Pablo Beltran
Software Engineer, Statsig

Automating BigQuery load jobs from GCS: Our scalable approach

Thu Mar 06 2025

At Statsig, efficiently loading data from Google Cloud Storage (GCS) into BigQuery is essential for analytics and data-driven decision-making

Initially, our automated ingestion system was rigid—tables had to be predefined, and adding new tables required manual updates. As our data needs evolved, we created a flexible and dynamic ingestion solution.

Here’s how we built it:

The challenge: Flexibility and reliability

Our original ingestion pipeline worked without manual intervention but lacked flexibility. Every new data source required explicit configuration. Additionally, relying solely on BigQuery's INFORMATION_SCHEMA to verify job statuses was problematic due to potential delays. We needed to:

  • Automatically detect and ingest data into new tables dynamically.

  • Group files into time-based buckets for organized ingestion.

  • Reliably track ingestion jobs, accounting for potential delays in status reporting.

Our dynamic, automated solution

We built a Python-based system managed by an orchestrator that triggers our load scripts in a cron-like schedule. The solution leverages GCS, MongoDB, and BigQuery, focusing on these key aspects:

1. Dynamic bucket discovery

We detect GCS buckets automatically using naming conventions, any bucket matching a certain prefix is considered for load jobs. The bucket should contain top-level folders named after the destination table for the data to be written to.


def list_buckets_with_prefix(project_id, prefix):
    storage_client = storage.Client(project=project_id)
    buckets = storage_client.list_buckets()
    return [bucket.name for bucket in buckets if bucket.name.startswith(prefix)]

def list_table_files_in_bucket(project_id: str, bucket_name: str) -> List[str]:
    """List all files in a GCS bucket with a given prefix."""
    storage_client = storage.Client(project=project_id)
    blobs = storage_client.list_blobs(bucket_name, delimiter="/", include_folders_as_prefixes=True)

    # For some reason prefixes does not show up until the iterator is consumed
    # so the line below is needed.
    list(blobs)
    files = blobs.prefixes

    # They should all be of the type <project_id>.<dataset>.<table_name>
    pattern = r"(?P<project_id>[a-zA-Z0-9\-_]+)\.(?P<dataset>[a-zA-Z0-9\-_]+)\.(?P<table_name>[a-zA-Z0-9\-_]+)"

    return [blob[:-1] for blob in files if re.search(pattern, blob)]

2. Time-based bucketing of files

We organize incoming data into discrete time buckets (e.g., every 1,000 seconds). This enables systematic ingestion and easy tracking:


def get_time_bucket(timestamp_ms, bucket_size_ms):
    return (timestamp_ms // bucket_size_ms) * bucket_size_ms

This means data files written between timestamps [N, N + bucket_size) are grouped and ingested together.

3. Reliable job status tracking

We track ingestion job statuses in two ways:

  • BigQuery's INFORMATION_SCHEMA.JOBS: for historical job statuses and to identify completed or failed jobs.

  • MongoDB: for tracking pending and initiated jobs to mitigate delays in BigQuery's INFORMATION_SCHEMA updates.

To retrieve the job states from INFORMATION_SCHEMA, we use:


SELECT labels, state, error_result
FROM `region-us`.INFORMATION_SCHEMA.JOBS
WHERE job_type = 'LOAD'
    AND creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
    AND EXISTS (
        SELECT 1 FROM UNNEST(labels) WHERE key = 'bq_load_source_bucket_name'
    )

The labels contain key metadata (like the bucket name and time bucket) we use to verify whether specific datasets have been successfully ingested:

  • bq_load_source_bucket_name: Indicates the originating bucket for the load job.

  • bq_load_dest_table_name: Indicates the destination table for the load job.

  • bq_load_bucket_timestamp: Indicates the specific time bucket processed.

This labeling strategy simplifies tracking and status verification.

Declarative approach

Our ingestion system is declarative. The Python script explicitly determines the desired state (datasets that should be loaded) and compares it against the actual state (data successfully ingested or pending). The script then launches any necessary jobs to correct discrepancies.

This declarative approach simplifies management and ensures the actual state of our ingestion matches our expectations.

3. Launching BigQuery load jobs

Our Python script triggers ingestion using BigQuery's API. Example:


def launch_bq_load_job(bucket_name, folder, timestamp_bucket, table_info):
    full_path = f"gs://{bucket_name}/{folder}/{timestamp_bucket}*"
    full_table_id = f"{table_info.project_id}.{table_info.dataset}.{table_info.tablename}"

    job_config = bigquery.LoadJobConfig(
        source_format="AVRO",
        write_disposition="WRITE_APPEND",
        labels={
            "bq_load_source_bucket_name": bucket_name,
            "bq_load_bucket_timestamp": str(timestamp_bucket),
        }
    )

    bq_client.load_table_from_uri(
        source_uris=full_path,
        destination=full_table_id,
        job_config=job_config,
    )

Orchestrating workflows

We use an orchestrator configured to trigger our Python ingestion script periodically, following a cron-like schedule. This ensures frequent, automated checks and ingestion cycles:


*/2 * * * * /usr/bin/python3 /path/to/load_jobs.py

System architecture


GCS Buckets
|
├── Cron Orchestrator (Python)
│   ├── Check for new buckets/files
│   ├── Compute time buckets
│   ├── Verify load status (MongoDB & BigQuery INFORMATION_SCHEMA)
│   ├── Trigger load job
│   │   └── BigQuery
|
└── MongoDB
    └── Track pending jobs for immediate consistency

Writing data for auto-ingestion

Teams write data to GCS using clear conventions:


<bucket-prefix>/<project_id>.<dataset>.<tablename>/<timestamp>_<uuid>.avro

  • <bucket-prefix>: Lets our system know that this bucket should be considered for ingestion

  • <project_id.dataset.tablename>: Tells the system what the destination table is.

  • <timestamp>_<uuid>.avro: Lets us know what time bucket the data should belong to.

Why BigQuery load jobs?

BigQuery load jobs offer significant advantages:

  • Cost-effective: No ingestion costs for batch loads from GCS.

  • Highly scalable: Efficient handling of massive datasets.

  • Reliable: Optimized for quick and robust data loading.

Results

Our updated system:

  • Enables rapid onboarding of new data sources without manual intervention.

  • Maintains consistent and accurate data ingestion states.

  • Optimizes resource usage by avoiding unnecessary jobs.

Recommendations

  • Standardize bucket and file naming conventions.

  • Use a durable datastore to persist launched jobs for immediate status accuracy if short ingestion cycles are crucial.

  • For simpler setups, rely solely on BigQuery and accept minor delays.

To summarize

Automating BigQuery data ingestion from GCS using declarative scripts and lightweight orchestration delivers scalable, reliable, and efficient data workflows.

We currently use this system to process over a trillion rows daily into hundreds of tables.

Request a demo

Statsig's experts are on standby to answer any questions about experimentation at your organization.
request a demo cta image

Recent Posts

We use cookies to ensure you get the best experience on our website.
Privacy Policy