Skip to content

Commit

Permalink
Intitial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
danielshaya committed May 18, 2017
1 parent 3de0eed commit 9be3bcd
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 56 deletions.
52 changes: 52 additions & 0 deletions src/main/java/org/rxjournal/impl/DataItemProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.rxjournal.impl;

import net.openhft.chronicle.wire.ValueIn;

/**
* Created by daniel on 18/05/17.
*/
public class DataItemProcessor {

private byte status;
private long messageCount;
private long time;
private String storedFilter;
private Object valueFromQueue;

public void process(ValueIn in, Object using){
status = in.int8();
messageCount = in.int64();
time = in.int64();
storedFilter = in.text();

if(status == RxStatus.ERROR){
valueFromQueue = in.throwable(false);
}else {
if(using== null) {
valueFromQueue = in.object();
}else{
valueFromQueue = in.object(using, using.getClass());
}
}
}

public byte getStatus() {
return status;
}

public long getMessageCount() {
return messageCount;
}

public long getTime() {
return time;
}

public String getFilter() {
return storedFilter;
}

public Object getObject() {
return valueFromQueue;
}
}
14 changes: 5 additions & 9 deletions src/main/java/org/rxjournal/impl/RxJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
public class RxJournal {
private static final Logger LOG = LoggerFactory.getLogger(RxJournal.class.getName());
static final String END_OF_STREAM_FILTER = "endOfStream";
static final String ERROR_FILTER = "error";
private String dir;

public RxJournal(String dir){
Expand Down Expand Up @@ -89,19 +88,16 @@ private static void writeQueueToFile(ExcerptTailer tailer, String fileName) thro
private static void writeQueueToFile(ExcerptTailer tailer, String fileName, boolean toStdout) throws IOException {
FileWriter fileWriter = new FileWriter(fileName);
tailer.toStart();
DataItemProcessor dim = new DataItemProcessor();
while(tailer.readDocument(
w -> {
ValueIn in = w.getValueIn();
byte status = in.int8();
long messageCount = in.int64();
long time = in.int64();
dim.process(in,null);
//todo timezone should be configurable
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.of("Europe/London"));
String filter = in.text();
Object valueFromQueue = in.object();
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dim.getTime()), ZoneId.of("Europe/London"));
try {
String item = RxStatus.toString(status) + "\t" + messageCount + "\t" + dateTime + "\t"
+ filter + "\t" + valueFromQueue;
String item = RxStatus.toString(dim.getStatus()) + "\t" + dim.getMessageCount() + "\t" + dateTime + "\t"
+ dim.getFilter() + "\t" + dim.getMessageCount();
fileWriter.write(item + "\n");
if(toStdout) {
LOG.info(item);
Expand Down
42 changes: 12 additions & 30 deletions src/main/java/org/rxjournal/impl/RxPlayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
public class RxPlayer {
private RxJournal rxJournal;
private DataItemProcessor dim = new DataItemProcessor();

RxPlayer(RxJournal rxJournal) {
this.rxJournal = rxJournal;
Expand All @@ -29,34 +30,31 @@ public Observable play(PlayOptions options) {

boolean foundItem = tailer.readDocument(w -> {
ValueIn in = w.getValueIn();
byte status = in.int8();
long messageCount = in.int64();
long recordedAtTime = in.int64();
String storedWithFilter = in.text();
dim.process(in, null);

if (testEndOfStream(subscriber, storedWithFilter)) {
if (testEndOfStream(subscriber, dim.getFilter())) {
stop[0] = true;
return;
}

if (testPastPlayUntil(options, subscriber, recordedAtTime)){
if (testPastPlayUntil(options, subscriber, dim.getTime())){
stop[0] = true;
return;
}

if (options.playFrom() > recordedAtTime
&& (!options.playFromNow() || fromTime < recordedAtTime)) {
pause(options, lastTime, recordedAtTime);
if (options.playFrom() > dim.getTime()
&& (!options.playFromNow() || fromTime < dim.getTime())) {
pause(options, lastTime, dim.getTime());

if (options.filter().equals(storedWithFilter) || RxJournal.ERROR_FILTER.equals(storedWithFilter)) {
if (storedWithFilter.equals(RxJournal.ERROR_FILTER)){
subscriber.onError(getThrowable(in));
if (options.filter().equals(dim.getFilter())) {
if (dim.getStatus()==RxStatus.ERROR){
subscriber.onError((Throwable)dim.getObject());
stop[0] = true;
return;
}
subscriber.onNext(getStoredObject(options, in));
subscriber.onNext(dim.getObject());
}
lastTime[0] = recordedAtTime;
lastTime[0] = dim.getTime();
}
});
if (!foundItem && !options.completeAtEndOfFile() || stop[0]) {
Expand All @@ -69,12 +67,6 @@ public Observable play(PlayOptions options) {
});
}

//todo need to do this until bug inChronicle is resolved.
private Throwable getThrowable(ValueIn in) {
String errorMessage = in.text();
return new RuntimeException(errorMessage);
}

private boolean testPastPlayUntil(PlayOptions options, Emitter<? super Object> s, long recordedAtTime) {
if(options.playUntil() > recordedAtTime){
s.onComplete();
Expand All @@ -92,16 +84,6 @@ private boolean testEndOfStream(Emitter<? super Object> s, String storedWithFilt
return false;
}

private Object getStoredObject(PlayOptions options, ValueIn in) {
Object storedObject;
if (options.using() != null) {
storedObject = in.object(options.using(), options.using().getClass());
} else {
storedObject = in.object();
}
return storedObject;
}

private void pause(PlayOptions options, long[] lastTime, long recordedAtTime) {
if (options.replayStrategy() == PlayOptions.Replay.REAL_TIME && lastTime[0] != Long.MIN_VALUE) {
DSUtil.sleep((int) (recordedAtTime - lastTime[0]));
Expand Down
21 changes: 12 additions & 9 deletions src/main/java/org/rxjournal/impl/RxRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public void record(Observable<?> observable, String filter) {

TriConsumer<ExcerptAppender, String, Object> onNextConsumer = getOnNextConsumerRecorder();
Consumer<ExcerptAppender> onCompleteConsumer = getOnCompleteRecorder();
BiConsumer<ExcerptAppender, Throwable> onErrorConsumer = getOnErrorRecorder();
TriConsumer<ExcerptAppender, String, Throwable> onErrorConsumer = getOnErrorRecorder();

observable.subscribe(
t -> onNextConsumer.accept(appender, filter, t),
e -> onErrorConsumer.accept(appender, e),
e -> onErrorConsumer.accept(appender, filter, e),
() -> onCompleteConsumer.accept(appender)
);
}
Expand All @@ -69,10 +69,9 @@ private Consumer<ExcerptAppender> getOnCompleteRecorder(){
});
}

private BiConsumer<ExcerptAppender, Throwable> getOnErrorRecorder(){
return (a, t) -> a.writeDocument(w -> {
//todo Throwable should go here once Chronicle bug is fixed
writeObject(w, RxJournal.ERROR_FILTER, t.getMessage(), RxStatus.ERROR);
private TriConsumer<ExcerptAppender, String, Throwable> getOnErrorRecorder(){
return (a, f, t) -> a.writeDocument(w -> {
writeObject(w, f, t, RxStatus.ERROR);
});
}

Expand All @@ -81,7 +80,11 @@ private void writeObject(WireOut wireOut, String filter, Object obj, byte status
wireOut.getValueOut().int64(messageCounter.incrementAndGet());
wireOut.getValueOut().int64(System.currentTimeMillis());
wireOut.getValueOut().text(filter);
wireOut.getValueOut().object(obj);
if(status==RxStatus.ERROR){
wireOut.getValueOut().throwable((Throwable)obj);
}else {
wireOut.getValueOut().object(obj);
}
}

public void record(Flowable<?> flowable, String filter) {
Expand All @@ -90,11 +93,11 @@ public void record(Flowable<?> flowable, String filter) {

TriConsumer<ExcerptAppender, String, Object> onNextConsumer = getOnNextConsumerRecorder();
Consumer<ExcerptAppender> onCompleteConsumer = getOnCompleteRecorder();
BiConsumer<ExcerptAppender, Throwable> onErrorConsumer = getOnErrorRecorder();
TriConsumer<ExcerptAppender, String, Throwable> onErrorConsumer = getOnErrorRecorder();

flowable.subscribe(
t -> onNextConsumer.accept(appender, filter, t),
e -> onErrorConsumer.accept(appender, e),
e -> onErrorConsumer.accept(appender, filter, e),
() -> onCompleteConsumer.accept(appender)
);
}
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/org/rxjournal/impl/RxValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public class RxValidator {
private static final Logger LOG = LoggerFactory.getLogger(RxValidator.class.getName());
private ValidationResult validationResult;
private DataItemProcessor dataItemProcessor = new DataItemProcessor();

public Observable<ValidationResult> validate(String fileName, Observable observable, String filter) {
Subject<ValidationResult> validatorPublisher = PublishSubject.create();
Expand Down Expand Up @@ -57,14 +58,10 @@ private Object getNextMatchingFilter(ExcerptTailer tailer, String filter){
}

ValueIn in = dc.wire().getValueIn();
byte status = in.int8();
long messageCount = in.int64();
long time = in.int64();
String storedFilter = in.text();
Object valueFromQueue = in.object();
dataItemProcessor.process(in, null);

if(storedFilter.equals(filter)){
return valueFromQueue;
if(dataItemProcessor.getFilter().equals(filter)){
return dataItemProcessor.getObject();
}else{
tailer.moveToIndex(++index);
return getNextMatchingFilter(tailer, filter);
Expand All @@ -75,5 +72,4 @@ private Object getNextMatchingFilter(ExcerptTailer tailer, String filter){
public ValidationResult getValidationResult(){
return validationResult;
}

}
1 change: 1 addition & 0 deletions src/test/java/org/rxjournal/impl/RxErrorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void errorTest() throws IOException{
);

RxJournal rxJournal = new RxJournal("/tmp/testError");
rxJournal.writeToFile("/tmp/testError/error.txt",true);
rxJournal.clearCache();

//Pass the input stream into the rxRecorder which will subscribe to it and record all events.
Expand Down

0 comments on commit 9be3bcd

Please sign in to comment.