Build an end-to-end streaming ingestion pipeline collecting tower call health metrics, identifying dropped call rates, and alerting downstream logs.
This streaming pipeline consumes real-time call health events from Pub/Sub, parses the raw JSON payload in a ParDo, buckets records into 1-minute fixed windows, aggregates metrics per tower using a custom CombineFn to determine the percentage of dropped calls, and logs the output in BigQuery.
To test this streaming pipeline, you need an active stream of records. Create a local file named generate_stream.py and copy-paste this Python script to generate mock tower events on GCP Pub/Sub:
import json
import time
import random
from google.cloud import pubsub_v1
# 1. Configure GCP identifiers
project_id = "your-gcp-project-id"
topic_id = "call-logs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
statuses = ["COMPLETED", "COMPLETED", "COMPLETED", "DROPPED"]
print("Starting streaming log publisher. Press Ctrl+C to exit.")
while True:
# 2. Publish random mock telemetric records
data = {
"call_id": f"call_{random.randint(1000, 9999)}",
"tower_id": f"tower_{random.choice(['A', 'B', 'C'])}",
"status": random.choice(statuses),
"duration": random.randint(10, 300),
"timestamp": int(time.time())
}
payload = json.dumps(data).encode("utf-8")
publisher.publish(topic_path, payload)
print(f"Published event: {data}")
time.sleep(1.0)Start by setting up your pipeline configuration. Because this is a continuous network monitor, you must set the streaming=True option.
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(
streaming=True, # Critical for real-time streaming
runner="DataflowRunner",
project="telecom-gcp-project"
)Read raw byte strings from your active Google Cloud Pub/Sub topic and parse JSON bytes.
class ParseCallLogFn(beam.DoFn):
def process(self, element):
# element is a Pub/Sub byte string
data = json.loads(element.decode("utf-8"))
# Yield key-value tuple: (key, value)
yield (data["tower_id"], {
"call_id": data["call_id"],
"status": data["status"], # "COMPLETED" or "DROPPED"
"duration": int(data["duration"])
})Apply a FixedWindows(60) transformation to bucket elements.
from apache_beam.transforms.window import FixedWindows
# Bucket records into 60s windows
windowed_stream = parsed_records | beam.WindowInto(FixedWindows(60))Subclass beam.CombineFn to count total calls and track dropped connections.
class AggregateTowerMetricsFn(beam.CombineFn):
def create_accumulator(self):
return {"total_calls": 0, "dropped_calls": 0}
def add_input(self, accumulator, element):
accumulator["total_calls"] += 1
if element["status"] == "DROPPED":
accumulator["dropped_calls"] += 1
return accumulator
def merge_accumulators(self, accumulators):
merged = {"total_calls": 0, "dropped_calls": 0}
for acc in accumulators:
merged["total_calls"] += acc["total_calls"]
merged["dropped_calls"] += acc["dropped_calls"]
return merged
def extract_output(self, accumulator):
total = accumulator["total_calls"]
dropped = accumulator["dropped_calls"]
rate = (dropped / total * 100.0) if total > 0 else 0.0
return {"total_calls": total, "dropped_calls": dropped, "dropped_call_rate": rate}Map output elements to matching BigQuery schemas and append records.
| "Write to BQ" >> beam.io.WriteToBigQuery(
"project:dataset.table",
schema="tower_id:STRING, total_calls:INTEGER, dropped_calls:INTEGER, dropped_call_rate:FLOAT",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)Try writing the pipeline code yourself using the steps above! Once you are finished, open the collapsible container below to review the complete, production-ready solution script.