Allowed Lateness
1. Introduction
In real-time streaming, the watermark indicates the progress of time in the pipeline. Once the watermark passes the end of a window, the runner assumes all data for that window has arrived, closes the window, and emits the output.
However, in real production systems, network lag, client offline states, or server crashes can delay data elements. Allowed Lateness is the policy that tells Apache Beam how long to keep a window's state open after the watermark has passed its end boundary, allowing late-arriving data to update the results.
2. Why This Concept Exists
By default, Apache Beam has an allowed lateness of zero. This means that any event arriving after the watermark has passed the end of its window is immediately discarded.
Allowed Lateness exists to prevent this data loss:
- Completeness: Ensures delayed messages are still aggregated and reflected in the final output.
- Speculative Processing: Allows pipelines to emit fast "on-time" results, and then issue updated "late" results if delayed data arrives.
- State Management: Bounds how long the runner must preserve window state. Without a limit, the runner would have to hold window states in memory forever, eventually crashing the system.
3. Key Terminology
- On-Time Firing: The first execution of the window's aggregation, triggered when the watermark passes the end of the window.
- Late Data: Elements whose event timestamps are older than the current watermark.
- Allowed Lateness: The duration of time (in seconds) after the watermark passes the window end during which late elements are still processed.
- State Garbage Collection (GC): The permanent deletion of window state after the allowed lateness duration expires.
4. How It Works
- Watermark Passes: The watermark passes the end of a window (e.g.
12:05). The runner triggers the "On-Time" output. - Late Element Arrives: A late element with event time
12:03arrives at processing time12:07. - Lateness Check: The runner checks if
current_watermark (12:07) < window_end (12:05) + allowed_lateness (e.g., 5 minutes = 12:10). - Process or Drop:
- If it is within the bounds, the runner restores the window state, adds the late element, and emits an updated result.
- If the watermark has passed the allowed lateness threshold, the element is discarded.
5. Visual Diagram
Before 12:05 (End)
On-Time Data Accepted
12:05 to 12:10 (Lateness)
Late Data Accepted
After 12:10 (GC Boundary)
Late Data Dropped (State Cleared)
6. Code Example
The following code configures 5-minute fixed windows while allowing late-arriving data to be processed for up to 3 minutes after the watermark passes:
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
with beam.Pipeline() as p:
(p
| "ReadTransactions" >> beam.io.ReadFromPubSub(subscription="projects/my-gcp/subs/sales")
| "FormatKV" >> beam.Map(lambda x: (x["user_id"], float(x["amount"])))
| "WindowWithLateness" >> beam.WindowInto(
FixedWindows(5 * 60), # 5-minute windows
allowed_lateness=3 * 60 # 3 minutes allowed lateness (180 seconds)
)
| "SumSales" >> beam.CombinePerKey(sum)
| "WriteOutput" >> beam.io.WriteToText("d:/Builds/beam-acadamy/beam-academy-hub/content/lessons/sales_output")
)
7. Code Explanation
FixedWindows(5 * 60)segments transactions into 5-minute windows.allowed_lateness=3 * 60configures the runner to preserve window states (e.g., user purchase tallies) for an additional 180 seconds after the watermark passes.CombinePerKey(sum)executes twice if late elements arrive: once when the watermark passes the window end, and again when late elements arrive before the allowed lateness expires.
8. Real Production Example
In cellular billing systems, users make voice calls and send texts. If a user enters a region with poor connectivity, call detail records (CDRs) are cached on the device and uploaded late. By setting an allowed lateness of 2 hours, the mobile carrier's streaming billing pipeline can update the hourly usage metrics once the cached CDRs upload, preventing billing discrepancies.
9. Common Mistakes
- Excessive Lateness Durations: Setting allowed lateness to large values (e.g. 7 days) on high-throughput streams. The runner must hold the intermediate window states in persistent storage, which can degrade pipeline performance and cause storage costs to soar.
- Forgetting to Set Triggers: Relying on allowed lateness without configuring appropriate trigger behaviors (e.g. accumulation modes), which can cause downstream sinks to receive confusing duplicate or partial aggregates.
- Ignoring Discarded Elements: Failing to monitor dropped late data. If allowed lateness is set too low, critical records are silently dropped.
10. Interview Perspective
- Question: What is the default allowed lateness in Apache Beam if you do not specify it?
- Answer: The default is 0 seconds. Late data is discarded immediately.
- Question: What happens to window state after the allowed lateness period expires?
- Answer: The runner performs garbage collection on the window. All state for that window is permanently deleted, and any subsequent data falling into that window is discarded.
11. Best Practices
- Analyze your source's latency profile to set allowed lateness at the 99th percentile of data arrival delays.
- For pipelines writing to databases that do not support updates (idempotent writes), avoid using allowed lateness, as it will trigger multiple writes for the same window.
- Monitor metrics such as
droppedDueToLatenessin your runner (e.g., Google Cloud Dataflow) to tune your lateness settings.
12. Summary
- Allowed lateness keeps window state open for delayed elements.
- Prevents data loss due to network lag or system backlog.
- By default, allowed lateness is 0, meaning late data is dropped.
- Window state is garbage collected after the lateness period passes.
- It is configured inside the
WindowIntotransform.