This repository contains a few samples to help you get started quickly with the Kafka extension.
To test the sample applications, you need to have access to a Kafka instance. Here are some ways you can get access to one.
There are several DevContainer samples here. If you start the Visual Studio Code on the target sample directory, it will automatically start a development environment on a Docker container with a local Kafka cluster. It is the easiest option for starting a Kafka cluster. Developing inside a Container
We provide the Confluent Docker Compose sample to get started with a local Kafka and data generator. Follow the guide at https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html#cp-quick-start-docker.
Make sure you complete the steps at least until the topics page views, users, and pageviews_female are created (including data generators). The included .NET sample function contains a consumer for each of those three topics.
Confluent Cloud is a fully managed, cloud-native event stream platform powered by Apache Kafka. The samples include Confluence Cloud samples to understand real-world configuration.
Azure Functions Kafka Extension support several languages with the following samples. For more details and getting started, please refer to the links below.
Language | Description | Link | DevContainer |
---|---|---|---|
C# | C# precompiled sample with Visual Studio | Readme | No |
Java | Java 8 sample | Readme | Yes |
JavaScript | Node 12 sample | Readme | Yes |
PowerShell | PowerShell 6 Sample | Readme | No |
Python | Python 3.8 sample | Readme | Yes |
TypeScript | TypeScript sample (Node 12) | Readme | Yes |
Custom containers enable us to deploy a custom container to the Function App. We can use any other languages; however, as an example, we provide a java sample to explain how to develop it.
Language | Description | Link | DevContainer |
---|---|---|---|
Java | Custom container sample | Readme | No |
Kafka extension supports several languages, however, it uses the same Azure Functions host. For this reason, there is a common configuration for each language. Please find below some common notes with applying to all the languages.
You can find all Kafka related configuration on the function.json.
In the case of Java, you specify it as an annotation. However, the maven plugin generates the function.json.
If your function doesn't work well, please check your code and function.json
at first.
function.json
{
"scriptFile" : "../kafka-function-1.0-SNAPSHOT.jar",
"entryPoint" : "com.contoso.kafka.TriggerFunction.runMany",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvents",
"password" : "%ConfluentCloudPassword%",
"protocol" : "SASLSSL",
"dataType" : "string",
"topic" : "message",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"cardinality" : "MANY",
"username" : "%ConfluentCloudUsername%",
"brokerList" : "%BrokerList%"
} ]
}
{
"scriptFile" : "../kafka-function-1.0-SNAPSHOT.jar",
"entryPoint" : "com.contoso.kafka.TriggerFunction.runMany",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvents",
"password" : "%EventHubConnectionString%",
"protocol" : "SASLSSL",
"dataType" : "string",
"topic" : "message",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"cardinality" : "MANY",
"username" : "$ConnectionString",
"brokerList" : "%BrokerList%"
} ]
}
NOTE For EventHub, username should be set to "$ConnectionString" only. The password should be the actual connection string value that could be set in local.settings.json or appsettings (Please see local-settings section for more details).
It is the configuration of a local function runtime. If you deploy the target application on Azure with a local.settings.json,
you will require the same settings on the Function App App settings.
NOTE All the passwords and connection strings settings are recommended to be put in appsettings. For more details, refer to Local settings file.
{
"IsEncrypted": false,
"Values": {
"BrokerList": "{YOUR_CONFLUENT_CLOUD_BROKER}",
"ConfluentCloudUserName": "{YOUR_CONFLUENT_CLOUD_USERNAME}",
"ConfluentCloudPassword": "{YOUR_CONFLUENT_CLOUD_PASSWORD}",
"FUNCTIONS_WORKER_RUNTIME": "<runtime>",
"AzureWebJobsStorage": "",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "<runtime>",
"BrokerList": "<YOUR_EVENTHUB_NAMESPACE_NAME>.servicebus.windows.net:9093",
"EventHubConnectionString": "<YOUR_EVENTHUB_CONNECTIONSTRING>",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}
Currently, in Azure Functions - most triggers and bindings are ordinarily obtained using the extension bundle. However, currently, the Kafka extension is not part of the extension bundle (will be added in the future). Meanwhile, you will have to install the Kafka extension manually.
For installing Kafka extension manually:
Azure Functions extension is written in C#. We need to install with .NET Core capability. csproj
file is a project file for .NET. For more information refer to Understanding the project file
extensions.csproj
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<WarningsAsErrors></WarningsAsErrors>
<DefaultItemExcludes>**</DefaultItemExcludes>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.2.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="1.1.7" />
</ItemGroup>
<ItemGroup>
<None Update="confluent_cloud_cacert.pem">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
You can use this command. It will refer to the extensions.csproj
and install the related extension.
$ func extensions install
In the case of Java, they need to specify the extension name.
$ func extensions install --package Microsoft.Azure.WebJobs.Extensions.Kafka --version ${EXTENSION_VERSION}
You can go bin/runtimes/
if you find librdkafka native libraries, the installation is succeeded.
Kafka extensions use libkafka native libraries. That is included in the Kafka Extensions NuGet package. However, for the Linux and OSX environment, you need to specify LD_LIBRARY_PATH
for the Azure Functions runtime refer to the native library.
$ export LD_LIBRARY_PATH=/workspace/bin/runtimes/linux-x64/native
For the devcontainer, you will find the configuration on the devcontainer.json.
If you deploy your app on the Linux Premium Functions, you need to configure App settings with LD_LIBRARY_PATH.
For more details, refer to Linux Premium plan configuration
You can find the configuration for the Confluent Cloud for C# in Connecting to Confluent Cloud in Azure.
Java and Python have a binding library. Currently, it resides in this repository. In the near feature, it will move to the official repo. So you don't need to install manually.
However, currently, we need to install it manually. Please follow the instruction for each README.md.
For the KafkaTrigger and non-C# implementation, if we want to execute Kafka trigger with batch, you can configure cardinality
and dataType
. For more details, refer to Language support configuration
If you have problems connecting to localhost:9092 try to add broker 127.0.0.1
to your host file and use instead of localhost.
The sample provides a devcontainer profile. Open the folder in VsCode and perform the action Remote-Containers: Reopen in Container
. The action will reopen VsCode inside a container, together with the Confluent's Kafka starter sample. Then run the function inside the remote container using the following local.settings.json file:
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "node",
"AzureWebJobsStorage": "{AzureWebJobsStorage}",
"BrokerList":"broker:29092"
}
}
Headers are supported for both Kafka Trigger and Kafka Output binding. You can find the samples for headers in this folder with name KafkaTriggerWithHeaders
, KafkaTriggerManyWithHeaders
for Trigger functions and KafakOutputWithHeaders
, KafkaOutputManyWithHeaders
for output binding functions.
KafkaOutputWithHeaders
is a sample for single event type while KafkaOutputManyWithHeaders
is for batch events.
To run KafkaOutputWithHeaders
function, send a http GET request with message at url http://localhost:7071/api/KafkaOutputWithHeaders?message=<your_message>
. It will create a new Kafka Event with payload as your_message and headers as { Key: 'test', Value: '<language>'}
.
Similarly, to run KafkaOutputManyWithHeaders
function, send a http GET request at url http://localhost:7071/api/KafkaOutputManyWithHeaders
. It would create two messages with headers on given topic.
KafkaTriggerWithHeaders
is a sample for single event type while KafkaTriggerManyWithHeaders
is for batch events.
KafkaTriggerWithHeaders
will be triggered whenever there is a Kafka Event. It prints the message and the corresponding headers for that message.
Similarly, KafkaTriggerManyWithHeaders
is a trigger function which processes batch of Kafka events. For all the events in the batch, it prints the message and corresponding headers.