advanced

Tagged Outputs

9 min readLast updated: 2026-07-01

1. Introduction

Tagged Outputs are an advanced routing mechanism in Apache Beam. When a ParDo transform produces multiple outputs via tags, Beam returns a specialized DoOutputsTuple object, which represents the gateway to multi-path DAG branching.

2. Why This Concept Exists

For complex workflows, data processing is not linear. You need to branch your processing pipeline dynamically. While side outputs provide the basic mechanism to emit records with tags, Tagged Outputs define the structured routing API that lets you separate, unpack, type-hint, and consume these multi-branch streams cleanly without mixing the underlying code logic.

3. Key Terminology

  • DoOutputsTuple: The dictionary-like container returned by a ParDo with side outputs, which holds multiple PCollections keyed by their tag names.
  • Tuple Tag: An identifier used to distinguish different outputs of a single transform.
  • Branching DAG: A pipeline graph design where a single stage splits into multiple parallel tracks.

4. How It Works

  • When configuring a ParDo that produces multiple outputs, you apply .with_outputs(*tags, main=main_tag).
  • The returned object is a DoOutputsTuple.
  • You unpack individual PCollections using attribute access (e.g. results.my_tag) or dictionary indexing (e.g. results['my_tag']).
  • Each of these unpacked collections is a standard PCollection that can be mapped, grouped, or written to separate databases.

5. Visual Diagram

PCollection Input

results.domestic
Write to domestic DB

results.global
Write to global DB

6. Code Example

Branching transaction events by geographical source location:

python
import apache_beam as beam

class GeoRouter(beam.DoFn):
    def process(self, element):
        country = element.get("country", "US")
        if country == "US":
            yield element # Main Output
        else:
            yield beam.pvalue.TaggedOutput("international", element)

with beam.Pipeline() as p:
    txns = p | "CreateTxns" >> beam.Create([
        {"id": 1, "country": "US", "amount": 10.0},
        {"id": 2, "country": "CA", "amount": 25.0},
        {"id": 3, "country": "MX", "amount": 40.0}
    ])
    
    # Apply ParDo and register the "international" side output tag
    routed = txns | "RouteGeo" >> beam.ParDo(GeoRouter()).with_outputs("international", main="domestic")
    
    # Process domestic transactions
    routed.domestic | "ProcessDomestic" >> beam.Map(lambda x: print(f"Domestic Txn: {x}"))
    
    # Process international transactions
    routed.international | "ProcessIntl" >> beam.Map(lambda x: print(f"International Txn: {x}"))

7. Code Explanation

  • The GeoRouter class yields non-US transactions to TaggedOutput("international", element).
  • Chaining .with_outputs("international", main="domestic") gives us access to attributes .domestic (the main output) and .international (the side output).
  • Each attribute represents a distinct PCollection that flows down its own path.

8. Real Production Example

Routing system metrics to separate monitoring services:

python
import apache_beam as beam

class MetricsRouter(beam.DoFn):
    def process(self, element):
        metric_type = element.get("type")
        if metric_type == "cpu":
            yield beam.pvalue.TaggedOutput("cpu_metrics", element)
        elif metric_type == "memory":
            yield beam.pvalue.TaggedOutput("mem_metrics", element)
        else:
            yield element # Default main stream

with beam.Pipeline() as p:
    metrics = p | "LoadMetrics" >> beam.Create([
        {"type": "cpu", "usage": 90.0},
        {"type": "memory", "usage": 70.0},
        {"type": "disk", "usage": 45.0}
    ])
    
    routed = metrics | "RouteMetrics" >> beam.ParDo(MetricsRouter()).with_outputs("cpu_metrics", "mem_metrics", main="other_metrics")
    
    # Route specific metrics to distinct destinations
    routed.cpu_metrics | "WriteCPUMetrics" >> beam.io.WriteToText("cpu-report")
    routed.mem_metrics | "WriteMemMetrics" >> beam.io.WriteToText("mem-report")
    routed.other_metrics | "WriteOtherMetrics" >> beam.io.WriteToText("other-report")

9. Common Mistakes

  • Unpacking Mismatched Variable Lengths: Attempting to unpack the DoOutputsTuple directly using tuple assignment like a, b = txns | beam.ParDo(...) instead of referencing the explicit names via dot notation or keys.
  • Accessing Non-Registered Tags: Accessing an attribute on the DoOutputsTuple that was not declared inside the .with_outputs() function, which raises an AttributeError.

10. Interview Perspective

  • Question: What object does ParDo.with_outputs() return, and how do you access individual outputs?
  • Answer: It returns a DoOutputsTuple object. You access individual outputs either by using attribute notation (e.g. tuple_obj.tag_name) or key indexing (e.g. tuple_obj['tag_name']).
  • Question: Can a side output collect elements of different types than the main output?
  • Answer: Yes. Each output branch in a DoOutputsTuple is a separate PCollection, meaning they can have different schemas, structures, or type hints.

11. Best Practices

  • Always define the main output explicitly using the main keyword argument inside with_outputs() for readability.
  • Store tag names as string constants if they are referenced across multiple locations or files.

12. Summary

  • Tagged Outputs construct multi-path branching pipelines.
  • The with_outputs method returns a DoOutputsTuple.
  • Branches can be processed, transformed, and written to separate targets independently.

13. Interactive Challenges

Challenge 1: Accessing Named Tags (Beginner)

Given a DoOutputsTuple named results containing tags 'high' and 'low', write code to extract the PCollection for 'high' using attribute dot notation and write it to text files.

Challenge 2: Split Odd/Even Tuple Unpacking (Intermediate)

Write a complete pipeline block using a custom DoFn that routes odd and even numbers, registers the tags 'odd' and 'even', extracts both from the result container using dictionary lookup syntax, and prints them.

Challenge 3: Multi-Branch Logger Routing (Advanced)

Write a pipeline segment that routes lines of raw text containing log levels. Create a DoFn that routes messages to tags 'critical', 'warning', and 'info'. Set up the pipeline, define the outputs, unpack them, and write each group to its own sharded text files.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)