advanced

Kafka Streaming

7 min readLast updated: 2026-07-01

1. Introduction

Apache Kafka is an open-source distributed event store and stream processing platform. In enterprise architectures, Kafka often serves as the central log stream. Apache Beam integrates with Kafka using the KafkaIO connector, allowing you to ingest partition-based streams directly into your pipeline.

2. Why This Concept Exists

Kafka is designed for high-throughput, horizontal scalability, and message persistence. Unlike message brokers that delete data upon delivery, Kafka retains messages for a configurable retention window. When integrating Kafka with Apache Beam, the connector must handle dynamic partition discovery, manage offset commits, and translate Kafka's partitioning model into Beam's parallel execution model.

3. Key Terminology

  • Broker: A Kafka server node that stores and serves message streams.
  • Partition: A single ordered sequence of messages on a broker. Topics are divided into partitions for parallel processing.
  • Offset: A unique sequential integer assigned to each message within a partition, used to track consumption progress.
  • Bootstrap Servers: A list of host/port pairs used by the Kafka client to establish initial connections to the cluster.
  • Cross-Language Transform: A Beam feature that allows the Python SDK to execute Java-based transforms (like KafkaIO) by communicating with a Java expansion service.

4. How It Works

In the Python SDK, ReadFromKafka uses a cross-language expansion service because the underlying Kafka client is written in Java.

  1. Partition Reading: Beam spawns readers for each partition. The runner tracks the offsets of each partition.
  2. Watermarking: Beam tracks the event time of records inside Kafka. The watermark of the overall pipeline is determined by the minimum watermark among all active partitions.
  3. Offset Commit: By default, Beam handles offset tracking internally in its checkpoints. You can configure Kafka to commit offsets back to Kafka brokers for external progress visibility.

5. Visual Diagram

Kafka Topic Partitions (0, 1, 2)
Message Broker Data source

Java Expansion Service
Cross-language KafkaIO SDK

▼ (gRPC Serialization)

Worker 1 (Python)
Business logic

Worker 2 (Python)
Business logic

6. Code Example

Here is how to set up a Python Apache Beam pipeline to read from a Kafka cluster, decode the key/value byte pairs, and process them:

python
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(streaming=True)

with beam.Pipeline(options=options) as p:
    (
        p
        # 1. Read from Kafka using a cross-language connector
        | "ReadKafka" >> ReadFromKafka(
            consumer_config={
                "bootstrap.servers": "10.0.0.1:9092,10.0.0.2:9092",
                "group.id": "beam-consumer-group",
                "auto.offset.reset": "earliest"
            },
            topics=["device-telemetry"],
            key_deserializer="org.apache.kafka.common.serialization.StringDeserializer",
            value_deserializer="org.apache.kafka.common.serialization.StringDeserializer"
        )
        # 2. Unpack key and value (tuples of key-value are yielded by KafkaIO)
        | "ExtractKeyValue" >> beam.Map(lambda record: {
            "key": record[0],
            "value": record[1]
        })
        # 3. Log results
        | "LogEvents" >> beam.Map(print)
    )

7. Code Explanation

  • ReadFromKafka is imported from apache_beam.io.kafka. It automatically sets up the local Java expansion service under the hood.
  • consumer_config specifies standard Kafka client configurations: bootstrap.servers, consumer group.id, and auto.offset.reset.
  • key_deserializer and value_deserializer specify the Java class names for decoding raw message payloads (e.g. StringDeserializer or ByteArrayDeserializer).
  • The output of the read step is a PCollection of key-value tuples: (key_bytes_or_string, value_bytes_or_string).

8. Real Production Example

A retail bank streams transaction logs from Kafka. The Beam pipeline reads the transactions, uses stateful processing to detect potential double-swipe events, and alerts a security application by writing the offending transactions back to a critical alerts Kafka topic.

9. Common Mistakes

  • Missing Java Runtime Environment: Because KafkaIO is a cross-language transform, the system running the Python pipeline must have Java (JDK) installed on the machine initiating the pipeline to start the expansion service, otherwise the pipeline will fail to initialize.
  • Idle Partitions Watermark Freeze: If a Kafka topic has 10 partitions, but 2 partitions receive no data, the watermark of the pipeline might stop advancing because the watermark is bound to the slowest (idle) partition. Setting with_max_num_records or utilizing custom idle-timeout features is required.

10. Interview Perspective

  • Question: Why is Kafka's watermark advancement dependent on all partitions?
  • Answer: In Kafka, data is distributed across multiple partitions. Since the runner cannot guarantee which partition contains the oldest event time, it must advance its watermark based on the minimum watermark of all partitions to avoid processing late data.
  • Question: How does Apache Beam guarantee exactly-once processing when writing to Kafka?
  • Answer: Beam writes to Kafka using transaction IDs. By writing in transaction blocks that match the Beam checkpointing frequency, the runner guarantees that writes are committed only when the pipeline checkpoint completes successfully.

11. Best Practices

  • Ensure that the number of Kafka partitions matches your consumer parallelism needs to prevent consumer bottlenecks.
  • Always define a dead-letter queue (DLQ) path for serialization errors when parsing raw Kafka messages.
  • Configure network access rules to allow worker machines to talk to all brokers in the Kafka cluster.

12. Summary

  • ReadFromKafka connects Apache Beam to Kafka clusters.
  • In Python, KafkaIO runs as a cross-language transform via a Java expansion service.
  • Watermarks progress based on the slowest partition in the active subscription.

13. Interactive Challenges

Challenge 1: Basic Kafka Ingestion Config (Beginner)

Write a Python code statement configuring ReadFromKafka to connect to bootstrap server localhost:9092 to read from the topic "clicks". Use String deserialization for both keys and values.

Challenge 2: Accessing Kafka Record Values (Intermediate)

Given a PCollection of Kafka records named kafka_records (represented as key-value tuples), extract only the value portion and decode it as a JSON string into a Python dictionary.

Challenge 3: Write Key-Value Pairs to Kafka (Advanced)

Configure WriteToKafka to write a PCollection of (str, str) key-value tuples named alerts to the topic "alerts-topic" on bootstrap server "192.168.1.50:9092".

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)