From 961e9147747dc218b283f40b255daefdc6d83521 Mon Sep 17 00:00:00 2001 From: Vivek Jilla Date: Thu, 8 Aug 2024 00:06:52 +0530 Subject: [PATCH] feat: decorator support for Kafka extension (#234) * initial changes for decorator support for Kafka * Fixing minor issues * fixing tests * some more changes post validations * Added Oauthbearer options support and fixing tests --- azure/functions/decorators/constants.py | 2 + azure/functions/decorators/function_app.py | 316 +++++++++++++++++++++ azure/functions/decorators/kafka.py | 155 ++++++++++ tests/decorators/test_kafka.py | 104 +++++++ 4 files changed, 577 insertions(+) create mode 100644 azure/functions/decorators/kafka.py create mode 100644 tests/decorators/test_kafka.py diff --git a/azure/functions/decorators/constants.py b/azure/functions/decorators/constants.py index 2787cb27..8d699b99 100644 --- a/azure/functions/decorators/constants.py +++ b/azure/functions/decorators/constants.py @@ -8,6 +8,8 @@ EVENT_HUB = "eventHub" HTTP_TRIGGER = "httpTrigger" HTTP_OUTPUT = "http" +KAFKA = "kafka" +KAFKA_TRIGGER = "kafkaTrigger" QUEUE = "queue" QUEUE_TRIGGER = "queueTrigger" SERVICE_BUS = "serviceBus" diff --git a/azure/functions/decorators/function_app.py b/azure/functions/decorators/function_app.py index 14af303f..feaa7605 100644 --- a/azure/functions/decorators/function_app.py +++ b/azure/functions/decorators/function_app.py @@ -24,6 +24,8 @@ from azure.functions.decorators.eventhub import EventHubTrigger, EventHubOutput from azure.functions.decorators.http import HttpTrigger, HttpOutput, \ HttpMethod +from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \ + BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod from azure.functions.decorators.queue import QueueTrigger, QueueOutput from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \ ServiceBusQueueOutput, ServiceBusTopicTrigger, \ @@ -1229,6 +1231,155 @@ def decorator(): return wrap + def kafka_trigger(self, + arg_name: str, + topic: str, + broker_list: str, + event_hub_connection_string: Optional[str] = None, + consumer_group: Optional[str] = None, + avro_schema: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + ssl_key_location: Optional[str] = None, + ssl_ca_location: Optional[str] = None, + ssl_certificate_location: Optional[str] = None, + ssl_key_password: Optional[str] = None, + schema_registry_url: Optional[str] = None, + schema_registry_username: Optional[str] = None, + schema_registry_password: Optional[str] = None, + o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501 + o_auth_bearer_client_id: Optional[str] = None, + o_auth_bearer_client_secret: Optional[str] = None, + o_auth_bearer_scope: Optional[str] = None, + o_auth_bearer_token_endpoint_url: Optional[str] = None, + o_auth_bearer_extensions: Optional[str] = None, + authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NotSet", # noqa E501 + protocol: Optional[Union[BrokerProtocol, str]] = "NotSet", # noqa E501 + cardinality: Optional[Union[Cardinality, str]] = "One", + lag_threshold: int = 1000, + data_type: Optional[Union[DataType, str]] = None, + **kwargs) -> Callable[..., Any]: + """ + The kafka_trigger decorator adds + :class:`KafkaTrigger` + to the :class:`FunctionBuilder` object + for building :class:`Function` object used in worker function + indexing model. This is equivalent to defining kafka trigger + in the function.json which enables function to be triggered to + respond to an event sent to a kafka topic. + All optional fields will be given default value by function host when + they are parsed by function host. + + Ref: https://aka.ms/kafkatrigger + + :param arg_name: the variable name used in function code for the + parameter that has the kafka event data. + :param topic: The topic monitored by the trigger. + :param broker_list: The list of Kafka brokers monitored by the trigger. + :param event_hub_connection_string: The name of an app setting that + contains the connection string for the eventhub when using Kafka + protocol header feature of Azure EventHubs. + :param consumer_group: Kafka consumer group used by the trigger. + :param avro_schema: This should be used only if a generic record + should be generated. + :param username: SASL username for use with the PLAIN and SASL-SCRAM-.. + mechanisms. Default is empty string. This is equivalent to + 'sasl.username' in librdkafka. + :param password: SASL password for use with the PLAIN and SASL-SCRAM-.. + mechanisms. Default is empty string. This is equivalent to + 'sasl.password' in librdkafka. + :param ssl_key_location: Path to client's private key (PEM) used for + authentication. Default is empty string. This is equivalent to + 'ssl.key.location' in librdkafka. + :param ssl_ca_location: Path to CA certificate file for verifying the + broker's certificate. This is equivalent to 'ssl.ca.location' in + librdkafka. + :param ssl_certificate_location: Path to client's certificate. This is + equivalent to 'ssl.certificate.location' in librdkafka. + :param ssl_key_password: Password for client's certificate. This is + equivalent to 'ssl.key.password' in librdkafka. + :param schema_registry_url: URL for the Avro Schema Registry. + :param schema_registry_username: Username for the Avro Schema Registry. + :param schema_registry_password: Password for the Avro Schema Registry. + :param o_auth_bearer_method: Either 'default' or 'oidc'. + sasl.oauthbearer in librdkafka. + :param o_auth_bearer_client_id: Specify only when o_auth_bearer_method + is 'oidc'. sasl.oauthbearer.client.id in librdkafka. + :param o_auth_bearer_client_secret: Specify only when + o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in + librdkafka. + :param o_auth_bearer_scope: Specify only when o_auth_bearer_method + is 'oidc'. Client use this to specify the scope of the access request + to the broker. sasl.oauthbearer.scope in librdkafka. + :param o_auth_bearer_token_endpoint_url: Specify only when + o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url + in librdkafka. + :param o_auth_bearer_extensions: Allow additional information to be + provided to the broker. Comma-separated list of key=value pairs. E.g., + "supportFeatureX=true,organizationId=sales-emea". + sasl.oauthbearer.extensions in librdkafka + :param authentication_mode: SASL mechanism to use for authentication. + Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is + Plain. This is equivalent to 'sasl.mechanism' in librdkafka. + :param protocol: Gets or sets the security protocol used to communicate + with brokers. Default is plain text. This is equivalent to + 'security.protocol' in librdkafka. TODO + :param lag_threshold: Maximum number of unprocessed messages a worker + is expected to have at an instance. When target-based scaling is not + disabled, this is used to divide total unprocessed event count to + determine the number of worker instances, which will then be rounded + up to a worker instance count that creates a balanced partition + distribution. Default is 1000. + :param data_type: Defines how Functions runtime should treat the + parameter value. + :param kwargs: Keyword arguments for specifying additional binding + fields to include in the binding json + :return: Decorator function. + """ + + @self._configure_function_builder + def wrap(fb): + def decorator(): + fb.add_trigger( + trigger=KafkaTrigger( + name=arg_name, + topic=topic, + broker_list=broker_list, + event_hub_connection_string=event_hub_connection_string, # noqa: E501 + consumer_group=consumer_group, + avro_schema=avro_schema, + username=username, + password=password, + ssl_key_location=ssl_key_location, + ssl_ca_location=ssl_ca_location, + ssl_certificate_location=ssl_certificate_location, + ssl_key_password=ssl_key_password, + schema_registry_url=schema_registry_url, + schema_registry_username=schema_registry_username, + schema_registry_password=schema_registry_password, + o_auth_bearer_method=parse_singular_param_to_enum( + o_auth_bearer_method, OAuthBearerMethod), + o_auth_bearer_client_id=o_auth_bearer_client_id, + o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501 + o_auth_bearer_scope=o_auth_bearer_scope, + o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501 + o_auth_bearer_extensions=o_auth_bearer_extensions, + authentication_mode=parse_singular_param_to_enum( + authentication_mode, BrokerAuthenticationMode), + protocol=parse_singular_param_to_enum(protocol, + BrokerProtocol), + cardinality=parse_singular_param_to_enum(cardinality, + Cardinality), + lag_threshold=lag_threshold, + data_type=parse_singular_param_to_enum(data_type, + DataType), + **kwargs)) + return fb + + return decorator() + + return wrap + def sql_trigger(self, arg_name: str, table_name: str, @@ -2212,6 +2363,171 @@ def decorator(): return wrap + def kafka_output(self, + arg_name: str, + topic: str, + broker_list: str, + avro_schema: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + ssl_key_location: Optional[str] = None, + ssl_ca_location: Optional[str] = None, + ssl_certificate_location: Optional[str] = None, + ssl_key_password: Optional[str] = None, + schema_registry_url: Optional[str] = None, + schema_registry_username: Optional[str] = None, + schema_registry_password: Optional[str] = None, + o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501 + o_auth_bearer_client_id: Optional[str] = None, + o_auth_bearer_client_secret: Optional[str] = None, + o_auth_bearer_scope: Optional[str] = None, + o_auth_bearer_token_endpoint_url: Optional[str] = None, + o_auth_bearer_extensions: Optional[str] = None, + max_message_bytes: int = 1_000_000, + batch_size: int = 10_000, + enable_idempotence: bool = False, + message_timeout_ms: int = 300_000, + request_timeout_ms: int = 5_000, + max_retries: int = 2_147_483_647, + authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NOTSET", # noqa E501 + protocol: Optional[Union[BrokerProtocol, str]] = "NOTSET", + linger_ms: int = 5, + data_type: Optional[Union[DataType, str]] = None, + **kwargs) -> Callable[..., Any]: + """ + The kafka_output decorator adds + :class:`KafkaOutput` + to the :class:`FunctionBuilder` object + for building :class:`Function` object used in worker function + indexing model. This is equivalent to defining output binding + in the function.json which enables function to + write events to a kafka topic. + All optional fields will be given default value by function host when + they are parsed by function host. + + Ref: https://aka.ms/kafkaoutput + + :param arg_name: The variable name used in function code that + represents the event. + :param topic: The topic monitored by the trigger. + :param broker_list: The list of Kafka brokers monitored by the trigger. + :param avro_schema: This should be used only if a generic record + should be generated. + :param username: SASL username for use with the PLAIN and SASL-SCRAM-.. + mechanisms. Default is empty string. This is equivalent to + 'sasl.username' in librdkafka. + :param password: SASL password for use with the PLAIN and SASL-SCRAM-.. + mechanisms. Default is empty string. This is equivalent to + 'sasl.password' in librdkafka. + :param ssl_key_location: Path to client's private key (PEM) used for + authentication. Default is empty string. This is equivalent to + 'ssl.key.location' in librdkafka. + :param ssl_ca_location: Path to CA certificate file for verifying the + broker's certificate. This is equivalent to 'ssl.ca.location' in + librdkafka. + :param ssl_certificate_location: Path to client's certificate. This is + equivalent to 'ssl.certificate.location' in librdkafka. + :param ssl_key_password: Password for client's certificate. This is + equivalent to 'ssl.key.password' in librdkafka. + :param schema_registry_url: URL for the Avro Schema Registry. + :param schema_registry_username: Username for the Avro Schema Registry. + :param schema_registry_password: Password for the Avro Schema Registry. + :param o_auth_bearer_method: Either 'default' or 'oidc'. + sasl.oauthbearer in librdkafka. + :param o_auth_bearer_client_id: Specify only when o_auth_bearer_method + is 'oidc'. sasl.oauthbearer.client.id in librdkafka. + :param o_auth_bearer_client_secret: Specify only when + o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in + librdkafka. + :param o_auth_bearer_scope: Specify only when o_auth_bearer_method + is 'oidc'. Client use this to specify the scope of the access request + to the broker. sasl.oauthbearer.scope in librdkafka. + :param o_auth_bearer_token_endpoint_url: Specify only when + o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url + in librdkafka. + :param o_auth_bearer_extensions: Allow additional information to be + provided to the broker. Comma-separated list of key=value pairs. E.g., + "supportFeatureX=true,organizationId=sales-emea". + sasl.oauthbearer.extensions in librdkafka + :param max_message_bytes: Maximum transmit message size. Default is 1MB + :param batch_size: Maximum number of messages batched in one MessageSet + Default is 10000. + :param enable_idempotence: When set to `true`, the producer will ensure + that messages are successfully produced exactly once and in the + original produce order. Default is false. + :param message_timeout_ms: Local message timeout. This value is only + enforced locally and limits the time a produced message waits for + successful delivery. A time of 0 is infinite. This is the maximum time + used to deliver a message (including retries). Delivery error occurs + when either the retry count or the message timeout are exceeded. + Default is 300000. + :param request_timeout_ms: The ack timeout of the producer request in + milliseconds. Default is 5000. + :param max_retries: How many times to retry sending a failing Message. + Default is 2147483647. Retrying may cause reordering unless + 'EnableIdempotence' is set to 'True'. + :param authentication_mode: SASL mechanism to use for authentication. + Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is + Plain. This is equivalent to 'sasl.mechanism' in librdkafka. + :param protocol: Gets or sets the security protocol used to communicate + with brokers. Default is plain text. This is equivalent to + 'security.protocol' in librdkafka. + :param linger_ms: Linger.MS property provides the time between batches + of messages being sent to cluster. Larger value allows more batching + results in high throughput. + :param data_type: Defines how Functions runtime should treat the + parameter value. + :param kwargs: Keyword arguments for specifying additional binding + fields to include in the binding json + + :return: Decorator function. + """ + + @self._configure_function_builder + def wrap(fb): + def decorator(): + fb.add_binding( + binding=KafkaOutput( + name=arg_name, + topic=topic, + broker_list=broker_list, + avro_schema=avro_schema, + username=username, + password=password, + ssl_key_location=ssl_key_location, + ssl_ca_location=ssl_ca_location, + ssl_certificate_location=ssl_certificate_location, + ssl_key_password=ssl_key_password, + schema_registry_url=schema_registry_url, + schema_registry_username=schema_registry_username, + schema_registry_password=schema_registry_password, + o_auth_bearer_method=parse_singular_param_to_enum( + o_auth_bearer_method, OAuthBearerMethod), + o_auth_bearer_client_id=o_auth_bearer_client_id, + o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501 + o_auth_bearer_scope=o_auth_bearer_scope, + o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501 + o_auth_bearer_extensions=o_auth_bearer_extensions, + max_message_bytes=max_message_bytes, + batch_size=batch_size, + enable_idempotence=enable_idempotence, + message_timeout_ms=message_timeout_ms, + request_timeout_ms=request_timeout_ms, + max_retries=max_retries, + authentication_mode=parse_singular_param_to_enum( + authentication_mode, BrokerAuthenticationMode), + protocol=parse_singular_param_to_enum(protocol, + BrokerProtocol), + linger_ms=linger_ms, + data_type=parse_singular_param_to_enum(data_type, + DataType), + **kwargs)) + return fb + + return decorator() + + return wrap + def table_input(self, arg_name: str, connection: str, diff --git a/azure/functions/decorators/kafka.py b/azure/functions/decorators/kafka.py new file mode 100644 index 00000000..3e726f88 --- /dev/null +++ b/azure/functions/decorators/kafka.py @@ -0,0 +1,155 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from typing import Optional + +from azure.functions.decorators.constants import KAFKA, KAFKA_TRIGGER +from azure.functions.decorators.core import Cardinality, DataType, \ + OutputBinding, Trigger +from .utils import StringifyEnum + + +class BrokerAuthenticationMode(StringifyEnum): + NOTSET = -1 + GSSAPI = 0 + PLAIN = 1 + SCRAMSHA256 = 2 + SCRAMSHA512 = 3 + + +class BrokerProtocol(StringifyEnum): + NOTSET = -1 + PLAINTEXT = 0 + SSL = 1 + SASLPLAINTEXT = 2 + SASLSSL = 3 + + +class OAuthBearerMethod(StringifyEnum): + DEFAULT = 0, + OIDC = 1 + + +class KafkaOutput(OutputBinding): + @staticmethod + def get_binding_name() -> str: + return KAFKA + + def __init__(self, + name: str, + topic: str, + broker_list: str, + avro_schema: Optional[str], + username: Optional[str], + password: Optional[str], + ssl_key_location: Optional[str], + ssl_ca_location: Optional[str], + ssl_certificate_location: Optional[str], + ssl_key_password: Optional[str], + schema_registry_url: Optional[str], + schema_registry_username: Optional[str], + schema_registry_password: Optional[str], + o_auth_bearer_method: Optional[OAuthBearerMethod] = None, + o_auth_bearer_client_id: Optional[str] = None, + o_auth_bearer_client_secret: Optional[str] = None, + o_auth_bearer_scope: Optional[str] = None, + o_auth_bearer_token_endpoint_url: Optional[str] = None, + o_auth_bearer_extensions: Optional[str] = None, + max_message_bytes: int = 1_000_000, + batch_size: int = 10_000, + enable_idempotence: bool = False, + message_timeout_ms: int = 300_000, + request_timeout_ms: int = 5_000, + max_retries: int = 2_147_483_647, + authentication_mode: Optional[BrokerAuthenticationMode] = BrokerAuthenticationMode.NOTSET, # noqa: E501 + protocol: Optional[BrokerProtocol] = BrokerProtocol.NOTSET, + linger_ms: int = 5, + data_type: Optional[DataType] = None, + **kwargs): + self.topic = topic + self.broker_list = broker_list + self.avro_schema = avro_schema + self.username = username + self.password = password + self.ssl_key_location = ssl_key_location + self.ssl_ca_location = ssl_ca_location + self.ssl_certificate_location = ssl_certificate_location + self.ssl_key_password = ssl_key_password + self.schema_registry_url = schema_registry_url + self.schema_registry_username = schema_registry_username + self.schema_registry_password = schema_registry_password + self.o_auth_bearer_method = o_auth_bearer_method + self.o_auth_bearer_client_id = o_auth_bearer_client_id + self.o_auth_bearer_client_secret = o_auth_bearer_client_secret + self.o_auth_bearer_scope = o_auth_bearer_scope + self.o_auth_bearer_token_endpoint_url = o_auth_bearer_token_endpoint_url # noqa: E501 + self.o_auth_bearer_extensions = o_auth_bearer_extensions + self.max_message_bytes = max_message_bytes + self.batch_size = batch_size + self.enable_idempotence = enable_idempotence + self.message_timeout_ms = message_timeout_ms + self.request_timeout_ms = request_timeout_ms + self.max_retries = max_retries + self.authentication_mode = authentication_mode + self.protocol = protocol + self.linger_ms = linger_ms + super().__init__(name=name, data_type=data_type) + + +class KafkaTrigger(Trigger): + @staticmethod + def get_binding_name() -> str: + return KAFKA_TRIGGER + + def __init__(self, + name: str, + topic: str, + broker_list: str, + event_hub_connection_string: Optional[str], + consumer_group: Optional[str], + avro_schema: Optional[str], + username: Optional[str], + password: Optional[str], + ssl_key_location: Optional[str], + ssl_ca_location: Optional[str], + ssl_certificate_location: Optional[str], + ssl_key_password: Optional[str], + schema_registry_url: Optional[str], + schema_registry_username: Optional[str], + schema_registry_password: Optional[str], + o_auth_bearer_method: Optional[OAuthBearerMethod] = None, + o_auth_bearer_client_id: Optional[str] = None, + o_auth_bearer_client_secret: Optional[str] = None, + o_auth_bearer_scope: Optional[str] = None, + o_auth_bearer_token_endpoint_url: Optional[str] = None, + o_auth_bearer_extensions: Optional[str] = None, + authentication_mode: Optional[BrokerAuthenticationMode] = BrokerAuthenticationMode.NOTSET, # noqa: E501 + protocol: Optional[BrokerProtocol] = BrokerProtocol.NOTSET, + cardinality: Optional[Cardinality] = Cardinality.ONE, + lag_threshold: int = 1000, + data_type: Optional[DataType] = None, + **kwargs): + self.topic = topic + self.broker_list = broker_list + self.event_hub_connection_string = event_hub_connection_string + self.consumer_group = consumer_group + self.avro_schema = avro_schema + self.username = username + self.password = password + self.ssl_key_location = ssl_key_location + self.ssl_ca_location = ssl_ca_location + self.ssl_certificate_location = ssl_certificate_location + self.ssl_key_password = ssl_key_password + self.schema_registry_url = schema_registry_url + self.schema_registry_username = schema_registry_username + self.schema_registry_password = schema_registry_password + self.o_auth_bearer_method = o_auth_bearer_method + self.o_auth_bearer_client_id = o_auth_bearer_client_id + self.o_auth_bearer_client_secret = o_auth_bearer_client_secret + self.o_auth_bearer_scope = o_auth_bearer_scope + self.o_auth_bearer_token_endpoint_url = o_auth_bearer_token_endpoint_url # noqa: E501 + self.o_auth_bearer_extensions = o_auth_bearer_extensions + self.authentication_mode = authentication_mode + self.protocol = protocol + self.cardinality = cardinality + self.lag_threshold = lag_threshold + super().__init__(name=name, data_type=data_type) diff --git a/tests/decorators/test_kafka.py b/tests/decorators/test_kafka.py new file mode 100644 index 00000000..409df275 --- /dev/null +++ b/tests/decorators/test_kafka.py @@ -0,0 +1,104 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import unittest + +from azure.functions.decorators.constants import KAFKA_TRIGGER, KAFKA +from azure.functions.decorators.core import BindingDirection, Cardinality, \ + DataType +from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \ + BrokerAuthenticationMode, BrokerProtocol + + +class TestKafka(unittest.TestCase): + def test_kafka_trigger_valid_creation(self): + trigger = KafkaTrigger(name="arg_name", + topic="topic", + broker_list="broker_list", + event_hub_connection_string="ehcs", + consumer_group="consumer_group", + avro_schema="avro_schema", + username="username", + password="password", + ssl_key_location="ssl_key_location", + ssl_ca_location="ssl_ca_location", + ssl_certificate_location="scl", + ssl_key_password="ssl_key_password", + schema_registry_url="srurl", + schema_registry_username="sruser", + schema_registry_password="srp", + authentication_mode=BrokerAuthenticationMode.PLAIN, # noqa: E501 + data_type=DataType.UNDEFINED, + dummy_field="dummy") + + self.assertEqual(trigger.get_binding_name(), "kafkaTrigger") + self.assertEqual(trigger.get_dict_repr(), + {"authenticationMode": BrokerAuthenticationMode.PLAIN, + "avroSchema": "avro_schema", + "brokerList": "broker_list", + "consumerGroup": "consumer_group", + "dataType": DataType.UNDEFINED, + "direction": BindingDirection.IN, + "dummyField": "dummy", + "eventHubConnectionString": "ehcs", + "lagThreshold": 1000, + "name": "arg_name", + "password": "password", + "protocol": BrokerProtocol.NOTSET, + "schemaRegistryPassword": "srp", + "schemaRegistryUrl": "srurl", + "schemaRegistryUsername": "sruser", + "sslCaLocation": "ssl_ca_location", + "sslCertificateLocation": "scl", + "sslKeyLocation": "ssl_key_location", + "sslKeyPassword": "ssl_key_password", + "topic": "topic", + "cardinality": Cardinality.ONE, + "type": KAFKA_TRIGGER, + "username": "username"}) + + def test_kafka_output_valid_creation(self): + output = KafkaOutput(name="arg_name", + topic="topic", + broker_list="broker_list", + avro_schema="avro_schema", + username="username", + password="password", + ssl_key_location="ssl_key_location", + ssl_ca_location="ssl_ca_location", + ssl_certificate_location="scl", + ssl_key_password="ssl_key_password", + schema_registry_url="schema_registry_url", + schema_registry_username="sru", + schema_registry_password="srp", + max_retries=10, + data_type=DataType.UNDEFINED, + dummy_field="dummy") + + self.assertEqual(output.get_binding_name(), "kafka") + self.assertEqual(output.get_dict_repr(), + {'authenticationMode': BrokerAuthenticationMode.NOTSET, # noqa: E501 + 'avroSchema': 'avro_schema', + 'batchSize': 10000, + 'brokerList': 'broker_list', + 'dataType': DataType.UNDEFINED, + 'direction': BindingDirection.OUT, + 'dummyField': 'dummy', + 'enableIdempotence': False, + 'lingerMs': 5, + 'maxMessageBytes': 1000000, + 'maxRetries': 10, + 'messageTimeoutMs': 300000, + 'name': 'arg_name', + 'password': 'password', + 'protocol': BrokerProtocol.NOTSET, + 'requestTimeoutMs': 5000, + 'schemaRegistryPassword': 'srp', + 'schemaRegistryUrl': 'schema_registry_url', + 'schemaRegistryUsername': 'sru', + 'sslCaLocation': 'ssl_ca_location', + 'sslCertificateLocation': 'scl', + 'sslKeyLocation': 'ssl_key_location', + 'sslKeyPassword': 'ssl_key_password', + 'topic': 'topic', + 'type': KAFKA, + 'username': 'username'})