Advanced developers looking to implement production-level integration with Apache Kafka message brokers.
Consume from an Apache Kafka topic stream and output parsed JSON payloads.
Save the following raw rows locally as \`dataset.csv\` to test your pipeline:
topic: transactions
payload-1: {"tx_id": "1001", "amount": 250.00, "status": "approved"}
payload-2: {"tx_id": "1002", "amount": 15.50, "status": "approved"}
payload-3: {"tx_id": "1003", "amount": 1200.00, "status": "flagged"}Create a local file named \`starter.py\` and copy the following skeleton. Complete the missing transformations:
# starter.py - Kafka Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run_pipeline():
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
# TODO: Implement ReadFromKafka ingestion
# TODO: Deserialize JSON bytes
# TODO: Filter status == 'flagged'
pass
if __name__ == "__main__":
run_pipeline()