Timers & Scheduling
1. Introduction
A Timer in Apache Beam allows a stateful DoFn to register a callback at a specific point in time. When that time is reached, the runner triggers a callback method defined inside your class.
2. Why This Concept Exists
Stateful processing allows you to store data, but you also need a way to act based on the passage of time. For example:
- Timeouts: Alerting if an IoT device fails to send a heartbeat within 5 minutes.
- Garbage Collection: Clearing old state cells to free memory if a user session goes inactive.
- Window overrides: Forcing results to emit at exact intervals. Timers provide the mechanism to schedule and trigger these time-based operations.
3. Key Terminology
- Event-Time Timers: Fire based on watermark progress. Fired only when the stream's watermark passes the registered timestamp.
- Processing-Time Timers: Fire based on real-world wall-clock time on the worker machine.
- Timer Spec: The declaration of the timer slot (
TimerSpec). - On-Timer Callback: The method annotated with
@on_timerthat executes when the timer fires.
4. How It Works
- Declare: Define a
TimerSpecat the class level of your statefulDoFn. - Schedule: Inside
process(), set the timer to a specific timestamp usingtimer.set(timestamp). - Execute: When the target time arrives, Beam executes the callback method associated with that timer.
5. Visual Diagram
Element Ingested (12:00)
Sets timer at 12:15
6. Code Example
Setting an event-time timer to fire 5 minutes after an element's timestamp:
import apache_beam as beam
from apache_beam.state import TimerSpec, ValueStateSpec
from apache_beam.coders import VarIntCoder
from apache_beam.transforms.trigger import TimeDomain
class TimeoutAlertDoFn(beam.DoFn):
# 1. Declare state and timer specs
LAST_SEEN_STATE = ValueStateSpec("last_seen", VarIntCoder())
ALERT_TIMER = TimerSpec("alert_timer", TimeDomain.EVENT_TIME)
def process(self, element,
timestamp=beam.DoFn.TimestampParam,
last_seen=beam.DoFn.StateParam(LAST_SEEN_STATE),
timer=beam.DoFn.TimerParam(ALERT_TIMER)):
key, val = element
# Save timestamp
last_seen.write(int(timestamp))
# 2. Schedule timer to fire 5 minutes (300 seconds) in event time
timer.set(int(timestamp) + 300)
yield element
# 3. Define callback method triggered when timer fires
@on_timer(ALERT_TIMER)
def trigger_alert(self, key = beam.DoFn.KeyParam):
# Fire alert payload
yield (key, "ALERT: No events seen in last 5 minutes!")
7. Code Explanation
ALERT_TIMER = TimerSpec("alert_timer", TimeDomain.EVENT_TIME)creates an Event-Time timer cell.timer.set(...)schedules the callback execution at 5 minutes past the element's timestamp.@on_timer(ALERT_TIMER)binds thetrigger_alertfunction to the timer, receiving the corresponding key parameter.
8. Real Production Example
In session tracking, you store active cart contents in BagState. When a user exits the site, you want to clear their cart after 30 minutes of inactivity. You set an event-time timer for last_activity_time + 1800. If no new event resets the timer within 30 minutes, the timer fires, emails the user about their abandoned cart, and clears the state.
9. Common Mistakes
- Setting Timers without keys: Like stateful processing, timers require key-value inputs. Applying them to non-keyed data throws a validation crash.
- Neglecting timezone offsets: When using processing-time timers, be careful with system local time zones on worker VMs. Always stick to UTC.
10. Interview Perspective
- Question: What happens if you call
timer.set()multiple times for the same key? - Answer: Calling
set()overwrites the existing timer for that key/window. Only the last scheduled timestamp is active. This is ideal for resetting inactivity timeouts. - Question: Can a timer output data?
- Answer: Yes. The
@on_timercallback method has the exact same capabilities asprocess()—it can yield elements to downstream collections.
11. Best Practices
- Always clear state cells associated with a key inside the timer callback to prevent leaks.
- Use Event-Time timers for backfills and deterministic testing, and Processing-Time timers solely for wall-clock notifications.
12. Summary
- Timers schedule callbacks based on time domains (Event/Processing).
- Configured using
@on_timer(TimerSpec). - Crucial for session timeouts, heartbeats, and state memory cleanup.