Watermarks
1. Introduction
A Watermark is a moving temporal line in Apache Beam that measures the progress of time inside a streaming pipeline. It represents the system's confidence of when all data up to a certain point has been processed.
2. Why This Concept Exists
In batch processing, all data is present at start. In streaming, network delays, mobile disconnections, or system retries cause data to arrive out of order (e.g. an event that happened at 12:00 might only reach the pipeline at 12:15). A watermark is the mechanism that answers the fundamental streaming question: "How long should the pipeline wait for slow or delayed data before closing the window and printing results?"
3. Key Terminology
- Event Time: The time when the event actually occurred on the device (embedded inside the record payload).
- Processing Time: The wall-clock time on the worker machine currently executing the transform.
- Late Data: Elements that arrive after the watermark has passed their event time.
4. How It Works
- Beam tracks the event timestamps of all records in the pipeline.
- The watermark monotonically moves forward. If the watermark reaches
12:10:00, the runner assumes: "We have seen almost all data generated before 12:10:00." - Heuristic Watermark: An estimate of data arrival progress based on network traffic trends.
- Exact Watermark: A precise measurement used when sources (like Pub/Sub queues) guarantee strict message ordering metadata.
5. Visual Diagram
Event Time (Data)
12:00 | 12:05 | 12:10
Processing Time (Wall Clock)
12:05 | 12:12 | 12:20
6. Code Example
Extracting custom event timestamps from logs and configuring allowed lateness:
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
class ExtractTimestampDoFn(beam.DoFn):
def process(self, element):
# Extract event time in seconds from JSON
event_time_epoch = float(element["timestamp"])
# Yield element wrapped with its event timestamp
yield beam.window.TimestampedValue(element, event_time_epoch)
# In your pipeline:
# output = logs | beam.ParDo(ExtractTimestampDoFn()) | beam.WindowInto(FixedWindows(60), allowed_lateness=300)
7. Code Explanation
beam.window.TimestampedValue(element, timestamp)tells the runner when the log event actually occurred.- The runner uses these timestamps to calculate the watermark progress.
allowed_lateness=300tells the watermark engine: "Keep window state open for 300 seconds (5 minutes) after the watermark passes to capture late elements."
8. Real Production Example
In cellular signal towers, device connectivity drops in tunnels. A device might buffer logs locally at 14:00 and upload them at 14:30. The watermark acts as the governor that permits the streaming aggregator to update the 14:00 hourly metrics once the delayed logs arrive.
9. Common Mistakes
- Assuming watermark equals processing time: Watermarks measure Event Time progress. If data ingestion stops, the watermark stops moving, even as the wall clock processing time keeps ticking.
- Discarding late data silently: If you do not specify an allowed lateness threshold, late-arriving events are discarded by default without triggering errors.
10. Interview Perspective
- Question: What causes a watermark to lag (drift)?
- Answer: Network latency, network disconnections, consumer bottlenecks, or high system backlog. If workers process elements slowly, the gap between event time and processing time increases, causing the watermark to lag.
- Question: What is the consequence of setting an excessively large allowed lateness?
- Answer: The runner must hold the window state in memory or persistent storage for a longer period, increasing storage costs and potentially causing worker memory crashes.
11. Best Practices
- Extract event timestamps as close to the ingestion source as possible.
- Set a reasonable allowed lateness that covers 99% of your network latency distribution profiles.
12. Summary
- Watermarks track time progress in streaming pipelines.
- Differentiates Event Time (on-device) from Processing Time (on-worker).
- Governs window closing and late-arriving data policies.