Apache Beam & Google Cloud DataFlow to define and execute data processing pipelines

Apache Beam

  • SDKs: You can use the following SDKs (Python SDK, Java SDK or Go SDK) to write your code.
  • DataSource: Your application data source could be batch/micro-batch or streaming.
  • Runner: Specify the target system/platform where your Beam pipeline will run.

Available runners

Let’s do it

# Create Python environment...
$ pip3 install virtualenv
$ virtualenv --python=/usr/bin/python3.7 .venv
# Activate environment...
source .venv/bin/activate
# Install requirements...
pip install apache-beam[gcp]
COUNTRY|CITY|ZIP|FULL_NAME
Bulgaria|Fusagasugá|2689|Lani Hoffman
Ireland|Sant'Elia a Pianisi|10214|Naida Livingston
Turks and Caicos Islands|Heredia|41302|Scarlett Burris
China|Prato Carnico|259646|Aurelia Witt
...

First example

# -*- coding: utf-8 -*-

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

from apache_beam.options.pipeline_options import (
PipelineOptions,
GoogleCloudOptions,
StandardOptions
)
class Split(beam.DoFn):
def process(self, element, *args, **kwargs):
country, city, zip, full_name = element.split('|')
return [{
'full_name': full_name,
'country': country,
'city': city,
'zip': zip
}]


class CollectPeople(beam.DoFn):
def process(self, element, *args, **kwargs):
""" Return list of tuples in the form (country, person) """
return [(element['country'], element['full_name'])]


class WriteToCSV(beam.DoFn):
def process(self, element, *args, **kwargs):
""" Prepare rows to be written in the csv file """
return [{element[0]}, {element[1]}]
if __name__ == '__main__':
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'direct'

with beam.Pipeline(options=options) as pipeline:
pipeline | \
ReadFromText('<file_name>.csv') | \
beam.ParDo(Split()) | \
beam.ParDo(CollectPeople()) | \
beam.GroupByKey() | \
WriteToText('results.csv')
('Bulgaria', ['Lani Hoffman', 'Jane Hoffman'])
('Ireland', ['Naida Livingston', 'Raven Sutton'])
('Turks and Caicos Islands', ['Scarlett Burris'])
('China', ['Aurelia Witt'])
('Denmark', ['Tucker Simmons', 'Shelley Ray'])
('Micronesia', ['Frances Waller', 'Micah Ross'])
('Russian Federation', ['September Puckett'])
('Viet Nam', ['Davis Frederick'])
...

Second example

# Create the bucket...
gsutil mb gs://<bucket_name>/

# Upload your file...
gsutil cp <file_name>.csv gs://<bucket_name>/
# -*- coding: utf-8 -*-

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
class Split(beam.DoFn):
def process(self, element, *args, **kwargs):
country, city, zip, full_name = element.split('|')
return [{
'full_name': full_name,
'country': country,
'city': city,
'zip': zip
}]


class CollectPeople(beam.DoFn):
def process(self, element, *args, **kwargs):
""" Return list of tuples in the form (country, person) """
return [(element['country'], 1)]


class WriteToCSV(beam.DoFn):
def process(self, element, *args, **kwargs):
""" Prepare rows to be written in the csv file """
return [{element[0]: sum(element[1])}]
if __name__ == '__main__':
flow_options = [
'--project=<gcp-project-id>',
'--runner=DataflowRunner',
'--job_name=xtest-dataflow-job',
'--temp_location=gs://<bucket_name>/temp',
'--staging_location=gs://<bucket_name>/stages',
'--output=gs://<bucket_name>/results/output',
]

options = PipelineOptions(flow_options)
gcp_options = options.view_as(GoogleCloudOptions)

with beam.Pipeline(options=options) as pipeline:
pipeline | \
ReadFromText('gs://<bucket_name>/<file_name>.csv') | \
beam.ParDo(Split()) | \
beam.ParDo(CollectPeople()) | \
beam.GroupByKey() | \
beam.ParDo(WriteToCSV()) | \
WriteToText('gs://<bucket_name>/results.csv')
...
{'Djibouti': 2}
{'Falkland Islands': 1}
{'Cambodia': 1}
{'Saint Helena, Ascension and Tristan da Cunha': 3}
{'Virgin Islands, British': 2}
{'Anguilla': 1}
{'Palau': 2}
{'Kiribati': 1}
{'Sierra Leone': 1}
{'Chile': 1}
{'Iran': 2}
{'French Southern Territories': 2}
{'Martinique': 1}
{'France': 2}
{'Azerbaijan': 1}
{'Gambia': 1}
{'Rwanda': 2}
{'Kenya': 2}
...

--

--

--

Cloud & Solutions & Data Architect | Python Developer | Serverless Advocate

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

BTFS to Launch a Repair Mode Offering Users More Rewards

Simple JWT Authentication with Golang (Part 2)

How I use ‘Scrum’ in my personal workflow.

Basic Home Network Hygiene — Network Segmentation.

Introduction to Customer Analytics

Static Websites on AWS S3

BitTorrent Class|** will damage your computer. You should move it to the Trash.

How to Deploy Your Qt Cross-Platform Applications to Linux Operating System With linuxdeployqt

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alejandro Cora González

Alejandro Cora González

Cloud & Solutions & Data Architect | Python Developer | Serverless Advocate

More from Medium

Timeseries databases demystified

Step by Step guide to create multi-cluster in Google Cloud, Read and Write CSV Files from Jupiter…

Data lake with Pyspark through Dataproc GCP using Airflow

Using Airflow and Spark operator to Add Partitions to Hive Metastore