Custom Windows
1. Introduction
While standard windowing strategies—such as Fixed, Sliding, and Session windows—cover the majority of stream processing use cases, some applications require more specialized partitioning logic.
A Custom Window is defined by extending Apache Beam's WindowFn class. This allows developers to control exactly how elements are assigned to windows and how windows are merged, opening the door to dynamic, data-driven windowing behaviors.
2. Why This Concept Exists
Standard windowing strategies assume static, uniform temporal boundaries. However, real-world business rules are often non-uniform and complex.
Custom windows are necessary for scenarios such as:
- Dynamic Sizes: Window durations that change based on payload attributes (e.g., high-priority user events get shorter windows, while low-priority events get longer ones).
- Non-Standard Time Intervals: Grouping data based on calendar concepts that do not map to simple hourly or daily multipliers (e.g., custom factory shifts or fiscal quarters).
- Custom Session Rules: Sessions that merge not just based on inactivity gaps, but on transaction boundaries (e.g., grouping events from checkout start to purchase completion).
3. Key Terminology
- WindowFn: The base class in Apache Beam for all windowing strategies.
- IntervalWindow: The standard representation of a time interval, defined by a closed start time and open end time
[start, end). - Window Merging: The process of combining multiple windows into a single window.
- Window Coder: A serializer that transmits window metadata across workers in a distributed cluster.
4. How It Works
To create a custom windowing strategy in Apache Beam:
- Inherit: Subclass the
WindowFnbase class. - Assign: Override the
assign(self, context)method. This method receives anAssignContext(which provides access to the element and its timestamp) and returns a list of windows that the element belongs to. - Merge (Optional): For merging window types (like sessions), override the
merge(self, context)method to inspect active windows and combine overlapping ones. - Coder: Override
get_window_coder(self)to provide a coder for window serialization.
5. Visual Diagram
1-min Window
12:00 - 12:01
10-min Window
12:00 - 12:10
6. Code Example
The following code implements a custom PriorityBasedWindowFn that assigns elements to 1-minute windows if they are flagged as "high" priority, or 10-minute windows otherwise:
import apache_beam as beam
from apache_beam.transforms.window import WindowFn, IntervalWindow
class PriorityBasedWindowFn(WindowFn):
def assign(self, assign_context):
# Access the element payload and metadata timestamp
element = assign_context.element
timestamp = float(assign_context.timestamp)
# Determine window size based on payload property
if isinstance(element, dict) and element.get("priority") == "high":
size = 60 # 1 minute
else:
size = 600 # 10 minutes
# Calculate aligned start time
start = timestamp - (timestamp % size)
# Return a list of windows (must be iterable)
return [IntervalWindow(start, start + size)]
def get_window_coder(self):
# Use standard interval window serializer
return IntervalWindow.Coder()
# Usage in pipeline:
# windowed = stream | beam.WindowInto(PriorityBasedWindowFn())
7. Code Explanation
PriorityBasedWindowFninherits fromWindowFn.assign_context.elementextracts the actual Python dictionary payload.assign_context.timestampfetches the metadata event timestamp.start = timestamp - (timestamp % size)aligns the window start to the nearest grid interval.return [IntervalWindow(start, start + size)]returns a list containing a singleIntervalWindowcovering the range.get_window_coderreturnsIntervalWindow.Coder(), which is sufficient since we are using standardIntervalWindowbounds.
8. Real Production Example
In industrial manufacturing, plants run three daily shifts that do not align with standard clock divisions: Morning (06:00 - 14:00), Afternoon (14:00 - 22:00), and Night (22:00 - 06:00). To track machinery status per shift, a custom windowing strategy calculates shift-specific intervals based on the timestamp of the telemetry reading. This allows aggregations to run precisely within shift boundaries, letting operators analyze team efficiency.
9. Common Mistakes
- Returning a Single Window Directly: Returning
IntervalWindow(...)instead of wrapping it in a list[IntervalWindow(...)]. Theassignmethod must return an iterable collection of windows. - Non-Deterministic Assignment: Writing assignment logic that relies on local worker system time or randomized states. Window assignment must be a pure function of the element and its timestamp to ensure consistency.
- Forgetting the Window Coder: Omitting
get_window_coder. This causes serialization errors when elements are shuffled between worker nodes in a distributed environment.
10. Interview Perspective
- Question: Why does the
assignmethod return a list of windows instead of a single window? - Answer: In Apache Beam, a single element can be assigned to multiple windows simultaneously (e.g., in a sliding window strategy, an element belongs to multiple overlapping slices).
- Question: What is the purpose of the
mergemethod in a customWindowFn? - Answer: The
mergemethod allows the runner to group and combine overlapping active windows into a single window. It is used to implement session windows, where new elements bridge the gaps between existing windows.
11. Best Practices
- Ensure the custom
assignlogic is highly optimized; it executes for every record passing through the transform and can easily bottleneck a high-throughput pipeline. - Whenever possible, reuse the standard
IntervalWindowto avoid implementing a custom window class and coder. - Test your custom window functions with unit tests using mock timestamps to verify boundary conditions.
12. Summary
- Custom windows allow arbitrary partitioning rules beyond standard types.
- They are created by extending
WindowFnand overridingassign. assignreceives element and time metadata, returning a list of windows.get_window_coderis required to handle network serialization.- Useful for dynamic window sizing and shift-aligned window intervals.