Skip to content

Commit

Permalink
feat: 965 upgrade dataproc serverless version to 1.2 python (#995)
Browse files Browse the repository at this point in the history
* inital commit for python 1.2 upgrade

* inital commit for python 1.2 upgrade

* inital commit for python 1.2 upgrade

* inital commit for python 1.2 upgrade

* inital commit for python 1.2 upgrade
  • Loading branch information
surjits254 authored Oct 18, 2024
1 parent 98f0caa commit 817ec52
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 194 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Please refer to the [Dataproc Templates (Python - PySpark) README](/python) for
* [ElasticsearchToBigtable](/python/dataproc_templates/elasticsearch#elasticsearch-to-bigtable)
* [ElasticsearchToGCS](/python/dataproc_templates/elasticsearch#elasticsearch-to-gcs)
* [GCSToBigQuery](/python/dataproc_templates/gcs#gcs-to-bigquery) (blogpost [link](https://medium.com/@ppaglilla/getting-started-with-dataproc-serverless-pyspark-templates-e32278a6a06e))
* [GCSToBigTable](/python/dataproc_templates/gcs#gcs-to-bigtable)(blogpost [link](https://medium.com/google-cloud/pyspark-load-data-from-gcs-to-bigtable-using-gcp-dataproc-serverless-c373430fe157))
* [GCSToBigTable](/python/dataproc_templates/gcs#gcs-to-bigtable)(blogpost [link](https://medium.com/google-cloud/load-data-from-gcs-to-bigtable-with-gcp-dataproc-serverless-3862399718d2))
* [GCSToGCS](/python/dataproc_templates/gcs#gcs-to-gcs---sql-transformation) (blogpost [link](https://medium.com/@ankuljain/migrate-gcs-to-gcs-using-dataproc-serverless-3b7b0f6ad6b9))
* [GCSToJDBC](/python/dataproc_templates/gcs#gcs-to-jdbc) (blogpost [link](https://medium.com/google-cloud/import-data-from-gcs-to-jdbc-databases-using-dataproc-serverless-c7154b242430))
* [GCSToMongo](/python/dataproc_templates/gcs#gcs-to-mongodb) (blogpost [link](https://medium.com/google-cloud/importing-data-from-gcs-to-mongodb-using-dataproc-serverless-fed58904633a))
Expand All @@ -79,11 +79,11 @@ Please refer to the [Dataproc Templates (Python - PySpark) README](/python) for
* [KafkaToBigQuery](/python/dataproc_templates/kafka/#kafka-to-bq) (Blogpost [link](https://medium.com/google-cloud/dataproc-serverless-template-to-migrate-your-data-from-kafka-to-bigquery-9b635a66b9dc))
* [MongoToBigQuery](/python/dataproc_templates/mongo#mongot-to-bq)
* [MongoToGCS](/python/dataproc_templates/mongo#mongo-to-gcs) (blogpost [link](https://medium.com/google-cloud/exporting-data-from-mongodb-to-gcs-buckets-using-dataproc-serverless-64830fb15b51))
* [PubSubLiteToBigtable](/python/dataproc_templates/pubsublite#pubsublite-to-bigtable)
* [RedshiftToGCS](/python/dataproc_templates/redshift#redshift-to-gcs) (blogpost [link](https://medium.com/google-cloud/exporting-data-from-redshift-to-gcs-using-gcp-dataproc-serverless-and-pyspark-9ab78de11405))
* [PubSubLiteToBigtable](/python/dataproc_templates/pubsublite#pubsublite-to-bigtable) **Deprecated and will be removed in Q1 2025**
* [RedshiftToGCS](/python/dataproc_templates/redshift#redshift-to-gcs) (blogpost [link](https://medium.com/google-cloud/exporting-data-from-redshift-to-gcs-using-gcp-dataproc-serverless-and-pyspark-9ab78de11405)) **Deprecated and will be removed in Q1 2025**
* [S3ToBigQuery](/python/dataproc_templates/s3#amazon-s3-to-bigquery)
* [SnowflakeToGCS](/python/dataproc_templates/snowflake#1-snowflake-to-gcs) (blogpost [link](https://medium.com/@varunikagupta96/exporting-data-from-snowflake-to-gcs-using-pyspark-on-dataproc-serverless-363d3bed551b))
* [TextToBigQuery](/python/dataproc_templates/gcs#text-to-bigquery) (blogpost [link](https://medium.com/google-cloud/dataproc-serverless-pyspark-template-for-ingesting-compressed-text-files-to-bigquery-c6eab8fb6bc9))
* [TextToBigQuery](/python/dataproc_templates/gcs#text-to-bigquery) (blogpost [link](https://medium.com/google-cloud/dataproc-serverless-pyspark-template-for-ingesting-compressed-text-files-to-bigquery-c6eab8fb6bc9)) **Deprecated and will be removed in Q1 2025**


## Dataproc Templates (Notebooks)
Expand Down
105 changes: 24 additions & 81 deletions python/.ci/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,6 @@ pipeline {
git branch: "${GIT_BRANCH_LOCAL}", changelog: false, poll: false, url: 'https://github.com/GoogleCloudPlatform/dataproc-templates/'
}
}
stage('Clean Resources'){
steps {
catchError {
sh '''
gcloud pubsub lite-topics publish psltobt \
--location=us-west1 \
--message=\'\'\'
{
"rowkey":"rk1",
"columns": [
{
"columnfamily":"place",
"columnname":"city",
"columnvalue":"Bangalore"
},
{
"columnfamily":"place",
"columnname":"state",
"columnvalue":"Karnataka"
},
{
"columnfamily":"date",
"columnname":"year",
"columnvalue":"2023"
}
]
}
\'\'\'
'''
}
}
}
stage('Cluster Creation'){
when {
// Run this stage only if JOB_TYPE is not set to CLUSTER
Expand Down Expand Up @@ -141,7 +109,6 @@ pipeline {
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export SKIP_BUILD=true
cd python
Expand Down Expand Up @@ -189,35 +156,6 @@ pipeline {
}
stage('Parallel Execution 2'){
parallel{
stage('TEXT TO BIGQUERY') {
steps{
retry(count: stageRetryCount) {
sh '''
python3.8 -m pip install --user virtualenv
python3.8 -m venv env
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export SKIP_BUILD=true
cd python
./bin/start.sh -- \
--template=TEXTTOBIGQUERY \
--text.bigquery.input.compression="gzip" \
--text.bigquery.input.delimiter="," \
--text.bigquery.input.location="gs://python-dataproc-templates/text-bq-input/" \
--text.bigquery.output.dataset="python_dataproc_templates" \
--text.bigquery.output.table="python_text_bq_table" \
--text.bigquery.output.mode=overwrite \
--text.bigquery.temp.bucket.name="python-dataproc-templates-temp-bq"
'''
}
}
}
stage('JDBC To JDBC') {
steps{
retry(count: stageRetryCount) {
Expand Down Expand Up @@ -312,7 +250,6 @@ pipeline {
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export SKIP_BUILD=true
cd python
Expand Down Expand Up @@ -341,7 +278,6 @@ pipeline {
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export SKIP_BUILD=true
cd python
Expand Down Expand Up @@ -369,7 +305,7 @@ pipeline {
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://spark-cassandra-connector/spark-cassandra-connector-assembly_2.12-3.2.0.jar,gs://spark-cassandra-connector/jnr-posix-3.1.8.jar,gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export JARS="gs://spark-cassandra-connector/spark-cassandra-connector-assembly_2.12-3.2.0.jar,gs://spark-cassandra-connector/jnr-posix-3.1.8.jar"
export SKIP_BUILD=true
cd python
Expand Down Expand Up @@ -431,7 +367,7 @@ pipeline {
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://python-dataproc-templates-temp/jars/mongo-java-driver-3.9.1.jar,gs://python-dataproc-templates-temp/jars/mongo-spark-connector_2.12-2.4.0.jar,gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export JARS="gs://python-dataproc-templates-temp/jars/mongo-java-driver-3.9.1.jar,gs://python-dataproc-templates-temp/jars/mongo-spark-connector_2.12-2.4.0.jar"
export SKIP_BUILD=true
cd python
Expand Down Expand Up @@ -461,7 +397,7 @@ pipeline {
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://dataproc-templates/jars/mysql-connector-java-8.0.29.jar,gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export JARS="gs://dataproc-templates/jars/mysql-connector-java-8.0.29.jar"
export SKIP_BUILD=true
cd python
Expand Down Expand Up @@ -491,7 +427,7 @@ pipeline {
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://dataproc-templates/jars/mysql-connector-java-8.0.29.jar,gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export JARS="gs://dataproc-templates/jars/mysql-connector-java-8.0.29.jar"
export SKIP_BUILD=true
cd python
Expand Down Expand Up @@ -629,38 +565,45 @@ pipeline {
}
}
}
stage('PUB/SUB LITE TO BIGTABLE') {
}
}
stage('Parallel Execution 6'){
parallel{
stage('S3 TO BigQuery (avro)') {
steps{
retry(count: stageRetryCount) {
sh '''
withCredentials([usernamePassword(credentialsId: 'aws-s3-ro-credentials',
passwordVariable: 'S3_SECRET', usernameVariable: 'S3_KEY')])
{sh '''
python3.8 -m pip install --user virtualenv
python3.8 -m venv env
source env/bin/activate
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export JARS="gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar"
export SKIP_BUILD=true
cd python
./bin/start.sh \
-- --template=PUBSUBLITETOBIGTABLE \
--pubsublite.bigtable.subscription.path=projects/$GCP_PROJECT/locations/$REGION/subscriptions/psltobt-sub \
--pubsublite.bigtable.streaming.checkpoint.location="gs://dataproc-templates/integration-testing/psltobt/checkpoint" \
--pubsublite.bigtable.output.project=$GCP_PROJECT \
--pubsublite.bigtable.output.instance=$ENV_TEST_BIGTABLE_INSTANCE \
--pubsublite.bigtable.output.table="output_table" \
--pubsublite.bigtable.streaming.timeout=20
-- --template=S3TOBIGQUERY \
--s3.bq.input.location="s3a://dataproc-templates-integration-tests/cities.avro" \
--s3.bq.access.key="${S3_KEY}" \
--s3.bq.secret.key="${S3_SECRET}" \
--s3.bq.input.format="avro" \
--s3.bq.output.dataset.name="dataproc_templates" \
--s3.bq.output.table.name="s3_to_bq_avro_py" \
--s3.bq.output.mode="overwrite" \
--s3.bq.temp.bucket.name="${GCS_STAGING_LOCATION}"
'''
}
}
}
}
}
}
stage('Parallel Execution 6'){
}
stage('Parallel Execution 7'){
parallel{
stage('Hbase TO GCS (Manual)') {
when {
Expand Down
8 changes: 4 additions & 4 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
- [ElasticsearchToBigtable](/python/dataproc_templates/elasticsearch#elasticsearch-to-bigtable)
- [ElasticsearchToGCS](/python/dataproc_templates/elasticsearch#elasticsearch-to-gcs)
- [GCSToBigQuery](/python/dataproc_templates/gcs#gcs-to-bigquery) (blogpost [link](https://medium.com/@ppaglilla/getting-started-with-dataproc-serverless-pyspark-templates-e32278a6a06e))
- [GCSToBigTable](/python/dataproc_templates/gcs#gcs-to-bigtable) (blogpost [link](https://medium.com/google-cloud/pyspark-load-data-from-gcs-to-bigtable-using-gcp-dataproc-serverless-c373430fe157))
- [GCSToBigTable](/python/dataproc_templates/gcs#gcs-to-bigtable) (blogpost [link](https://medium.com/google-cloud/load-data-from-gcs-to-bigtable-with-gcp-dataproc-serverless-3862399718d2))
- [GCSToGCS](/python/dataproc_templates/gcs#gcs-to-gcs---sql-transformation)(blogpost [link](https://medium.com/@ankuljain/migrate-gcs-to-gcs-using-dataproc-serverless-3b7b0f6ad6b9))
- [GCSToJDBC](/python/dataproc_templates/gcs#gcs-to-jdbc) (blogpost [link](https://medium.com/google-cloud/import-data-from-gcs-to-jdbc-databases-using-dataproc-serverless-c7154b242430))
- [GCSToMongo](/python/dataproc_templates/gcs#gcs-to-mongodb) (blogpost [link](https://medium.com/google-cloud/importing-data-from-gcs-to-mongodb-using-dataproc-serverless-fed58904633a))
Expand All @@ -26,11 +26,11 @@
- [KafkaToBigQuery](/python/dataproc_templates/kafka/#kafka-to-bigquery)
- [MongoToGCS](/python/dataproc_templates/mongo#mongo-to-gcs)(blogpost [link](https://medium.com/google-cloud/exporting-data-from-mongodb-to-gcs-buckets-using-dataproc-serverless-64830fb15b51))
- [MongoToBigQuery](/python/dataproc_templates/mongo#mongot-to-bq)
- [PubSubLiteToBigtable](/python/dataproc_templates/pubsublite#pubsublite-to-bigtable)
- [RedshiftToGCS](/python/dataproc_templates/redshift#redshift-to-gcs)(blogpost [link](https://medium.com/google-cloud/exporting-data-from-redshift-to-gcs-using-gcp-dataproc-serverless-and-pyspark-9ab78de11405))
- [PubSubLiteToBigtable](/python/dataproc_templates/pubsublite#pubsublite-to-bigtable) **Deprecated and will be removed in Q1 2025**
- [RedshiftToGCS](/python/dataproc_templates/redshift#redshift-to-gcs)(blogpost [link](https://medium.com/google-cloud/exporting-data-from-redshift-to-gcs-using-gcp-dataproc-serverless-and-pyspark-9ab78de11405)) **Deprecated and will be removed in Q1 2025**
- [S3ToBigQuery](/python/dataproc_templates/s3#amazon-s3-to-bigquery)
- [SnowflakeToGCS](/python/dataproc_templates/snowflake#1-snowflake-to-gcs)(blogpost [link](https://medium.com/@varunikagupta96/exporting-data-from-snowflake-to-gcs-using-pyspark-on-dataproc-serverless-363d3bed551b))
- [TextToBigQuery](/python/dataproc_templates/gcs#text-to-bigquery)
- [TextToBigQuery](/python/dataproc_templates/gcs#text-to-bigquery) **Deprecated and will be removed in Q1 2025**

Dataproc Templates (Python - PySpark) supports submitting jobs to both Dataproc Serverless using [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark) and Dataproc Cluster using [jobs submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/pyspark)

Expand Down
11 changes: 8 additions & 3 deletions python/bin/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@ if [ -z "$SKIP_BUILD" ]; then
python3 ${PROJECT_ROOT_DIR}/setup.py bdist_egg --output=$PACKAGE_EGG_FILE
fi

if [ $4 = "--template=HBASETOGCS" ] || [ $4 = "--template=GCSTOBIGTABLE" ]; then
if [ $4 = "--template=HBASETOGCS" ]; then
OPT_SPARK_VERSION="--version=1.0.29"
else
OPT_SPARK_VERSION="--version=1.1"
OPT_SPARK_VERSION="--version=1.2"
fi

OPT_PROJECT="--project=${GCP_PROJECT}"
OPT_REGION="--region=${REGION}"
OPT_JARS="--jars=file:///usr/lib/spark/external/spark-avro.jar"
#OPT_JARS="--jars=file:///usr/lib/spark/external/spark-avro.jar"
OPT_JARS="--jars=file:///usr/lib/spark/connector/spark-avro.jar"
if [[ $OPT_SPARK_VERSION == *"=1.1"* || $JOB_TYPE == "CLUSTER" ]]; then
echo "Dataproc Serverless Runtime 1.1 or CLUSTER Job Type Detected"
OPT_JARS="--jars=file:///usr/lib/spark/external/spark-avro.jar"
fi
OPT_LABELS="--labels=job_type=dataproc_template"
OPT_DEPS_BUCKET="--deps-bucket=${GCS_STAGING_LOCATION}"
OPT_PY_FILES="--py-files=${PROJECT_ROOT_DIR}/${PACKAGE_EGG_FILE}"
Expand Down
6 changes: 0 additions & 6 deletions python/dataproc_templates/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,10 @@ options:
--bigquery.gcs.output.timestampntzformat BIGQUERY.GCS.OUTPUT.TIMESTAMPNTZFORMAT
Sets the string that indicates a timestamp without timezone format
```

## Required JAR files

This template requires the [Spark BigQuery connector](https://cloud.google.com/dataproc-serverless/docs/guides/bigquery-connector-spark-example) to be available in the Dataproc cluster.

## Example submission

```
export GCP_PROJECT=my-project
export JARS="gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
export GCS_STAGING_LOCATION="gs://my-bucket"
export REGION=us-central1
Expand Down
Loading

0 comments on commit 817ec52

Please sign in to comment.