From 79028e6669da2798eb82713bf7e5725cf55659a8 Mon Sep 17 00:00:00 2001 From: Vivek Jilla Date: Wed, 31 Jul 2024 17:36:52 +0530 Subject: [PATCH] pythonv2 samples --- .gitignore | 3 +- samples/python-v2/function_app.py | 12 ++++ samples/python-v2/host.json | 19 ++++++ samples/python-v2/kafka_output.py | 38 ++++++++++++ samples/python-v2/kafka_trigger.py | 77 ++++++++++++++++++++++++ samples/python-v2/kafka_trigger_avro.py | 37 ++++++++++++ samples/python-v2/kafka_trigger_retry.py | 38 ++++++++++++ samples/python-v2/requirements.txt | 5 ++ 8 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 samples/python-v2/function_app.py create mode 100644 samples/python-v2/host.json create mode 100644 samples/python-v2/kafka_output.py create mode 100644 samples/python-v2/kafka_trigger.py create mode 100644 samples/python-v2/kafka_trigger_avro.py create mode 100644 samples/python-v2/kafka_trigger_retry.py create mode 100644 samples/python-v2/requirements.txt diff --git a/.gitignore b/.gitignore index 2ec48a61..2a77582f 100644 --- a/.gitignore +++ b/.gitignore @@ -288,4 +288,5 @@ local.settings.json.org samples/dotnet/KafkaFunctionSample/Properties/ temp -packages-microsoft-prod.deb \ No newline at end of file +packages-microsoft-prod.deb +.venv \ No newline at end of file diff --git a/samples/python-v2/function_app.py b/samples/python-v2/function_app.py new file mode 100644 index 00000000..f8aa9d9a --- /dev/null +++ b/samples/python-v2/function_app.py @@ -0,0 +1,12 @@ +import azure.functions as func + +from kafka_trigger_avro import KafkaTriggerAvro +from kafka_output import KafkaOutput +from kafka_trigger import KafkaTrigger +from kafka_trigger_retry import KafkaTriggerRetry + +app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) +app.register_blueprint(KafkaTriggerAvro) +app.register_blueprint(KafkaOutput) +app.register_blueprint(KafkaTrigger) +app.register_blueprint(KafkaTriggerRetry) \ No newline at end of file diff --git a/samples/python-v2/host.json b/samples/python-v2/host.json new file mode 100644 index 00000000..84912631 --- /dev/null +++ b/samples/python-v2/host.json @@ -0,0 +1,19 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + }, + "console": { + "isEnabled": true, + "DisableColors": false + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/python-v2/kafka_output.py b/samples/python-v2/kafka_output.py new file mode 100644 index 00000000..a929385e --- /dev/null +++ b/samples/python-v2/kafka_output.py @@ -0,0 +1,38 @@ +import azure.functions as func +import json + +KafkaOutput = func.Blueprint() + +@KafkaOutput.function_name(name="KafkaOutput") +@KafkaOutput.route(route="kafka_output") +@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain") +def kafka_output(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse: + input_msg = req.params.get('message') + outputMessage.set(input_msg) + return 'OK' + + +@KafkaOutput.function_name(name="KafkaOutputMany") +@KafkaOutput.route(route="kafka_output_many") +@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string") +def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse: + outputMessage.set(json.dumps(['one', 'two'])) + return 'OK' + +@KafkaOutput.function_name(name="KafkaOutputWithHeaders") +@KafkaOutput.route(route="kafka_output_with_headers") +@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain") +def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse: + message = req.params.get('message') + kevent = { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] } + out.set(json.dumps(kevent)) + return 'OK' + +@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders") +@KafkaOutput.route(route="kafka_output_many_with_headers") +@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain") +def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse: + kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }] }, + { "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "two", "Headers": [{ "Key": "test", "Value": "python" }] }] + out.set(json.dumps(kevent)) + return 'OK' \ No newline at end of file diff --git a/samples/python-v2/kafka_trigger.py b/samples/python-v2/kafka_trigger.py new file mode 100644 index 00000000..a4378408 --- /dev/null +++ b/samples/python-v2/kafka_trigger.py @@ -0,0 +1,77 @@ +import logging +import typing +import azure.functions as func +import json +import base64 + +KafkaTrigger = func.Blueprint() + + +@KafkaTrigger.function_name(name="KafkaTrigger") +@KafkaTrigger.kafka_trigger( + arg_name="kevent", + topic="KafkaTopic", + broker_list="KafkaBrokerList", + username="KafkaUsername", + password="KafkaPassword", + protocol="SaslSsl", + authentication_mode="Plain", + consumer_group="$Default1") +def kafka_trigger(kevent : func.KafkaEvent): + logging.info(kevent.get_body().decode('utf-8')) + logging.info(kevent.metadata) + +@KafkaTrigger.function_name(name="KafkaTriggerMany") +@KafkaTrigger.kafka_trigger( + arg_name="kevents", + topic="KafkaTopic", + broker_list="KafkaBrokerList", + username="KafkaUsername", + password="KafkaPassword", + protocol="SaslSsl", + authentication_mode="Plain", + cardinality="MANY", + data_type="string", + consumer_group="$Default2") +def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]): + for event in kevents: + logging.info(event.get_body()) + +@KafkaTrigger.function_name(name="KafkaTriggerWithHeaders") +@KafkaTrigger.kafka_trigger( + arg_name="kevent", + topic="KafkaTopic", + broker_list="KafkaBrokerList", + username="KafkaUsername", + password="KafkaPassword", + protocol="SaslSsl", + authentication_mode="Plain", + data_type="string", + consumer_group="$Default3") +def kafka_trigger_with_headers(kevent : func.KafkaEvent): + logging.info("Python Kafka trigger function called for message " + kevent.metadata["Value"]) + headers = json.loads(kevent.metadata["Headers"]) + for header in headers: + logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii'))) + + +@KafkaTrigger.function_name(name="KafkaTriggerManyWithHeaders") +@KafkaTrigger.kafka_trigger( + arg_name="kevents", + topic="KafkaTopic", + broker_list="KafkaBrokerList", + username="KafkaUsername", + password="KafkaPassword", + protocol="SaslSsl", + authentication_mode="Plain", + cardinality="MANY", + data_type="string", + consumer_group="$Default4") +def kafka_trigger_many_with_headers(kevents : typing.List[func.KafkaEvent]): + for event in kevents: + event_dec = event.get_body().decode('utf-8') + event_json = json.loads(event_dec) + logging.info("Python Kafka trigger function called for message " + event_json["Value"]) + headers = event_json["Headers"] + for header in headers: + logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii'))) diff --git a/samples/python-v2/kafka_trigger_avro.py b/samples/python-v2/kafka_trigger_avro.py new file mode 100644 index 00000000..7e4f1cf8 --- /dev/null +++ b/samples/python-v2/kafka_trigger_avro.py @@ -0,0 +1,37 @@ +import logging +import typing +import azure.functions as func + +KafkaTriggerAvro = func.Blueprint() + +@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroMany") +@KafkaTriggerAvro.kafka_trigger( + arg_name="kafkaAvroGenericTriggerMany", + topic="KafkaTopic", + broker_list="KafkaBrokerList", + username="KafkaUsername", + password="KafkaPassword", + protocol="SaslSsl", + authentication_mode="Plain", + data_type="string", + consumer_group="$Default", + avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}", + cardinality="MANY") +def kafka_trigger_avro_many(kafkaAvroGenericTriggerMany : typing.List[func.KafkaEvent]): + for event in kafkaAvroGenericTriggerMany: + logging.info(event.get_body()) + +@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne") +@KafkaTriggerAvro.kafka_trigger( + arg_name="kafkaTriggerAvroGeneric", + topic="KafkaTopic", + broker_list="KafkaBrokerList", + username="KafkaUsername", + password="KafkaPassword", + protocol="SaslSsl", + authentication_mode="Plain", + consumer_group="$Default", + avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}") +def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent): + logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8')) + logging.info(kafkaTriggerAvroGeneric.metadata) \ No newline at end of file diff --git a/samples/python-v2/kafka_trigger_retry.py b/samples/python-v2/kafka_trigger_retry.py new file mode 100644 index 00000000..9246e0e2 --- /dev/null +++ b/samples/python-v2/kafka_trigger_retry.py @@ -0,0 +1,38 @@ +import logging +import azure.functions as func + +KafkaTriggerRetry = func.Blueprint() + + +@KafkaTriggerRetry.function_name(name="KafkaTriggerRetryFixed") +@KafkaTriggerRetry.kafka_trigger( + arg_name="kevent", + topic="KafkaTopic", + broker_list="KafkaBrokerList", + username="KafkaUsername", + password="KafkaPassword", + protocol="SaslSsl", + authentication_mode="Plain", + consumer_group="$Default") +@KafkaTriggerRetry.retry(strategy="fixed_delay", max_retry_count="3", delay_interval="00:00:10") +def kafka_trigger_retry_fixed(kevent : func.KafkaEvent): + logging.info(kevent.get_body().decode('utf-8')) + logging.info(kevent.metadata) + raise Exception("unhandled error") + + +@KafkaTriggerRetry.function_name(name="KafkaTriggerRetryExponential") +@KafkaTriggerRetry.kafka_trigger( + arg_name="kevent", + topic="KafkaTopic", + broker_list="KafkaBrokerList", + username="KafkaUsername", + password="KafkaPassword", + protocol="SaslSsl", + authentication_mode="Plain", + consumer_group="$Default") +@KafkaTriggerRetry.retry(strategy="exponential_backoff", max_retry_count="5", minimum_interval="00:00:10", maximum_interval="00:15:00") +def kafka_trigger_retry_exponential(kevent : func.KafkaEvent): + logging.info(kevent.get_body().decode('utf-8')) + logging.info(kevent.metadata) + raise Exception("unhandled error") \ No newline at end of file diff --git a/samples/python-v2/requirements.txt b/samples/python-v2/requirements.txt new file mode 100644 index 00000000..85267c5a --- /dev/null +++ b/samples/python-v2/requirements.txt @@ -0,0 +1,5 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions \ No newline at end of file