-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Design Point: Replay as Observable #1
Comments
Thanks for your comment appreciate the feedback. What you say is indeed correct and by design - will respond fully when I have some time. In the meantime have a look at this example where I touch on this issue by converting the Observable to a Flowable that handles back pressure. |
Hi David I've thought a lot about your point and agree with you that In addition I've had feedback to ask me to support more than RxJava. For these reasons I've refactored the project so that it supports any Reactive framework with RxJava just being one implementation. Please let me know if you prefer this abstraction and if you have any more comments. |
I welcome the use of standard Reactive-Streams APIs and let the client wrap their rich fluent APIs of choice around a However, targeting reactive streams directly exposes you to the rules and requirements of the RS specification which is much harder than it seems. For example, the reworked |
The reason I used |
actually if you go back to returning something and eliminate the subscriber, having a The other path is to create you own
|
This is by no means a complete implementation but gives you an idea of how the API and internal class structure would look like. Note there's no particular request handling, which is the most difficult part (the pattern in RxJava and Reactor is a usually a complex drain loop). Note also that the cancellation evaluation could probably be more fine-grained in public class RxPlayer {
private RxJournal rxJournal;
RxPlayer(RxJournal rxJournal) {
this.rxJournal = rxJournal;
}
/**
* See documentation on {@link PlayOptions}
* @param options Options controlling how play is executed.
*/
public Publisher<Object> play(PlayOptions options) {
options.validate();
return new PlayPublisher(rxJournal, options);
}
static final class PlayPublisher implements Publisher<Object> {
private final RxJournal rxJournal;
private final PlayOptions options;
PlayPublisher(RxJournal rxJournal, PlayOptions options) {
this.rxJournal = rxJournal;
this.options = options;
}
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new PlaySubscription(s, rxJournal, options));
}
}
static final class PlaySubscription implements Subscription {
private final Subscriber actual;
private final RxJournal rxJournal;
private final PlayOptions options;
private final DataItemProcessor dim = new DataItemProcessor();
private volatile boolean cancelled;
PlaySubscription(Subscriber<? super Object> actual, RxJournal journal, PlayOptions options) {
this.actual = actual;
this.rxJournal = journal;
this.options = options;
}
@Override
public void request(long n) {
//this ignores request, but it's a start :o
fastPath();
}
@Override
public void cancel() {
cancelled = true;
}
private void fastPath() {
try (ChronicleQueue queue = rxJournal.createQueue()) {
ExcerptTailer tailer = queue.createTailer();
long[] lastTime = new long[]{Long.MIN_VALUE};
boolean[] stop = new boolean[]{false};
while (true) {
boolean foundItem = tailer.readDocument(w -> {
if (cancelled) {
return;
}
ValueIn in = w.getValueIn();
dim.process(in, options.using());
if (dim.getTime() > options.playUntilTime()
|| dim.getMessageCount() >= options.playUntilSeqNo()) {
actual.onComplete();
stop[0] = true;
return;
}
if (dim.getTime() > options.playFromTime() && dim.getMessageCount() >= options.playFromSeqNo()) {
pause(options, lastTime, dim.getTime());
if (options.filter().equals(dim.getFilter())) {
if (dim.getStatus() == RxStatus.COMPLETE) {
actual.onComplete();
stop[0] = true;
return;
}
if (dim.getStatus() == RxStatus.ERROR) {
actual.onError((Throwable) dim.getObject());
stop[0] = true;
return;
}
actual.onNext(dim.getObject());
}
lastTime[0] = dim.getTime();
}
});
if (cancelled) {
return;
}
if (!foundItem && !options.completeAtEndOfFile()) {
actual.onComplete();
return;
}
if (stop[0]) {
return;
}
}
}
}
private void pause(PlayOptions options, long[] lastTime, long recordedAtTime) {
if (options.replayRate() == ReplayRate.ACTUAL_TIME && lastTime[0] != Long.MIN_VALUE) {
DSUtil.sleep((int) (recordedAtTime - lastTime[0]));
} else if (options.pauseStrategy() == PauseStrategy.YIELD) {
Thread.yield();
}
//otherwise SPIN
}
}
} |
Hi Simon, Thank's that's really helpful and a lot to think about. I do like the idea of implementing the In terms of Not 100% sure what you mean by request handling. But in this case I think it's valid to ignore the request because of the fact that everything is in the Journal, so effectively buffered. If the user wants to use a Thanks again |
Yeah, in terms of switching to RxJava, that would just be a matter of using In terms of backpressure and request handling, what I meant is that if the JournalSubscription receives a That is usually solved in RxJava by a If you don't do that and there is backpressure, you'll end up probably buffering the whole journal in memory in the queue of the first operator you use after your publisher, which is counterproductive. |
Understand what you mean I'll have a go at implementing... So if you get One other quick question on your code above - should Thanks again for all your input |
The requests add up and the publisher is now expected to send a total of 7 values. Of course, if the journal only contains eg. 6, the publisher will Calls to When the subscription is cancelled, no need to call |
Checked in a version of code based on your code snippet. It's not complete but works for simple cases and all tests pass. Could you perhaps give me a couple of test cases that break the code and I can look to fix. As usual your expertise in this area is invaluable to this project! Once the code has settled down I'll update the README and build a new pre-release version. |
Does |
another potential problem that I just thought about: the Detecting that case in the (Note that |
@akarnokd return false indicates a temporary lack of data. e.g. We are listening live to a journal... that why we poll with a |
@simonbasle Replaying in ACTUAL_TIME in reality would only be done when the journal is used as a source for a test scenario. Would this be a problem - in real life (without the journal) this is the speed at which the events were produced and the system would have to deal with it. I'm not sure I understand why executing from a single dedicated Thead is the solution - do you mind explaining. When would |
Just a thought - given the changes over the past week what do you think about renaming the project to ReactiveJournal? |
@danielshaya if renaming is still a possibility, I think it is a good idea. writing a About the request, yes About the replay rate, the problem is that by default all the processing happens on the thread on which you In Reactor for instance, you have operators that deal with time and introducing delays. The way these work without blocking all sequences is that they internally use a If you manage to split the reading from the tailer into restartable routines, you can use the same trick to schedule the next read from the queue inside such a dedicated thread, avoiding the blocking pause entirely. This will of course also help in implementing backpressure support (if you have a routine to read one element from the queue, you can hold on calling it if there is no pending request). |
@simonbasle Whilst the project is still new I took the opportunity to rename - makes much more sense with this nomenclature. (I'll soon rewrite the README) I've made a start on handling the I'm going to read @akarnokd 's Wiki page. Thanks for the reference. Any further comments much appreciated. |
I've taken your suggestions in mind and refactored Would you be able to review this please? The only issue I have with this strategy is that it always calls back the Rgds |
I took the decision for now to let the user choose to go on the 'fast path' (to be called back on the same thread and to ignore calls to Before I write this up in the README I'd be very interested in your comments. Thx |
I see 4 combinations:
The only thing that makes a separate thread necessary is actually the blocking pauses induced by The idea is to trigger the fastpath by calling You could have a |
Don't you think it might be confusing to the users to sometimes be called back on their own thread and sometimes not depending on the subscription? Shouldn't the user have an override if they really want to be called back on their own thread for 2) and 4) |
on one hand it could make sense to provide some form of configuration, but on the other hand you need to be sure that the execution context maintains the invariant of In RxJava or Reactor, this is usually solved by allowing the user to provide a custom edit: oh yeah it would totally make sense to always run on the user-chosen thread, just as long as it is one dedicated to the operator. What is not desirable is to execute on a thread that is not dedicated to the operator, risking blocking another operator with the blocking pauses. |
Thanks again for the points: So I think this is the way to go for now (I do want to get 1.0 release out soon :) ) By default when the user asks for ACTUAL_TIME I'll create a new thread otherwise I'll use the user thread (or maybe one that they pass in). I'd like to give the user option of always being able to run on their own thread even if they have chosen ACTUAL_TIME (I'll configure that as an override option in I'll use One question I still have is that even when you choose FAST_TIME you might well still have pauses waiting for the journal to get a message (e.g. if the user is listening remotely) - does that pause require having its own thread or can you use the user thread for that? |
Ah I forgot that the |
It's not so much the network latencies as the fact that the delays will come because the remote event might not be generated for a while. At the end of the day we're just reading from a live journal which can be populated at any time as and when events become available. |
@simonbasle I've rewritten At this point I'd like to tidy up and go for a 1.0 release. Of course I welcome your feedback before then if you have a moment to review. |
Continuing the discussion from twitter:
I assume the journal acts as an unbounded buffer that stores items on disk, thus when replaying it can read elements from this disk buffer on demand, supporting backpressure naturally.
The problem I see with the current setup is that unless one consumes the journal directly with a
Consumer
, it can run into backpressure problems again caused by the replay itself. For example, applyingobserveOn
would trigger a replay to flood the internal buffer ofObservable.observeOn
thus the backpressure problem is back.The text was updated successfully, but these errors were encountered: