Define custom element processing logic using the standard DoFn lifecycle.
import apache_beam as beam
class ParseRecordFn(beam.DoFn):
def setup(self):
# Called once per worker start (good for client initialization)
self.client = initialize_database_client()
def start_bundle(self):
# Called once per element bundle processing start
self.batch = []
def process(self, element):
# Main entrypoint: yields zero, one, or multiple outputs
if element.is_valid:
yield element
def finish_bundle(self):
# Called at the end of element bundle execution
pass
def teardown(self):
# Called once when worker halts (closes connections)
self.client.close()