Replies: 2 comments 4 replies
-
|
Hi @jonaslb, You need to implement your application logic to define the "end" of the stream. like: application_properties = {
"isTheLast": False,
}
amqp_message = AMQPMessage(
body=bytes("hello", "utf-8"),
application_properties=application_properties,
)
await producer.send(stream=STREAM, message=amqp_message)
application_properties = {
"isTheLast": True, ## like this
}
amqp_message = AMQPMessage(
body=bytes("hello: {}".format(i), "utf-8"),
application_properties=application_properties,
)
await producer.send(stream=STREAM, message=amqp_message) |
Beta Was this translation helpful? Give feedback.
-
@jonaslb For my own purpose I define "end of stream" as "there is no message for a period of time". In RbFly you can subscribe with timeout parameter (see https://wrobell.dcmod.org/rbfly/api.html#rbfly.streams.StreamsClient, sorry for the plug). @Gsantomaggio Does rstream supports something similar? |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Currently the consumer API is very oriented toward stream processing, which makes sense, but I think there is also a use case where it is desired to "read to the end of stream" as a single batch and then no more. With Kafka, I believe it is possible to react to end-of-stream on a consumer, but I don't believe a similar thing is possible for rmq streams?
I have a crude local wrapper I use for this purpose, but it involves subscribing once using
LASToffset spec to get the offset, and then subscribing once more using the stored/initial offset and stopping when the last offset is received. Which turns out to be a relatively large amount of code - especially taking into account that the stream might not exist / multiple streams can be queried - for what I feel is a conceptually simple problem. So, perhaps such a solution in polished form would be appropriate for inclusion here?Beta Was this translation helpful? Give feedback.
All reactions