As the number of streaming queries grew, we wanted a centralized place where we could quickly view a snapshot of all our pipelines.
When it comes to monitoring our queries, we are primarily interested in answering three questions:
What is the input rate?
What is the processing rate?
What is the age of the freshest data being processed?
Structured Streaming queries on Databricks already come with a UI attached to them when they are started, which helps monitor the input rate and processing rate (#1 and #2) at the job level:
These graphs help look at the performance of a specific job, but when you have a lot of queries, it is not practical to individually check each one. Additionally, this UI does not answer question number three: What is the age of the freshest data being processed?
Before monitoring the query, we first need to install the Datadog Agent on the cluster. You can follow the ‘Driver only’ instructions on the Datadog Databricks integration guide. Before running the script provided by Datadog, you need to modify it to set‘enable_query_name_tag’ to true. This is placed under ‘instances’ like this:
instances:
- spark_url: http://\$DB_DRIVER_IP:\$DB_DRIVER_PORT
spark_cluster_mode: spark_standalone_mode
cluster_name: ${hostip}
streaming_metrics: true
enable_query_name_tag: true <----
This will tag your metrics with the QueryName you provide, allowing you to view them individually even if there is more than one query per cluster. You also need to enable dogstatsd
by adding the following two lines to the script:
Enable dogstatsd
echo "use_dogstatsd: true" >> /etc/datadog-agent/datadog.yaml
echo "dogstatsd_port: 8125" >> /etc/datadog-agent/datadog.yaml
To run the Datadog agent on your cluster, you need to have the install script you generated run as an init script as well as enable streaming metrics and pass in your Datadog API key:
In order to select a specific query when making a dashboard, we need to provide a QueryName for the query to use. This can be done when writing the query:
.writeStream
.queryName(query_name) <---
.outputMode("append")
.format("delta")
.option("checkpointLocation", checkpoint)
.toTable(tablename)
Input rate and processing rate are automatically tracked for you. However, if you want to know your data freshness, you need to track it using foreachBatch. First, you need to pip install datadog. Then you can track the freshness like this:
from datadog import statsd
.writeStream
.queryName(query_name) <---
.format("delta")
.option("checkpointLocation", checkpoint)
.toTable(tablename)def record_freshness(df, epoch_id):
timestamp = df.limit(1).collect()[0]['enqueued_time']
freshness = (datetime.now() - timestamp).total_seconds()
statsd.gauge(
'streaming.freshness_seconds',
freshness,
tags=['query_name:'+query_name]
)
(
query
.writeStream
.queryName(query_name)
.outputMode("append")
.foreachBatch(record_freshness)
.format("delta")
.option("checkpointLocation", checkpoint)
.start()
)
If your query does not use ‘foreachBatch’, you can create a second query that reads the updates from your first query and records metrics:
spark.readStream
.option('startingVersion', 'latest')
.format("delta")
.table(tablename)
.select("event_time")
.writeStream
.foreachBatch(update_freshness)
.start()
Now that we have all the metrics we care about being tracked, we can build a dashboard on Datadog. We can track all of these metrics using time-series graphs. Here is a quick guide on setting up a graph.
This is what the dashboard could look like when all the charts are set up:
You can quickly swap to a different $query_name to view graphs for different queries. These dashboards will allow you to ensure that your queries are keeping up with incoming data and track how changes are affecting performance.
Experimenting with query-level optimizations at Statsig: How we reduced latency by testing temp tables vs. CTEs in Metrics Explorer. Read More ⇾
Find out how we scaled our data platform to handle hundreds of petabytes of data per day, and our specific solutions to the obstacles we've faced while scaling. Read More ⇾
The debate between Bayesian and frequentist statistics sounds like a fundamental clash, but it's more about how we talk about uncertainty than the actual decisions we make. Read More ⇾
Building a scalable experimentation platform means balancing cost, performance, and flexibility. Here’s how we designed an elastic, efficient, and powerful system. Read More ⇾
Here's how we optimized store cloning, cut processing time from 500ms to 2ms, and engineered FastCloneMap for blazing-fast entity updates. Read More ⇾
It's one thing to have a really great and functional product. It's another thing to have a product that feels good to use. Read More ⇾