Beam Best PracticesEvergreen Article

Implementing Dead Letter Queue (DLQ) in Streams

Published: July 02, 20268 min read

In streaming data pipelines, high availability is paramount. Pipelines must process millions of events continuously without halting. However, bad data is inevitable. Malformed JSON payloads, schema drift, or corrupted bytes will periodically arrive from external sources.

If your pipeline processing code throws an unhandled exception (e.g., trying to parse a corrupt JSON string), the runner will crash and retry the task. In streaming, this creates a Poison Pill scenario: the bad record stays at the head of the ingestion queue, causing the pipeline to crash and retry indefinitely, blocking all subsequent valid events.

To prevent this, you must implement a Dead Letter Queue (DLQ) pattern.


1. Core Architecture of a DLQ

The DLQ pattern separates the processing logic into three steps:

  1. Parse and Validate: Attempt to decode and audit the element inside a try-except block.
  2. Tag Output: If successful, emit to the main output. If an exception occurs, wrap the raw data along with error details (timestamp, exception message) and emit it to a tagged side-output.
  3. Route Sinks: Write valid elements to your primary database (e.g., BigQuery), and write the error records to a separate location (e.g., Cloud Storage or Pub/Sub) for manual review.

2. Implementing a DLQ in Apache Beam

You can implement this in Apache Beam using Tagged Side Outputs (TupleTag).

python
import json
import apache_beam as beam

class ParseAndValidateFn(beam.DoFn):
    def process(self, element):
        try:
            # Attempt to parse json
            parsed = json.loads(element.decode("utf-8"))
            
            # Perform schema check
            if "user_id" in parsed and "event_type" in parsed:
                yield parsed
            else:
                # Route custom validation failures
                yield beam.pvalue.TaggedOutput("dlq", {
                    "error": "Missing required fields",
                    "raw_payload": element.decode("utf-8")
                })
        except Exception as e:
            # Route processing exceptions
            yield beam.pvalue.TaggedOutput("dlq", {
                "error": str(e),
                "raw_payload": element.decode("utf-8")
            })

# Apply inside pipeline
results = (
    raw_messages 
    | "ParseAndValidate" >> beam.ParDo(ParseAndValidateFn()).with_outputs("dlq", main="valid")
)

# Route main output to BigQuery
results.valid | "WriteToBQ" >> WriteToBigQuery("my_table")

# Route DLQ output to GCS
results.dlq | "WriteToDLQ" >> WriteToText("gs://my-bucket/dlq/errors")

3. Best Practices for DLQ Design

  • [ ] Capture Error Metadata: Always attach the error class name, error message, timestamp, and step name to the DLQ payload. This makes debugging simple.
  • [ ] Set Monitoring Alerts: Configure alert notifications based on the rate of records arriving in your DLQ. A sudden spike indicates a breaking change in upstream schemas.
  • [ ] Establish Re-drive Capabilities: Ensure that the data written to the DLQ can be read and "re-driven" (re-processed) back into the pipeline once the processing bugs are resolved.