Apache Spark & Google Cloud DataProc

$ gcloud auth login
You are now logged in as [...@gmail.com].
Your current project is [...]. You can change this setting by running:
$ gcloud config set project PROJECT_ID

The Project

$ gcloud projects create <project-name>
Waiting for [operations/...] to finish...done.                                                                                                                               
Enabling service [cloudapis.googleapis.com] on project [...]...
Operation "operations/..." finished successfully.
$ gcloud config set project <project-name>
$ gcloud services enable dataproc.googleapis.com
$ gcloud services enable bigquery.googleapis.com
clientip ident auth timestamp verb request httpversion response bytes referrer agent
$ gsutil mb gs://dataproc_test_x_25/
$ gsutil cp apache_logs.csv gs://dataproc_test_x_25/data/input/
$ bq mk --dataset <project-name>:apache_logs
$ bq mk --table <project-name>:apache_logs.access_logs
  • Read the log files from the bucket.
  • Extract host, datetime, method, endpoint, protocol, status code.
  • Save into BigQuery.
# script.py
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, count


host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
datetime_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})'
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'


def transform_dataframe(data_frame):
return data_frame.select(
regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', datetime_pattern, 1).alias('datetime'),
regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
)


if __name__ == '__main__':
if len(sys.argv) != 3:
raise Exception("You must specified the input file/folder and the table you wanna save the data in the form <project>:dataset.table!")

input_folder, output = sys.argv[1], sys.argv[2]
spark = SparkSession.builder \
.appName('Test Spark - DataProc') \
.getOrCreate()
spark.conf.set('temporaryGcsBucket', 'dataproc_test_x_25')
sc = spark.sparkContext

df = spark.read.text(input_folder)
logs_df = transform_dataframe(df)

logs_df.write.format('bigquery') \
.option('table', output).mode('append') \
.save()
$ gsutil cp script.py gs://dataproc_test_x_25/
$ gcloud dataproc clusters create spark-cluster \
--region=us-central1 --single-node \
--master-machine-type=n1-standard-1
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-name>/regions/us-west1/clusters/spark-cluster] Cluster placed in zone [us-west1-a]
gcloud dataproc jobs submit pyspark \ 
gs://dataproc_test_x_25/script.py \
--cluster=spark-cluster \
--region=us-west1 \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar \
-- gs://dataproc_test_x_25/data/input/ <project-name>.apache_logs.access_logs

--

--

--

Cloud & Solutions & Data Architect | Python Developer | Serverless Advocate

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Getting Started With Streamline Menus

Simpler production with containers

| Engineering News-Record

TDD Inside-Out vs Outside-In

Day 35 of 100 Days of VR: How to Run Google Cardboard on an Android Device in Unity

Let a programmer tell you what id

Partner Case. What do you know about Traffic back

Setup Terminal and iTerm for Sublime Shortcut “subl”:

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alejandro Cora González

Alejandro Cora González

Cloud & Solutions & Data Architect | Python Developer | Serverless Advocate

More from Medium

Running pyspark jobs on Google Cloud using Serverless Dataproc

Building Apache Livy 0.8.0 for Spark 3.x

Churn Prediction with PySpark and Google Cloud Dataproc

Overview of GCP Dataproc Serverless Spark