Top
1. Introduction
The Top transform in Apache Beam is a built-in mathematical combiner used to identify the largest or smallest $N$ elements in a PCollection. You can calculate these top elements globally across the entire collection or retrieve the top $N$ items per key.
2. Why This Concept Exists
In big data analytics, finding extreme values—such as the top 10 highest-selling items, the bottom 5 slowest network routes, or the top 3 scores per class—is a common requirement. Sorting the entire dataset is extremely expensive, introducing high network and memory costs. Top optimizes this process by sorting and pruning elements locally on workers, transferring only the candidate top items.
3. Key Terminology
- Top.Largest(N): A combiner that returns the $N$ largest elements in the PCollection.
- Top.Smallest(N): A combiner that returns the $N$ smallest elements in the PCollection.
- Top.PerKey(N, compare_fn): Returns the top $N$ values for each key in a key-value PCollection based on a comparison function.
4. How It Works
- Local Bounded Heap: Each worker node maintains a local bounded priority queue (heap) of size $N$ for its partition.
- Pruning: As elements are read, the worker updates the heap. It discards any element smaller than the current $N$-th largest element, keeping worker memory usage bounded.
- Shuffle: Only the top $N$ elements from each worker partition are sent over the network to the combiner node.
- Final Merge: The combiner node merges the workers' heaps and outputs a single list of the top $N$ elements.
5. Visual Diagram
Top 2: [80, 12]
Top 2: [90, 45]
Global Top 2 Result
[90, 80]
6. Code Example
Finding the 3 largest numbers and the top score per student:
import apache_beam as beam
with beam.Pipeline() as p:
scores = p | "Scores" >> beam.Create([55, 92, 84, 100, 67])
# 1. Get Top 3 Largest globally
top_three = scores | "Top3" >> beam.combiners.Top.Largest(3)
# 2. Get Top Scores per student (KV)
student_scores = p | "StudentScores" >> beam.Create([
("Alice", 78),
("Bob", 95),
("Alice", 98),
("Bob", 88)
])
top_per_student = student_scores | "TopPerStudent" >> beam.combiners.Top.PerKey(1)
7. Code Explanation
top_threeyields a single list element inside a PCollection:[[100, 92, 84]].top_per_studentevaluates scores per key, returning a list of values:[("Alice", [98]), ("Bob", [95])].
8. Real Production Example
For an e-commerce platform, you can calculate the top 5 highest-grossing products every hour by mapping transactions to (product_id, revenue), summing the revenue per product, and then using Top.Largest(5) on the resulting totals.
9. Common Mistakes
- Assuming flat output:
Top.Largest(N)outputs a PCollection containing a single list of elements (e.g.[[elem1, elem2]]). It does not output elements as flat individual values. If you want to process them flat, you must applybeam.FlatMap(lambda x: x)downstream. - Using large values of N: If you set $N$ to a very large number (e.g., $N=1,000,000$), workers will have to maintain huge heaps, causing high memory usage and latency.
10. Interview Perspective
- Question: Why is Top more performant than using GroupByKey followed by a custom sort?
- Answer: Because
Topuses bounded heaps to prune elements locally on workers. This reduces the network shuffle volume from $O(Total)$ to $O(N \times Workers)$, whereas GroupByKey shuffles all elements first. - Question: Can I use Top with custom objects?
- Answer: Yes, you can specify a custom comparison function or key function (e.g.,
beam.combiners.Top.Of(N, key=lambda x: x.price)).
11. Best Practices
- Ensure that the elements being compared implement comparison operators (
<,>) or pass a customkeyargument to define custom sort behavior. - Keep $N$ small (typically less than 1,000) to keep memory consumption low.
12. Summary
Topretrieves the largest or smallest elements in a PCollection.- Reduces network traffic by sorting and filtering locally on worker nodes.
- The output is wrapped inside a single-element list PCollection.