diff --git a/python/.ci/Jenkinsfile b/python/.ci/Jenkinsfile index c579e4468..65515feb1 100644 --- a/python/.ci/Jenkinsfile +++ b/python/.ci/Jenkinsfile @@ -244,6 +244,33 @@ pipeline { } } } + stage('JDBC To JDBC with secrets') { + steps{ + retry(count: stageRetryCount) { + sh ''' + + source env/bin/activate + + export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp" + export JARS="gs://datproc_template_nk/jars/mysql-connector-java-8.0.29.jar,gs://datproc_template_nk/jars/postgresql-42.2.6.jar,gs://datproc_template_nk/jars/mssql-jdbc-6.4.0.jre8.jar" + export SKIP_BUILD=true + + cd python + + ./bin/start.sh \ + -- --template=JDBCTOJDBC \ + --jdbctojdbc.input.url.secret="jdbctobqconn" \ + --jdbctojdbc.input.driver="com.mysql.cj.jdbc.Driver" \ + --jdbctojdbc.input.table="demo" \ + --jdbctojdbc.output.url.secret="jdbctobqconn" \ + --jdbctojdbc.output.driver="com.mysql.cj.jdbc.Driver" \ + --jdbctojdbc.output.table="demo_out" \ + --jdbctojdbc.output.mode="overwrite" \ + --jdbctojdbc.output.batch.size="1000" + ''' + } + } + } stage('GCS To MONGO') { steps{ retry(count: stageRetryCount) { @@ -451,6 +478,36 @@ pipeline { } } } + stage('JDBC TO BIGQUERY with secret') { + steps{ + retry(count: stageRetryCount) { + sh ''' + + python3.8 -m pip install --user virtualenv + + python3.8 -m venv env + + source env/bin/activate + + export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp" + export JARS="gs://dataproc-templates/jars/mysql-connector-java-8.0.29.jar,gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar" + export SKIP_BUILD=true + + cd python + + ./bin/start.sh \ + -- --template=JDBCTOBIGQUERY \ + --jdbc.bigquery.input.url.secret="jdbctobqconn" \ + --jdbc.bigquery.input.driver="com.mysql.jdbc.Driver" \ + --jdbc.bigquery.input.table="test.demo" \ + --jdbc.bigquery.output.mode="overwrite" \ + --jdbc.bigquery.output.dataset="dataproc_templates" \ + --jdbc.bigquery.output.table="jdbctobq" \ + --jdbc.bigquery.temp.bucket.name="dataproc-templates/integration-testing/jdbctobq" + ''' + } + } + } stage('Cassandra TO GCS') { steps{ retry(count: stageRetryCount) { @@ -513,6 +570,35 @@ pipeline { } } } + stage('JDBC TO GCS with secret') { + steps{ + retry(count: stageRetryCount) { + sh ''' + + python3.8 -m pip install --user virtualenv + + python3.8 -m venv env + + source env/bin/activate + + export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp" + export JARS="gs://dataproc-templates/jars/mysql-connector-java-8.0.29.jar" + export SKIP_BUILD=true + + cd python + + ./bin/start.sh \ + -- --template=JDBCTOGCS \ + --jdbctogcs.input.url.secret="jdbctobqconn" \ + --jdbctogcs.input.driver="com.mysql.jdbc.Driver" \ + --jdbctogcs.input.table="test.demo" \ + --jdbctogcs.output.location="gs://dataproc-templates/integration-testing/jdbctogcs/output-python" \ + --jdbctogcs.output.mode="overwrite" \ + --jdbctogcs.output.format="csv" + ''' + } + } + } stage('GCS TO GCS') { steps{ retry(count: stageRetryCount) { @@ -583,7 +669,9 @@ pipeline { steps{ retry(count: stageRetryCount) { sh ''' - + python3.8 -m pip install --user virtualenv + + python3.8 -m venv env source env/bin/activate export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp" @@ -593,7 +681,7 @@ pipeline { cd python ./bin/start.sh \ - --container-image="gcr.io/$GCP_PROJECT/hbase-to-gcs:1.0.1" \ + --container-image="gcr.io/$GCP_PROJECT/hbase-to-gcs:1.0.2" \ --properties="spark.dataproc.driverEnv.SPARK_EXTRA_CLASSPATH=/etc/hbase/conf/" \ -- --template=HBASETOGCS \ --hbase.gcs.output.location="gs://python-dataproc-templates/hbase-gcs-output" \ @@ -622,13 +710,18 @@ pipeline { sh ''' whoami sudo su + + python3.8 -m pip install --user virtualenv + + python3.8 -m venv env + source env/bin/activate gsutil cp gs://python-dataproc-templates/surjitsh/hbase-site.xml . export GCS_STAGING_LOCATION=gs://python-dataproc-templates-temp export CATALOG='{"table":{"namespace":"default","name":"my_table"},"rowkey":"key","columns":{"key":{"cf":"rowkey","col":"key","type":"string"},"name":{"cf":"cf","col":"name","type":"string"}}}' - export IMAGE_NAME_VERSION=hbase-to-gcs:1.0.1 + export IMAGE_NAME_VERSION=hbase-to-gcs:1.0.2 export HBASE_SITE_PATH=../hbase-site.xml export IMAGE=gcr.io/${GCP_PROJECT}/${IMAGE_NAME_VERSION} export SKIP_IMAGE_BUILD=TRUE @@ -670,7 +763,7 @@ pipeline { cd python ./bin/start.sh \ - --container-image="gcr.io/$GCP_PROJECT/bt-templates-test:1.0.0" \ + --container-image="gcr.io/$GCP_PROJECT/bt-templates-test:1.0.1" \ --properties="spark.dataproc.driverEnv.SPARK_EXTRA_CLASSPATH=/etc/hbase/conf/" \ -- --template=GCSTOBIGTABLE \ --gcs.bigtable.input.format="csv" \ @@ -714,4 +807,4 @@ pipeline { } } } -} \ No newline at end of file +} diff --git a/python/dataproc_templates/gcs/Dockerfile b/python/dataproc_templates/gcs/Dockerfile index f50f906df..88de2ed08 100644 --- a/python/dataproc_templates/gcs/Dockerfile +++ b/python/dataproc_templates/gcs/Dockerfile @@ -28,6 +28,30 @@ ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/ ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*' RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" COPY hbase-site.xml /etc/hbase/conf/ +RUN chmod 777 /etc/hbase/conf/hbase-site.xml + +# (Optional) Install and configure Miniconda3. +ENV CONDA_HOME=/opt/miniconda3 +ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python +ENV PATH=${CONDA_HOME}/bin:${PATH} +COPY Miniconda3-latest-Linux-x86_64.sh . +RUN bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/miniconda3 \ + && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ + && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ + && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ + && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict + +# (Optional) Install Conda packages. +# +# The following packages are installed in the default image, it is strongly +# recommended to include all of them. +# +# Use mamba to install packages quickly. +RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \ + && ${CONDA_HOME}/bin/mamba install \ + google-cloud-bigquery \ + google-cloud-bigtable \ + google-cloud-secret-manager # (Required) Create the 'spark' group/user. # The GID and UID must be 1099. Home directory is required. diff --git a/python/dataproc_templates/gcs/README.md b/python/dataproc_templates/gcs/README.md index 2713e44c8..4d8382aaa 100644 --- a/python/dataproc_templates/gcs/README.md +++ b/python/dataproc_templates/gcs/README.md @@ -206,6 +206,7 @@ You can also check out the [differences between HBase and Cloud Bigtable](https: ``` - Build the [Dockerfile](./Dockerfile), building and pushing it to GCP Container Registry with: ``` + wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh IMAGE=gcr.io//: docker build -t "${IMAGE}" . docker push "${IMAGE}" diff --git a/python/dataproc_templates/hbase/Dockerfile b/python/dataproc_templates/hbase/Dockerfile index 5be09e51a..b1231e40d 100644 --- a/python/dataproc_templates/hbase/Dockerfile +++ b/python/dataproc_templates/hbase/Dockerfile @@ -31,6 +31,30 @@ RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" # hbase-site.xml to be altered based to set server connection details, ref-: src/main/resources/hbase-site.xml COPY ${HBASE_SITE_NAME} /etc/hbase/conf/ +RUN chmod 777 /etc/hbase/conf/${HBASE_SITE_NAME} + +# (Optional) Install and configure Miniconda3. +ENV CONDA_HOME=/opt/miniconda3 +ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python +ENV PATH=${CONDA_HOME}/bin:${PATH} +COPY Miniconda3-latest-Linux-x86_64.sh . +RUN bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/miniconda3 \ + && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ + && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ + && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ + && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict + +# (Optional) Install Conda packages. +# +# The following packages are installed in the default image, it is strongly +# recommended to include all of them. +# +# Use mamba to install packages quickly. +RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \ + && ${CONDA_HOME}/bin/mamba install \ + google-cloud-bigquery \ + google-cloud-bigtable \ + google-cloud-secret-manager # (Required) Create the 'spark' group/user. # The GID and UID must be 1099. Home directory is required. diff --git a/python/dataproc_templates/hbase/README.md b/python/dataproc_templates/hbase/README.md index 9dfc8ae47..26ab32840 100644 --- a/python/dataproc_templates/hbase/README.md +++ b/python/dataproc_templates/hbase/README.md @@ -16,6 +16,7 @@ Template for reading files from Hbase and writing to Google Cloud Storage. It su ``` - You can use and adapt the Dockerfile from the guide above, building and pushing it to GCP Container Registry with: ``` + wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh IMAGE=gcr.io//: docker build -t "${IMAGE}" . docker push "${IMAGE}" diff --git a/python/dataproc_templates/jdbc/README.md b/python/dataproc_templates/jdbc/README.md index c945f0373..e6cca6a0b 100644 --- a/python/dataproc_templates/jdbc/README.md +++ b/python/dataproc_templates/jdbc/README.md @@ -56,6 +56,8 @@ jdbc:sqlserver://:;databaseName=;user=;passwor ``` jdbc:oracle:thin:@//:/?user=&password= ``` +* **Note: +JDBC Connections now allow use of secrets created in Cloud Secret Manager. Please check the examples in respective sections.** ## Other important properties @@ -95,10 +97,14 @@ Template for reading data from JDBC table and writing them to a JDBC table. It s ## Arguments -* `jdbctojdbc.input.url`: JDBC input URL +* `jdbctojdbc.input.url`: JDBC input URL +* `jdbctojdbc.input.url.secret`: JDBC input URL secret. Pass the secret name as created in Cloud Secret Manager +> Note: Please provide only one of the above two properties (`jdbctojdbc.input.url` or `jdbctojdbc.input.url.secret`) * `jdbctojdbc.input.driver`: JDBC input driver name * `jdbctojdbc.input.table`: JDBC input table name -* `jdbctojdbc.output.url`: JDBC output url. When the JDBC target is PostgreSQL it is recommended to include the connection parameter reWriteBatchedInserts=true in the URL to provide a significant performance improvement over the default setting. +* `jdbctojdbc.output.url`: JDBC output url. When the JDBC target is PostgreSQL it is recommended to include the connection parameter reWriteBatchedInserts=true in the URL to provide a significant performance improvement over the default setting. OR provide secret name enclosed inside { } +* `jdbctojdbc.output.url.secret`: JDBC output URL secret. Pass the secret name as created in Cloud Secret Manager. +> Note: Please provide only one of the above two properties (`jdbctojdbc.output.url` or `jdbctojdbc.output.url.secret`) * `jdbctojdbc.output.driver`: JDBC output driver name * `jdbctojdbc.output.table`: JDBC output table name * `jdbctojdbc.input.partitioncolumn` (Optional): JDBC input table partition column name @@ -126,12 +132,14 @@ usage: main.py --template JDBCTOJDBC \ optional arguments: -h, --help show this help message and exit + --jdbctojdbc.input.url.secret JDBCTOJDBC.INPUT.URL.SECRET \ --jdbctojdbc.input.partitioncolumn JDBCTOJDBC.INPUT.PARTITIONCOLUMN \ --jdbctojdbc.input.lowerbound JDBCTOJDBC.INPUT.LOWERBOUND \ --jdbctojdbc.input.upperbound JDBCTOJDBC.INPUT.UPPERBOUND \ --jdbctojdbc.numpartitions JDBCTOJDBC.NUMPARTITIONS \ --jdbctojdbc.input.fetchsize JDBCTOJDBC.INPUT.FETCHSIZE \ --jdbctojdbc.input.sessioninitstatement JDBCTOJDBC.INPUT.SESSIONINITSTATEMENT \ + --jdbctojdbc.output.url.secret JDBCTOJDBC.OUTPUT.URL.SECRET \ --jdbctojdbc.output.create_table.option JDBCTOJDBC.OUTPUT.CREATE_TABLE.OPTION \ --jdbctojdbc.output.mode {overwrite,append,ignore,errorifexists} \ --jdbctojdbc.output.batch.size JDBCTOJDBC.OUTPUT.BATCH.SIZE \ @@ -174,6 +182,26 @@ export JARS="/mysql-connector-java-8.0.29.jar, \ --jdbctojdbc.output.mode= \ --jdbctojdbc.output.batch.size= + +WITH SECRET + +./bin/start.sh \ +-- --template=JDBCTOJDBC \ +--jdbctojdbc.input.url.secret="" \ +--jdbctojdbc.input.driver= \ +--jdbctojdbc.input.table= \ +--jdbctojdbc.input.partitioncolumn= \ +--jdbctojdbc.input.lowerbound= \ +--jdbctojdbc.input.upperbound= \ +--jdbctojdbc.numpartitions= \ +--jdbctojdbc.input.fetchsize= \ +--jdbctojdbc.input.sessioninitstatement= \ +--jdbctojdbc.output.url.secret="" \ +--jdbctojdbc.output.driver= \ +--jdbctojdbc.output.table= \ +--jdbctojdbc.output.create_table.option= \ +--jdbctojdbc.output.mode= \ +--jdbctojdbc.output.batch.size= ``` ## Example execution: @@ -202,6 +230,24 @@ export JARS="gs://my-gcp-proj/jars/mysql-connector-java-8.0.29.jar,gs://my-gcp-p --jdbctojdbc.output.create_table.option="PARTITION BY RANGE(id) (PARTITION p0 VALUES LESS THAN (5),PARTITION p1 VALUES LESS THAN (10),PARTITION p2 VALUES LESS THAN (15),PARTITION p3 VALUES LESS THAN MAXVALUE)" \ --jdbctojdbc.output.mode="overwrite" \ --jdbctojdbc.output.batch.size="1000" + +WITH SECRET + +./bin/start.sh \ +-- --template=JDBCTOJDBC \ +--jdbctojdbc.input.url.secret="jdbctojdbcurlsecret" \ +--jdbctojdbc.input.driver="com.mysql.cj.jdbc.Driver" \ +--jdbctojdbc.input.table="(select * from employees where id <10) as employees" \ +--jdbctojdbc.input.partitioncolumn=id \ +--jdbctojdbc.input.lowerbound="1" \ +--jdbctojdbc.input.upperbound="10" \ +--jdbctojdbc.numpartitions="4" \ +--jdbctojdbc.output.url.secret="jdbctojdbcoutputurlsecret" \ +--jdbctojdbc.output.driver="com.mysql.cj.jdbc.Driver" \ +--jdbctojdbc.output.table="employees_out" \ +--jdbctojdbc.output.create_table.option="PARTITION BY RANGE(id) (PARTITION p0 VALUES LESS THAN (5),PARTITION p1 VALUES LESS THAN (10),PARTITION p2 VALUES LESS THAN (15),PARTITION p3 VALUES LESS THAN MAXVALUE)" \ +--jdbctojdbc.output.mode="overwrite" \ +--jdbctojdbc.output.batch.size="1000" ``` * PostgreSQL to PostgreSQL @@ -315,7 +361,9 @@ The only thing needs to keep in mind is that, the name of the Spark temporary vi Template for reading data from JDBC table and writing into files in Google Cloud Storage. It supports reading partition tabels and supports writing in JSON, CSV, Parquet and Avro formats. ## Arguments -* `jdbctogcs.input.url`: JDBC input URL +* `jdbctogcs.input.url`: JDBC input URL +* `jdbctogcs.input.url.secret`: JDBC input URL secret. Pass the secret name as created in Cloud Secret Manager. +> Note: Please provide only one of the above two properties (`jdbctogcs.input.url` or `jdbctogcs.input.url.secret`) * `jdbctogcs.input.driver`: JDBC input driver name * `jdbctogcs.input.table`: JDBC input table name * `jdbctogcs.input.sql.query`: JDBC input SQL query @@ -391,6 +439,8 @@ options: -h, --help show this help message and exit --jdbctogcs.input.url JDBCTOGCS.INPUT.URL JDBC input URL + --jdbctogcs.input.url.secret JDBCTOGCS.INPUT.URL.SECRET + JDBC input URL secret. Pass the secret name as created in Cloud Secret Manager. --jdbctogcs.input.driver JDBCTOGCS.INPUT.DRIVER JDBC input driver name --jdbctogcs.input.table JDBCTOGCS.INPUT.TABLE @@ -467,7 +517,25 @@ export JARS="/mysql-connector-java-8.0.29.jar, \ +--jdbctogcs.input.table= \ +--jdbctogcs.input.partitioncolumn= \ +--jdbctogcs.input.lowerbound= \ +--jdbctogcs.input.upperbound= \ +--jdbctogcs.numpartitions= \ +--jdbctogcs.input.fetchsize= \ +--jdbctogcs.input.sessioninitstatement= \ +--jdbctogcs.output.location= \ +--jdbctogcs.output.mode= \ +--jdbctogcs.output.format= \ +--jdbctogcs.output.partitioncolumn= + +WITH SECRET + +./bin/start.sh \ +-- --template=JDBCTOGCS \ +--jdbctogcs.input.url.secret="" \ --jdbctogcs.input.driver= \ --jdbctogcs.input.table= \ --jdbctogcs.input.partitioncolumn= \ @@ -512,6 +580,22 @@ export JARS="gs://my-gcp-proj/jars/mysql-connector-java-8.0.29.jar,gs://my-gcp-p --jdbctogcs.output.mode="overwrite" \ --jdbctogcs.output.format="csv" \ --jdbctogcs.output.partitioncolumn="department_id" + +WITH SECRET + +./bin/start.sh \ +-- --template=JDBCTOGCS \ +--jdbctogcs.input.url.secret="jdbctogcsinputurl" \ +--jdbctogcs.input.driver="com.mysql.cj.jdbc.Driver" \ +--jdbctogcs.input.table="(select * from employees where id <10) as employees" \ +--jdbctogcs.input.partitioncolumn="id" \ +--jdbctogcs.input.lowerbound="11" \ +--jdbctogcs.input.upperbound="20" \ +--jdbctogcs.numpartitions="4" \ +--jdbctogcs.output.location="gs://output_bucket/output/" \ +--jdbctogcs.output.mode="overwrite" \ +--jdbctogcs.output.format="csv" \ +--jdbctogcs.output.partitioncolumn="department_id" ``` * PostgreSQL to Cloud Storage @@ -587,6 +671,8 @@ This template requires the JBDC jar files mentioned, and also the [Spark BigQuer ## Arguments * `jdbc.bigquery.input.url`: JDBC input URL +* `jdbc.bigquery.input.url.secret`: JDBC input URL secret. Pass the secret name as created in Cloud Secret Manager. +> Note: Please provide only one of the above two properties (`jdbc.bigquery.input.url` or `jdbc.bigquery.input.url.secret`) * `jdbc.bigquery.input.driver`: JDBC input driver name * `jdbc.bigquery.input.table`: JDBC input table name * `jdbc.bigquery.input.partitioncolumn` (Optional): JDBC input table partition column name @@ -622,6 +708,7 @@ usage: main.py [-h] --jdbc.bigquery.output.dataset optional arguments: -h, --help show this help message and exit + --jdbc.bigquery.input.url.secret JDBC.BIGQUERY.INPUT.URL.SECRET --jdbc.bigquery.output.dataset JDBC.BIGQUERY.OUTPUT.DATASET BigQuery dataset for the output table --jdbc.bigquery.output.table JDBC.BIGQUERY.OUTPUT.TABLE @@ -671,7 +758,19 @@ export JARS="/mysql-connector-java-8.0.29.jar, Dict[str, Any]: required=True, help='Spark BigQuery connector temporary bucket' ) - parser.add_argument( + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( f'--{constants.JDBC_BQ_INPUT_URL}', dest=constants.JDBC_BQ_INPUT_URL, - required=True, + required=False, + default="", help='JDBC input URL' ) + group.add_argument( + f'--{constants.JDBC_BQ_INPUT_URL_SECRET}', + dest=constants.JDBC_BQ_INPUT_URL_SECRET, + required=False, + default="", + help='JDBC input URL secret name' + ) parser.add_argument( f'--{constants.JDBC_BQ_INPUT_DRIVER}', dest=constants.JDBC_BQ_INPUT_DRIVER, @@ -144,7 +155,13 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: big_query_dataset: str = args[constants.JDBC_BQ_OUTPUT_DATASET] big_query_table: str = args[constants.JDBC_BQ_OUTPUT_TABLE] bq_temp_bucket: str = args[constants.JDBC_BQ_LD_TEMP_BUCKET_NAME] - input_jdbc_url: str = args[constants.JDBC_BQ_INPUT_URL] + + #check if secret is passed or the connection string in the agruments + if str(args[constants.JDBC_BQ_INPUT_URL])=="": + input_jdbc_url: str = secret_manager.access_secret_version(args[constants.JDBC_BQ_INPUT_URL_SECRET]) + else: + input_jdbc_url: str = args[constants.JDBC_BQ_INPUT_URL] + input_jdbc_driver: str = args[constants.JDBC_BQ_INPUT_DRIVER] input_jdbc_table: str = args[constants.JDBC_BQ_INPUT_TABLE] input_jdbc_partitioncolumn: str = args[constants.JDBC_BQ_INPUT_PARTITIONCOLUMN] diff --git a/python/dataproc_templates/jdbc/jdbc_to_gcs.py b/python/dataproc_templates/jdbc/jdbc_to_gcs.py index 709dd8231..28bcc5bea 100644 --- a/python/dataproc_templates/jdbc/jdbc_to_gcs.py +++ b/python/dataproc_templates/jdbc/jdbc_to_gcs.py @@ -23,6 +23,7 @@ from dataproc_templates.util.argument_parsing import add_spark_options from dataproc_templates.util.dataframe_writer_wrappers import persist_dataframe_to_cloud_storage import dataproc_templates.util.template_constants as constants +import dataproc_templates.util.secret_manager as secret_manager __all__ = ['JDBCToGCSTemplate'] @@ -37,12 +38,21 @@ class JDBCToGCSTemplate(BaseTemplate): def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: parser: argparse.ArgumentParser = argparse.ArgumentParser() - parser.add_argument( + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( f'--{constants.JDBCTOGCS_INPUT_URL}', dest=constants.JDBCTOGCS_INPUT_URL, - required=True, + required=False, + default="", help='JDBC input URL' ) + group.add_argument( + f'--{constants.JDBCTOGCS_INPUT_URL_SECRET}', + dest=constants.JDBCTOGCS_INPUT_URL_SECRET, + required=False, + default="", + help='JDBC input URL secret name' + ) parser.add_argument( f'--{constants.JDBCTOGCS_INPUT_DRIVER}', dest=constants.JDBCTOGCS_INPUT_DRIVER, @@ -178,7 +188,14 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: logger: Logger = self.get_logger(spark=spark) # Arguments - input_jdbc_url: str = args[constants.JDBCTOGCS_INPUT_URL] + #check if secret is passed or the connection string in URL + #check if secret is passed or the connection string in the agruments + if str(args[constants.JDBCTOGCS_INPUT_URL])=="": + input_jdbc_url: str = secret_manager.access_secret_version(args[constants.JDBCTOGCS_INPUT_URL_SECRET]) + else: + input_jdbc_url: str = args[constants.JDBCTOGCS_INPUT_URL] + + input_jdbc_driver: str = args[constants.JDBCTOGCS_INPUT_DRIVER] input_jdbc_table: str = args[constants.JDBCTOGCS_INPUT_TABLE] input_jdbc_sql_query: str = args[constants.JDBCTOGCS_INPUT_SQL_QUERY] diff --git a/python/dataproc_templates/jdbc/jdbc_to_jdbc.py b/python/dataproc_templates/jdbc/jdbc_to_jdbc.py index 6362b85f0..e522c78ac 100644 --- a/python/dataproc_templates/jdbc/jdbc_to_jdbc.py +++ b/python/dataproc_templates/jdbc/jdbc_to_jdbc.py @@ -21,6 +21,7 @@ from dataproc_templates import BaseTemplate import dataproc_templates.util.template_constants as constants +import dataproc_templates.util.secret_manager as secret_manager __all__ = ['JDBCToJDBCTemplate'] @@ -34,12 +35,22 @@ class JDBCToJDBCTemplate(BaseTemplate): def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: parser: argparse.ArgumentParser = argparse.ArgumentParser() - parser.add_argument( + groupinput = parser.add_mutually_exclusive_group(required=True) + groupinput.add_argument( f'--{constants.JDBCTOJDBC_INPUT_URL}', dest=constants.JDBCTOJDBC_INPUT_URL, - required=True, + required=False, + default="", help='JDBC input URL' ) + groupinput.add_argument( + f'--{constants.JDBCTOJDBC_INPUT_URL_SECRET}', + dest=constants.JDBCTOJDBC_INPUT_URL_SECRET, + required=False, + default="", + help='JDBC input URL secret name' + ) + parser.add_argument( f'--{constants.JDBCTOJDBC_INPUT_DRIVER}', dest=constants.JDBCTOJDBC_INPUT_DRIVER, @@ -95,12 +106,23 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: default="", help='Custom SQL statement to execute in each reader database session' ) - parser.add_argument( + + groupoutput = parser.add_mutually_exclusive_group(required=True) + groupoutput.add_argument( f'--{constants.JDBCTOJDBC_OUTPUT_URL}', dest=constants.JDBCTOJDBC_OUTPUT_URL, - required=True, - help='JDBC output URL' + required=False, + default="", + help='JDBC input URL' + ) + groupoutput.add_argument( + f'--{constants.JDBCTOJDBC_OUTPUT_URL_SECRET}', + dest=constants.JDBCTOJDBC_OUTPUT_URL_SECRET, + required=False, + default="", + help='JDBC input URL secret name' ) + parser.add_argument( f'--{constants.JDBCTOJDBC_OUTPUT_DRIVER}', dest=constants.JDBCTOJDBC_OUTPUT_DRIVER, @@ -172,7 +194,12 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: logger: Logger = self.get_logger(spark=spark) # Arguments - input_jdbc_url: str = args[constants.JDBCTOJDBC_INPUT_URL] + #check if secret is passed or the connection string in URL + if str(args[constants.JDBCTOJDBC_INPUT_URL])=="": + input_jdbc_url: str = secret_manager.access_secret_version(args[constants.JDBCTOJDBC_INPUT_URL_SECRET]) + else: + input_jdbc_url: str = args[constants.JDBCTOJDBC_INPUT_URL] + input_jdbc_driver: str = args[constants.JDBCTOJDBC_INPUT_DRIVER] input_jdbc_table: str = args[constants.JDBCTOJDBC_INPUT_TABLE] input_jdbc_partitioncolumn: str = args[constants.JDBCTOJDBC_INPUT_PARTITIONCOLUMN] @@ -181,7 +208,14 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: jdbc_numpartitions: str = args[constants.JDBCTOJDBC_NUMPARTITIONS] input_jdbc_fetchsize: int = args[constants.JDBCTOJDBC_INPUT_FETCHSIZE] input_jdbc_sessioninitstatement: str = args[constants.JDBCTOJDBC_SESSIONINITSTATEMENT] - output_jdbc_url: str = args[constants.JDBCTOJDBC_OUTPUT_URL] + + #check if secret is passed or the connection string in URL + if str(args[constants.JDBCTOJDBC_OUTPUT_URL])=="": + output_jdbc_url: str = secret_manager.access_secret_version(args[constants.JDBCTOJDBC_OUTPUT_URL_SECRET]) + else: + output_jdbc_url: str = args[constants.JDBCTOJDBC_OUTPUT_URL] + + output_jdbc_driver: str = args[constants.JDBCTOJDBC_OUTPUT_DRIVER] output_jdbc_table: str = args[constants.JDBCTOJDBC_OUTPUT_TABLE] output_jdbc_create_table_option: str = args[constants.JDBCTOJDBC_OUTPUT_CREATE_TABLE_OPTION] diff --git a/python/dataproc_templates/util/secret_manager.py b/python/dataproc_templates/util/secret_manager.py new file mode 100644 index 000000000..3c927f26d --- /dev/null +++ b/python/dataproc_templates/util/secret_manager.py @@ -0,0 +1,42 @@ +from google.cloud import secretmanager_v1 as secretmanager +import google.auth +import re + + +def access_secret_version(secret_id, version_id="latest"): + """ + Get secret from the secret manager + + Args: + secret_id: secret name + version_id: latest(default) + + Returns: + Secret value + + """ + project_id: str + _, project_id = google.auth.default() + + # Create the Secret Manager client. + client = secretmanager.SecretManagerServiceClient() + + if validate_secret(secret_id): + # Build the resource name of the secret version. + name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}" + else: + raise Exception("Invalid secret name. Secret name should not contain any other special symbol except - or _") + + # Access the secret version. + response = client.access_secret_version(name=name) + + # Return the decoded payload. + return response.payload.data.decode('UTF-8') + + +def validate_secret(secret_id): + valid_secret = True + regexp = re.compile('[^0-9a-zA-Z_-]+') + if regexp.search(secret_id): + valid_secret = False + return valid_secret diff --git a/python/dataproc_templates/util/template_constants.py b/python/dataproc_templates/util/template_constants.py index d7ab397c7..fb2df9d9f 100644 --- a/python/dataproc_templates/util/template_constants.py +++ b/python/dataproc_templates/util/template_constants.py @@ -384,6 +384,7 @@ def get_csv_output_spark_options(prefix): # JDBC to JDBC JDBCTOJDBC_INPUT_URL = "jdbctojdbc.input.url" +JDBCTOJDBC_INPUT_URL_SECRET = "jdbctojdbc.input.url.secret" JDBCTOJDBC_INPUT_DRIVER = "jdbctojdbc.input.driver" JDBCTOJDBC_INPUT_TABLE = "jdbctojdbc.input.table" JDBCTOJDBC_INPUT_FETCHSIZE = "jdbctojdbc.input.fetchsize" @@ -393,6 +394,7 @@ def get_csv_output_spark_options(prefix): JDBCTOJDBC_SESSIONINITSTATEMENT = "jdbctojdbc.input.sessioninitstatement" JDBCTOJDBC_NUMPARTITIONS = "jdbctojdbc.numpartitions" JDBCTOJDBC_OUTPUT_URL = "jdbctojdbc.output.url" +JDBCTOJDBC_OUTPUT_URL_SECRET = "jdbctojdbc.output.url.secret" JDBCTOJDBC_OUTPUT_DRIVER = "jdbctojdbc.output.driver" JDBCTOJDBC_OUTPUT_TABLE = "jdbctojdbc.output.table" JDBCTOJDBC_OUTPUT_CREATE_TABLE_OPTION = "jdbctojdbc.output.create_table.option" @@ -403,6 +405,7 @@ def get_csv_output_spark_options(prefix): # JDBC to GCS JDBCTOGCS_INPUT_URL = "jdbctogcs.input.url" +JDBCTOGCS_INPUT_URL_SECRET = "jdbctogcs.input.url.secret" JDBCTOGCS_INPUT_DRIVER = "jdbctogcs.input.driver" JDBCTOGCS_INPUT_TABLE = "jdbctogcs.input.table" JDBCTOGCS_INPUT_SQL_QUERY = "jdbctogcs.input.sql.query" @@ -421,6 +424,7 @@ def get_csv_output_spark_options(prefix): # JDBC to BigQuery JDBC_BQ_INPUT_URL = "jdbc.bigquery.input.url" +JDBC_BQ_INPUT_URL_SECRET = "jdbc.bigquery.input.url.secret" JDBC_BQ_INPUT_DRIVER = "jdbc.bigquery.input.driver" JDBC_BQ_INPUT_TABLE = "jdbc.bigquery.input.table" JDBC_BQ_INPUT_FETCHSIZE = "jdbc.bigquery.input.fetchsize" diff --git a/python/requirements.txt b/python/requirements.txt index bee86138c..580107660 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -5,4 +5,5 @@ pytest>=7.1.1 mock>=4.0.3 pytest-cov>=3.0.0 coverage>=6.3.2 -google-cloud-bigtable>=2.17.0 \ No newline at end of file +google-cloud-bigtable>=2.17.0 +google-cloud-secret-manager>=2.19.0 \ No newline at end of file diff --git a/python/setup.py b/python/setup.py index 5f2f284ca..e047ae381 100644 --- a/python/setup.py +++ b/python/setup.py @@ -76,7 +76,8 @@ def run(self): dependencies = [ "pyspark>=3.2.0", - "google-cloud-bigquery>=3.4.0" + "google-cloud-bigquery>=3.4.0", + "google-cloud-secret-manager>=2.19.0" ] diff --git a/python/test/jdbc/test_jdbc_to_bigquery.py b/python/test/jdbc/test_jdbc_to_bigquery.py index 06206ac03..f98e09fb1 100644 --- a/python/test/jdbc/test_jdbc_to_bigquery.py +++ b/python/test/jdbc/test_jdbc_to_bigquery.py @@ -19,6 +19,7 @@ from dataproc_templates.jdbc.jdbc_to_bigquery import JDBCToBigQueryTemplate import dataproc_templates.util.template_constants as constants +import dataproc_templates.util.secret_manager as secret_manager class TestJDBCToBigQueryTemplate: @@ -172,3 +173,43 @@ def test_run_pass_args4(self, mock_spark_session): ).option().option().mode.assert_called_once_with(constants.OUTPUT_MODE_APPEND) mock_spark_session.dataframe.DataFrame.write.format( ).option().option().mode().save.assert_called_once() + + + @mock.patch.object(pyspark.sql, 'SparkSession') + def test_run_pass_args5(self, mock_spark_session): + """Tests JDBCToBigQueryTemplate pass args without partition column with secret""" + + jdbc_to_bigquery_template = JDBCToBigQueryTemplate() + + mock_parsed_args = jdbc_to_bigquery_template.parse_args( + ["--jdbc.bigquery.input.url.secret=jdbctobqconn", + "--jdbc.bigquery.input.driver=driver", + "--jdbc.bigquery.input.table=table1", + "--jdbc.bigquery.input.fetchsize=100", + "--jdbc.bigquery.input.sessioninitstatement=EXEC some_setup_sql('data4')", + "--jdbc.bigquery.output.mode=append", + "--jdbc.bigquery.output.dataset=bq-dataset", + "--jdbc.bigquery.output.table=bq-table", + "--jdbc.bigquery.temp.bucket.name=bucket-name", + ]) + mock_spark_session.read.format().options().load.return_value = mock_spark_session.dataframe.DataFrame + jdbc_to_bigquery_template.run(mock_spark_session, mock_parsed_args) + mock_spark_session.read.format.assert_called_with(constants.FORMAT_JDBC) + _, kwargs = mock_spark_session.read.format().options.call_args + assert (constants.JDBC_URL, secret_manager.access_secret_version("jdbctobqconn")) in kwargs.items() + assert (constants.JDBC_DRIVER, "driver") in kwargs.items() + assert (constants.JDBC_TABLE, "table1") in kwargs.items() + assert (constants.JDBC_NUMPARTITIONS, "10") in kwargs.items() + assert (constants.JDBC_FETCHSIZE, 100) in kwargs.items() + assert (constants.JDBC_SESSIONINITSTATEMENT, "EXEC some_setup_sql('data4')") in kwargs.items() + mock_spark_session.read.format().options().load() + mock_spark_session.dataframe.DataFrame.write.format.assert_called_once_with( + constants.FORMAT_BIGQUERY) + mock_spark_session.dataframe.DataFrame.write.format( + ).option.assert_called_once_with(constants.TABLE, "bq-dataset.bq-table") + mock_spark_session.dataframe.DataFrame.write.format().option( + ).option.assert_called_once_with(constants.JDBC_BQ_TEMP_BUCKET, "bucket-name") + mock_spark_session.dataframe.DataFrame.write.format( + ).option().option().mode.assert_called_once_with(constants.OUTPUT_MODE_APPEND) + mock_spark_session.dataframe.DataFrame.write.format( + ).option().option().mode().save.assert_called_once() diff --git a/python/test/jdbc/test_jdbc_to_gcs.py b/python/test/jdbc/test_jdbc_to_gcs.py index 6459508b5..0a1d06785 100644 --- a/python/test/jdbc/test_jdbc_to_gcs.py +++ b/python/test/jdbc/test_jdbc_to_gcs.py @@ -19,6 +19,7 @@ from dataproc_templates.jdbc.jdbc_to_gcs import JDBCToGCSTemplate import dataproc_templates.util.template_constants as constants +import dataproc_templates.util.secret_manager as secret_manager class TestJDBCToGCSTemplate: @@ -268,3 +269,37 @@ def test_run_pass_args7(self, mock_spark_session): mock_spark_session.dataframe.DataFrame.write.mode().partitionBy.assert_called_once_with("column") mock_spark_session.dataframe.DataFrame.write.mode().partitionBy().options.assert_called_once_with(**{constants.CSV_HEADER: 'true'}) mock_spark_session.dataframe.DataFrame.write.mode().partitionBy().options().csv.assert_called_once_with("gs://test") + + + @mock.patch.object(pyspark.sql, 'SparkSession') + def test_run_pass_args8(self, mock_spark_session): + """Tests JDBCToGCSTemplate pass args with secret""" + + jdbc_to_gcs_template = JDBCToGCSTemplate() + + mock_parsed_args = jdbc_to_gcs_template.parse_args( + ["--jdbctogcs.input.url.secret=jdbctobqconn", + "--jdbctogcs.input.driver=driver", + "--jdbctogcs.input.table=table1", + "--jdbctogcs.output.location=gs://test", + "--jdbctogcs.output.format=csv", + "--jdbctogcs.output.mode=append", + "--jdbctogcs.output.partitioncolumn=column" + ]) + mock_spark_session.read.format().options().load.return_value = mock_spark_session.dataframe.DataFrame + jdbc_to_gcs_template.run(mock_spark_session, mock_parsed_args) + mock_spark_session.read.format.assert_called_with( + constants.FORMAT_JDBC) + mock_spark_session.read.format().options.assert_called_with(**{ + constants.JDBC_URL: secret_manager.access_secret_version("jdbctobqconn"), + constants.JDBC_DRIVER: "driver", + constants.JDBC_TABLE: "table1", + constants.JDBC_NUMPARTITIONS: "10", + constants.JDBC_FETCHSIZE: 0 + }) + mock_spark_session.read.format().options().load() + mock_spark_session.dataframe.DataFrame.write.mode.assert_called_once_with(constants.OUTPUT_MODE_APPEND) + mock_spark_session.dataframe.DataFrame.write.mode().partitionBy.assert_called_once_with("column") + mock_spark_session.dataframe.DataFrame.write.mode().partitionBy().options.assert_called_once_with(**{constants.CSV_HEADER: 'true'}) + mock_spark_session.dataframe.DataFrame.write.mode().partitionBy().options().csv.assert_called_once_with("gs://test") + diff --git a/python/test/jdbc/test_jdbc_to_jdbc.py b/python/test/jdbc/test_jdbc_to_jdbc.py index 6538943d6..aa51024db 100644 --- a/python/test/jdbc/test_jdbc_to_jdbc.py +++ b/python/test/jdbc/test_jdbc_to_jdbc.py @@ -19,6 +19,7 @@ from dataproc_templates.jdbc.jdbc_to_jdbc import JDBCToJDBCTemplate import dataproc_templates.util.template_constants as constants +import dataproc_templates.util.secret_manager as secret_manager class TestJDBCToJDBCTemplate: @@ -191,3 +192,40 @@ def test_run_pass_args3(self, mock_spark_session): mock_spark_session.dataframe.DataFrame.write.format().option().option().option().option().option().option().mode.assert_called_once_with(constants.OUTPUT_MODE_APPEND) mock_spark_session.dataframe.DataFrame.write.format().option().option().option().option().option().option().mode().save.assert_called_once() + + @mock.patch.object(pyspark.sql, 'SparkSession') + def test_run_pass_args4(self, mock_spark_session): + """Tests JDBCToJDBCTemplate pass args with secret""" + + jdbc_to_jdbc_template = JDBCToJDBCTemplate() + + mock_parsed_args = jdbc_to_jdbc_template.parse_args( + ["--jdbctojdbc.input.url.secret=jdbctobqconn", + "--jdbctojdbc.input.driver=driver", + "--jdbctojdbc.input.table=table1", + "--jdbctojdbc.input.fetchsize=20", + "--jdbctojdbc.output.url.secret=jdbctobqconn", + "--jdbctojdbc.output.driver=driver", + "--jdbctojdbc.output.table=table2" + ]) + mock_spark_session.read.format().options().load.return_value = mock_spark_session.dataframe.DataFrame + jdbc_to_jdbc_template.run(mock_spark_session, mock_parsed_args) + mock_spark_session.read.format.assert_called_with(constants.FORMAT_JDBC) + _, kwargs = mock_spark_session.read.format().options.call_args + assert (constants.JDBC_URL, secret_manager.access_secret_version("jdbctobqconn")) in kwargs.items() + assert (constants.JDBC_DRIVER, "driver") in kwargs.items() + assert (constants.JDBC_TABLE, "table1") in kwargs.items() + assert (constants.JDBC_NUMPARTITIONS, "10") in kwargs.items() + assert (constants.JDBC_FETCHSIZE, 20) in kwargs.items() + assert constants.JDBC_SESSIONINITSTATEMENT not in kwargs + mock_spark_session.read.format().options().load() + mock_spark_session.dataframe.DataFrame.write.format.assert_called_once_with(constants.FORMAT_JDBC) + mock_spark_session.dataframe.DataFrame.write.format().option.assert_called_once_with(constants.JDBC_URL, secret_manager.access_secret_version("jdbctobqconn")) + mock_spark_session.dataframe.DataFrame.write.format().option().option.assert_called_once_with(constants.JDBC_DRIVER, "driver") + mock_spark_session.dataframe.DataFrame.write.format().option().option().option.assert_called_once_with(constants.JDBC_TABLE, "table2") + mock_spark_session.dataframe.DataFrame.write.format().option().option().option().option.assert_called_once_with(constants.JDBC_CREATE_TABLE_OPTIONS, "") + mock_spark_session.dataframe.DataFrame.write.format().option().option().option().option().option.assert_called_once_with(constants.JDBC_BATCH_SIZE, "1000") + mock_spark_session.dataframe.DataFrame.write.format().option().option().option().option().option().option.assert_called_once_with(constants.JDBC_NUMPARTITIONS, "10") + mock_spark_session.dataframe.DataFrame.write.format().option().option().option().option().option().option().mode.assert_called_once_with(constants.OUTPUT_MODE_APPEND) + mock_spark_session.dataframe.DataFrame.write.format().option().option().option().option().option().option().mode().save.assert_called_once() +