Metrics
1. Introduction
Metrics in Apache Beam provide a way to instrument and monitor pipeline execution at runtime. Unlike logging, which records discrete textual events, metrics track quantitative values (like counting processed items, monitoring processing latency distributions, or tracking queue depths) in a lightweight, aggregatable format. Dataflow collects these metrics and surface them in both the Cloud Console and Cloud Monitoring.
2. Why This Concept Exists
Telemetry is crucial for assessing pipeline health. However, relying on standard logs to track progress has disadvantages:
- High Network Overhead: Logging "Processed record 1", "Processed record 2" generates immense data volumes that slow down processing.
- Lack of Aggregation: Logs are split across VMs. Calculating the total number of records processed requires downloading and parsing gigabytes of text.
Metrics solve these issues by accumulating counts locally on the worker VMs. The aggregated numbers are periodicially reported back to the control plane, providing cheap, real-time, cluster-wide telemetry.
3. Key Terminology
- Counter: A metric that tracks a single 64-bit integer value that can be incremented or decremented (e.g., total rows processed).
- Distribution: A metric that tracks the statistics of a set of values, including minimum, maximum, sum, count, and average (e.g., string lengths or execution durations).
- Gauge: A metric that tracks the latest reported value (e.g., current temperature, watermarks, or system memory).
- System Metric: Built-in metrics generated by Dataflow, such as CPU utilization, system lag, and throughput rates.
4. How It Works
- SDK Declaration: The developer instantiates a metric object globally or inside a
DoFnnamespace. - Local Accumulation: While executing the
processloop on worker VM threads, the SDK increments or updates the metric. These updates occur in-memory, making them extremely fast. - Heartbeat Report: Every few seconds, the worker process aggregates the local values and sends them to the Dataflow service during standard health heartbeats.
- Global Consolidation: The control plane sums or aggregates the metrics across all workers and updates the Cloud Monitoring system.
- Programmatic Access: You can query these metrics via the
PipelineResultobject returned at the end of execution, or view them in the Dataflow monitoring console.
5. Visual Diagram
The metric aggregation lifecycle across multiple stateless workers:
[ Worker VM 1 ] ──(Increment counter: +5)──┐
│
[ Worker VM 2 ] ──(Increment counter: +8)──┼─(Heartbeat update)─> [ Dataflow Control Plane ]
│ │
[ Worker VM 3 ] ──(Increment counter: +3)──┘ (Aggregates to 16)
│
▼
[ Dataflow Console / UI ]
[ Cloud Monitoring / API ]
6. Code Example
The following code demonstrates how to define, increment, and record Counter, Distribution, and Gauge metrics in a custom DoFn:
import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
class InstrumentPipelineDoFn(beam.DoFn):
def __init__(self):
# 1. Define namespaces and names for metrics
self.processed_counter = Metrics.counter(self.__class__, "processed_elements")
self.malformed_counter = Metrics.counter(self.__class__, "malformed_elements")
self.length_distribution = Metrics.distribution(self.__class__, "word_lengths")
self.latest_gauge = Metrics.gauge(self.__class__, "last_seen_value_length")
def process(self, element):
# Increment total counter
self.processed_counter.inc()
if not element:
# Increment malformed counter
self.malformed_counter.inc()
return
word_len = len(element)
# Record distribution statistics (min, max, mean, count)
self.length_distribution.update(word_len)
# Track the latest value length
self.latest_gauge.set(word_len)
yield element.upper()
def run_metrics_job():
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
words = p | "CreateData" >> beam.Create(["apple", "", "banana", "watermelon", None])
processed = words | "ProcessWords" >> beam.ParDo(InstrumentPipelineDoFn())
# Retrieve pipeline result
result = p.run()
# Wait for completion (required for batch pipelines to fetch final metrics)
result.wait_until_finish()
# Query metrics programmatically
metrics_filter = beam.metrics.metric.MetricsFilter().with_namespace("InstrumentPipelineDoFn")
query_result = result.metrics().query(metrics_filter)
print("--- Final Job Metrics ---")
for counter in query_result['counters']:
print(f"Counter {counter.key.metric.name}: {counter.committed}")
for dist in query_result['distributions']:
print(f"Distribution {dist.key.metric.name} - Mean: {dist.committed.mean}, Max: {dist.committed.max}")
if __name__ == "__main__":
run_metrics_job()
7. Code Explanation
Metrics.counter,Metrics.distribution, andMetrics.gaugeare instantiated. They take a namespace (usually the executing class or module name) and a metric name.self.processed_counter.inc()increases the value by 1. You can also pass custom amounts, e.g.,.inc(5).self.length_distribution.update(word_len)feeds the length into a running stats collector.result.metrics().query(metrics_filter)pulls the metric results from the completed job metadata.committedmetrics represent updates validated by the runner.
8. Real Production Example
A data engineering team monitors a pipeline ingest rate. They define a Counter metric invalid_records. They configure a GCP Cloud Monitoring Alerting Policy to monitor the resource dataflow_job and the metric user/invalid_records. If the counter increases by more than 100 in 5 minutes, it triggers an alert indicating that a source database schema has changed and is feeding corrupt data.
9. Common Mistakes
- Dynamic Metric Names: Naming metrics using data values. E.g.,
Metrics.counter(self.__class__, f"user_{element.user_id}"). If your system has 1 million users, this attempts to provision 1 million metrics, exhausting Dataflow and Cloud Monitoring quota resources, crashing the job. Naming metrics must remain static. - Assuming Real-time Accuracy in Active Jobs: Relying on metric queries for precise billing or transaction accounting. Metrics are gathered asynchronously through periodic heartbeats. While helpful for statistics, they do not guarantee exactly-once timing accuracy for live data billing.
10. Interview Perspective
- Question: What is the difference between
attemptedandcommittedmetrics in Apache Beam? - Answer:
Attemptedmetrics reflect all operations, including data processes on worker nodes that failed and were retried by the runner.Committedmetrics only include updates that were part of successfully committed bundles. For production statistics, always rely oncommittedvalues. - Question: Are Apache Beam user-defined metrics visible in Cloud Monitoring dashboards?
- Answer: Yes. Dataflow automatically exports user-defined metrics to Cloud Monitoring under the path
custom.googleapis.com/dataflow/user/metric_name.
11. Best Practices
- Keep metric names static and use descriptive camel_case or snake_case namespaces.
- Use
Distributionswhen tracking numeric performance bounds (like payload size or execution latency) to get averages and extremes. - Use
Countersfor monitoring throughput and error frequencies.
12. Summary
- Metrics provide lightweight telemetry without the performance cost of logs.
- Supports Counters (sum), Distributions (min/max/average), and Gauges (latest value).
- Metrics are compiled locally on workers and periodically synchronized.
- User metrics are exported to Cloud Monitoring for dashboarding and alerting.