Skip to content

dedeco/structured-streaming-job-pyspark

Repository files navigation

A Sample Structured Streaming

Run the job (local)

Build

poetry build

Running

poetry run spark-submit \
  --master local \
  --packages 'org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3'\
  --py-files dist/structured_streaming-*.whl jobs/sample_job.py \
  <IP_KAFKA_BROKER>:9092 com.google.sample.purchases.2 ./tmp/output ./tmp/checkpoint '60 seconds'

Submit to Dataproc

Preparing dependencies

poetry export -f requirements.txt --output requirements.txt

Copy initialization script GCS

gsutil cp requirements.txt gs://andresousa-experimental-scripts

Create Dataproc cluster

Create the cluster with python dependencies and submit the job

export REGION=us-central1;
gcloud dataproc clusters create cluster-sample \
--region=${REGION} \
--image-version 2.0-debian10 \
--initialization-actions=gs://andresousa-experimental-scripts/initialize-cluster.sh 

Submit job

gcloud dataproc jobs submit pyspark \
  --cluster=cluster-sample \
  --region=us-central1 \
  --properties=^#^spark.jars.packages='org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3' \
  --py-files dist/structured_streaming-*.whl \
  jobs/sample_job.py \
  -- <IP_KAFKA_BROKER>:9092 com.google.sample.purchases.2 gs://andresousa-experimental-streaming-test/output gs://andresousa-experimental-checkpoints/checkpoint '60 seconds'

Debugging

See query streaming in console

query = df \
    .writeStream \
    .outputMode('Append') \
    .format('console') \
    .start()

query.awaitTermination()

About

Structured Streaming Sample Job

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages