intermediate

Monitoring & Logs

6 min readLast updated: 2026-06-30

1. Introduction

Monitoring and Logging are the tools and practices used to observe, debug, and optimize Apache Beam pipelines as they run on Google Cloud Dataflow.

2. Why This Concept Exists

Data processing happens in the cloud across dozens of virtual machine workers. When a pipeline slows down or throws an error, you cannot simply attach a debugger or inspect standard console outputs. You need a centralized dashboard to track data throughput speeds, read system error logs, and inspect custom pipeline execution metrics (like total error rows parsed).

3. Key Terminology

  • System Lag: In streaming pipelines, the time difference between when the latest data was generated and when it was processed (measures pipeline delays).
  • Throughput: The volume of records processed per second.
  • Beam Metrics: Custom counters, distributions, or gauges declared inside your code that update in real-time on the console dashboard.

4. How It Works

  • Dataflow Console: GCP provides a visual DAG graph showing metrics (elements/sec, execution time) for every step.
  • Cloud Logging: Standard python logging records are collected automatically and sent to Google Cloud Logging.
  • Metrics SDK: Developers declare counters inside a DoFn to track custom execution events.

5. Visual Diagram

Telemetry Sources
CPU / RAM / Lag metrics

Dataflow Agent
Collects worker metrics

GCP Dataflow Dashboard
Custom UI charts & logs

6. Code Example

Creating custom counter and distribution metrics inside a DoFn:

python
import logging
import apache_beam as beam
from apache_beam.metrics import Metrics

class ProcessTransactionsDoFn(beam.DoFn):
    def __init__(self):
        # 1. Declare metrics at class initialization
        self.error_counter = Metrics.counter(self.__class__, "error_count")
        self.amount_distribution = Metrics.distribution(self.__class__, "transaction_amounts")

    def process(self, element):
        try:
            amount = float(element["amount"])
            # Update distribution metric (tracks min, max, mean)
            self.amount_distribution.update(amount)
            yield element
        except Exception as e:
            # Update counter metric
            self.error_counter.inc()
            logging.error(f"Failed to parse transaction: {e}")

7. Code Explanation

  • Metrics.counter(...) declares a counter metric named "error_count".
  • self.error_counter.inc() increments the counter when an exception occurs.
  • logging.error(...) streams the error trace to GCP Cloud Logging.
  • amount_distribution.update(amount) calculates summary statistics (min, max, mean) for the amounts processed.

8. Real Production Example

When auditing pipelines, you check the Dataflow Job Details page. If the "System Lag" graph is climbing steadily upward, it means your stream ingestion rate exceeds worker capacity, and you must allocate more workers or enable autoscaling.

9. Common Mistakes

  • Excessive logging: Writing logging.info(...) inside the process() method for every single record will overload GCL, resulting in high log ingestion bills. Log only warnings, errors, or periodic bundle samples.
  • Declaring metrics dynamically inside process: Always declare your metrics inside __init__ or as class variables. Re-instantiating metrics inside process() creates worker bottlenecks.

10. Interview Perspective

  • Question: What are the three types of user metrics supported in Apache Beam?
  • Answer:
    1. Counter: Tracks a single running sum integer (can increment or decrement).
    2. Distribution: Tracks summary stats (min, max, sum, count, mean).
    3. Gauge: Tracks only the latest reported integer value.
  • Question: How do you access worker logs?
  • Answer: Logs are collected by the Cloud Logging agent running on worker VMs. You can query them in the GCP console using the log explorer or search filter boxes inside the Dataflow job portal.

11. Best Practices

  • Use structured logging (JSON formatted logs) to allow easy filtering in Cloud Logging.
  • Add counter metrics to track dirty, skipped, or truncated rows to evaluate data quality.

12. Summary

  • Logs are handled by Python standard logging and shipped to Cloud Logging.
  • Dataflow Console provides step-by-step telemetry (lag, throughput, CPU).
  • Beam Metrics SDK supports Counters, Distributions, and Gauges.

13. Interactive Challenges

Challenge 1: Declare Counter Metric (Beginner)

Define a DoFn class named InputCounterDoFn that increments a Beam counter metric named "records_processed" for every element processed.

Challenge 2: Declare Distribution Metric (Intermediate)

Define a DoFn class named WordLengthDistributionDoFn that measures the distribution of string lengths for processed text elements using a distribution metric named "word_lengths".

Challenge 3: Query Metrics Programmatically (Advanced)

Write a code snippet that runs a pipeline, waits until it finishes, and queries the execution results to print the final value of counter metric "records_processed".

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)