Initially, our automated ingestion system was rigid—tables had to be predefined, and adding new tables required manual updates. As our data needs evolved, we created a flexible and dynamic ingestion solution.
Here’s how we built it:
Our original ingestion pipeline worked without manual intervention but lacked flexibility. Every new data source required explicit configuration. Additionally, relying solely on BigQuery's INFORMATION_SCHEMA to verify job statuses was problematic due to potential delays. We needed to:
Automatically detect and ingest data into new tables dynamically.
Group files into time-based buckets for organized ingestion.
Reliably track ingestion jobs, accounting for potential delays in status reporting.
We built a Python-based system managed by an orchestrator that triggers our load scripts in a cron-like schedule. The solution leverages GCS, MongoDB, and BigQuery, focusing on these key aspects:
We detect GCS buckets automatically using naming conventions, any bucket matching a certain prefix is considered for load jobs. The bucket should contain top-level folders named after the destination table for the data to be written to.
We organize incoming data into discrete time buckets (e.g., every 1,000 seconds). This enables systematic ingestion and easy tracking:
This means data files written between timestamps [N, N + bucket_size)
are grouped and ingested together.
We track ingestion job statuses in two ways:
BigQuery's INFORMATION_SCHEMA.JOBS: for historical job statuses and to identify completed or failed jobs.
MongoDB: for tracking pending and initiated jobs to mitigate delays in BigQuery's INFORMATION_SCHEMA updates.
To retrieve the job states from INFORMATION_SCHEMA, we use:
The labels contain key metadata (like the bucket name and time bucket) we use to verify whether specific datasets have been successfully ingested:
bq_load_source_bucket_name
: Indicates the originating bucket for the load job.
bq_load_dest_table_name
: Indicates the destination table for the load job.
bq_load_bucket_timestamp
: Indicates the specific time bucket processed.
This labeling strategy simplifies tracking and status verification.
Our ingestion system is declarative. The Python script explicitly determines the desired state (datasets that should be loaded) and compares it against the actual state (data successfully ingested or pending). The script then launches any necessary jobs to correct discrepancies.
This declarative approach simplifies management and ensures the actual state of our ingestion matches our expectations.
Our Python script triggers ingestion using BigQuery's API. Example:
We use an orchestrator configured to trigger our Python ingestion script periodically, following a cron-like schedule. This ensures frequent, automated checks and ingestion cycles:
Teams write data to GCS using clear conventions:
<bucket-prefix>: Lets our system know that this bucket should be considered for ingestion
<project_id.dataset.tablename>: Tells the system what the destination table is.
<timestamp>_<uuid>.avro: Lets us know what time bucket the data should belong to.
BigQuery load jobs offer significant advantages:
Cost-effective: No ingestion costs for batch loads from GCS.
Highly scalable: Efficient handling of massive datasets.
Reliable: Optimized for quick and robust data loading.
Our updated system:
Enables rapid onboarding of new data sources without manual intervention.
Maintains consistent and accurate data ingestion states.
Optimizes resource usage by avoiding unnecessary jobs.
Standardize bucket and file naming conventions.
Use a durable datastore to persist launched jobs for immediate status accuracy if short ingestion cycles are crucial.
For simpler setups, rely solely on BigQuery and accept minor delays.
Automating BigQuery data ingestion from GCS using declarative scripts and lightweight orchestration delivers scalable, reliable, and efficient data workflows.
We currently use this system to process over a trillion rows daily into hundreds of tables.