beginner

JSON Files

6 min readLast updated: 2026-07-01

1. Introduction

JSON (JavaScript Object Notation) is a popular semi-structured format for logs and config files. Apache Beam handles JSON parsing at scale, enabling pipelines to read line-delimited records or whole documents.

2. Why This Concept Exists

Structured data in modern platforms is frequently exchanged as Newline-Delimited JSON (NDJSON) or standard JSON arrays. Beam processes these streams by parsing text inputs into native Python dictionaries, enabling subsequent transformations (e.g. filtering, group-by).

3. Key Terminology

  • NDJSON: Newline-delimited JSON. Each line of the file is a separate valid JSON object.
  • JSON Array: A single document wrapped in square brackets [ ... ] containing multiple objects.
  • json.loads: Python standard library function to convert a JSON string into a dictionary.

4. How It Works

  • NDJSON: ReadFromText reads each line as a string, and json.loads converts the string to a dictionary.
  • JSON Array: Custom file readers parse the entire file or use specific JSON splitters.

5. Visual Diagram

NDJSON File
Newline delimited JSON

json.loads Map
String to Object parse

PCollection[Dict]
Parsed JSON dictionaries

6. Code Example

Reading and writing NDJSON:

python
import json
import apache_beam as beam

with beam.Pipeline() as p:
    (p
     | "ReadJSON" >> beam.io.ReadFromText("data/events.json")
     | "ParseJSON" >> beam.Map(json.loads)
     | "FilterErrors" >> beam.Filter(lambda item: item.get("status") == "error")
     | "FormatString" >> beam.Map(json.dumps)
     | "WriteErrors" >> beam.io.WriteToText("data/errors", file_name_suffix=".json")
    )

7. Code Explanation

  • json.loads parses each line string into a Python dict.
  • json.dumps converts the filtered dictionary records back into strings before writing with WriteToText.

8. Real Production Example

Handling malformed JSON files gracefully using a Side Output:

python
import json
import apache_beam as beam

class ParseJsonFn(beam.DoFn):
    def process(self, element):
        try:
            yield json.loads(element)
        except json.JSONDecodeError as e:
            # Yield malformed records to side output
            yield beam.pvalue.TaggedOutput("dead_letter", (element, str(e)))

with beam.Pipeline() as p:
    results = (p | beam.io.ReadFromText("gs://raw-logs/*.json")
                 | beam.ParDo(ParseJsonFn()).with_outputs("dead_letter", main="parsed"))
    
    # Process parsed logs
    results.parsed | "Process" >> beam.Map(print)
    
    # Save errors for investigation
    results.dead_letter | "WriteErrors" >> beam.io.WriteToText("gs://errors/malformed")

9. Common Mistakes

  • Reading standard JSON arrays with ReadFromText: If your JSON file is a single big array formatted with spaces and newlines, ReadFromText will read individual lines (which are not valid JSON objects on their own) and fail.
  • Skipping error validation: Unchecked JSON decoding will crash the entire pipeline if a single corrupted record is encountered.

10. Interview Perspective

  • Question: Why is NDJSON preferred over standard JSON arrays in large pipelines?
  • Answer: NDJSON can be split arbitrarily at newline boundaries. Workers can read separate byte segments in parallel. Standard JSON arrays require parsing the entire file structure sequentially, which bottlenecks throughput.
  • Question: How do you write JSON to a database?
  • Answer: Parse the JSON to a dictionary first, and then map the fields to columns or mutation payloads.

11. Best Practices

  • Always use try-except blocks when calling json.loads inside pipelines.
  • Prefer NDJSON format for file transfers.

12. Summary

  • NDJSON files are read line-by-line using ReadFromText and parsed with json.loads.
  • Side outputs (Dead Letter Queue) prevent pipeline crashes due to invalid JSON.

13. Interactive Challenges

Challenge 1: JSON Parser Map (Beginner)

Write a map transform that takes a JSON string {"user": "Alice", "score": 95} and parses it into a dictionary.

Challenge 2: JSON Serializer Map (Intermediate)

Create a map transform that takes a dictionary record {"id": 1, "active": True} and serializes it to a JSON formatted string.

Challenge 3: Safe JSON Ingestion (Advanced)

Write a DoFn subclass named SafeJsonParser that yields a dictionary for valid JSON strings, but yields nothing (ignoring errors) for invalid JSON strings.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)