Django application to produce/consume events from Kafka supported by kstreams
pip install django-streamsor with poetry
poetry add django-streamsand add it to INSTALLED_APPS:
INSTALLED_APPS = [
...
"django_streams",
...
"my_streams_app",
# etc...
]https://kpn.github.io/django-streams/
create the engine:
# my_streams_app/engine.py
from django_streams import create_engine
from kstreams.backends import Kafka
stream_engine = create_engine(
title="test-engine",
backend=Kafka(),
)To configure the backend follow the kstreams backend documentation
Define your streams:
# my_streams_app/streams.py
from kstreams import ConsumerRecord
from .engine import stream_engine
@stream_engine.stream("dev-kpn-des--hello-kpn", group_id="django-streams-principal-group-id") # your consumer
async def consumer_task(cr: ConsumerRecord):
async for cr in stream:
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")and then in your apps.py you must import the python module or your coroutines
# my_streams_app/apps.py
from django.apps import AppConfig
class StreamingAppConfig(AppConfig):
name = "streaming_app"
def ready(self):
from . import streams # import the streams moduleNow you can run the worker:
python manage.py workerProducing events can be sync or async. If you are in a sync context you must use stream_engine.sync_send, otherwise stream_engine.send. For both cases a RecordMetadata is returned.
# streaming_app/views.py
from django.http import HttpResponse
from django.views.generic import View
from .engine import stream_engine
class HelloWorldView(View):
def get(self, request, *args, **kwargs):
record_metadata = stream_engine.sync_send(
"hello-kpn",
value=b"hello world",
key="hello",
partition=None,
timestamp_ms=None,
headers=None,
)
return HttpResponse(f"Event metadata: {record_metadata}")Producer:
| Total produced events | Time (seconds) |
|---|---|
| 1 | 0.004278898239135742 |
| 10 | 0.030963897705078125 |
| 100 | 0.07049298286437988 |
| 1000 | 0.6609988212585449 |
| 10000 | 6.501222133636475 |
./scrtips/test./scrtips/format