advanced

Timers & Scheduling

8 min readLast updated: 2026-06-30

1. Introduction

A Timer in Apache Beam allows a stateful DoFn to register a callback at a specific point in time. When that time is reached, the runner triggers a callback method defined inside your class.

2. Why This Concept Exists

Stateful processing allows you to store data, but you also need a way to act based on the passage of time. For example:

  • Timeouts: Alerting if an IoT device fails to send a heartbeat within 5 minutes.
  • Garbage Collection: Clearing old state cells to free memory if a user session goes inactive.
  • Window overrides: Forcing results to emit at exact intervals. Timers provide the mechanism to schedule and trigger these time-based operations.

3. Key Terminology

  • Event-Time Timers: Fire based on watermark progress. Fired only when the stream's watermark passes the registered timestamp.
  • Processing-Time Timers: Fire based on real-world wall-clock time on the worker machine.
  • Timer Spec: The declaration of the timer slot (TimerSpec).
  • On-Timer Callback: The method annotated with @on_timer that executes when the timer fires.

4. How It Works

  1. Declare: Define a TimerSpec at the class level of your stateful DoFn.
  2. Schedule: Inside process(), set the timer to a specific timestamp using timer.set(timestamp).
  3. Execute: When the target time arrives, Beam executes the callback method associated with that timer.

5. Visual Diagram

Element Ingested (12:00)
Sets timer at 12:15

Watermark moves: 12:15
Call @on_timer

6. Code Example

Setting an event-time timer to fire 5 minutes after an element's timestamp:

python
import apache_beam as beam
from apache_beam.state import TimerSpec, ValueStateSpec
from apache_beam.coders import VarIntCoder
from apache_beam.transforms.trigger import TimeDomain

class TimeoutAlertDoFn(beam.DoFn):
    # 1. Declare state and timer specs
    LAST_SEEN_STATE = ValueStateSpec("last_seen", VarIntCoder())
    ALERT_TIMER = TimerSpec("alert_timer", TimeDomain.EVENT_TIME)

    def process(self, element, 
                timestamp=beam.DoFn.TimestampParam,
                last_seen=beam.DoFn.StateParam(LAST_SEEN_STATE),
                timer=beam.DoFn.TimerParam(ALERT_TIMER)):
        key, val = element
        
        # Save timestamp
        last_seen.write(int(timestamp))
        
        # 2. Schedule timer to fire 5 minutes (300 seconds) in event time
        timer.set(int(timestamp) + 300)
        yield element

    # 3. Define callback method triggered when timer fires
    @on_timer(ALERT_TIMER)
    def trigger_alert(self, key = beam.DoFn.KeyParam):
        # Fire alert payload
        yield (key, "ALERT: No events seen in last 5 minutes!")

7. Code Explanation

  • ALERT_TIMER = TimerSpec("alert_timer", TimeDomain.EVENT_TIME) creates an Event-Time timer cell.
  • timer.set(...) schedules the callback execution at 5 minutes past the element's timestamp.
  • @on_timer(ALERT_TIMER) binds the trigger_alert function to the timer, receiving the corresponding key parameter.

8. Real Production Example

In session tracking, you store active cart contents in BagState. When a user exits the site, you want to clear their cart after 30 minutes of inactivity. You set an event-time timer for last_activity_time + 1800. If no new event resets the timer within 30 minutes, the timer fires, emails the user about their abandoned cart, and clears the state.

9. Common Mistakes

  • Setting Timers without keys: Like stateful processing, timers require key-value inputs. Applying them to non-keyed data throws a validation crash.
  • Neglecting timezone offsets: When using processing-time timers, be careful with system local time zones on worker VMs. Always stick to UTC.

10. Interview Perspective

  • Question: What happens if you call timer.set() multiple times for the same key?
  • Answer: Calling set() overwrites the existing timer for that key/window. Only the last scheduled timestamp is active. This is ideal for resetting inactivity timeouts.
  • Question: Can a timer output data?
  • Answer: Yes. The @on_timer callback method has the exact same capabilities as process()—it can yield elements to downstream collections.

11. Best Practices

  • Always clear state cells associated with a key inside the timer callback to prevent leaks.
  • Use Event-Time timers for backfills and deterministic testing, and Processing-Time timers solely for wall-clock notifications.

12. Summary

  • Timers schedule callbacks based on time domains (Event/Processing).
  • Configured using @on_timer(TimerSpec).
  • Crucial for session timeouts, heartbeats, and state memory cleanup.

13. Interactive Challenges

Challenge 1: Processing-Time Timeout Alert (Beginner)

Define a DoFn class named HeartbeatAlertDoFn that sets a processing-time timer to fire exactly 10 seconds after processing a record.

Challenge 2: Inactivity Timer Resetter (Intermediate)

Write a DoFn class named InactivitySessionDoFn that uses a ValueStateSpec to hold session activity, and sets a 60-second event-time timer that gets reset (pushed forward) every time a new event for that key arrives.

Challenge 3: State Garbage Collector (Advanced)

Define a DoFn that accumulates integers into BagStateSpec and sets an event-time timer at the end of the window. When the timer fires, it yields the average of all buffered values and clears the state.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)