Apache Beam & Google Cloud DataFlow to define and execute data processing pipelines
Even if we already have tools such as Apache Hadoop, Apache Spark and Apache Flink there are some situations when we could need a tool that provide us a kind of abstraction between our application logic and the big data ecosystem or even to provide a unified model for both stream and data processing. There is where Apache Beam comes on the scene.
In this post, we’ll covered two simple examples running our pipelines on a local environment as well as on Google Cloud DataFlow.
Apache Beam
Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream processing.
There are some elements you need to know before you start writing your data processing code/application.
- SDKs: You can use the following SDKs (Python SDK, Java SDK or Go SDK) to write your code.
- DataSource: Your application data source could be batch/micro-batch or streaming.
- Runner: Specify the target system/platform where your Beam pipeline will run.
Available runners
- DirectRunner: Runs locally on your machine — great for developing, testing, and debugging.
- ApexRunner: Runs on Apache Apex.
- FlinkRunner: Runs on Apache Flink.
- SparkRunner: Runs on Apache Spark.
- DataflowRunner: Runs on Google Cloud Dataflow, a fully managed service within Google Cloud Platform.
- GearpumpRunner: Runs on Apache Gearpump (incubating).
- SamzaRunner: Runs on Apache Samza.
- NemoRunner: Runs on Apache Nemo.
- JetRunner: Runs on Hazelcast Jet.
Even if Apache Beam is designed to enable pipelines to be portable across different runners, the reality is every runner has different capabilities and they also have different abilities to implement the core concepts in the Beam model. Check this Capability Matrix to understand more deeply each runner functionality.
Also, you need to get aware and understand the core concepts around Apache Beam: Pipeline, PCollection, PTransform, ParDO, and DoFn.
Pipeline: a pipeline encapsulates the workflow of your entire data processing tasks. Including read input data, transforms data, and write output data. All Beam driver programs must create a Pipeline and when you create it, you must also specify the execution options to specify where and how to run.
PCollection: is an abstraction that represents a potentially distributed, multi-element data set. You can think of it as “pipeline” data. Apache Beam transformations use this kind of object as inputs and outputs.
PTransforms: are the operations in your pipeline, and provide a generic processing framework. You provide processing logic in the form of a function object, and your user code is applied to each element of an input PCollection or more than one. Depending on the pipeline runner you choose, many different workers across a cluster may execute instances of your code in parallel. The code running on each worker generates the output elements that are ultimately added to the final output PCollection produced by the transformations.
ParDo: is a transform for generic parallel processing. The ParDo processing paradigm is similar to the Map phase of the Map/Shuffle/Reduce on Hadoop. A ParDo transform each element in the input PCollection, performs some processing function on that element, and emits zero, one, or multiple elements to an output PCollection.
DoFn: A DoFn applies your logic in each element in the input PCollection and lets you populate the elements of an output PCollection. In order to includ it in your pipeline, it’s wrapped in a ParDo PTransform.
Let’s do it
For both examples, we need to create our Python insolate environment and install the appropriated requirements.
# Create Python environment...
$ pip3 install virtualenv
$ virtualenv --python=/usr/bin/python3.7 .venv# Activate environment...
source .venv/bin/activate# Install requirements...
pip install apache-beam[gcp]
Then, we will create our application. For this first example we will create a pipeline that having as input a CSV file (you could create your own dataset using: http://generatedata.com/) in the form:
COUNTRY|CITY|ZIP|FULL_NAME
Bulgaria|Fusagasugá|2689|Lani Hoffman
Ireland|Sant'Elia a Pianisi|10214|Naida Livingston
Turks and Caicos Islands|Heredia|41302|Scarlett Burris
China|Prato Carnico|259646|Aurelia Witt
...
First example
For our first example the expected output is a list of people by country.
# -*- coding: utf-8 -*-
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import (
PipelineOptions,
GoogleCloudOptions,
StandardOptions
)
Then, we need to define our functions (DoFn) to be applied over each element of a PCollection.
class Split(beam.DoFn):
def process(self, element, *args, **kwargs):
country, city, zip, full_name = element.split('|')
return [{
'full_name': full_name,
'country': country,
'city': city,
'zip': zip
}]
class CollectPeople(beam.DoFn):
def process(self, element, *args, **kwargs):
""" Return list of tuples in the form (country, person) """
return [(element['country'], element['full_name'])]
class WriteToCSV(beam.DoFn):
def process(self, element, *args, **kwargs):
""" Prepare rows to be written in the csv file """
return [{element[0]}, {element[1]}]
Then, create the pipeline (using the local runner) to read the input data, apply all the needed transformations and write the results.
if __name__ == '__main__':
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'direct'
with beam.Pipeline(options=options) as pipeline:
pipeline | \
ReadFromText('<file_name>.csv') | \
beam.ParDo(Split()) | \
beam.ParDo(CollectPeople()) | \
beam.GroupByKey() | \
WriteToText('results.csv')
The results (depending your input data) must look like:
('Bulgaria', ['Lani Hoffman', 'Jane Hoffman'])
('Ireland', ['Naida Livingston', 'Raven Sutton'])
('Turks and Caicos Islands', ['Scarlett Burris'])
('China', ['Aurelia Witt'])
('Denmark', ['Tucker Simmons', 'Shelley Ray'])
('Micronesia', ['Frances Waller', 'Micah Ross'])
('Russian Federation', ['September Puckett'])
('Viet Nam', ['Davis Frederick'])
...
Second example
For our second example, we will run the pipeline on Google Cloud DataFlow and the expected output must be number of people by country.
But first, we need to create a bucket on GCP and copy our file that contains the dataset. We are using gsutil for this purpose and if you need to install it you can go through: https://cloud.google.com/storage/docs/gsutil_install.
# Create the bucket...
gsutil mb gs://<bucket_name>/
# Upload your file...
gsutil cp <file_name>.csv gs://<bucket_name>/
This second example looks like:
# -*- coding: utf-8 -*-
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
With a little changes on our functions.
class Split(beam.DoFn):
def process(self, element, *args, **kwargs):
country, city, zip, full_name = element.split('|')
return [{
'full_name': full_name,
'country': country,
'city': city,
'zip': zip
}]
class CollectPeople(beam.DoFn):
def process(self, element, *args, **kwargs):
""" Return list of tuples in the form (country, person) """
return [(element['country'], 1)]
class WriteToCSV(beam.DoFn):
def process(self, element, *args, **kwargs):
""" Prepare rows to be written in the csv file """
return [{element[0]: sum(element[1])}]
Then, the pipeline (using the DataflowRunner) to read the input data, apply all the needed transformations and write the results.
if __name__ == '__main__':
flow_options = [
'--project=<gcp-project-id>',
'--runner=DataflowRunner',
'--job_name=xtest-dataflow-job',
'--temp_location=gs://<bucket_name>/temp',
'--staging_location=gs://<bucket_name>/stages',
'--output=gs://<bucket_name>/results/output',
]
options = PipelineOptions(flow_options)
gcp_options = options.view_as(GoogleCloudOptions)
with beam.Pipeline(options=options) as pipeline:
pipeline | \
ReadFromText('gs://<bucket_name>/<file_name>.csv') | \
beam.ParDo(Split()) | \
beam.ParDo(CollectPeople()) | \
beam.GroupByKey() | \
beam.ParDo(WriteToCSV()) | \
WriteToText('gs://<bucket_name>/results.csv')
Then, if you go to: https://console.cloud.google.com/dataflow, you could see your job status.
Going through the job details you can check the job info, the job graph, the job metrics, resource metrics and other information.
The results (depending your input data) must look like:
...
{'Djibouti': 2}
{'Falkland Islands': 1}
{'Cambodia': 1}
{'Saint Helena, Ascension and Tristan da Cunha': 3}
{'Virgin Islands, British': 2}
{'Anguilla': 1}
{'Palau': 2}
{'Kiribati': 1}
{'Sierra Leone': 1}
{'Chile': 1}
{'Iran': 2}
{'French Southern Territories': 2}
{'Martinique': 1}
{'France': 2}
{'Azerbaijan': 1}
{'Gambia': 1}
{'Rwanda': 2}
{'Kenya': 2}
...
Well, that’s all. I hope this information will be useful in your next project! The examples covered here are pretty simple, but going through it, you could get a better understanding of what is Apache Beam and how could you use it on a distributed processing system such as Google Cloud Dataflow.