From bdac3f43f4a164237da56863a12a8730572da416 Mon Sep 17 00:00:00 2001 From: Vivek Jilla Date: Mon, 22 Jul 2024 21:42:51 +0530 Subject: [PATCH 1/5] initial changes for decorator support for Kafka --- azure/functions/decorators/constants.py | 2 + azure/functions/decorators/function_app.py | 249 +++++++++++++++++++++ azure/functions/decorators/kafka.py | 124 ++++++++++ tests/decorators/test_kafka.py | 103 +++++++++ 4 files changed, 478 insertions(+) create mode 100644 azure/functions/decorators/kafka.py create mode 100644 tests/decorators/test_kafka.py diff --git a/azure/functions/decorators/constants.py b/azure/functions/decorators/constants.py index 2787cb27..8d699b99 100644 --- a/azure/functions/decorators/constants.py +++ b/azure/functions/decorators/constants.py @@ -8,6 +8,8 @@ EVENT_HUB = "eventHub" HTTP_TRIGGER = "httpTrigger" HTTP_OUTPUT = "http" +KAFKA = "kafka" +KAFKA_TRIGGER = "kafkaTrigger" QUEUE = "queue" QUEUE_TRIGGER = "queueTrigger" SERVICE_BUS = "serviceBus" diff --git a/azure/functions/decorators/function_app.py b/azure/functions/decorators/function_app.py index 14af303f..8eb02e5e 100644 --- a/azure/functions/decorators/function_app.py +++ b/azure/functions/decorators/function_app.py @@ -24,6 +24,8 @@ from azure.functions.decorators.eventhub import EventHubTrigger, EventHubOutput from azure.functions.decorators.http import HttpTrigger, HttpOutput, \ HttpMethod +from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \ + BrokerAuthenticationMode, BrokerProtocol from azure.functions.decorators.queue import QueueTrigger, QueueOutput from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \ ServiceBusQueueOutput, ServiceBusTopicTrigger, \ @@ -1229,6 +1231,120 @@ def decorator(): return wrap + def kafka_trigger(self, + arg_name: str, + topic: str, + broker_list: str, + event_hub_connection_string: Optional[str], + consumer_group: Optional[str], + avro_schema: Optional[str], + username: Optional[str], + password: Optional[str], + ssl_key_location: Optional[str], + ssl_ca_location: Optional[str], + ssl_certificate_location: Optional[str], + ssl_key_password: Optional[str], + schema_registry_url: Optional[str], + schema_registry_username: Optional[str], + schema_registry_password: Optional[str], + authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NotSet, # noqa: E501 + protocol: BrokerProtocol = BrokerProtocol.NotSet, + lag_threshold: int = 1000, + data_type: Optional[Union[DataType, str]] = None, + **kwargs) -> Callable[..., Any]: + """ + The kafka_trigger decorator adds + :class:`KafkaTrigger` + to the :class:`FunctionBuilder` object + for building :class:`Function` object used in worker function + indexing model. This is equivalent to defining kafka trigger + in the function.json which enables function to be triggered to + respond to an event sent to a kafka topic. + All optional fields will be given default value by function host when + they are parsed by function host. + + Ref: https://aka.ms/kafkatrigger TODO + + :param arg_name: the variable name used in function code for the + parameter that receives the kafka event data. + :param topic: The topic monitored by the trigger. + :param broker_list: The list of Kafka brokers monitored by the trigger. + :param event_hub_connection_string: The name of an app setting that + contains the connection string for the eventhub when using Kafka + protocol header feature of Azure EventHubs. + :param consumer_group: Kafka consumer group used by the trigger. + :param avro_schema: This should be used only if a generic record + should be generated. + :param username: SASL username for use with the PLAIN and SASL-SCRAM-.. + mechanisms. Default is empty string. This is equivalent to + 'sasl.username' in librdkafka. + :param password: SASL password for use with the PLAIN and SASL-SCRAM-.. + mechanisms. Default is empty string. This is equivalent to + 'sasl.password' in librdkafka. + :param ssl_key_location: Path to client's private key (PEM) used for + authentication. Default is empty string. This is equivalent to + 'ssl.key.location' in librdkafka. + :param ssl_ca_location: Path to CA certificate file for verifying the + broker's certificate. This is equivalent to 'ssl.ca.location' in + librdkafka. + :param ssl_certificate_location: Path to client's certificate. This is + equivalent to 'ssl.certificate.location' in librdkafka. + :param ssl_key_password: Password for client's certificate. This is + equivalent to 'ssl.key.password' in librdkafka. + :param schema_registry_url: URL for the Avro Schema Registry. + :param schema_registry_username: Username for the Avro Schema Registry. + :param schema_registry_password: Password for the Avro Schema Registry. + :param authentication_mode: SASL mechanism to use for authentication. + Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is + Plain. This is equivalent to 'sasl.mechanism' in librdkafka. + :param protocol: Gets or sets the security protocol used to communicate + with brokers. Default is plain text. This is equivalent to + 'security.protocol' in librdkafka. TODO + :param lag_threshold: Maximum number of unprocessed messages a worker + is expected to have at an instance. When target-based scaling is not + disabled, this is used to divide total unprocessed event count to + determine the number of worker instances, which will then be rounded + up to a worker instance count that creates a balanced partition + distribution. Default is 1000. + :param data_type: Defines how Functions runtime should treat the + parameter value. + :param kwargs: Keyword arguments for specifying additional binding + fields to include in the binding json + :return: Decorator function. + """ + + @self._configure_function_builder + def wrap(fb): + def decorator(): + fb.add_trigger( + trigger=KafkaTrigger( + name=arg_name, + topic=topic, + broker_list=broker_list, + event_hub_connection_string=event_hub_connection_string, # noqa: E501 + consumer_group=consumer_group, + avro_schema=avro_schema, + username=username, + password=password, + ssl_key_location=ssl_key_location, + ssl_ca_location=ssl_ca_location, + ssl_certificate_location=ssl_certificate_location, + ssl_key_password=ssl_key_password, + schema_registry_url=schema_registry_url, + schema_registry_username=schema_registry_username, + schema_registry_password=schema_registry_password, + authentication_mode=authentication_mode, + protocol=protocol, + lag_threshold=lag_threshold, + data_type=parse_singular_param_to_enum(data_type, + DataType), + **kwargs)) + return fb + + return decorator() + + return wrap + def sql_trigger(self, arg_name: str, table_name: str, @@ -2212,6 +2328,139 @@ def decorator(): return wrap + def kafka_output(self, + arg_name: str, + topic: str, + broker_list: str, + avro_schema: Optional[str], + username: Optional[str], + password: Optional[str], + ssl_key_location: Optional[str], + ssl_ca_location: Optional[str], + ssl_certificate_location: Optional[str], + ssl_key_password: Optional[str], + schema_registry_url: Optional[str], + schema_registry_username: Optional[str], + schema_registry_password: Optional[str], + max_message_bytes: int = 1_000_000, + batch_size: int = 10_000, + enable_idempotence: bool = False, + message_timeout_ms: int = 300_000, + request_timeout_ms: int = 5_000, + max_retries: int = 2_147_483_647, + authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NotSet, # noqa: E501 + protocol: BrokerProtocol = BrokerProtocol.NotSet, + linger_ms: int = 5, + data_type: Optional[Union[DataType, str]] = None, + **kwargs) -> Callable[..., Any]: + """ + The kafka_output decorator adds + :class:`KafkaOutput` + to the :class:`FunctionBuilder` object + for building :class:`Function` object used in worker function + indexing model. This is equivalent to defining output binding + in the function.json which enables function to + write events to a kafka topic. + All optional fields will be given default value by function host when + they are parsed by function host. + + Ref: https://aka.ms/kafkaoutput + + :param arg_name: The variable name used in function code that + represents the event. + :param topic: The topic monitored by the trigger. + :param broker_list: The list of Kafka brokers monitored by the trigger. + :param avro_schema: This should be used only if a generic record + should be generated. + :param username: SASL username for use with the PLAIN and SASL-SCRAM-.. + mechanisms. Default is empty string. This is equivalent to + 'sasl.username' in librdkafka. + :param password: SASL password for use with the PLAIN and SASL-SCRAM-.. + mechanisms. Default is empty string. This is equivalent to + 'sasl.password' in librdkafka. + :param ssl_key_location: Path to client's private key (PEM) used for + authentication. Default is empty string. This is equivalent to + 'ssl.key.location' in librdkafka. + :param ssl_ca_location: Path to CA certificate file for verifying the + broker's certificate. This is equivalent to 'ssl.ca.location' in + librdkafka. + :param ssl_certificate_location: Path to client's certificate. This is + equivalent to 'ssl.certificate.location' in librdkafka. + :param ssl_key_password: Password for client's certificate. This is + equivalent to 'ssl.key.password' in librdkafka. + :param schema_registry_url: URL for the Avro Schema Registry. + :param schema_registry_username: Username for the Avro Schema Registry. + :param schema_registry_password: Password for the Avro Schema Registry. + :param max_message_bytes: Maximum transmit message size. Default is 1MB + :param batch_size: Maximum number of messages batched in one MessageSet + Default is 10000. + :param enable_idempotence: When set to `true`, the producer will ensure + that messages are successfully produced exactly once and in the + original produce order. Default is false. + :param message_timeout_ms: Local message timeout. This value is only + enforced locally and limits the time a produced message waits for + successful delivery. A time of 0 is infinite. This is the maximum time + used to deliver a message (including retries). Delivery error occurs + when either the retry count or the message timeout are exceeded. + Default is 300000. + :param request_timeout_ms: The ack timeout of the producer request in + milliseconds. Default is 5000. + :param max_retries: How many times to retry sending a failing Message. + Default is 2147483647. Retrying may cause reordering unless + 'EnableIdempotence' is set to 'True'. + :param authentication_mode: SASL mechanism to use for authentication. + Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is + Plain. This is equivalent to 'sasl.mechanism' in librdkafka. + :param protocol: Gets or sets the security protocol used to communicate + with brokers. Default is plain text. This is equivalent to + 'security.protocol' in librdkafka. TODO + :param linger_ms: Linger.MS property provides the time between batches + of messages being sent to cluster. Larger value allows more batching + results in high throughput. + :param data_type: Defines how Functions runtime should treat the + parameter value. + :param kwargs: Keyword arguments for specifying additional binding + fields to include in the binding json + + :return: Decorator function. + """ + + @self._configure_function_builder + def wrap(fb): + def decorator(): + fb.add_binding( + binding=KafkaOutput( + name=arg_name, + topic=topic, + broker_list=broker_list, + avro_schema=avro_schema, + username=username, + password=password, + ssl_key_location=ssl_key_location, + ssl_ca_location=ssl_ca_location, + ssl_certificate_location=ssl_certificate_location, + ssl_key_password=ssl_key_password, + schema_registry_url=schema_registry_url, + schema_registry_username=schema_registry_username, + schema_registry_password=schema_registry_password, + max_message_bytes=max_message_bytes, + batch_size=batch_size, + enable_idempotence=enable_idempotence, + message_timeout_ms=message_timeout_ms, + request_timeout_ms=request_timeout_ms, + max_retries=max_retries, + authentication_mode=authentication_mode, + protocol=protocol, + linger_ms=linger_ms, + data_type=parse_singular_param_to_enum(data_type, + DataType), + **kwargs)) + return fb + + return decorator() + + return wrap + def table_input(self, arg_name: str, connection: str, diff --git a/azure/functions/decorators/kafka.py b/azure/functions/decorators/kafka.py new file mode 100644 index 00000000..0a715a0e --- /dev/null +++ b/azure/functions/decorators/kafka.py @@ -0,0 +1,124 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from typing import Optional +from enum import Enum + +from azure.functions.decorators.constants import KAFKA, KAFKA_TRIGGER +from azure.functions.decorators.core import DataType, \ + OutputBinding, Trigger + + +class BrokerAuthenticationMode(Enum): + NotSet = -1 + Gssapi = 0 + Plain = 1 + ScramSha256 = 2 + ScramSha512 = 3 + + +class BrokerProtocol(Enum): + NotSet = -1 + Plaintext = 0 + Ssl = 1 + SaslPlaintext = 2 + SaslSsl = 3 + + +class KafkaOutput(OutputBinding): + @staticmethod + def get_binding_name() -> str: + return KAFKA + + def __init__(self, + name: str, + topic: str, + broker_list: str, + avro_schema: Optional[str], + username: Optional[str], + password: Optional[str], + ssl_key_location: Optional[str], + ssl_ca_location: Optional[str], + ssl_certificate_location: Optional[str], + ssl_key_password: Optional[str], + schema_registry_url: Optional[str], + schema_registry_username: Optional[str], + schema_registry_password: Optional[str], + max_message_bytes: int = 1_000_000, + batch_size: int = 10_000, + enable_idempotence: bool = False, + message_timeout_ms: int = 300_000, + request_timeout_ms: int = 5_000, + max_retries: int = 2_147_483_647, + authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NotSet, # noqa: E501 + protocol: BrokerProtocol = BrokerProtocol.NotSet, + linger_ms: int = 5, + data_type: Optional[DataType] = None, + **kwargs): + self.topic = topic + self.broker_list = broker_list + self.avro_schema = avro_schema + self.username = username + self.password = password + self.ssl_key_location = ssl_key_location + self.ssl_ca_location = ssl_ca_location + self.ssl_certificate_location = ssl_certificate_location + self.ssl_key_password = ssl_key_password + self.schema_registry_url = schema_registry_url + self.schema_registry_username = schema_registry_username + self.schema_registry_password = schema_registry_password + self.max_message_bytes = max_message_bytes + self.batch_size = batch_size + self.enable_idempotence = enable_idempotence + self.message_timeout_ms = message_timeout_ms + self.request_timeout_ms = request_timeout_ms + self.max_retries = max_retries + self.authentication_mode = authentication_mode + self.protocol = protocol + self.linger_ms = linger_ms + super().__init__(name=name, data_type=data_type) + + +class KafkaTrigger(Trigger): + @staticmethod + def get_binding_name() -> str: + return KAFKA_TRIGGER + + def __init__(self, + name: str, + topic: str, + broker_list: str, + event_hub_connection_string: Optional[str], + consumer_group: Optional[str], + avro_schema: Optional[str], + username: Optional[str], + password: Optional[str], + ssl_key_location: Optional[str], + ssl_ca_location: Optional[str], + ssl_certificate_location: Optional[str], + ssl_key_password: Optional[str], + schema_registry_url: Optional[str], + schema_registry_username: Optional[str], + schema_registry_password: Optional[str], + authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NotSet, # noqa: E501 + protocol: BrokerProtocol = BrokerProtocol.NotSet, + lag_threshold: int = 1000, + data_type: Optional[DataType] = None, + **kwargs): + self.topic = topic + self.broker_list = broker_list + self.event_hub_connection_string = event_hub_connection_string + self.consumer_group = consumer_group + self.avro_schema = avro_schema + self.username = username + self.password = password + self.ssl_key_location = ssl_key_location + self.ssl_ca_location = ssl_ca_location + self.ssl_certificate_location = ssl_certificate_location + self.ssl_key_password = ssl_key_password + self.schema_registry_url = schema_registry_url + self.schema_registry_username = schema_registry_username + self.schema_registry_password = schema_registry_password + self.authentication_mode = authentication_mode + self.protocol = protocol + self.lag_threshold = lag_threshold + super().__init__(name=name, data_type=data_type) diff --git a/tests/decorators/test_kafka.py b/tests/decorators/test_kafka.py new file mode 100644 index 00000000..95a51484 --- /dev/null +++ b/tests/decorators/test_kafka.py @@ -0,0 +1,103 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import unittest + +from azure.functions.decorators.constants import KAFKA_TRIGGER, KAFKA +from azure.functions.decorators.core import BindingDirection, \ + DataType +from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \ + BrokerAuthenticationMode, BrokerProtocol + + +class TestKafka(unittest.TestCase): + def test_kafka_trigger_valid_creation(self): + trigger = KafkaTrigger(name="arg_name", + topic="topic", + broker_list="broker_list", + event_hub_connection_string="ehcs", + consumer_group="consumer_group", + avro_schema="avro_schema", + username="username", + password="password", + ssl_key_location="ssl_key_location", + ssl_ca_location="ssl_ca_location", + ssl_certificate_location="scl", + ssl_key_password="ssl_key_password", + schema_registry_url="srurl", + schema_registry_username="sruser", + schema_registry_password="srp", + authentication_mode=BrokerAuthenticationMode.Plain, # noqa: E501 + data_type=DataType.UNDEFINED, + dummy_field="dummy") + + self.assertEqual(trigger.get_binding_name(), "kafkaTrigger") + self.assertEqual(trigger.get_dict_repr(), + {"authenticationMode": BrokerAuthenticationMode.Plain, + "avroSchema": "avro_schema", + "brokerList": "broker_list", + "consumerGroup": "consumer_group", + "dataType": DataType.UNDEFINED, + "direction": BindingDirection.IN, + "dummyField": "dummy", + "eventHubConnectionString": "ehcs", + "lagThreshold": 1000, + "name": "arg_name", + "password": "password", + "protocol": BrokerProtocol.NotSet, + "schemaRegistryPassword": "srp", + "schemaRegistryUrl": "srurl", + "schemaRegistryUsername": "sruser", + "sslCaLocation": "ssl_ca_location", + "sslCertificateLocation": "scl", + "sslKeyLocation": "ssl_key_location", + "sslKeyPassword": "ssl_key_password", + "topic": "topic", + "type": KAFKA_TRIGGER, + "username": "username"}) + + def test_kafka_output_valid_creation(self): + output = KafkaOutput(name="arg_name", + topic="topic", + broker_list="broker_list", + avro_schema="avro_schema", + username="username", + password="password", + ssl_key_location="ssl_key_location", + ssl_ca_location="ssl_ca_location", + ssl_certificate_location="scl", + ssl_key_password="ssl_key_password", + schema_registry_url="schema_registry_url", + schema_registry_username="sru", + schema_registry_password="srp", + max_retries=10, + data_type=DataType.UNDEFINED, + dummy_field="dummy") + + self.assertEqual(output.get_binding_name(), "kafka") + self.assertEqual(output.get_dict_repr(), + {'authenticationMode': BrokerAuthenticationMode.NotSet, # noqa: E501 + 'avroSchema': 'avro_schema', + 'batchSize': 10000, + 'brokerList': 'broker_list', + 'dataType': DataType.UNDEFINED, + 'direction': BindingDirection.OUT, + 'dummyField': 'dummy', + 'enableIdempotence': False, + 'lingerMs': 5, + 'maxMessageBytes': 1000000, + 'maxRetries': 10, + 'messageTimeoutMs': 300000, + 'name': 'arg_name', + 'password': 'password', + 'protocol': BrokerProtocol.NotSet, + 'requestTimeoutMs': 5000, + 'schemaRegistryPassword': 'srp', + 'schemaRegistryUrl': 'schema_registry_url', + 'schemaRegistryUsername': 'sru', + 'sslCaLocation': 'ssl_ca_location', + 'sslCertificateLocation': 'scl', + 'sslKeyLocation': 'ssl_key_location', + 'sslKeyPassword': 'ssl_key_password', + 'topic': 'topic', + 'type': KAFKA, + 'username': 'username'}) From ad95faad0bc07c4388616a6b7a82c0f7fe06d9b2 Mon Sep 17 00:00:00 2001 From: Vivek Jilla Date: Tue, 30 Jul 2024 14:29:22 +0530 Subject: [PATCH 2/5] Fixing minor issues --- azure/functions/decorators/function_app.py | 64 ++++++++++++---------- azure/functions/decorators/kafka.py | 33 +++++------ 2 files changed, 51 insertions(+), 46 deletions(-) diff --git a/azure/functions/decorators/function_app.py b/azure/functions/decorators/function_app.py index 8eb02e5e..17bcbbfa 100644 --- a/azure/functions/decorators/function_app.py +++ b/azure/functions/decorators/function_app.py @@ -1235,20 +1235,20 @@ def kafka_trigger(self, arg_name: str, topic: str, broker_list: str, - event_hub_connection_string: Optional[str], - consumer_group: Optional[str], - avro_schema: Optional[str], - username: Optional[str], - password: Optional[str], - ssl_key_location: Optional[str], - ssl_ca_location: Optional[str], - ssl_certificate_location: Optional[str], - ssl_key_password: Optional[str], - schema_registry_url: Optional[str], - schema_registry_username: Optional[str], - schema_registry_password: Optional[str], - authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NotSet, # noqa: E501 - protocol: BrokerProtocol = BrokerProtocol.NotSet, + event_hub_connection_string: Optional[str] = None, + consumer_group: Optional[str] = None, + avro_schema: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + ssl_key_location: Optional[str] = None, + ssl_ca_location: Optional[str] = None, + ssl_certificate_location: Optional[str] = None, + ssl_key_password: Optional[str] = None, + schema_registry_url: Optional[str] = None, + schema_registry_username: Optional[str] = None, + schema_registry_password: Optional[str] = None, + authentication_mode: str = "NotSet", + protocol: str = "NotSet", lag_threshold: int = 1000, data_type: Optional[Union[DataType, str]] = None, **kwargs) -> Callable[..., Any]: @@ -1333,8 +1333,10 @@ def decorator(): schema_registry_url=schema_registry_url, schema_registry_username=schema_registry_username, schema_registry_password=schema_registry_password, - authentication_mode=authentication_mode, - protocol=protocol, + authentication_mode=parse_singular_param_to_enum( + authentication_mode, BrokerAuthenticationMode), + protocol=parse_singular_param_to_enum(protocol, + BrokerProtocol), lag_threshold=lag_threshold, data_type=parse_singular_param_to_enum(data_type, DataType), @@ -2332,24 +2334,24 @@ def kafka_output(self, arg_name: str, topic: str, broker_list: str, - avro_schema: Optional[str], - username: Optional[str], - password: Optional[str], - ssl_key_location: Optional[str], - ssl_ca_location: Optional[str], - ssl_certificate_location: Optional[str], - ssl_key_password: Optional[str], - schema_registry_url: Optional[str], - schema_registry_username: Optional[str], - schema_registry_password: Optional[str], + avro_schema: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + ssl_key_location: Optional[str] = None, + ssl_ca_location: Optional[str] = None, + ssl_certificate_location: Optional[str] = None, + ssl_key_password: Optional[str] = None, + schema_registry_url: Optional[str] = None, + schema_registry_username: Optional[str] = None, + schema_registry_password: Optional[str] = None, max_message_bytes: int = 1_000_000, batch_size: int = 10_000, enable_idempotence: bool = False, message_timeout_ms: int = 300_000, request_timeout_ms: int = 5_000, max_retries: int = 2_147_483_647, - authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NotSet, # noqa: E501 - protocol: BrokerProtocol = BrokerProtocol.NotSet, + authentication_mode: str = "NOTSET", + protocol: str = "NOTSET", linger_ms: int = 5, data_type: Optional[Union[DataType, str]] = None, **kwargs) -> Callable[..., Any]: @@ -2449,8 +2451,10 @@ def decorator(): message_timeout_ms=message_timeout_ms, request_timeout_ms=request_timeout_ms, max_retries=max_retries, - authentication_mode=authentication_mode, - protocol=protocol, + authentication_mode=parse_singular_param_to_enum( + authentication_mode, BrokerAuthenticationMode), + protocol=parse_singular_param_to_enum(protocol, + BrokerProtocol), linger_ms=linger_ms, data_type=parse_singular_param_to_enum(data_type, DataType), diff --git a/azure/functions/decorators/kafka.py b/azure/functions/decorators/kafka.py index 0a715a0e..2fc5dac1 100644 --- a/azure/functions/decorators/kafka.py +++ b/azure/functions/decorators/kafka.py @@ -6,22 +6,23 @@ from azure.functions.decorators.constants import KAFKA, KAFKA_TRIGGER from azure.functions.decorators.core import DataType, \ OutputBinding, Trigger +from .utils import StringifyEnum -class BrokerAuthenticationMode(Enum): - NotSet = -1 - Gssapi = 0 - Plain = 1 - ScramSha256 = 2 - ScramSha512 = 3 +class BrokerAuthenticationMode(StringifyEnum): + NOTSET = -1 + GSSAPI = 0 + PLAIN = 1 + SCRAMSHA256 = 2 + SCRAMSHA512 = 3 -class BrokerProtocol(Enum): - NotSet = -1 - Plaintext = 0 - Ssl = 1 - SaslPlaintext = 2 - SaslSsl = 3 +class BrokerProtocol(StringifyEnum): + NOTSET = -1 + PLAINTEXT = 0 + SSL = 1 + SASLPLAINTEXT = 2 + SASLSSL = 3 class KafkaOutput(OutputBinding): @@ -49,8 +50,8 @@ def __init__(self, message_timeout_ms: int = 300_000, request_timeout_ms: int = 5_000, max_retries: int = 2_147_483_647, - authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NotSet, # noqa: E501 - protocol: BrokerProtocol = BrokerProtocol.NotSet, + authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NOTSET, # noqa: E501 + protocol: BrokerProtocol = BrokerProtocol.NOTSET, linger_ms: int = 5, data_type: Optional[DataType] = None, **kwargs): @@ -99,8 +100,8 @@ def __init__(self, schema_registry_url: Optional[str], schema_registry_username: Optional[str], schema_registry_password: Optional[str], - authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NotSet, # noqa: E501 - protocol: BrokerProtocol = BrokerProtocol.NotSet, + authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NOTSET, # noqa: E501 + protocol: BrokerProtocol = BrokerProtocol.NOTSET, lag_threshold: int = 1000, data_type: Optional[DataType] = None, **kwargs): From 89ec9f8f6ee95cd2d2fa3ee90a8f1d86bafa61fb Mon Sep 17 00:00:00 2001 From: Vivek Jilla Date: Tue, 30 Jul 2024 18:33:10 +0530 Subject: [PATCH 3/5] fixing tests --- tests/decorators/test_kafka.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/decorators/test_kafka.py b/tests/decorators/test_kafka.py index 95a51484..6d7f1b07 100644 --- a/tests/decorators/test_kafka.py +++ b/tests/decorators/test_kafka.py @@ -26,13 +26,13 @@ def test_kafka_trigger_valid_creation(self): schema_registry_url="srurl", schema_registry_username="sruser", schema_registry_password="srp", - authentication_mode=BrokerAuthenticationMode.Plain, # noqa: E501 + authentication_mode=BrokerAuthenticationMode.PLAIN, # noqa: E501 data_type=DataType.UNDEFINED, dummy_field="dummy") self.assertEqual(trigger.get_binding_name(), "kafkaTrigger") self.assertEqual(trigger.get_dict_repr(), - {"authenticationMode": BrokerAuthenticationMode.Plain, + {"authenticationMode": BrokerAuthenticationMode.PLAIN, "avroSchema": "avro_schema", "brokerList": "broker_list", "consumerGroup": "consumer_group", @@ -43,7 +43,7 @@ def test_kafka_trigger_valid_creation(self): "lagThreshold": 1000, "name": "arg_name", "password": "password", - "protocol": BrokerProtocol.NotSet, + "protocol": BrokerProtocol.NOTSET, "schemaRegistryPassword": "srp", "schemaRegistryUrl": "srurl", "schemaRegistryUsername": "sruser", @@ -75,7 +75,7 @@ def test_kafka_output_valid_creation(self): self.assertEqual(output.get_binding_name(), "kafka") self.assertEqual(output.get_dict_repr(), - {'authenticationMode': BrokerAuthenticationMode.NotSet, # noqa: E501 + {'authenticationMode': BrokerAuthenticationMode.NOTSET, # noqa: E501 'avroSchema': 'avro_schema', 'batchSize': 10000, 'brokerList': 'broker_list', @@ -89,7 +89,7 @@ def test_kafka_output_valid_creation(self): 'messageTimeoutMs': 300000, 'name': 'arg_name', 'password': 'password', - 'protocol': BrokerProtocol.NotSet, + 'protocol': BrokerProtocol.NOTSET, 'requestTimeoutMs': 5000, 'schemaRegistryPassword': 'srp', 'schemaRegistryUrl': 'schema_registry_url', From 8b6168bbc22352c6f951eada7c0f3d90662d732e Mon Sep 17 00:00:00 2001 From: Vivek Jilla Date: Wed, 31 Jul 2024 17:45:30 +0530 Subject: [PATCH 4/5] some more changes post validations --- azure/functions/decorators/function_app.py | 17 ++++++++++------- azure/functions/decorators/kafka.py | 4 +++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/azure/functions/decorators/function_app.py b/azure/functions/decorators/function_app.py index 17bcbbfa..2429fd77 100644 --- a/azure/functions/decorators/function_app.py +++ b/azure/functions/decorators/function_app.py @@ -1247,8 +1247,9 @@ def kafka_trigger(self, schema_registry_url: Optional[str] = None, schema_registry_username: Optional[str] = None, schema_registry_password: Optional[str] = None, - authentication_mode: str = "NotSet", - protocol: str = "NotSet", + authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NotSet", + protocol: Optional[Union[BrokerProtocol, str]] = "NotSet", + cardinality : Optional[Union[Cardinality, str]] = "One", lag_threshold: int = 1000, data_type: Optional[Union[DataType, str]] = None, **kwargs) -> Callable[..., Any]: @@ -1263,10 +1264,10 @@ def kafka_trigger(self, All optional fields will be given default value by function host when they are parsed by function host. - Ref: https://aka.ms/kafkatrigger TODO + Ref: https://aka.ms/kafkatrigger :param arg_name: the variable name used in function code for the - parameter that receives the kafka event data. + parameter that has the kafka event data. :param topic: The topic monitored by the trigger. :param broker_list: The list of Kafka brokers monitored by the trigger. :param event_hub_connection_string: The name of an app setting that @@ -1337,6 +1338,8 @@ def decorator(): authentication_mode, BrokerAuthenticationMode), protocol=parse_singular_param_to_enum(protocol, BrokerProtocol), + cardinality=parse_singular_param_to_enum(cardinality, + Cardinality), lag_threshold=lag_threshold, data_type=parse_singular_param_to_enum(data_type, DataType), @@ -2350,8 +2353,8 @@ def kafka_output(self, message_timeout_ms: int = 300_000, request_timeout_ms: int = 5_000, max_retries: int = 2_147_483_647, - authentication_mode: str = "NOTSET", - protocol: str = "NOTSET", + authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NOTSET", + protocol: Optional[Union[BrokerProtocol, str]] = "NOTSET", linger_ms: int = 5, data_type: Optional[Union[DataType, str]] = None, **kwargs) -> Callable[..., Any]: @@ -2415,7 +2418,7 @@ def kafka_output(self, Plain. This is equivalent to 'sasl.mechanism' in librdkafka. :param protocol: Gets or sets the security protocol used to communicate with brokers. Default is plain text. This is equivalent to - 'security.protocol' in librdkafka. TODO + 'security.protocol' in librdkafka. :param linger_ms: Linger.MS property provides the time between batches of messages being sent to cluster. Larger value allows more batching results in high throughput. diff --git a/azure/functions/decorators/kafka.py b/azure/functions/decorators/kafka.py index 2fc5dac1..ea52ce5a 100644 --- a/azure/functions/decorators/kafka.py +++ b/azure/functions/decorators/kafka.py @@ -4,7 +4,7 @@ from enum import Enum from azure.functions.decorators.constants import KAFKA, KAFKA_TRIGGER -from azure.functions.decorators.core import DataType, \ +from azure.functions.decorators.core import Cardinality, DataType, \ OutputBinding, Trigger from .utils import StringifyEnum @@ -102,6 +102,7 @@ def __init__(self, schema_registry_password: Optional[str], authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NOTSET, # noqa: E501 protocol: BrokerProtocol = BrokerProtocol.NOTSET, + cardinality : Cardinality = Cardinality.ONE, lag_threshold: int = 1000, data_type: Optional[DataType] = None, **kwargs): @@ -121,5 +122,6 @@ def __init__(self, self.schema_registry_password = schema_registry_password self.authentication_mode = authentication_mode self.protocol = protocol + self.cardinality = cardinality self.lag_threshold = lag_threshold super().__init__(name=name, data_type=data_type) From 22a3138fb6bcfff0ec6d9c6bb6a3587f1358e753 Mon Sep 17 00:00:00 2001 From: Vivek Jilla Date: Wed, 31 Jul 2024 19:36:10 +0530 Subject: [PATCH 5/5] Added Oauthbearer options support and fixing tests --- azure/functions/decorators/function_app.py | 70 ++++++++++++++++++++-- azure/functions/decorators/kafka.py | 40 +++++++++++-- tests/decorators/test_kafka.py | 3 +- 3 files changed, 101 insertions(+), 12 deletions(-) diff --git a/azure/functions/decorators/function_app.py b/azure/functions/decorators/function_app.py index 2429fd77..feaa7605 100644 --- a/azure/functions/decorators/function_app.py +++ b/azure/functions/decorators/function_app.py @@ -25,7 +25,7 @@ from azure.functions.decorators.http import HttpTrigger, HttpOutput, \ HttpMethod from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \ - BrokerAuthenticationMode, BrokerProtocol + BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod from azure.functions.decorators.queue import QueueTrigger, QueueOutput from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \ ServiceBusQueueOutput, ServiceBusTopicTrigger, \ @@ -1247,9 +1247,15 @@ def kafka_trigger(self, schema_registry_url: Optional[str] = None, schema_registry_username: Optional[str] = None, schema_registry_password: Optional[str] = None, - authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NotSet", - protocol: Optional[Union[BrokerProtocol, str]] = "NotSet", - cardinality : Optional[Union[Cardinality, str]] = "One", + o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501 + o_auth_bearer_client_id: Optional[str] = None, + o_auth_bearer_client_secret: Optional[str] = None, + o_auth_bearer_scope: Optional[str] = None, + o_auth_bearer_token_endpoint_url: Optional[str] = None, + o_auth_bearer_extensions: Optional[str] = None, + authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NotSet", # noqa E501 + protocol: Optional[Union[BrokerProtocol, str]] = "NotSet", # noqa E501 + cardinality: Optional[Union[Cardinality, str]] = "One", lag_threshold: int = 1000, data_type: Optional[Union[DataType, str]] = None, **kwargs) -> Callable[..., Any]: @@ -1295,6 +1301,23 @@ def kafka_trigger(self, :param schema_registry_url: URL for the Avro Schema Registry. :param schema_registry_username: Username for the Avro Schema Registry. :param schema_registry_password: Password for the Avro Schema Registry. + :param o_auth_bearer_method: Either 'default' or 'oidc'. + sasl.oauthbearer in librdkafka. + :param o_auth_bearer_client_id: Specify only when o_auth_bearer_method + is 'oidc'. sasl.oauthbearer.client.id in librdkafka. + :param o_auth_bearer_client_secret: Specify only when + o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in + librdkafka. + :param o_auth_bearer_scope: Specify only when o_auth_bearer_method + is 'oidc'. Client use this to specify the scope of the access request + to the broker. sasl.oauthbearer.scope in librdkafka. + :param o_auth_bearer_token_endpoint_url: Specify only when + o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url + in librdkafka. + :param o_auth_bearer_extensions: Allow additional information to be + provided to the broker. Comma-separated list of key=value pairs. E.g., + "supportFeatureX=true,organizationId=sales-emea". + sasl.oauthbearer.extensions in librdkafka :param authentication_mode: SASL mechanism to use for authentication. Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is Plain. This is equivalent to 'sasl.mechanism' in librdkafka. @@ -1334,6 +1357,13 @@ def decorator(): schema_registry_url=schema_registry_url, schema_registry_username=schema_registry_username, schema_registry_password=schema_registry_password, + o_auth_bearer_method=parse_singular_param_to_enum( + o_auth_bearer_method, OAuthBearerMethod), + o_auth_bearer_client_id=o_auth_bearer_client_id, + o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501 + o_auth_bearer_scope=o_auth_bearer_scope, + o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501 + o_auth_bearer_extensions=o_auth_bearer_extensions, authentication_mode=parse_singular_param_to_enum( authentication_mode, BrokerAuthenticationMode), protocol=parse_singular_param_to_enum(protocol, @@ -2347,13 +2377,19 @@ def kafka_output(self, schema_registry_url: Optional[str] = None, schema_registry_username: Optional[str] = None, schema_registry_password: Optional[str] = None, + o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501 + o_auth_bearer_client_id: Optional[str] = None, + o_auth_bearer_client_secret: Optional[str] = None, + o_auth_bearer_scope: Optional[str] = None, + o_auth_bearer_token_endpoint_url: Optional[str] = None, + o_auth_bearer_extensions: Optional[str] = None, max_message_bytes: int = 1_000_000, batch_size: int = 10_000, enable_idempotence: bool = False, message_timeout_ms: int = 300_000, request_timeout_ms: int = 5_000, max_retries: int = 2_147_483_647, - authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NOTSET", + authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NOTSET", # noqa E501 protocol: Optional[Union[BrokerProtocol, str]] = "NOTSET", linger_ms: int = 5, data_type: Optional[Union[DataType, str]] = None, @@ -2396,6 +2432,23 @@ def kafka_output(self, :param schema_registry_url: URL for the Avro Schema Registry. :param schema_registry_username: Username for the Avro Schema Registry. :param schema_registry_password: Password for the Avro Schema Registry. + :param o_auth_bearer_method: Either 'default' or 'oidc'. + sasl.oauthbearer in librdkafka. + :param o_auth_bearer_client_id: Specify only when o_auth_bearer_method + is 'oidc'. sasl.oauthbearer.client.id in librdkafka. + :param o_auth_bearer_client_secret: Specify only when + o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in + librdkafka. + :param o_auth_bearer_scope: Specify only when o_auth_bearer_method + is 'oidc'. Client use this to specify the scope of the access request + to the broker. sasl.oauthbearer.scope in librdkafka. + :param o_auth_bearer_token_endpoint_url: Specify only when + o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url + in librdkafka. + :param o_auth_bearer_extensions: Allow additional information to be + provided to the broker. Comma-separated list of key=value pairs. E.g., + "supportFeatureX=true,organizationId=sales-emea". + sasl.oauthbearer.extensions in librdkafka :param max_message_bytes: Maximum transmit message size. Default is 1MB :param batch_size: Maximum number of messages batched in one MessageSet Default is 10000. @@ -2448,6 +2501,13 @@ def decorator(): schema_registry_url=schema_registry_url, schema_registry_username=schema_registry_username, schema_registry_password=schema_registry_password, + o_auth_bearer_method=parse_singular_param_to_enum( + o_auth_bearer_method, OAuthBearerMethod), + o_auth_bearer_client_id=o_auth_bearer_client_id, + o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501 + o_auth_bearer_scope=o_auth_bearer_scope, + o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501 + o_auth_bearer_extensions=o_auth_bearer_extensions, max_message_bytes=max_message_bytes, batch_size=batch_size, enable_idempotence=enable_idempotence, diff --git a/azure/functions/decorators/kafka.py b/azure/functions/decorators/kafka.py index ea52ce5a..3e726f88 100644 --- a/azure/functions/decorators/kafka.py +++ b/azure/functions/decorators/kafka.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. from typing import Optional -from enum import Enum from azure.functions.decorators.constants import KAFKA, KAFKA_TRIGGER from azure.functions.decorators.core import Cardinality, DataType, \ @@ -25,6 +24,11 @@ class BrokerProtocol(StringifyEnum): SASLSSL = 3 +class OAuthBearerMethod(StringifyEnum): + DEFAULT = 0, + OIDC = 1 + + class KafkaOutput(OutputBinding): @staticmethod def get_binding_name() -> str: @@ -44,14 +48,20 @@ def __init__(self, schema_registry_url: Optional[str], schema_registry_username: Optional[str], schema_registry_password: Optional[str], + o_auth_bearer_method: Optional[OAuthBearerMethod] = None, + o_auth_bearer_client_id: Optional[str] = None, + o_auth_bearer_client_secret: Optional[str] = None, + o_auth_bearer_scope: Optional[str] = None, + o_auth_bearer_token_endpoint_url: Optional[str] = None, + o_auth_bearer_extensions: Optional[str] = None, max_message_bytes: int = 1_000_000, batch_size: int = 10_000, enable_idempotence: bool = False, message_timeout_ms: int = 300_000, request_timeout_ms: int = 5_000, max_retries: int = 2_147_483_647, - authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NOTSET, # noqa: E501 - protocol: BrokerProtocol = BrokerProtocol.NOTSET, + authentication_mode: Optional[BrokerAuthenticationMode] = BrokerAuthenticationMode.NOTSET, # noqa: E501 + protocol: Optional[BrokerProtocol] = BrokerProtocol.NOTSET, linger_ms: int = 5, data_type: Optional[DataType] = None, **kwargs): @@ -67,6 +77,12 @@ def __init__(self, self.schema_registry_url = schema_registry_url self.schema_registry_username = schema_registry_username self.schema_registry_password = schema_registry_password + self.o_auth_bearer_method = o_auth_bearer_method + self.o_auth_bearer_client_id = o_auth_bearer_client_id + self.o_auth_bearer_client_secret = o_auth_bearer_client_secret + self.o_auth_bearer_scope = o_auth_bearer_scope + self.o_auth_bearer_token_endpoint_url = o_auth_bearer_token_endpoint_url # noqa: E501 + self.o_auth_bearer_extensions = o_auth_bearer_extensions self.max_message_bytes = max_message_bytes self.batch_size = batch_size self.enable_idempotence = enable_idempotence @@ -100,9 +116,15 @@ def __init__(self, schema_registry_url: Optional[str], schema_registry_username: Optional[str], schema_registry_password: Optional[str], - authentication_mode: BrokerAuthenticationMode = BrokerAuthenticationMode.NOTSET, # noqa: E501 - protocol: BrokerProtocol = BrokerProtocol.NOTSET, - cardinality : Cardinality = Cardinality.ONE, + o_auth_bearer_method: Optional[OAuthBearerMethod] = None, + o_auth_bearer_client_id: Optional[str] = None, + o_auth_bearer_client_secret: Optional[str] = None, + o_auth_bearer_scope: Optional[str] = None, + o_auth_bearer_token_endpoint_url: Optional[str] = None, + o_auth_bearer_extensions: Optional[str] = None, + authentication_mode: Optional[BrokerAuthenticationMode] = BrokerAuthenticationMode.NOTSET, # noqa: E501 + protocol: Optional[BrokerProtocol] = BrokerProtocol.NOTSET, + cardinality: Optional[Cardinality] = Cardinality.ONE, lag_threshold: int = 1000, data_type: Optional[DataType] = None, **kwargs): @@ -120,6 +142,12 @@ def __init__(self, self.schema_registry_url = schema_registry_url self.schema_registry_username = schema_registry_username self.schema_registry_password = schema_registry_password + self.o_auth_bearer_method = o_auth_bearer_method + self.o_auth_bearer_client_id = o_auth_bearer_client_id + self.o_auth_bearer_client_secret = o_auth_bearer_client_secret + self.o_auth_bearer_scope = o_auth_bearer_scope + self.o_auth_bearer_token_endpoint_url = o_auth_bearer_token_endpoint_url # noqa: E501 + self.o_auth_bearer_extensions = o_auth_bearer_extensions self.authentication_mode = authentication_mode self.protocol = protocol self.cardinality = cardinality diff --git a/tests/decorators/test_kafka.py b/tests/decorators/test_kafka.py index 6d7f1b07..409df275 100644 --- a/tests/decorators/test_kafka.py +++ b/tests/decorators/test_kafka.py @@ -3,7 +3,7 @@ import unittest from azure.functions.decorators.constants import KAFKA_TRIGGER, KAFKA -from azure.functions.decorators.core import BindingDirection, \ +from azure.functions.decorators.core import BindingDirection, Cardinality, \ DataType from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \ BrokerAuthenticationMode, BrokerProtocol @@ -52,6 +52,7 @@ def test_kafka_trigger_valid_creation(self): "sslKeyLocation": "ssl_key_location", "sslKeyPassword": "ssl_key_password", "topic": "topic", + "cardinality": Cardinality.ONE, "type": KAFKA_TRIGGER, "username": "username"})