Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pythonv2 samples [To be merged post python library is released] #514

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,5 @@ local.settings.json.org
samples/dotnet/KafkaFunctionSample/Properties/

temp
packages-microsoft-prod.deb
packages-microsoft-prod.deb
.venv
12 changes: 12 additions & 0 deletions samples/python-v2/function_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import azure.functions as func

from kafka_trigger_avro import KafkaTriggerAvro
from kafka_output import KafkaOutput
from kafka_trigger import KafkaTrigger
from kafka_trigger_retry import KafkaTriggerRetry

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
app.register_blueprint(KafkaTriggerAvro)
app.register_blueprint(KafkaOutput)
app.register_blueprint(KafkaTrigger)
app.register_blueprint(KafkaTriggerRetry)
19 changes: 19 additions & 0 deletions samples/python-v2/host.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
},
"console": {
"isEnabled": true,
"DisableColors": false
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
38 changes: 38 additions & 0 deletions samples/python-v2/kafka_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import azure.functions as func
import json

KafkaOutput = func.Blueprint()

@KafkaOutput.function_name(name="KafkaOutput")
@KafkaOutput.route(route="kafka_output")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'


@KafkaOutput.function_name(name="KafkaOutputMany")
@KafkaOutput.route(route="kafka_output_many")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string")
def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse:
outputMessage.set(json.dumps(['one', 'two']))
return 'OK'

@KafkaOutput.function_name(name="KafkaOutputWithHeaders")
@KafkaOutput.route(route="kafka_output_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'

@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders")
@KafkaOutput.route(route="kafka_output_many_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }] },
{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "two", "Headers": [{ "Key": "test", "Value": "python" }] }]
out.set(json.dumps(kevent))
return 'OK'
77 changes: 77 additions & 0 deletions samples/python-v2/kafka_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
import typing
import azure.functions as func
import json
import base64

KafkaTrigger = func.Blueprint()


@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)

@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
arg_name="kevents",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
cardinality="MANY",
data_type="string",
consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
for event in kevents:
logging.info(event.get_body())

@KafkaTrigger.function_name(name="KafkaTriggerWithHeaders")
@KafkaTrigger.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
data_type="string",
consumer_group="$Default3")
def kafka_trigger_with_headers(kevent : func.KafkaEvent):
logging.info("Python Kafka trigger function called for message " + kevent.metadata["Value"])
headers = json.loads(kevent.metadata["Headers"])
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))


@KafkaTrigger.function_name(name="KafkaTriggerManyWithHeaders")
@KafkaTrigger.kafka_trigger(
arg_name="kevents",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
cardinality="MANY",
data_type="string",
consumer_group="$Default4")
def kafka_trigger_many_with_headers(kevents : typing.List[func.KafkaEvent]):
for event in kevents:
event_dec = event.get_body().decode('utf-8')
event_json = json.loads(event_dec)
logging.info("Python Kafka trigger function called for message " + event_json["Value"])
headers = event_json["Headers"]
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))
37 changes: 37 additions & 0 deletions samples/python-v2/kafka_trigger_avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
import typing
import azure.functions as func

KafkaTriggerAvro = func.Blueprint()

@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroMany")
@KafkaTriggerAvro.kafka_trigger(
arg_name="kafkaAvroGenericTriggerMany",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
data_type="string",
consumer_group="$Default",
avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
cardinality="MANY")
def kafka_trigger_avro_many(kafkaAvroGenericTriggerMany : typing.List[func.KafkaEvent]):
for event in kafkaAvroGenericTriggerMany:
logging.info(event.get_body())

@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
arg_name="kafkaTriggerAvroGeneric",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default",
avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
38 changes: 38 additions & 0 deletions samples/python-v2/kafka_trigger_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
import azure.functions as func

KafkaTriggerRetry = func.Blueprint()


@KafkaTriggerRetry.function_name(name="KafkaTriggerRetryFixed")
@KafkaTriggerRetry.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default")
@KafkaTriggerRetry.retry(strategy="fixed_delay", max_retry_count="3", delay_interval="00:00:10")
def kafka_trigger_retry_fixed(kevent : func.KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
raise Exception("unhandled error")


@KafkaTriggerRetry.function_name(name="KafkaTriggerRetryExponential")
@KafkaTriggerRetry.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default")
@KafkaTriggerRetry.retry(strategy="exponential_backoff", max_retry_count="5", minimum_interval="00:00:10", maximum_interval="00:15:00")
def kafka_trigger_retry_exponential(kevent : func.KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
raise Exception("unhandled error")
5 changes: 5 additions & 0 deletions samples/python-v2/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues

azure-functions