Serialization
1. Introduction
Serialization (often called pickling in Python) is the process of converting program code, function closures, and data objects into a byte stream. This byte stream is then transmitted over the network to execution workers, where it is deserialized back into memory objects.
2. Why This Concept Exists
Apache Beam is a distributed framework. The code you write executes on a master computer (driver node), but the operations are executed by a fleet of distinct worker machines. For workers to process elements, they must receive the actual Python code (functions, classes) and the data elements in a format that can cross network boundaries.
3. Key Terminology
- Serialization / Pickling: Packaging code/data into bytes.
- Deserialization / Unpickling: Restoring bytes back into code/data in worker memory.
- Closure: A function that carries references to variables defined in its outer scope.
- Dill / Pickle: The library engines used by Python Beam to serialize complex code objects.
4. How It Works
- When you launch a pipeline, Beam serialize your
DoFnand lambda functions usingdill. - These serialized packages are sent to remote workers.
- As data elements travel through the pipeline, they are serialized using
Coders(e.g., converting strings to UTF-8 bytes) to flow between workers.
5. Visual Diagram
Driver Node
User Python Code
Network Bytes
Sent over network
Worker Node
Runs live Worker code
6. Code Example
Demonstrating how a helper class must be serializable to be passed inside a transform:
import apache_beam as beam
class Multiplier:
def __init__(self, factor):
self.factor = factor
def multiply(self, value):
return value * self.factor
with beam.Pipeline() as p:
mult = Multiplier(10)
(p
| "CreateNums" >> beam.Create([1, 2, 3])
| "ApplyMult" >> beam.Map(mult.multiply)
| "Print" >> beam.Map(print))
7. Code Explanation
- The
Multiplierobject is instantiated on the driver node. - When
beam.Map(mult.multiply)is applied, Beam serialize both the function reference and the instance data (self.factor = 10). - The worker deserializes the instance and executes the multiplication successfully.
8. Real Production Example
Handling non-serializable objects (like live database clients) by instantiating them on the worker instead of the driver:
import apache_beam as beam
class DBLookupDoFn(beam.DoFn):
def __init__(self, connection_url):
# Url string is serializable
self.connection_url = connection_url
self.client = None
def setup(self):
# Setup runs on the worker AFTER deserialization.
# This is where we create non-serializable objects.
self.client = mock_db_connect(self.connection_url)
def process(self, element):
yield self.client.fetch_data(element)
def mock_db_connect(url):
class Client:
def fetch_data(self, el):
return f"Data for {el}"
return Client()
9. Common Mistakes
- Initializing Clients in
__init__: Creating database connections, file handles, or network sockets inside the__init__constructor of aDoFn. This will cause aPicklingErrorbecause open connections cannot be converted to bytes. - Capturing Large Contexts in Lambdas: Reference variables outside your lambda function can cause Beam to attempt to serialize unnecessary parts of your local environment, leading to massive payloads or errors.
10. Interview Perspective
- Question: Why does Beam fail with
PicklingError: Can't pickle local object...? - Answer: This happens when you try to pass an object that cannot be serialized (like an open file pointer, database connection, or a nested/local class definition) from the driver to a worker.
- Question: What is the purpose of the
setup()method in aDoFn? - Answer: The
setup()method is called on the worker immediately before it processes any data, making it the correct place to initialize non-serializable resources like database connections.
11. Best Practices
- Never declare socket, thread pool, or file connection variables in
__init__. - Keep mapping functions clean of external object references.
- Use simple primitives or NamedTuples for data objects traversing PCollections.
12. Summary
- All code and data traveling to workers must be serializable.
- Non-serializable dependencies must be initialized in
setup()orstart_bundle(). - Avoid capturing large local variables in closures.