Slipstream provides a data-flow model to simplify development of stateful streaming applications.
pip install slipstream-async
from asyncio import run
from slipstream import handle, stream
async def messages():
for emoji in '🏆📞🐟👌':
yield emoji
@handle(messages(), sink=[print])
def handle_message(msg):
yield f'Hello {msg}!'
if __name__ == '__main__':
run(stream())
Hello 🏆!
Hello 📞!
Hello 🐟!
Hello 👌!
Slipstream components interoperate with basic python building blocks:
Any
-thing can be passed around as data- Any
Callable
may be used as a sink AsyncIterables
act as sources- Parallelize through
handle
A many-to-many relation is established by passing multiple sources / sinks.
Install Slipstream along with aiokafka
(latest):
pip install slipstream-async[kafka]
Spin up a local Kafka broker with docker-compose.yml, using localhost:29091
to connect:
docker compose up broker -d
Copy-paste this snippet.
slipstream.handle
: bind streams (iterables) and sinks (callables) to user defined handler functionsslipstream.stream
: start streamingslipstream.Topic
: consume from (iterable), and produce to (callable) kafka using aiokafkaslipstream.Cache
: store data to disk using rocksdictslipstream.Conf
: set global kafka configuration (can be overridden per topic)slipstream.codecs.JsonCodec
: serialize and deserialize json messagesslipstream.checkpointing.Checkpoint
: recover from stream downtimes