Tagged Outputs
1. Introduction
Tagged Outputs are an advanced routing mechanism in Apache Beam. When a ParDo transform produces multiple outputs via tags, Beam returns a specialized DoOutputsTuple object, which represents the gateway to multi-path DAG branching.
2. Why This Concept Exists
For complex workflows, data processing is not linear. You need to branch your processing pipeline dynamically. While side outputs provide the basic mechanism to emit records with tags, Tagged Outputs define the structured routing API that lets you separate, unpack, type-hint, and consume these multi-branch streams cleanly without mixing the underlying code logic.
3. Key Terminology
- DoOutputsTuple: The dictionary-like container returned by a
ParDowith side outputs, which holds multiple PCollections keyed by their tag names. - Tuple Tag: An identifier used to distinguish different outputs of a single transform.
- Branching DAG: A pipeline graph design where a single stage splits into multiple parallel tracks.
4. How It Works
- When configuring a
ParDothat produces multiple outputs, you apply.with_outputs(*tags, main=main_tag). - The returned object is a
DoOutputsTuple. - You unpack individual PCollections using attribute access (e.g.
results.my_tag) or dictionary indexing (e.g.results['my_tag']). - Each of these unpacked collections is a standard
PCollectionthat can be mapped, grouped, or written to separate databases.
5. Visual Diagram
results.domestic
Write to domestic DB
results.global
Write to global DB
6. Code Example
Branching transaction events by geographical source location:
import apache_beam as beam
class GeoRouter(beam.DoFn):
def process(self, element):
country = element.get("country", "US")
if country == "US":
yield element # Main Output
else:
yield beam.pvalue.TaggedOutput("international", element)
with beam.Pipeline() as p:
txns = p | "CreateTxns" >> beam.Create([
{"id": 1, "country": "US", "amount": 10.0},
{"id": 2, "country": "CA", "amount": 25.0},
{"id": 3, "country": "MX", "amount": 40.0}
])
# Apply ParDo and register the "international" side output tag
routed = txns | "RouteGeo" >> beam.ParDo(GeoRouter()).with_outputs("international", main="domestic")
# Process domestic transactions
routed.domestic | "ProcessDomestic" >> beam.Map(lambda x: print(f"Domestic Txn: {x}"))
# Process international transactions
routed.international | "ProcessIntl" >> beam.Map(lambda x: print(f"International Txn: {x}"))
7. Code Explanation
- The
GeoRouterclass yields non-US transactions toTaggedOutput("international", element). - Chaining
.with_outputs("international", main="domestic")gives us access to attributes.domestic(the main output) and.international(the side output). - Each attribute represents a distinct PCollection that flows down its own path.
8. Real Production Example
Routing system metrics to separate monitoring services:
import apache_beam as beam
class MetricsRouter(beam.DoFn):
def process(self, element):
metric_type = element.get("type")
if metric_type == "cpu":
yield beam.pvalue.TaggedOutput("cpu_metrics", element)
elif metric_type == "memory":
yield beam.pvalue.TaggedOutput("mem_metrics", element)
else:
yield element # Default main stream
with beam.Pipeline() as p:
metrics = p | "LoadMetrics" >> beam.Create([
{"type": "cpu", "usage": 90.0},
{"type": "memory", "usage": 70.0},
{"type": "disk", "usage": 45.0}
])
routed = metrics | "RouteMetrics" >> beam.ParDo(MetricsRouter()).with_outputs("cpu_metrics", "mem_metrics", main="other_metrics")
# Route specific metrics to distinct destinations
routed.cpu_metrics | "WriteCPUMetrics" >> beam.io.WriteToText("cpu-report")
routed.mem_metrics | "WriteMemMetrics" >> beam.io.WriteToText("mem-report")
routed.other_metrics | "WriteOtherMetrics" >> beam.io.WriteToText("other-report")
9. Common Mistakes
- Unpacking Mismatched Variable Lengths: Attempting to unpack the
DoOutputsTupledirectly using tuple assignment likea, b = txns | beam.ParDo(...)instead of referencing the explicit names via dot notation or keys. - Accessing Non-Registered Tags: Accessing an attribute on the
DoOutputsTuplethat was not declared inside the.with_outputs()function, which raises anAttributeError.
10. Interview Perspective
- Question: What object does
ParDo.with_outputs()return, and how do you access individual outputs? - Answer: It returns a
DoOutputsTupleobject. You access individual outputs either by using attribute notation (e.g.tuple_obj.tag_name) or key indexing (e.g.tuple_obj['tag_name']). - Question: Can a side output collect elements of different types than the main output?
- Answer: Yes. Each output branch in a
DoOutputsTupleis a separate PCollection, meaning they can have different schemas, structures, or type hints.
11. Best Practices
- Always define the main output explicitly using the
mainkeyword argument insidewith_outputs()for readability. - Store tag names as string constants if they are referenced across multiple locations or files.
12. Summary
- Tagged Outputs construct multi-path branching pipelines.
- The
with_outputsmethod returns aDoOutputsTuple. - Branches can be processed, transformed, and written to separate targets independently.