In streaming data pipelines, high availability is paramount. Pipelines must process millions of events continuously without halting. However, bad data is inevitable. Malformed JSON payloads, schema drift, or corrupted bytes will periodically arrive from external sources.
If your pipeline processing code throws an unhandled exception (e.g., trying to parse a corrupt JSON string), the runner will crash and retry the task. In streaming, this creates a Poison Pill scenario: the bad record stays at the head of the ingestion queue, causing the pipeline to crash and retry indefinitely, blocking all subsequent valid events.
To prevent this, you must implement a Dead Letter Queue (DLQ) pattern.
The DLQ pattern separates the processing logic into three steps:
try-except block.You can implement this in Apache Beam using Tagged Side Outputs (TupleTag).
import json
import apache_beam as beam
class ParseAndValidateFn(beam.DoFn):
def process(self, element):
try:
# Attempt to parse json
parsed = json.loads(element.decode("utf-8"))
# Perform schema check
if "user_id" in parsed and "event_type" in parsed:
yield parsed
else:
# Route custom validation failures
yield beam.pvalue.TaggedOutput("dlq", {
"error": "Missing required fields",
"raw_payload": element.decode("utf-8")
})
except Exception as e:
# Route processing exceptions
yield beam.pvalue.TaggedOutput("dlq", {
"error": str(e),
"raw_payload": element.decode("utf-8")
})
# Apply inside pipeline
results = (
raw_messages
| "ParseAndValidate" >> beam.ParDo(ParseAndValidateFn()).with_outputs("dlq", main="valid")
)
# Route main output to BigQuery
results.valid | "WriteToBQ" >> WriteToBigQuery("my_table")
# Route DLQ output to GCS
results.dlq | "WriteToDLQ" >> WriteToText("gs://my-bucket/dlq/errors")