Project Structure
1. Introduction
When building production data pipelines, writing code inside a single script file is not maintainable. A production-grade Apache Beam project requires a structured directory tree that isolates pipeline logic, configuration parameters, reusable transformations, and unit tests, and packages dependencies for remote worker distribution.
2. Why This Concept Exists
When a pipeline runs on a distributed cluster (such as Apache Flink or Google Cloud Dataflow), the runner packages your code and sends it to multiple worker VMs. If your pipeline imports custom local helper classes or external libraries, remote workers will crash with a ModuleNotFoundError because they lack those files.
Creating a standardized project structure with a setup.py configuration solves this by packaging your project code as a redistributable module that the runner installs on all remote workers.
3. Key Terminology
setup.py: A Python metadata script used to package the pipeline and distribute custom modules to cluster worker nodes.requirements.txt: Outlines the external Python libraries (such aspandasorrequests) that must be installed on worker environments.TestPipeline: A specialized pipeline runner provided by the Beam SDK for testing pipeline transforms and verifying outputs using assertions.
4. How It Works
- Divide Code: Separate entry points (executables) from transformation modules.
- Write
setup.py: Configure packages using Python's standardsetuptools. - Build Tests: Write unit tests in a dedicated folder using standard testing frameworks (like
pytestorunittest). - Package for Execution: When submitting the pipeline, pass the
--setup_fileoption to tell the runner to package and install the custom code on worker VMs before processing data.
5. Visual Diagram
A typical production-ready directory structure:
📂 my_beam_project/
📄 setup.py
📄 requirements.txt
📄 run_pipeline.py
📂 pipelines/
📄 main_pipeline.py
📂 transforms/
📄 cleaning.py
📂 tests/
📄 test_cleaning.py
6. Code Example
Here is a sample setup.py script configuration:
# setup.py
import setuptools
setuptools.setup(
name="my_beam_project",
version="1.0.0",
install_requires=[
"pandas>=2.0.0",
"requests>=2.28.0"
],
packages=setuptools.find_packages(),
)
And here is a unit test in tests/test_cleaning.py that utilizes TestPipeline:
# tests/test_cleaning.py
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
def clean_name(name):
return name.strip().lower()
class TestCleaningTransforms(unittest.TestCase):
def test_clean_name(self):
# Create a TestPipeline context specifically designed for assertions
with TestPipeline() as p:
inputs = p | beam.Create([" ALICE ", "Bob ", " charlie"])
outputs = inputs | beam.Map(clean_name)
# Verify outputs match expectations
assert_that(outputs, equal_to(["alice", "bob", "charlie"]))
if __name__ == "__main__":
unittest.main()
7. Code Explanation
- The
setup.pyusessetuptools.setupto automatically package the subfolders (pipelines/,transforms/) and declare external dependencies. - In the test code, we import
TestPipelineinstead of the standardbeam.Pipeline. - We use the
assert_thatutility and theequal_tomatch checker. These run assertions inside the pipeline graph executor on the workers, ensuring that the test fails if the output elements do not match.
8. Real Production Example
In a CI/CD deployment pipeline, a Git action runs pytest on the tests/ directory to run all pipeline unit tests. If the tests pass, the pipeline deployment script executes the entry script:
python run_pipeline.py \
--runner=DataflowRunner \
--project=my-gcp-project \
--setup_file=./setup.py \
--staging_location=gs://my-bucket/staging
The --setup_file=./setup.py flag ensures that the worker nodes dynamically import custom transformations during cluster execution.
9. Common Mistakes
- Missing
__init__.pyfiles: Forgetting to add empty__init__.pyfiles to subdirectories. Without these, Python will not treat folders as packages, andfind_packages()will fail to package modules. - Using standard
assertstatements: Using standard Python assertions (e.g.assert value == "expected") inside unit tests to verify PCollection values. A PCollection is a distributed object; standard assertions will fail or run improperly. You must always useassert_that(pcoll, equal_to(...)).
10. Interview Perspective
- Question: Why is the
--setup_fileflag needed when submitting a job to a runner like Dataflow? - Answer: Standard Python pipelines read local directories on execution submission. However, when the job starts on cluster nodes, workers do not have access to those local packages.
--setup_filepackages the local module as a tarball, uploads it to staging, and installs it on each worker machine. - Question: What is the difference between
TestPipelineand a regularPipeline? - Answer:
TestPipelineis a subclass designed for testing. It automates capturing pipeline execution statuses, allows testing state transitions, and hooks intoassert_thatto verify node data without writing outputs to external disks.
11. Best Practices
- Never run raw database credentials or API keys inside code folders; manage credentials using secret managers and feed them using options.
- Write unit tests for every custom
DoFnand compositePTransformusingTestPipeline.
12. Summary
- Production pipelines require packaging logic into modules.
setup.pydefines packages and installs remote worker requirements.__init__.pyfiles are mandatory to declare python package folders.- Use
TestPipelineandassert_thatto execute logic assertions.