config.yaml:
producer:
cls: aioworkers_kafka.producer.KafkaProducer
format: json
kafka:
bootstrap.servers: kafka:9092
topic: test
consumer:
cls: aioworkers_kafka.consumer.KafkaConsumer
format: json # default format is json
kafka:
bootstrap.servers: kafka:9092
group.id: test
topics:
- test
worker:
cls: mymodule.MyWorker
input: .consumer
output: .producer
autorun: true
mymodule.py:
from aioworkers.worker.base import Worker
class MyWorker(Worker):
async def run(self, value): # consume value from input
assert isinstance(value, Mapping)
out = dict(value)
value["ts"] = time()
return value # produce value to output
$ aioworkers -c config.yaml
Check code:
hatch run lint:all
Format code:
hatch run lint:fmt
Run tests:
hatch run pytest
Run tests with coverage:
hatch run cov