FlatMap Transform
1. Introduction
The FlatMap transform is a one-to-many mapping step in Apache Beam. It takes each element from an input PCollection, applies a mapping function that returns a list (or iterable) of items, and "flattens" those items into individual records in the output PCollection.
2. Why This Concept Exists
While Map converts 1 input element into exactly 1 output element, you frequently need to expand or discard elements. For example, you might want to split a paragraph string into a collection of individual words, or parse a list of items inside a shopping cart. FlatMap allows you to output any number of elements (zero, one, or multiple) for each incoming element.
3. Key Terminology
- One-to-Many: Outputting zero, one, or multiple elements per input.
- Flattening: Taking an iterable output (like a list
[A, B]) and unpacking it so elements flow as individual records instead of a nested container.
4. How It Works
- The mapping function returns an iterable (e.g. list, set, or generator).
FlatMaploops through this iterable and emits each item individually.- If the function returns an empty list
[], nothing is emitted (acting as a filter!).
5. Visual Diagram
Input
[ "a b", "c", "" ]
FlatMap (split)
"a b" ➔ ["a", "b"] | "c" ➔ ["c"] | "" ➔ []
Output
[ "a", "b", "c" ]
6. Code Example
Splitting comma-separated strings into individual records:
import apache_beam as beam
with beam.Pipeline() as p:
csv_rows = p | beam.Create(["apple,banana", "cherry", ""])
individual_fruits = csv_rows | "SplitFruits" >> beam.FlatMap(lambda x: x.split(",") if x else [])
7. Code Explanation
FlatMapexecutes the split logic on each line."apple,banana"returns["apple", "banana"]. FlatMap flattens this, yielding two records."cherry"returns["cherry"], yielding one record.""returns[], yielding zero records (skipped).- Output collection contains:
["apple", "banana", "cherry"].
8. Real Production Example
When parsing nested JSON invoices, you use FlatMap to extract individual order items so you can perform product-level metrics downstream:
order_items = invoices | "GetItems" >> beam.FlatMap(lambda invoice: invoice.get("line_items", []))
9. Common Mistakes
- Using Map instead of FlatMap: Calling
beam.Map(lambda x: x.split(","))yields a PCollection of lists:[["apple", "banana"], ["cherry"]]. Ensure you useFlatMapto get flat elements. - Forgetting to return an iterable: Your custom function in
FlatMapmust return a list, generator, or tuple. Returning a single float or integer will raise aTypeErrorexception.
10. Interview Perspective
- Question: Can
FlatMapact as a filter? - Answer: Yes. If the predicate function returns an empty list
[]or list generator yielding nothing, the record is skipped, achieving identical behavior toFilter. - Question: How does
FlatMapcompile inside the SDK? - Answer:
FlatMapis mapped to aParDotransform executing a wrapper DoFn that loops through the iterable outputs.
11. Best Practices
- Use python generators (using
yieldinstead of returning lists) insideFlatMapcustom functions to reduce worker memory consumption. - Ensure default values (like empty lists
[]) are returned if keys are missing from dictionary inputs.
12. Summary
- Applies a 1-to-many mapping function.
- Flattens list/iterable outputs into flat PCollections.
- Crucial for splitting, expanding, and unpacking nested data records.