intermediate
Elasticsearch IO
7 min readLast updated: 2026-07-01
1. Introduction
Elasticsearch is a distributed, JSON-based search and analytics engine. In Apache Beam, the Elasticsearch IO connector allows pipelines to index, update, and search documents at scale.
2. Why This Concept Exists
Large logs, security events, and product inventories must be searchable with low latency. Standard relational databases fail at full-text search. Beam's Elasticsearch connector lets you process streams of raw logs, convert them to JSON documents, and write them directly into search indexes.
3. Key Terminology
- Index: A collection of documents sharing similar characteristics (similar to a database table).
- Document: A basic unit of information expressed in JSON (similar to a row).
- Bulk API: The Elasticsearch endpoint used to batch multiple index operations into a single API request.
4. How It Works
- The pipeline receives structured dictionary inputs.
- Write: The records are converted into JSON, and the Elasticsearch connector batches them. It targets the ES HTTP Bulk API, managing retries for temporary write errors (e.g. rate-limiting).
5. Visual Diagram
Raw Log Stream
➔DoFn JSON Parser
➔ (Bulk POST)Elasticsearch Cluster
6. Code Example
Writing parsed logs to an Elasticsearch index:
python
# Assuming you import WriteToElasticsearch from elasticsearch connector
# from apache_beam.io.elasticsearch import WriteToElasticsearch
# In your pipeline context:
# logs = p | "CreateLogs" >> beam.Create([
# {"id": "doc-1", "level": "INFO", "msg": "Job started"},
# {"id": "doc-2", "level": "ERROR", "msg": "Job failed"}
# ])
# logs | "IndexLogs" >> WriteToElasticsearch(
# hosts=["http://elasticsearch-host:9200"],
# index="app-logs",
# op_type="index"
# )
7. Code Explanation
hostsspecifies connection endpoints for the Elasticsearch cluster.indexrepresents the target index name.op_type="index"tells the connector to insert or overwrite documents.
8. Real Production Example
Directing documents to different indices dynamically based on payload properties:
python
# WriteToElasticsearch can take index name extraction callbacks:
# index_fn = lambda doc: f"logs-{doc['level'].lower()}"
#
# logs | "DynamicIndex" >> WriteToElasticsearch(
# hosts=["http://elasticsearch-host:9200"],
# index=index_fn
# )
9. Common Mistakes
- Omitted document ID routing: Letting Elasticsearch generate random IDs for documents if you need to perform upsert operations. Always define unique IDs to prevent duplicate indices.
- Small batch sizes: Inefficiently setting small batch parameters, causing network congestion.
10. Interview Perspective
- Question: How does the Elasticsearch connector optimize write throughput?
- Answer: It buffers records and uses bulk requests. Instead of issuing separate HTTP posts, it bundles records into a single bulk payload, cutting down TCP handshake and HTTP header overhead.
- Question: How do you handle schema changes in Elasticsearch?
- Answer: By using dynamic index naming patterns (e.g., daily indices) or mapping templates, letting Elasticsearch auto-infer new fields.
11. Best Practices
- Use compression for HTTP payloads if supported by the ES cluster.
- Implement backoff retry policies in client options to prevent overloading.
12. Summary
- Elasticsearch IO manages full-text indexing from pipelines.
- Writes target the Bulk API to achieve high insertion throughput.
- Index routing can be dynamic based on record fields.
13. Interactive Challenges
14. Related Content
Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)