Exactly Once Processing
1. Introduction
Exactly-once processing is a processing guarantee in streaming applications. It ensures that even if workers crash, network connections drop, or data is replayed, the final results in external databases or storage systems match what would have occurred if every event was processed exactly once.
2. Why This Concept Exists
In a distributed streaming architecture, the default delivery mode is at-least-once. This means that when failures occur, messages are re-delivered to prevent data loss. If you are calculating financial balances, counting inventory, or billing clients, duplicate processing can have severe business consequences. Implementing exactly-once processing prevents duplicate outputs from affecting downstream databases.
3. Key Terminology
- At-Least-Once: Guarantees that no messages are lost, but duplicates may occur.
- Exactly-Once: Guarantees that the final state of the system reflects exactly one execution of each message.
- Idempotent Write: A write operation that can be executed multiple times with the same input, resulting in the same final state (e.g., database upserts using a primary key).
- Two-Phase Commit (2PC): A protocol used to coordinate transaction commits across distributed workers and external storage.
4. How It Works
Achieving exactly-once processing requires a combination of three factors:
- Deduplicated Sources: The source must allow re-reading messages and provide unique message identifiers (e.g., Kafka offsets or Pub/Sub message IDs).
- Stateful Checkpointing: The runner (like Dataflow or Flink) tracks message consumption offsets and internal states within synchronized checkpoints.
- Transactional or Idempotent Sinks: The sink must support writing data transactionally (committing only when the checkpoint succeeds) or idempotently (overwriting instead of appending duplicates).
5. Visual Diagram
Durable Source
Replay (Kafka Offsets)
Beam Pipeline
Stateful Checkpoints
Idempotent Sink
UPSERT / Two-Phase Commit
6. Code Example
The following code demonstrates how to construct an idempotent write transform that updates database records using a unique record identifier as the key, guaranteeing exactly-once side effects:
import apache_beam as beam
class IdempotentWriteFn(beam.DoFn):
def __init__(self, db_client_config):
self.config = db_client_config
self.db = None
def setup(self):
# Initialize connection to target database
self.db = MockDatabaseClient(self.config)
def process(self, element):
tx_id, amount = element
# Idempotent write: UPSERT (Update or Insert) using tx_id as primary key
# If tx_id is written again during replay, the database state remains correct.
self.db.upsert(
key=tx_id,
value={"amount": amount, "status": "processed"}
)
# Mock Database Client for illustrative purposes
class MockDatabaseClient:
def __init__(self, config):
self.store = {}
def upsert(self, key, value):
self.store[key] = value # Overwrites existing key
7. Code Explanation
IdempotentWriteFnsimulates writing to an external database.- In
process, the database operation is an upsert (update or insert) keyed on the event's unique transaction ID (tx_id). - If a network failure occurs and the runner replays the last batch, the same
tx_idwill be written to the database. Because it is an upsert rather than an append, it overwrites the existing entry, resulting in zero duplicates.
8. Real Production Example
A billing engine streams cellular phone data usage logs to charge customer accounts. The pipeline aggregates byte usage in 1-hour fixed windows. It outputs billing transactions to Google Cloud BigQuery. Using BigQuery's write API with insertion IDs, the runner guarantees that if a network failure replays worker outputs, BigQuery filters the duplicates, billing the customer exactly once.
9. Common Mistakes
- Assuming runner exactly-once equals end-to-end exactly-once: A common misconception is that setting the runner option for exactly-once guarantees duplicate-free records in your database. If your output sink is an append-only SQL database (
INSERT INTO logs ...), replays will still create duplicate rows. The sink must be transactional or idempotent. - Using unstable event IDs: Generating transaction IDs inside the pipeline (e.g. using
uuid.uuid4()) violates determinism. During failure replays, the pipeline will generate different IDs, bypassing the deduplication layer.
10. Interview Perspective
- Question: How does Apache Beam ensure exactly-once semantics inside its shuffle operations?
- Answer: The runner uses stable record IDs and writes shuffle outputs to persistent checkpoints. When a worker fails to read a shuffled partition, the runner replays the shuffled records from the last successful checkpoint, filtering out duplicate IDs at the boundary.
- Question: What is the performance cost of exactly-once compared to at-least-once?
- Answer: Exactly-once requires more frequent state snapshot writes (checkpointing) and coordination protocols (like two-phase commits), which can increase pipeline latency and require more disk/network bandwidth.
11. Best Practices
- Ensure every input event has a unique, deterministic identifier assigned at the moment of generation.
- Prefer using native, well-tested Beam sinks (like
WriteToBigQueryorWriteToJDBCwith transactional configuration) which manage exactly-once commits automatically. - Keep transaction sizes small to prevent locking up target databases during commits.
12. Summary
- Exactly-once ensures processing results are correct despite cluster failures.
- Requires a replayable source, stateful checkpoints, and an idempotent/transactional sink.
- Avoid generating random IDs inside the pipeline; use source-assigned IDs.