Skip to content

Commit

Permalink
feat: decorator support for Kafka extension (#234)
Browse files Browse the repository at this point in the history
* initial changes for decorator support for Kafka

* Fixing minor issues

* fixing tests

* some more changes post validations

* Added Oauthbearer options support and fixing tests
  • Loading branch information
vivekjilla authored Aug 7, 2024
1 parent a0b8692 commit 961e914
Show file tree
Hide file tree
Showing 4 changed files with 577 additions and 0 deletions.
2 changes: 2 additions & 0 deletions azure/functions/decorators/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
EVENT_HUB = "eventHub"
HTTP_TRIGGER = "httpTrigger"
HTTP_OUTPUT = "http"
KAFKA = "kafka"
KAFKA_TRIGGER = "kafkaTrigger"
QUEUE = "queue"
QUEUE_TRIGGER = "queueTrigger"
SERVICE_BUS = "serviceBus"
Expand Down
316 changes: 316 additions & 0 deletions azure/functions/decorators/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from azure.functions.decorators.eventhub import EventHubTrigger, EventHubOutput
from azure.functions.decorators.http import HttpTrigger, HttpOutput, \
HttpMethod
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod
from azure.functions.decorators.queue import QueueTrigger, QueueOutput
from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \
ServiceBusQueueOutput, ServiceBusTopicTrigger, \
Expand Down Expand Up @@ -1229,6 +1231,155 @@ def decorator():

return wrap

def kafka_trigger(self,
arg_name: str,
topic: str,
broker_list: str,
event_hub_connection_string: Optional[str] = None,
consumer_group: Optional[str] = None,
avro_schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
o_auth_bearer_scope: Optional[str] = None,
o_auth_bearer_token_endpoint_url: Optional[str] = None,
o_auth_bearer_extensions: Optional[str] = None,
authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NotSet", # noqa E501
protocol: Optional[Union[BrokerProtocol, str]] = "NotSet", # noqa E501
cardinality: Optional[Union[Cardinality, str]] = "One",
lag_threshold: int = 1000,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The kafka_trigger decorator adds
:class:`KafkaTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining kafka trigger
in the function.json which enables function to be triggered to
respond to an event sent to a kafka topic.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/kafkatrigger
:param arg_name: the variable name used in function code for the
parameter that has the kafka event data.
:param topic: The topic monitored by the trigger.
:param broker_list: The list of Kafka brokers monitored by the trigger.
:param event_hub_connection_string: The name of an app setting that
contains the connection string for the eventhub when using Kafka
protocol header feature of Azure EventHubs.
:param consumer_group: Kafka consumer group used by the trigger.
:param avro_schema: This should be used only if a generic record
should be generated.
:param username: SASL username for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.username' in librdkafka.
:param password: SASL password for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.password' in librdkafka.
:param ssl_key_location: Path to client's private key (PEM) used for
authentication. Default is empty string. This is equivalent to
'ssl.key.location' in librdkafka.
:param ssl_ca_location: Path to CA certificate file for verifying the
broker's certificate. This is equivalent to 'ssl.ca.location' in
librdkafka.
:param ssl_certificate_location: Path to client's certificate. This is
equivalent to 'ssl.certificate.location' in librdkafka.
:param ssl_key_password: Password for client's certificate. This is
equivalent to 'ssl.key.password' in librdkafka.
:param schema_registry_url: URL for the Avro Schema Registry.
:param schema_registry_username: Username for the Avro Schema Registry.
:param schema_registry_password: Password for the Avro Schema Registry.
:param o_auth_bearer_method: Either 'default' or 'oidc'.
sasl.oauthbearer in librdkafka.
:param o_auth_bearer_client_id: Specify only when o_auth_bearer_method
is 'oidc'. sasl.oauthbearer.client.id in librdkafka.
:param o_auth_bearer_client_secret: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in
librdkafka.
:param o_auth_bearer_scope: Specify only when o_auth_bearer_method
is 'oidc'. Client use this to specify the scope of the access request
to the broker. sasl.oauthbearer.scope in librdkafka.
:param o_auth_bearer_token_endpoint_url: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url
in librdkafka.
:param o_auth_bearer_extensions: Allow additional information to be
provided to the broker. Comma-separated list of key=value pairs. E.g.,
"supportFeatureX=true,organizationId=sales-emea".
sasl.oauthbearer.extensions in librdkafka
:param authentication_mode: SASL mechanism to use for authentication.
Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is
Plain. This is equivalent to 'sasl.mechanism' in librdkafka.
:param protocol: Gets or sets the security protocol used to communicate
with brokers. Default is plain text. This is equivalent to
'security.protocol' in librdkafka. TODO
:param lag_threshold: Maximum number of unprocessed messages a worker
is expected to have at an instance. When target-based scaling is not
disabled, this is used to divide total unprocessed event count to
determine the number of worker instances, which will then be rounded
up to a worker instance count that creates a balanced partition
distribution. Default is 1000.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json
:return: Decorator function.
"""

@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=KafkaTrigger(
name=arg_name,
topic=topic,
broker_list=broker_list,
event_hub_connection_string=event_hub_connection_string, # noqa: E501
consumer_group=consumer_group,
avro_schema=avro_schema,
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
o_auth_bearer_method=parse_singular_param_to_enum(
o_auth_bearer_method, OAuthBearerMethod),
o_auth_bearer_client_id=o_auth_bearer_client_id,
o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501
o_auth_bearer_scope=o_auth_bearer_scope,
o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501
o_auth_bearer_extensions=o_auth_bearer_extensions,
authentication_mode=parse_singular_param_to_enum(
authentication_mode, BrokerAuthenticationMode),
protocol=parse_singular_param_to_enum(protocol,
BrokerProtocol),
cardinality=parse_singular_param_to_enum(cardinality,
Cardinality),
lag_threshold=lag_threshold,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb

return decorator()

return wrap

def sql_trigger(self,
arg_name: str,
table_name: str,
Expand Down Expand Up @@ -2212,6 +2363,171 @@ def decorator():

return wrap

def kafka_output(self,
arg_name: str,
topic: str,
broker_list: str,
avro_schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
o_auth_bearer_scope: Optional[str] = None,
o_auth_bearer_token_endpoint_url: Optional[str] = None,
o_auth_bearer_extensions: Optional[str] = None,
max_message_bytes: int = 1_000_000,
batch_size: int = 10_000,
enable_idempotence: bool = False,
message_timeout_ms: int = 300_000,
request_timeout_ms: int = 5_000,
max_retries: int = 2_147_483_647,
authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NOTSET", # noqa E501
protocol: Optional[Union[BrokerProtocol, str]] = "NOTSET",
linger_ms: int = 5,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The kafka_output decorator adds
:class:`KafkaOutput`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining output binding
in the function.json which enables function to
write events to a kafka topic.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/kafkaoutput
:param arg_name: The variable name used in function code that
represents the event.
:param topic: The topic monitored by the trigger.
:param broker_list: The list of Kafka brokers monitored by the trigger.
:param avro_schema: This should be used only if a generic record
should be generated.
:param username: SASL username for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.username' in librdkafka.
:param password: SASL password for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.password' in librdkafka.
:param ssl_key_location: Path to client's private key (PEM) used for
authentication. Default is empty string. This is equivalent to
'ssl.key.location' in librdkafka.
:param ssl_ca_location: Path to CA certificate file for verifying the
broker's certificate. This is equivalent to 'ssl.ca.location' in
librdkafka.
:param ssl_certificate_location: Path to client's certificate. This is
equivalent to 'ssl.certificate.location' in librdkafka.
:param ssl_key_password: Password for client's certificate. This is
equivalent to 'ssl.key.password' in librdkafka.
:param schema_registry_url: URL for the Avro Schema Registry.
:param schema_registry_username: Username for the Avro Schema Registry.
:param schema_registry_password: Password for the Avro Schema Registry.
:param o_auth_bearer_method: Either 'default' or 'oidc'.
sasl.oauthbearer in librdkafka.
:param o_auth_bearer_client_id: Specify only when o_auth_bearer_method
is 'oidc'. sasl.oauthbearer.client.id in librdkafka.
:param o_auth_bearer_client_secret: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in
librdkafka.
:param o_auth_bearer_scope: Specify only when o_auth_bearer_method
is 'oidc'. Client use this to specify the scope of the access request
to the broker. sasl.oauthbearer.scope in librdkafka.
:param o_auth_bearer_token_endpoint_url: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url
in librdkafka.
:param o_auth_bearer_extensions: Allow additional information to be
provided to the broker. Comma-separated list of key=value pairs. E.g.,
"supportFeatureX=true,organizationId=sales-emea".
sasl.oauthbearer.extensions in librdkafka
:param max_message_bytes: Maximum transmit message size. Default is 1MB
:param batch_size: Maximum number of messages batched in one MessageSet
Default is 10000.
:param enable_idempotence: When set to `true`, the producer will ensure
that messages are successfully produced exactly once and in the
original produce order. Default is false.
:param message_timeout_ms: Local message timeout. This value is only
enforced locally and limits the time a produced message waits for
successful delivery. A time of 0 is infinite. This is the maximum time
used to deliver a message (including retries). Delivery error occurs
when either the retry count or the message timeout are exceeded.
Default is 300000.
:param request_timeout_ms: The ack timeout of the producer request in
milliseconds. Default is 5000.
:param max_retries: How many times to retry sending a failing Message.
Default is 2147483647. Retrying may cause reordering unless
'EnableIdempotence' is set to 'True'.
:param authentication_mode: SASL mechanism to use for authentication.
Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is
Plain. This is equivalent to 'sasl.mechanism' in librdkafka.
:param protocol: Gets or sets the security protocol used to communicate
with brokers. Default is plain text. This is equivalent to
'security.protocol' in librdkafka.
:param linger_ms: Linger.MS property provides the time between batches
of messages being sent to cluster. Larger value allows more batching
results in high throughput.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json
:return: Decorator function.
"""

@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_binding(
binding=KafkaOutput(
name=arg_name,
topic=topic,
broker_list=broker_list,
avro_schema=avro_schema,
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
o_auth_bearer_method=parse_singular_param_to_enum(
o_auth_bearer_method, OAuthBearerMethod),
o_auth_bearer_client_id=o_auth_bearer_client_id,
o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501
o_auth_bearer_scope=o_auth_bearer_scope,
o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501
o_auth_bearer_extensions=o_auth_bearer_extensions,
max_message_bytes=max_message_bytes,
batch_size=batch_size,
enable_idempotence=enable_idempotence,
message_timeout_ms=message_timeout_ms,
request_timeout_ms=request_timeout_ms,
max_retries=max_retries,
authentication_mode=parse_singular_param_to_enum(
authentication_mode, BrokerAuthenticationMode),
protocol=parse_singular_param_to_enum(protocol,
BrokerProtocol),
linger_ms=linger_ms,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb

return decorator()

return wrap

def table_input(self,
arg_name: str,
connection: str,
Expand Down
Loading

0 comments on commit 961e914

Please sign in to comment.