poetry build
poetry run spark-submit \
--master local \
--packages 'org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3'\
--py-files dist/structured_streaming-*.whl jobs/sample_job.py \
<IP_KAFKA_BROKER>:9092 com.google.sample.purchases.2 ./tmp/output ./tmp/checkpoint '60 seconds'
poetry export -f requirements.txt --output requirements.txt
gsutil cp requirements.txt gs://andresousa-experimental-scripts
Create the cluster with python dependencies and submit the job
export REGION=us-central1;
gcloud dataproc clusters create cluster-sample \
--region=${REGION} \
--image-version 2.0-debian10 \
--initialization-actions=gs://andresousa-experimental-scripts/initialize-cluster.sh
gcloud dataproc jobs submit pyspark \
--cluster=cluster-sample \
--region=us-central1 \
--properties=^#^spark.jars.packages='org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3' \
--py-files dist/structured_streaming-*.whl \
jobs/sample_job.py \
-- <IP_KAFKA_BROKER>:9092 com.google.sample.purchases.2 gs://andresousa-experimental-streaming-test/output gs://andresousa-experimental-checkpoints/checkpoint '60 seconds'
query = df \
.writeStream \
.outputMode('Append') \
.format('console') \
.start()
query.awaitTermination()