Checkpointing Concepts
1. Introduction
Checkpointing is the core operational mechanism used by distributed stream processing runners to capture consistent, global snapshots of a running pipeline's state. It is the foundation of fault tolerance and exactly-once processing in streaming systems.
2. Why This Concept Exists
In batch systems, a worker failure is resolved by re-running the dataset partition from the beginning. In an unbounded streaming system, this is impossible because the stream never ends. Checkpointing allows the runner to save a periodic backup of the system's "save state" (including read offsets, active windows, user states, and in-flight records). When a crash occurs, the pipeline recovers by rolling back to this recent snapshot, preventing the need to re-process historical stream data.
3. Key Terminology
- State Snapshot: The recorded state of a worker's memory (e.g., active accumulators) saved to durable storage.
- Checkpoint Barrier: Special control messages injected by the runner into the data stream to demarcate the boundaries between checkpoints.
- In-flight Messages: Records that have been read from the source but are still undergoing processing inside the transforms.
- Incremental Checkpointing: An optimization where only the changes (deltas) in state since the last checkpoint are written to storage, rather than writing the entire state again.
4. How It Works
Most modern Beam runners use a variant of the Chandy-Lamport algorithm for distributed checkpointing:
- Barrier Ingestion: The runner's coordinator injects a checkpoint barrier into the input streams.
- Barrier Propagation: The barrier flows through the pipeline alongside data elements. When a transform receives the barrier, it temporarily pauses processing inputs from other channels until all barriers align.
- State Snapshotting: The worker writes the transform's current local state to durable storage (e.g. GCS, S3, or HDFS).
- Acknowledgment: Once the state is written, the worker forwards the barrier to downstream transforms and sends an acknowledgment back to the coordinator.
- Commit: When the barrier reaches the sinks and all workers have acknowledged, the checkpoint is declared complete, and the runner commits the corresponding input offsets back to the message broker.
5. Visual Diagram
Barrier (CP #5)
Triggers local worker snapshot
Durable State Store
Memory backups in GCS / HDFS
6. Code Example
The following code demonstrates how to configure runner-specific checkpoint options inside Apache Beam's PipelineOptions when using the Flink Runner:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run_flink_pipeline():
# Configure pipeline options, targeting the Flink runner in streaming mode
options = PipelineOptions([
"--runner=FlinkRunner",
"--streaming=True",
"--flink_master=localhost:8081",
# 1. Set the checkpointing interval in milliseconds (e.g., every 30 seconds)
"--checkpointing_interval=30000",
# 2. Configure exactly-once checkpointing mode
"--checkpointing_mode=EXACTLY_ONCE",
# 3. Define the backend state storage location
"--state_backend=rocksdb",
"--state_backend_storage_path=hdfs:///flink/checkpoints"
])
with beam.Pipeline(options=options) as p:
(
p
| "CreateMock" >> beam.Create([1, 2, 3]) # In real apps, replace with ReadFromKafka/PubSub
| "Process" >> beam.Map(lambda x: x * 2)
| "Log" >> beam.Map(print)
)
7. Code Explanation
--runner=FlinkRunnerdesignates Apache Flink as our execution engine.--checkpointing_interval=30000tells Flink to initiate a new global checkpoint barrier every 30 seconds (30,000 milliseconds).--checkpointing_mode=EXACTLY_ONCEconfigures Flink to align barriers, guaranteeing exactly-once semantics inside the pipeline.--state_backend=rocksdbconfigures Flink to store active state in local RocksDB databases on each worker, offloading snapshot files to the designated--state_backend_storage_pathpath.
8. Real Production Example
An online transaction platform runs an Apache Beam pipeline on Flink to monitor checkout logs. The Flink runner is configured to snapshot state to an AWS S3 bucket. At 14:05, a network routing switch fails, knocking out 2 of the 10 worker nodes. The Flink orchestrator detects the failure, launches replacement containers, downloads the state of the active checkouts from the 14:04:30 S3 checkpoint snapshot, and resumes processing from the corresponding Kafka offsets, avoiding any payment processing gaps.
9. Common Mistakes
- Excessive Checkpoint Frequency: Setting the checkpoint interval too low (e.g., 500 milliseconds) causes workers to spend more time snapshotting state and writing to storage than processing data. This leads to backpressure, where processing falls behind ingestion.
- Mismatched Checkpoint Storage: Storing checkpoints on the local disk of the worker virtual machines. If a VM crashes or is deleted, the checkpoint data is lost. Checkpoints must always be written to shared, durable storage (like GCS, S3, or HDFS).
10. Interview Perspective
- Question: What is the difference between aligned and unaligned checkpoints?
- Answer: In aligned checkpointing, a transform waits for barriers to arrive from all input channels before writing its state snapshot. In unaligned checkpointing, the transform writes its state and buffers in-flight records immediately when the first barrier arrives, reducing barrier alignment lag at the cost of larger checkpoint files.
- Question: Why are checkpoints not the same as database backups?
- Answer: Checkpoints are internal system snapshots designed for crash recovery, containing raw binary states optimized for fast recovery. Database backups are clean, structured exports meant for long-term historical records and query access.
11. Best Practices
- Use incremental checkpointing (e.g. RocksDB incremental snapshots) for pipelines that maintain large user states to reduce write size and network overhead.
- Set the checkpointing interval between 30 seconds and 5 minutes to balance recovery time with pipeline performance.
- Ensure the checkpoint storage system is geographically located near your processing cluster to avoid network bottlenecking.
12. Summary
- Checkpointing provides consistent state snapshots for streaming recovery.
- Uses barrier propagation (Chandy-Lamport) to align states.
- Must be saved to durable, distributed storage systems.