Flatten (Merging)
1. Introduction
The Flatten transform merges multiple separate PCollections of the same data type into a single unified PCollection. It acts as a union operator.
2. Why This Concept Exists
Data pipelines often ingest similar data from different sources (such as active logs from a web server and historical logs from cold storage). To run the exact same clean and aggregate steps on both datasets, you do not want to write duplicate pipeline code. Flatten lets you merge the separate streams into one PCollection so they can flow through the same downstream transforms.
3. Key Terminology
- Flatten: The pipeline transform class (
beam.Flatten). - PCollectionList: A container class (
beam.PCollectionList) that groups multiple PCollections together before passing them to the Flatten operator.
4. How It Works
- You collect multiple PCollections into a list:
beam.PCollectionList.of([pcoll1, pcoll2]). - Apply the
beam.Flatten()transform to the list. - The output PCollection contains all elements from all input collections combined. No shuffling or key groupings are executed (extremely fast!).
5. Visual Diagram
PCollection A
[1, 2]
PCollection B
[3, 4]
Output PCollection
[1, 2, 3, 4]
6. Code Example
Merging names from two different sources:
import apache_beam as beam
with beam.Pipeline() as p:
web_users = p | "WebUsers" >> beam.Create(["Alice", "Bob"])
app_users = p | "AppUsers" >> beam.Create(["Charles", "David"])
# 1. Group into a PCollectionList
users_list = beam.PCollectionList.of([web_users, app_users])
# 2. Flatten into a single PCollection
all_users = users_list | "MergeUsers" >> beam.Flatten()
7. Code Explanation
beam.PCollectionList.of([web_users, app_users])creates a list reference of the collections.beam.Flatten()merges them.- The resulting
all_usersPCollection contains elements:["Alice", "Bob", "Charles", "David"].
8. Real Production Example
In log warehousing, you ingest logs from three separate GCP regions. Each region reads from local Pub/Sub subscriptions in parallel. You merge the regional streams into a single global stream using Flatten before writing records to a centralized BigQuery table.
9. Common Mistakes
- Flattening mismatched element types: If you try to merge a PCollection of integers with a PCollection of complex dictionaries, Beam will compile but downstream transforms (like string modifications) will crash. Ensure elements have matching schemas before flattening.
- Using Flatten on lists of lists: If you have a single PCollection containing list objects (e.g.
[[1, 2], [3]]), useFlatMap, notFlatten.Flattenmerges separate PCollections, not nested lists inside one collection.
10. Interview Perspective
- Question: Does Flatten perform sorting or deduplication?
- Answer: No.
Flattenis a simple merge. It does not sort elements, remove duplicates, or guarantee order. If you need duplicate removal, applybeam.Distinct()downstream. - Question: How does Flatten perform in terms of cluster execution cost?
- Answer: It is extremely cheap.
Flattenis a metadata-only operation for many runners. It does not require moving data over network lines (shuffling). It simply directs upstream workers to feed their outputs into the same downstream step.
11. Best Practices
- Ensure all PCollections share identical element type hints before flattening.
- Combine sources early using
Flattento keep your pipeline graph clean and DRY (Don't Repeat Yourself).
12. Summary
Flattenmerges multiple PCollections of the same type.- Uses
beam.PCollectionList.of([pcoll1, pcoll2])wrappers. - Performs no sorting, deduplication, or network shuffling.