File IO
1. Introduction
In Apache Beam, File IO provides a set of patterns and transforms to match, read, and write files. Rather than treating files as static blobs, Beam abstracts them into PCollections of file metadata or readable streams, allowing you to scale processing dynamically across millions of files.
2. Why This Concept Exists
Modern data lakes ingest data constantly in the form of files. Pipelines need to dynamically watch directories, match new files matching specific patterns (e.g., wildcards like *.log), and distribute the reading work across workers. Beam's File IO provides the standard utilities to do this without writing custom directory-crawling code.
3. Key Terminology
- MatchFiles: A transform that accepts a file pattern (like
gs://bucket/*.csv) and outputs a PCollection of file metadata (paths, sizes, etc.). - ReadMatches: A transform that takes file metadata and opens the files, outputting a PCollection of
ReadableFileobjects. - ReadableFile: A Beam representation of a file that allows reading its contents as a stream of bytes or lines.
4. How It Works
MatchFilesis triggered with a glob pattern.- It queries the filesystem (GCS, S3, or Local) and returns metadata for matched files.
ReadMatchesreads each matched file, handling compression and file seek limits.- Workers process individual files or file segments in parallel.
5. Visual Diagram
File Pattern
logs/*.txt (Source)
MatchFiles
PCollection[Metadata] (paths, sizes)
ReadMatches
PCollection[ReadableFile] (streams)
Process Chunks
Reads & parses in parallel
6. Code Example
Matching files and printing their metadata:
import apache_beam as beam
from apache_beam.io import fileio
with beam.Pipeline() as p:
(p
| "MatchLogs" >> fileio.MatchFiles("data/logs/*.log")
| "ReadMatched" >> fileio.ReadMatches()
| "PrintPaths" >> beam.Map(lambda readable_file: print(f"File: {readable_file.metadata.path}, Size: {readable_file.metadata.size_in_bytes}"))
)
7. Code Explanation
fileio.MatchFiles("data/logs/*.log")looks for files matching the wildcard pattern.fileio.ReadMatches()opens the matched files.- The resulting objects are of type
ReadableFile, exposing.metadata.pathand.metadata.size_in_bytesproperties.
8. Real Production Example
Watching a storage bucket for dynamic, newly arriving files and mapping their content:
import apache_beam as beam
from apache_beam.io import fileio
def process_file(readable_file):
# Open the file and read text line by line
with readable_file.open() as f:
for line in f:
yield line.decode("utf-8").strip()
with beam.Pipeline() as p:
lines = (p
| "MatchNewFiles" >> fileio.MatchFiles("gs://company-bucket/incoming/*")
| "ReadMatches" >> fileio.ReadMatches()
| "ExtractContent" >> beam.FlatMap(process_file)
)
9. Common Mistakes
- Using standard open inside maps: Running
open(file_path)inside a map function can cause performance bottlenecks. Always use Beam'sReadableFile.open()to ensure proper credentials and compression handling. - Assuming files exist permanently: Files matching the pattern might be deleted by the time the worker processes them. Make sure to handle potential File Not Found exceptions gracefully.
10. Interview Perspective
- Question: Why use
MatchFilesandReadMatchesinstead of a standardReadFromText? - Answer:
ReadFromTextis optimized for static lists of flat files.MatchFilesallows you to dynamically stream matches, watch files continuously, and extract custom binary or tabular formats from files. - Question: How does Beam determine file sizes for parallel splits?
- Answer: The
FileSystemswrapper queries the storage provider's catalog, returning file sizes in metadata, which allows Beam to distribute the files among workers.
11. Best Practices
- Use
fileio.MatchFileswhen handling dynamic files. - Avoid loading whole files into memory; stream files line-by-line or chunk-by-chunk using generator functions.
12. Summary
- Beam File IO allows dynamic matching of file patterns.
- Converts filenames into
ReadableFilestreams. - Prevents performance bottlenecking by parallelizing file reads.