Handling Out-of-Order Events
1. Introduction
In real-time systems, events frequently arrive at the processing pipeline in a different order than they occurred in the physical world. Handling out-of-order events is the process of using event-time semantics, watermarks, allowed lateness, and triggers to guarantee correct processing regardless of arrival order.
2. Why This Concept Exists
Distributed networks, mobile connections, and retry mechanisms introduce unpredictable latency. A user might perform an action in a mobile app at 12:00 (Event Time), but because they enter a tunnel, the message is queued offline and only reaches the cloud pipeline at 12:15 (Processing Time). If the pipeline processed events based purely on arrival order, it would associate this event with the 12:15 interval, distorting historical accuracy.
3. Key Terminology
- Event Time Skew: The time difference between when an event occurred (Event Time) and when it is processed by the pipeline (Processing Time).
- Allowed Lateness: A configurable duration during which the runner preserves window state to process late-arriving events.
- Trigger: A rule that determines when the accumulated results of a window should be materialized and emitted.
- Accumulating Mode: A trigger setting where subsequent panes for a window include all historical data plus the new late data.
- Discarding Mode: A trigger setting where subsequent panes for a window emit only the delta (new data) since the last trigger firing.
4. How It Works
- Timestamping: Every element entering the pipeline is associated with an event timestamp.
- Watermarking: The watermark acts as the system's clock. When the watermark passes the end of a window (e.g.
12:05), the window is considered complete. - Allowed Lateness: If an event with timestamp
12:03arrives after the watermark has passed12:05, it is marked as "late." If it arrives within theallowed_latenessperiod, the runner retrieves the window state, updates the aggregation, and emits a new pane. - Triggers: The trigger controls when these updates are sent downstream (e.g., emit immediately when late data arrives).
5. Visual Diagram
Event Time (12:00 - 12:05)
e1: 12:01 (On-Time)
e2: 12:04 (On-Time)
Late Event (12:03)
Arrives at Wall Clock 12:12.
Processed because Watermark lateness window is open.
6. Code Example
The following pipeline configures 5-minute fixed windows that allow data to arrive up to 10 minutes late, updating the aggregation results:
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
# Define pipeline execution
with beam.Pipeline() as p:
(
p
| "ReadStream" >> beam.io.ReadFromPubSub(subscription="projects/my-proj/subscriptions/logs")
| "Parse" >> beam.Map(lambda x: eval(x.decode("utf-8"))) # Returns (user, score)
# Configure windowing with late data support
| "WindowWithLateness" >> beam.WindowInto(
FixedWindows(5 * 60), # 5-minute windows
trigger=AfterWatermark(
late=beam.transforms.trigger.AfterCount(1) # Fire instantly on each late element
),
allowed_lateness=10 * 60, # Keep state for 10 minutes after watermark passes
accumulation_mode=AccumulationMode.ACCUMULATING # Emit full updated counts
)
| "SumScores" >> beam.CombinePerKey(sum)
| "Format" >> beam.Map(print)
)
7. Code Explanation
FixedWindows(5 * 60)creates 5-minute windows.allowed_lateness=10 * 60(10 minutes) instructs the runner to keep the window's state (accumulators) in memory or disk for 600 seconds after the watermark passes12:05.trigger=AfterWatermark(late=AfterCount(1))tells the runner: "Fire once when the watermark passes the end of the window. If any late elements arrive afterward, fire an updated pane immediately for every 1 late element."accumulation_mode=AccumulationMode.ACCUMULATINGensures that if the first pane emitted10, and a late element of value5arrives, the next pane will emit the updated total15(instead of just the delta5).
8. Real Production Example
A ridesharing company receives trip distance logs. Drivers in remote areas accumulate logs on their local phones. The pipeline calculates daily mileage. By allowing 24 hours of lateness, the system incorporates rides from offline drivers into the correct day's financial metrics when they re-establish network connection.
9. Common Mistakes
- Leaving
allowed_latenessat 0: By default,allowed_latenessis 0. Any data arriving even one millisecond after the watermark passes the window boundary will be dropped, leading to undercounted metrics. - Infinite Lateness Memory Exhaustion: Setting
allowed_latenessto a huge value (or infinity) forces the runner to store window states forever. This will quickly exhaust worker RAM or local disk space. - Mismatch of Accumulation Modes: Using
DISCARDINGmode when your downstream database expects complete totals, or usingACCUMULATINGmode when the sink expects deltas, leading to duplicate calculations in external databases.
10. Interview Perspective
- Question: What happens to elements that arrive after the allowed lateness window has expired?
- Answer: They are considered "droppably late." The runner discards these elements immediately without processing them. They do not trigger window execution.
- Question: How does event-time processing differ from processing-time processing?
- Answer: Processing-time processing assigns events to windows based on the machine clock when the event is processed, ignoring when it happened. Event-time processing groups events based on their embedded timestamps, ensuring deterministic results even if data is replayed.
11. Best Practices
- Analyze your historical network skew to set an
allowed_latenessthat covers 99.9% of late-arriving data. - Use
ACCUMULATINGmode for updating internal metrics andDISCARDINGmode if you are sending deltas to append-only databases. - Monitor your pipeline's "system lag" metric to ensure the watermark is advancing and not stuck due to slow workers.
12. Summary
- Out-of-order data is caused by network latency and client-side queuing.
- Beam uses allowed lateness to retain state for late events.
- Triggers determine when to emit updated results (panes) for windows.