Skip to content
This repository has been archived by the owner on Jan 27, 2023. It is now read-only.

Support multiple-entry flowables #18

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

badgerwithagun
Copy link

Been playing around with some RX patterns and wondered if this was possible in a generic way. Turns out it is, sort of.

With these changes, the following spits out each item as a string on a new line as the items arrive, all using async:

        @GET
        @Path("stringStream")
        @Streamable // Activates the new code
        public Flowable<String> stringStream() {
            return Flowable.range(1, 49)
                .zipWith(Flowable.interval(1, SECOND), (it, interval) -> it)
                .map(i -> "Item " + i + "\n");
        }

What I've put together is also compatible with Jackson's JsonCreator, so you can just use @Streamable(writer = SomeCustomWriter.class and call JsonCreator from there:

        @GET
        @Path("objectStream")
        @Streamable(writer = SomeCustomWriter.class)
        public Flowable<Tuple> objectStream() {
            return Flowable.range(1, 49)
                .zipWith(Flowable.interval(1, SECOND), (it, interval) -> it)
                .map(Tuple::new);
        }

Not sure it's perfect, and I need to test it a while lot more, but I thought I'd pop it here here for thoughts.

@badgerwithagun
Copy link
Author

@alex-shpak
Copy link
Owner

Hi, thank you for your work!
I will need to check more deeply. In general when I think about emitting multiple values from flowable I see two alternatives:

  1. Flowable could be streamed as list automatically;
  2. Or Flowable could emit server-sent events: https://eclipse-ee4j.github.io/jersey.github.io/documentation/latest/sse.html

Perhaps if time between emits is large it is not expected by client to keep listen on connection.

@badgerwithagun
Copy link
Author

In my use-case, I am using JsonFactory in Jackson to stream a JSON array back to the client. This allows a fully-reactive pipeline:

  • Asynchronous JAX-RS entrypoint
  • Non-blocking DB query loading millions of records (e.g. using R2DBC)
  • Each record is piped to the client as it arrives
  • Client can pipe those to its own processing code as they arrive
  • Server and client are now processing the same data stream simultaneously
  • No-one is blocking any threads :)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants