Advanced LabHard

Lab: Kafka Pipeline

Estimated time: 60 mins

Who This Lab Is For

Advanced developers looking to implement production-level integration with Apache Kafka message brokers.

What You Will Learn

  • How to read streaming events from Kafka using ReadFromKafka.
  • How to deserialize byte-string payloads to JSON strings and objects.
  • How to query and filter dynamic dictionary parameters in real-time.

1. Business Scenario

Consume from an Apache Kafka topic stream and output parsed JSON payloads.

2. Input Dataset (\`dataset.csv\`)

Save the following raw rows locally as \`dataset.csv\` to test your pipeline:

text
topic: transactions
payload-1: {"tx_id": "1001", "amount": 250.00, "status": "approved"}
payload-2: {"tx_id": "1002", "amount": 15.50, "status": "approved"}
payload-3: {"tx_id": "1003", "amount": 1200.00, "status": "flagged"}

3. Starter Code Skeleton

Create a local file named \`starter.py\` and copy the following skeleton. Complete the missing transformations:

python
# starter.py - Kafka Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def run_pipeline():
    options = PipelineOptions()
    with beam.Pipeline(options=options) as p:
        # TODO: Implement ReadFromKafka ingestion
        # TODO: Deserialize JSON bytes
        # TODO: Filter status == 'flagged'
        pass

if __name__ == "__main__":
    run_pipeline()

4. Lab Requirements

  • Ingest message bytes from a Kafka topic.
  • Deserialize payload bytes to dictionary JSON payloads.
  • Filter and isolate transaction rows that are flagged.

5. Step-by-Step Guide & Solution

Solution for Kafka Pipeline

Click below to reveal the complete, runnable Python SDK implementation solution and the step-by-step walkthrough to complete the lab.

Advertisement
AdSense Slot #847392Leaderboard Banner (728x90)