Skip to content

Commit

Permalink
Intitial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
danielshaya committed May 19, 2017
1 parent 9be3bcd commit bf6ed7a
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 56 deletions.
43 changes: 26 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# RxJournal

RxJournal augments the popular [RxJava](https://github.com/ReactiveX/RxJava) library by adding functionality to record
and play reactive streams.
RxJournal augments the popular [RxJava](https://github.com/ReactiveX/RxJava) library by adding
functionality to record and replay reactive streams.

## Primary Motivations Behind RxJournal

Expand All @@ -10,29 +10,29 @@ and play reactive streams.
Testing is a primary motivation for RxJournal. RxJournal allows developers to
blackbox test their code by recording all inputs and outputs in and out of their programs.

One possible use case are unit tests where RxJournal recordings can be used to create
An obvious use case are unit tests where RxJournal recordings can be used to create
comprehensive tests (see [RxPlayerTest] for an example where this is done in this project).

Another powerful use case is to enable users to replay production data into test systems.
By simply copying over the journal file from a production system and replaying all or part of the file
into a test system the exact conditions of the primary system should be able to be reproduced.
into a test system the exact conditions of the primary system will be reproduced.

### 2. Remote Connnections

RxJournal can be recorded on one JVM and can be replayed on a another JVM that has access to
the file location.
the journal file location.

The remote connection can read from the beginning of the recording or just start with live
updates from the recorder. The remote connection (the 'listener') can write back to the
journal effecting a two way conversation. There can be multiple readers and writers to the
updates from the recorder. The remote connection (the 'listener') can optionally write back to the
journal effecting a two way conversation or RPC. There can be multiple readers and writers to the
journal.

The journal is serialised using Chronicle-Queue to a memory mapped file so
RxJournal uses Chronicle-Queue (a memory mapped file solution) serialisation meaning that
the process of moving data from one JVM to another is exceedingly efficient and can be achieved
in single digit micro seconds.

If you need to pass data between JVMs on the same machine this is not only the most efficient way
to do so but you will also have a full recording of the data that goes between the JVMs.
to do so but you will also provide you with a full recording of the data that is transferred between the JVMs.

### 3. Slow consumers (handling back pressure)

Expand All @@ -41,11 +41,11 @@ there are a few options available to your system.

Most often you end up implementing strategies that hold buffers of data in memory until the
consumer catches up. The problem with those sort of strategies are one, if your process
crashes you lose all the data in your buffer, if you need to consume the fast data in a
crashes you lose all the data in your buffer. Therefore if you need to consume the fast data in a
transactional manner this will not be an option. Two, you may run out of memory if the
buffers get really big. At the very least you will probably need to run your JVM with a large
memory setting that many be ineffecient. For latency sensitive applications it will
put pressure on the GC.
put pressure on the GC which will not be acceptable.


## Design Goals
Expand All @@ -54,7 +54,7 @@ put pressure on the GC.
program crashes
- Recording and playback is so fast that it won't slow down the host program.
- Recording and playback can be achieved without any gc overhead
- RxRecorder can be eaily fitted into any RxJava project
- RxRecorder can be eaily added (or even retro-fitted) into any RxJava project

# Quick Start
## Creating a Journal
Expand All @@ -67,12 +67,14 @@ The directory is the location where the serialised file will be created

## Recording a reactive stream
`RxRecorder` allows any RxJava `Observable`/`Flowable` to be journalled to disk using
the record function:
the `record` function:

RxRecorder rxRecorder = rxJournal.createRxRecorder();
rxRexcorder.record(Observable)

## Playing back a reactive stream
For notes on threading see FAQ below.

## Playing back a reactive stream

`RxPlayer` is used to playback the journal recording:

Expand All @@ -90,8 +92,6 @@ The data can be examined in plain ASCII using the writeToDisk function:

rxJournal.writeToDisk(String fileName, boolean printToSdout)

There are 3 primary envisaged purposes for RxRecorder.

## Putting it together with HelloWorld


Expand Down Expand Up @@ -133,7 +133,16 @@ Full code example code [HelloWorldApp].
}
}
```

The results of running this program can be seen below:

````
[main] INFO org.rxjournal.impl.RxJournal - Deleting existing recording [/tmp/Demo]
Hello World!!
[main] INFO org.rxjournal.impl.RxJournal - Writing recording to dir [/tmp/Demo/demo.txt]
[main] INFO org.rxjournal.impl.RxJournal - VALID 1 2017-05-19T08:52:27.156 Hello World!!
[main] INFO org.rxjournal.impl.RxJournal - COMPLETE 2 2017-05-19T08:52:27.157 EndOfStream{}
[main] INFO org.rxjournal.impl.RxJournal - Writing to dir complete
````

## FAQ

Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/rxjournal/impl/RxJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
*/
public class RxJournal {
private static final Logger LOG = LoggerFactory.getLogger(RxJournal.class.getName());
static final String END_OF_STREAM_FILTER = "endOfStream";
private String dir;

public RxJournal(String dir){
Expand Down Expand Up @@ -97,7 +96,7 @@ private static void writeQueueToFile(ExcerptTailer tailer, String fileName, bool
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dim.getTime()), ZoneId.of("Europe/London"));
try {
String item = RxStatus.toString(dim.getStatus()) + "\t" + dim.getMessageCount() + "\t" + dateTime + "\t"
+ dim.getFilter() + "\t" + dim.getMessageCount();
+ dim.getFilter() + "\t" + dim.getObject();
fileWriter.write(item + "\n");
if(toStdout) {
LOG.info(item);
Expand Down
20 changes: 6 additions & 14 deletions src/main/java/org/rxjournal/impl/RxPlayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ public Observable play(PlayOptions options) {
ValueIn in = w.getValueIn();
dim.process(in, null);

if (testEndOfStream(subscriber, dim.getFilter())) {
stop[0] = true;
return;
}

if (testPastPlayUntil(options, subscriber, dim.getTime())){
stop[0] = true;
return;
Expand All @@ -47,6 +42,12 @@ public Observable play(PlayOptions options) {
pause(options, lastTime, dim.getTime());

if (options.filter().equals(dim.getFilter())) {
if (dim.getStatus()==RxStatus.COMPLETE) {
subscriber.onComplete();
stop[0] = true;
return;
}

if (dim.getStatus()==RxStatus.ERROR){
subscriber.onError((Throwable)dim.getObject());
stop[0] = true;
Expand Down Expand Up @@ -75,15 +76,6 @@ private boolean testPastPlayUntil(PlayOptions options, Emitter<? super Object> s
return false;
}

private boolean testEndOfStream(Emitter<? super Object> s, String storedWithFilter) {
if (storedWithFilter.equals(RxJournal.END_OF_STREAM_FILTER)) {
s.onComplete();
return true;
}

return false;
}

private void pause(PlayOptions options, long[] lastTime, long recordedAtTime) {
if (options.replayStrategy() == PlayOptions.Replay.REAL_TIME && lastTime[0] != Long.MIN_VALUE) {
DSUtil.sleep((int) (recordedAtTime - lastTime[0]));
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/org/rxjournal/impl/RxRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Class to record input into RxJournal.
Expand Down Expand Up @@ -46,13 +47,13 @@ public void record(Observable<?> observable, String filter) {
ExcerptAppender appender = queue.acquireAppender();

TriConsumer<ExcerptAppender, String, Object> onNextConsumer = getOnNextConsumerRecorder();
Consumer<ExcerptAppender> onCompleteConsumer = getOnCompleteRecorder();
BiConsumer<ExcerptAppender, String> onCompleteConsumer = getOnCompleteRecorder();
TriConsumer<ExcerptAppender, String, Throwable> onErrorConsumer = getOnErrorRecorder();

observable.subscribe(
t -> onNextConsumer.accept(appender, filter, t),
e -> onErrorConsumer.accept(appender, filter, e),
() -> onCompleteConsumer.accept(appender)
() -> onCompleteConsumer.accept(appender, filter)
);
}

Expand All @@ -62,9 +63,9 @@ private TriConsumer<ExcerptAppender, String, Object> getOnNextConsumerRecorder()
});
}

private Consumer<ExcerptAppender> getOnCompleteRecorder(){
return a -> a.writeDocument(w -> {
writeObject(w, RxJournal.END_OF_STREAM_FILTER, new EndOfStream(), RxStatus.COMPLETE);
private BiConsumer<ExcerptAppender, String> getOnCompleteRecorder(){
return (a,f) -> a.writeDocument(w -> {
writeObject(w, f, new EndOfStream(), RxStatus.COMPLETE);
LOG.debug("Adding end of stream token");
});
}
Expand Down Expand Up @@ -92,13 +93,13 @@ public void record(Flowable<?> flowable, String filter) {
ExcerptAppender appender = queue.acquireAppender();

TriConsumer<ExcerptAppender, String, Object> onNextConsumer = getOnNextConsumerRecorder();
Consumer<ExcerptAppender> onCompleteConsumer = getOnCompleteRecorder();
BiConsumer<ExcerptAppender, String> onCompleteConsumer = getOnCompleteRecorder();
TriConsumer<ExcerptAppender, String, Throwable> onErrorConsumer = getOnErrorRecorder();

flowable.subscribe(
t -> onNextConsumer.accept(appender, filter, t),
e -> onErrorConsumer.accept(appender, filter, e),
() -> onCompleteConsumer.accept(appender)
() -> onCompleteConsumer.accept(appender, filter)
);
}
}
65 changes: 54 additions & 11 deletions src/test/java/org/rxjournal/impl/RxErrorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import io.reactivex.*;
import io.reactivex.observables.ConnectableObservable;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;


/**
Expand All @@ -19,11 +21,21 @@ public void errorTest() throws IOException{
//try a couple of filters one with an error and one without
//one should end with onError and one with onComplete()
//Create the rxRecorder and delete any previous content by clearing the cache
Throwable rte = new RuntimeException("Test Error");
Flowable<String> errorFlowable = Flowable.create(
e -> {
e.onNext("one");
e.onNext("two");
e.onError(new RuntimeException("Test Error"));
e.onError(rte);
},
BackpressureStrategy.BUFFER
);

Flowable<Integer> validFlowable = Flowable.create(
e -> {
e.onNext(100);
e.onNext(200);
e.onComplete();
},
BackpressureStrategy.BUFFER
);
Expand All @@ -35,22 +47,53 @@ public void errorTest() throws IOException{
//Pass the input stream into the rxRecorder which will subscribe to it and record all events.
//The subscription will not be activated on a new thread which will allow this program to continue.
RxRecorder rxRecorder = rxJournal.createRxRecorder();
rxRecorder.recordAsync(errorFlowable, "input");
rxRecorder.recordAsync(errorFlowable, "errorinput");
rxRecorder.recordAsync(validFlowable, "validinput");

//Retrieve a stream of
RxPlayer rxPlayer = rxJournal.createRxPlayer();
PlayOptions options = new PlayOptions().filter("input");
PlayOptions options = new PlayOptions().filter("errorinput");
Observable recordedObservable = rxPlayer.play(options);

AtomicInteger onNext = new AtomicInteger(0);
AtomicInteger onComplete = new AtomicInteger(0);
AtomicInteger onError = new AtomicInteger(0);
Throwable[] tArray = new Throwable[1];

//Pass the output stream (of words) into the rxRecorder which will subscribe to it and record all events.
recordedObservable.subscribe(System.out::println,
System.out::println,
()->System.out.println("Test complete"));
//Only start the recording now because we want to make sure that the BytesToWordsProcessor and the rxRecorder
//are both setup up to receive subscriptions.
//Sometimes useful to see the recording written to a file
rxJournal.writeToFile("/tmp/testError/error.txt",true);
recordedObservable.subscribe(i->onNext.incrementAndGet(),
e->{
onError.incrementAndGet();
tArray[0] = (Throwable) e;
},
()->onComplete.incrementAndGet());

Assert.assertEquals(2, onNext.get());
Assert.assertEquals(1, onError.get());
Assert.assertEquals(0, onComplete.get());
Assert.assertEquals(rte.getMessage(), tArray[0].getMessage());
Assert.assertEquals(rte.getClass(), tArray[0].getClass());
Assert.assertEquals(rte.getStackTrace()[0], tArray[0].getStackTrace()[0]);


options = new PlayOptions().filter("validinput");
recordedObservable = rxPlayer.play(options);

AtomicInteger onNextValid = new AtomicInteger(0);
AtomicInteger onCompleteValid = new AtomicInteger(0);
AtomicInteger onErrorValid = new AtomicInteger(0);

//Pass the output stream (of words) into the rxRecorder which will subscribe to it and record all events.
recordedObservable.subscribe(i->onNextValid.incrementAndGet(),
e->{
onErrorValid.incrementAndGet();
},
()->onComplete.incrementAndGet());

Assert.assertEquals(2, onNextValid.get());
Assert.assertEquals(0, onErrorValid.get());
Assert.assertEquals(1, onCompleteValid.get());

rxJournal.writeToFile("/tmp/testError/error.txt",true);

}
}
Binary file not shown.
4 changes: 2 additions & 2 deletions src/test/resources/playTest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ VALID 9 2017-05-11T11:01:20.150 input 114
VALID 10 2017-05-11T11:01:20.151 input 108
VALID 11 2017-05-11T11:01:20.152 input 100
VALID 12 2017-05-11T11:01:20.154 input 32
COMPLETE 13 2017-05-11T11:01:20.154 endOfStream EndOfStream{}
COMPLETE 13 2017-05-11T11:01:20.154 input EndOfStream{}
VALID 14 2017-05-11T11:01:20.173 output Hello
VALID 15 2017-05-11T11:01:20.184 output World
COMPLETE 16 2017-05-11T11:01:20.184 endOfStream EndOfStream{}
COMPLETE 16 2017-05-11T11:01:20.184 output EndOfStream{}
6 changes: 3 additions & 3 deletions todolist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
------15. Implement window grouping (this will probably have to be multi-threaded - or maybe not)
------11. Add features to rxRecorder replay to replay from time until or for a few seconds or a number of events
******20. Should there be 2 classes RxRecorder and RxPlayer - project could be called RxJournal. Yes will do register www.rxjournal.org
******5. Try more than one publisher e.g. add market data publisher (play ground)
******21. Deal with errors and test
******22. Add status flag to message and implement error handling

5. Try more than one publisher e.g. add market data publisher (play ground)
13. Work out minimum set of features for release
16. Write tests for record, play until
17. Consider replacing log with sout or exceptions
21. Deal with errors and test
22. Add status flag to message and implement error handling

0 comments on commit bf6ed7a

Please sign in to comment.