Apache Spark & Google Cloud DataProc

Dataproc is a managed Apache Spark / Apache Hadoop service that lets you take advantage of some of those open source data tools for batch, querying and streaming processing and machine learning.

This project is intended to explain the way you can run an Apache Spark script into Google Cloud DataProc. For this purpose, we will create a script to read an Apache Server log file, extract: host, datetime, method, endpoint, protocol and the status code and save the information into BigQuery.

In this project, we will use the Google Cloud SDK to manage or create the needed resources. If you have not already installed the SDK you can follow this instructions.

Then you need to authenticate from your console:

$ gcloud auth login

After a successful login, you will receive a message like this:

You are now logged in as [...@gmail.com].
Your current project is [...]. You can change this setting by running:
$ gcloud config set project PROJECT_ID

Once, we have correctly installed the Cloud SDK and successfully login through the console, it’s time to create our project, to perform this task we will use the follow instruction (of course, you could use the Web UI for it).

$ gcloud projects create <project-name>

You must obtain something like that:

Waiting for [operations/...] to finish...done.                                                                                                                               
Enabling service [cloudapis.googleapis.com] on project [...]...
Operation "operations/..." finished successfully.

Later, we need to update the project you will be working and enable the DataProc API and BigQuery API.

$ gcloud config set project <project-name>
$ gcloud services enable dataproc.googleapis.com
$ gcloud services enable bigquery.googleapis.com

The log file we are using is available here, and the structure is:

clientip ident auth timestamp verb request httpversion response bytes referrer agent

Create a bucket and copy the log file.

$ gsutil mb gs://dataproc_test_x_25/
$ gsutil cp apache_logs.csv gs://dataproc_test_x_25/data/input/

Also, we need to create a dataset and a table into BigQuery, for this purpose we can use the CLI or the user interface.

$ bq mk --dataset <project-name>:apache_logs
$ bq mk --table <project-name>:apache_logs.access_logs

Well, is time to create our PySpark script. The flow is:

  • Read the log files from the bucket.
  • Extract host, datetime, method, endpoint, protocol, status code.
  • Save into BigQuery.
# script.py
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, count


host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
datetime_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})'
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'


def transform_dataframe(data_frame):
return data_frame.select(
regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', datetime_pattern, 1).alias('datetime'),
regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
)


if __name__ == '__main__':
if len(sys.argv) != 3:
raise Exception("You must specified the input file/folder and the table you wanna save the data in the form <project>:dataset.table!")

input_folder, output = sys.argv[1], sys.argv[2]
spark = SparkSession.builder \
.appName('Test Spark - DataProc') \
.getOrCreate()
spark.conf.set('temporaryGcsBucket', 'dataproc_test_x_25')
sc = spark.sparkContext

df = spark.read.text(input_folder)
logs_df = transform_dataframe(df)

logs_df.write.format('bigquery') \
.option('table', output).mode('append') \
.save()

Copy the script into your bucket.

$ gsutil cp script.py gs://dataproc_test_x_25/

At this point, we are able to create a cluster on DataProc and send the job to be executed.

$ gcloud dataproc clusters create spark-cluster \
--region=us-central1 --single-node \
--master-machine-type=n1-standard-1
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-name>/regions/us-west1/clusters/spark-cluster] Cluster placed in zone [us-west1-a]

Send the job.

gcloud dataproc jobs submit pyspark \ 
gs://dataproc_test_x_25/script.py \
--cluster=spark-cluster \
--region=us-west1 \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar \
-- gs://dataproc_test_x_25/data/input/ <project-name>.apache_logs.access_logs

In this moment you can go to your Google Cloud Console Web UI and check the status and the log of your job.

And query your data in BigQuery.

Well, that’s all. I hope this information will be useful in your next project! The example covered here are pretty simple, but going through it, you could get a better understanding of how you can run an Apache Spark job using Google Cloud DataProc.

Data Architect | Python Developer