In stream processing, time is a slippery concept. Unlike batch processing where all data is available before computing, streaming engines process a continuous flow of events.
To make sense of this continuous flow, we group events into windows (e.g., hourly sales). But how does the engine know when an hour has actually finished and it can output the window's sum?
The answer is Watermarks.
Before understanding watermarks, we must distinguish between two types of time:
Because of network latency, user disconnections, and system buffering, data always arrives out of order. Watermarks bridge this gap.
A watermark is a monotonically increasing timestamp that represents the system's temporal progress.
[!NOTE] When a watermark reaches timestamp T, it indicates the system asserts: "We believe no more events with an Event Time older than T will arrive in this stream."
Once the watermark passes the end of a window (e.g., passing 1:00 PM for the 12:00 PM - 1:00 PM window), the engine knows it is safe to close that window and output the results.
Watermarks are typically classified into two types:
In most real-world scenarios, the engine cannot predict the future (e.g., a user might turn off their mobile device and upload logs 3 hours later). Therefore, the engine computes a heuristic watermark based on:
Heuristic watermarks are approximations. If data arrives older than the watermark, it is classified as Late Data.
When processing data where all source events are guaranteed to be in order (such as a database log stream or an ordered file sequence), the watermark can track the event time perfectly. In this case, there is never any late data.
In Apache Beam, if your raw input data does not have explicit event timestamps, you can use the beam.window.TimestampedValue class or assign them using a custom DoFn.
Here is how you parse custom event times from messages and set them as the official event time in Apache Beam:
import apache_beam as beam
from datetime import datetime
class AddTimestampFn(beam.DoFn):
def process(self, element):
# Element format: {"event_id": "123", "event_time": "2026-06-18 12:05:00", "payload": "..."}
dt = datetime.strptime(element["event_time"], "%Y-%m-%d %H:%M:%S")
timestamp = dt.timestamp()
# Yield the element wrapped with its official event time
yield beam.window.TimestampedValue(element, timestamp)
To design robust streaming pipelines, keep these principles in mind:
with_allowed_lateness().