Apache Spark & Google Cloud DataProc

$ gcloud auth login
You are now logged in as [...@gmail.com].
Your current project is [...]. You can change this setting by running:
$ gcloud config set project PROJECT_ID

The Project

$ gcloud projects create <project-name>
Waiting for [operations/...] to finish...done.                                                                                                                               
Enabling service [cloudapis.googleapis.com] on project [...]...
Operation "operations/..." finished successfully.
$ gcloud config set project <project-name>
$ gcloud services enable dataproc.googleapis.com
$ gcloud services enable bigquery.googleapis.com
clientip ident auth timestamp verb request httpversion response bytes referrer agent
$ gsutil mb gs://dataproc_test_x_25/
$ gsutil cp apache_logs.csv gs://dataproc_test_x_25/data/input/
$ bq mk --dataset <project-name>:apache_logs
$ bq mk --table <project-name>:apache_logs.access_logs
  • Read the log files from the bucket.
  • Extract host, datetime, method, endpoint, protocol, status code.
  • Save into BigQuery.
# script.py
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, count

host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
datetime_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})'
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'

def transform_dataframe(data_frame):
return data_frame.select(
regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', datetime_pattern, 1).alias('datetime'),
regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),

if __name__ == '__main__':
if len(sys.argv) != 3:
raise Exception("You must specified the input file/folder and the table you wanna save the data in the form <project>:dataset.table!")

input_folder, output = sys.argv[1], sys.argv[2]
spark = SparkSession.builder \
.appName('Test Spark - DataProc') \
spark.conf.set('temporaryGcsBucket', 'dataproc_test_x_25')
sc = spark.sparkContext

df = spark.read.text(input_folder)
logs_df = transform_dataframe(df)

logs_df.write.format('bigquery') \
.option('table', output).mode('append') \
$ gsutil cp script.py gs://dataproc_test_x_25/
$ gcloud dataproc clusters create spark-cluster \
--region=us-central1 --single-node \
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-name>/regions/us-west1/clusters/spark-cluster] Cluster placed in zone [us-west1-a]
gcloud dataproc jobs submit pyspark \ 
gs://dataproc_test_x_25/script.py \
--cluster=spark-cluster \
--region=us-west1 \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar \
-- gs://dataproc_test_x_25/data/input/ <project-name>.apache_logs.access_logs




