Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: decorator support for Kafka extension #234

Merged
merged 7 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions azure/functions/decorators/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
EVENT_HUB = "eventHub"
HTTP_TRIGGER = "httpTrigger"
HTTP_OUTPUT = "http"
KAFKA = "kafka"
KAFKA_TRIGGER = "kafkaTrigger"
QUEUE = "queue"
QUEUE_TRIGGER = "queueTrigger"
SERVICE_BUS = "serviceBus"
Expand Down
316 changes: 316 additions & 0 deletions azure/functions/decorators/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from azure.functions.decorators.eventhub import EventHubTrigger, EventHubOutput
from azure.functions.decorators.http import HttpTrigger, HttpOutput, \
HttpMethod
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod
from azure.functions.decorators.queue import QueueTrigger, QueueOutput
from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \
ServiceBusQueueOutput, ServiceBusTopicTrigger, \
Expand Down Expand Up @@ -1229,6 +1231,155 @@ def decorator():

return wrap

def kafka_trigger(self,
arg_name: str,
topic: str,
broker_list: str,
event_hub_connection_string: Optional[str] = None,
consumer_group: Optional[str] = None,
avro_schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
o_auth_bearer_scope: Optional[str] = None,
o_auth_bearer_token_endpoint_url: Optional[str] = None,
o_auth_bearer_extensions: Optional[str] = None,
authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NotSet", # noqa E501
protocol: Optional[Union[BrokerProtocol, str]] = "NotSet", # noqa E501
cardinality: Optional[Union[Cardinality, str]] = "One",
lag_threshold: int = 1000,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The kafka_trigger decorator adds
:class:`KafkaTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining kafka trigger
in the function.json which enables function to be triggered to
respond to an event sent to a kafka topic.
All optional fields will be given default value by function host when
they are parsed by function host.

Ref: https://aka.ms/kafkatrigger

:param arg_name: the variable name used in function code for the
parameter that has the kafka event data.
:param topic: The topic monitored by the trigger.
:param broker_list: The list of Kafka brokers monitored by the trigger.
:param event_hub_connection_string: The name of an app setting that
contains the connection string for the eventhub when using Kafka
protocol header feature of Azure EventHubs.
:param consumer_group: Kafka consumer group used by the trigger.
:param avro_schema: This should be used only if a generic record
should be generated.
:param username: SASL username for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.username' in librdkafka.
:param password: SASL password for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.password' in librdkafka.
:param ssl_key_location: Path to client's private key (PEM) used for
authentication. Default is empty string. This is equivalent to
'ssl.key.location' in librdkafka.
:param ssl_ca_location: Path to CA certificate file for verifying the
broker's certificate. This is equivalent to 'ssl.ca.location' in
librdkafka.
:param ssl_certificate_location: Path to client's certificate. This is
equivalent to 'ssl.certificate.location' in librdkafka.
:param ssl_key_password: Password for client's certificate. This is
equivalent to 'ssl.key.password' in librdkafka.
:param schema_registry_url: URL for the Avro Schema Registry.
:param schema_registry_username: Username for the Avro Schema Registry.
:param schema_registry_password: Password for the Avro Schema Registry.
:param o_auth_bearer_method: Either 'default' or 'oidc'.
sasl.oauthbearer in librdkafka.
:param o_auth_bearer_client_id: Specify only when o_auth_bearer_method
is 'oidc'. sasl.oauthbearer.client.id in librdkafka.
:param o_auth_bearer_client_secret: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in
librdkafka.
:param o_auth_bearer_scope: Specify only when o_auth_bearer_method
is 'oidc'. Client use this to specify the scope of the access request
to the broker. sasl.oauthbearer.scope in librdkafka.
:param o_auth_bearer_token_endpoint_url: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url
in librdkafka.
:param o_auth_bearer_extensions: Allow additional information to be
provided to the broker. Comma-separated list of key=value pairs. E.g.,
"supportFeatureX=true,organizationId=sales-emea".
sasl.oauthbearer.extensions in librdkafka
:param authentication_mode: SASL mechanism to use for authentication.
Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is
Plain. This is equivalent to 'sasl.mechanism' in librdkafka.
:param protocol: Gets or sets the security protocol used to communicate
with brokers. Default is plain text. This is equivalent to
'security.protocol' in librdkafka. TODO
:param lag_threshold: Maximum number of unprocessed messages a worker
is expected to have at an instance. When target-based scaling is not
disabled, this is used to divide total unprocessed event count to
determine the number of worker instances, which will then be rounded
up to a worker instance count that creates a balanced partition
distribution. Default is 1000.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json
:return: Decorator function.
"""

@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=KafkaTrigger(
name=arg_name,
topic=topic,
broker_list=broker_list,
event_hub_connection_string=event_hub_connection_string, # noqa: E501
consumer_group=consumer_group,
avro_schema=avro_schema,
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
o_auth_bearer_method=parse_singular_param_to_enum(
o_auth_bearer_method, OAuthBearerMethod),
o_auth_bearer_client_id=o_auth_bearer_client_id,
o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501
o_auth_bearer_scope=o_auth_bearer_scope,
o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501
o_auth_bearer_extensions=o_auth_bearer_extensions,
authentication_mode=parse_singular_param_to_enum(
authentication_mode, BrokerAuthenticationMode),
protocol=parse_singular_param_to_enum(protocol,
BrokerProtocol),
cardinality=parse_singular_param_to_enum(cardinality,
Cardinality),
lag_threshold=lag_threshold,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb

return decorator()

return wrap

def sql_trigger(self,
arg_name: str,
table_name: str,
Expand Down Expand Up @@ -2212,6 +2363,171 @@ def decorator():

return wrap

def kafka_output(self,
arg_name: str,
topic: str,
broker_list: str,
avro_schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
o_auth_bearer_scope: Optional[str] = None,
o_auth_bearer_token_endpoint_url: Optional[str] = None,
o_auth_bearer_extensions: Optional[str] = None,
max_message_bytes: int = 1_000_000,
batch_size: int = 10_000,
enable_idempotence: bool = False,
message_timeout_ms: int = 300_000,
request_timeout_ms: int = 5_000,
max_retries: int = 2_147_483_647,
authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NOTSET", # noqa E501
protocol: Optional[Union[BrokerProtocol, str]] = "NOTSET",
linger_ms: int = 5,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The kafka_output decorator adds
:class:`KafkaOutput`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining output binding
in the function.json which enables function to
write events to a kafka topic.
All optional fields will be given default value by function host when
they are parsed by function host.

Ref: https://aka.ms/kafkaoutput

:param arg_name: The variable name used in function code that
represents the event.
:param topic: The topic monitored by the trigger.
:param broker_list: The list of Kafka brokers monitored by the trigger.
:param avro_schema: This should be used only if a generic record
should be generated.
:param username: SASL username for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.username' in librdkafka.
:param password: SASL password for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.password' in librdkafka.
:param ssl_key_location: Path to client's private key (PEM) used for
authentication. Default is empty string. This is equivalent to
'ssl.key.location' in librdkafka.
:param ssl_ca_location: Path to CA certificate file for verifying the
broker's certificate. This is equivalent to 'ssl.ca.location' in
librdkafka.
:param ssl_certificate_location: Path to client's certificate. This is
equivalent to 'ssl.certificate.location' in librdkafka.
:param ssl_key_password: Password for client's certificate. This is
equivalent to 'ssl.key.password' in librdkafka.
:param schema_registry_url: URL for the Avro Schema Registry.
:param schema_registry_username: Username for the Avro Schema Registry.
:param schema_registry_password: Password for the Avro Schema Registry.
:param o_auth_bearer_method: Either 'default' or 'oidc'.
sasl.oauthbearer in librdkafka.
:param o_auth_bearer_client_id: Specify only when o_auth_bearer_method
is 'oidc'. sasl.oauthbearer.client.id in librdkafka.
:param o_auth_bearer_client_secret: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in
librdkafka.
:param o_auth_bearer_scope: Specify only when o_auth_bearer_method
is 'oidc'. Client use this to specify the scope of the access request
to the broker. sasl.oauthbearer.scope in librdkafka.
:param o_auth_bearer_token_endpoint_url: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url
in librdkafka.
:param o_auth_bearer_extensions: Allow additional information to be
provided to the broker. Comma-separated list of key=value pairs. E.g.,
"supportFeatureX=true,organizationId=sales-emea".
sasl.oauthbearer.extensions in librdkafka
:param max_message_bytes: Maximum transmit message size. Default is 1MB
:param batch_size: Maximum number of messages batched in one MessageSet
Default is 10000.
:param enable_idempotence: When set to `true`, the producer will ensure
that messages are successfully produced exactly once and in the
original produce order. Default is false.
:param message_timeout_ms: Local message timeout. This value is only
enforced locally and limits the time a produced message waits for
successful delivery. A time of 0 is infinite. This is the maximum time
used to deliver a message (including retries). Delivery error occurs
when either the retry count or the message timeout are exceeded.
Default is 300000.
:param request_timeout_ms: The ack timeout of the producer request in
milliseconds. Default is 5000.
:param max_retries: How many times to retry sending a failing Message.
Default is 2147483647. Retrying may cause reordering unless
'EnableIdempotence' is set to 'True'.
:param authentication_mode: SASL mechanism to use for authentication.
Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is
Plain. This is equivalent to 'sasl.mechanism' in librdkafka.
:param protocol: Gets or sets the security protocol used to communicate
with brokers. Default is plain text. This is equivalent to
'security.protocol' in librdkafka.
:param linger_ms: Linger.MS property provides the time between batches
of messages being sent to cluster. Larger value allows more batching
results in high throughput.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json

:return: Decorator function.
"""

@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_binding(
binding=KafkaOutput(
name=arg_name,
topic=topic,
broker_list=broker_list,
avro_schema=avro_schema,
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
o_auth_bearer_method=parse_singular_param_to_enum(
o_auth_bearer_method, OAuthBearerMethod),
o_auth_bearer_client_id=o_auth_bearer_client_id,
o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501
o_auth_bearer_scope=o_auth_bearer_scope,
o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501
o_auth_bearer_extensions=o_auth_bearer_extensions,
max_message_bytes=max_message_bytes,
batch_size=batch_size,
enable_idempotence=enable_idempotence,
message_timeout_ms=message_timeout_ms,
request_timeout_ms=request_timeout_ms,
max_retries=max_retries,
authentication_mode=parse_singular_param_to_enum(
authentication_mode, BrokerAuthenticationMode),
protocol=parse_singular_param_to_enum(protocol,
BrokerProtocol),
linger_ms=linger_ms,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb

return decorator()

return wrap

def table_input(self,
arg_name: str,
connection: str,
Expand Down
Loading