Skip to content

Commit

Permalink
moved Esper Engine to value state, started with type serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
phil3k committed Oct 31, 2017
1 parent 73c71ae commit ef827b5
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 59 deletions.
88 changes: 88 additions & 0 deletions src/main/java/at/datasciencelabs/EsperEngineSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package at.datasciencelabs;

import com.espertech.esper.client.EPServiceProvider;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;

public class EsperEngineSerializer extends TypeSerializer<EPServiceProvider> {

@Override
public boolean isImmutableType() {
return false;
}

@Override
public TypeSerializer<EPServiceProvider> duplicate() {
return this;
}

@Override
public EPServiceProvider createInstance() {
return null;
}

@Override
public EPServiceProvider copy(EPServiceProvider from) {
return null;
}

@Override
public EPServiceProvider copy(EPServiceProvider from, EPServiceProvider reuse) {
return null;
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(EPServiceProvider record, DataOutputView target) throws IOException {

}

@Override
public EPServiceProvider deserialize(DataInputView source) throws IOException {
return null;
}

@Override
public EPServiceProvider deserialize(EPServiceProvider reuse, DataInputView source) throws IOException {
return null;
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {

}

@Override
public boolean equals(Object obj) {
return false;
}

@Override
public boolean canEqual(Object obj) {
return false;
}

@Override
public int hashCode() {
return 0;
}

@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
return null;
}

@Override
public CompatibilityResult<EPServiceProvider> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
return null;
}
}
32 changes: 29 additions & 3 deletions src/main/java/at/datasciencelabs/EsperStream.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package at.datasciencelabs;

import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractionException;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import java.lang.reflect.Type;


/**
* A DataStream which is able to detect event sequences and patterns using Esper
Expand All @@ -32,17 +39,36 @@ public EsperStream(DataStream<IN> inputStream, String esperQuery) {
* Select from the EsperStream, must provide the return type of the output DataStream since no type information is
* currently extracted from the @see {@link EsperSelectFunction}.
*/
public <R> SingleOutputStreamOperator<R> select(EsperSelectFunction<R> esperSelectFunction, TypeInformation<R> dataStreamReturnType) {
public <R> SingleOutputStreamOperator<R> select(EsperSelectFunction<R> esperSelectFunction) {
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

SingleOutputStreamOperator<R> patternStream;

// TODO until the type extractor is capable of extracting non-generic parameters, the return type has to be passed in manually
TypeInformation<R> typeInformation = getTypeInformation(esperSelectFunction);

final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", dataStreamReturnType, new SelectEsperStreamOperator<Byte, IN, R>(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery));
patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", typeInformation, new SelectEsperStreamOperator<Byte, IN, R>(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery));

return patternStream;
}

@SuppressWarnings("unchecked")
private <OUT> TypeInformation<OUT> getTypeInformation(EsperSelectFunction<OUT> esperSelectFunction) {
try {
TypeExtractionUtils.LambdaExecutable lambdaExecutable = TypeExtractionUtils.checkAndExtractLambda(esperSelectFunction);
if (esperSelectFunction instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) esperSelectFunction).getProducedType();
}
if (lambdaExecutable != null) {
Type type = lambdaExecutable.getReturnType();
return (TypeInformation<OUT>) TypeExtractor.createTypeInfo(type);
}
else {
return TypeExtractor.createTypeInfo(esperSelectFunction, EsperSelectFunction.class, esperSelectFunction.getClass(), 0);
}
} catch (TypeExtractionException e) {
throw new InvalidTypesException("Could not extract types.", e);
}
}

}
71 changes: 51 additions & 20 deletions src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
Expand All @@ -13,27 +15,32 @@
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
import java.io.Serializable;

/**
* An operator which supports detecting event sequences and patterns using Esper.
*
* @param <KEY> Type of the key
* @param <IN> Type of the input stream
* @param <IN> Type of the input stream
* @param <OUT> Type of the output stream
*/
public class SelectEsperStreamOperator<KEY, IN, OUT> extends AbstractUdfStreamOperator<OUT, EsperSelectFunction<OUT>> implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace>, Serializable {

private final String query;
private final TypeInformation<IN> inputType;
private EPServiceProvider engine;
private ValueState<EPServiceProvider> engineState;
private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState";
private final Object lock = new Object[0];

/**
* Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper.
* Currently only processing time evaluation is supported.
* @param inputStreamType type of the input DataStream
*
* @param inputStreamType type of the input DataStream
* @param esperSelectFunction function to select from Esper's output
* @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time)
* @param esperQuery The esper query
* @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time)
* @param esperQuery The esper query
*/
public SelectEsperStreamOperator(TypeInformation<IN> inputStreamType, EsperSelectFunction<OUT> esperSelectFunction, boolean isProcessingTime, String esperQuery) {
super(esperSelectFunction);
Expand All @@ -48,34 +55,58 @@ public SelectEsperStreamOperator(TypeInformation<IN> inputStreamType, EsperSelec
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
engine = EPServiceProviderManager.getDefaultProvider();
engine.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass());
EPStatement statement = engine.getEPAdministrator().createEPL(query);
statement.addListener((newData, oldData) -> {
for (EventBean event : newData) {
EsperSelectFunction<OUT> userFunction = getUserFunction();
try {
output.collect(new StreamRecord<>((userFunction.select(event))));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});

if (this.engineState == null) {
this.engineState = getRuntimeContext().getState(new ValueStateDescriptor<>(ESPER_SERVICE_PROVIDER_STATE, new EsperEngineSerializer()));
}
}

@Override
public void processElement(StreamRecord<IN> streamRecord) throws Exception {
engine.getEPRuntime().sendEvent(streamRecord.getValue());
EPServiceProvider esperServiceProvider = getServiceProvider(this.hashCode() + "");
esperServiceProvider.getEPRuntime().sendEvent(streamRecord.getValue());
this.engineState.update(esperServiceProvider);
}

@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
internalTimer.getTimestamp();

}

@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {

}

private EPServiceProvider getServiceProvider(String context) throws IOException {
EPServiceProvider serviceProvider = engineState.value();
if (serviceProvider != null) {
return serviceProvider;
}
synchronized (lock) {
serviceProvider = engineState.value();
if (serviceProvider == null) {
serviceProvider = EPServiceProviderManager.getProvider(context);
serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass());
EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query);

statement.addListener((newData, oldData) -> {
for (EventBean event : newData) {
System.out.println(event.get("name"));
EsperSelectFunction<OUT> userFunction = getUserFunction();
try {
output.collect(new StreamRecord<>((userFunction.select(event))));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
this.engineState.update(serviceProvider);
return serviceProvider;

} else {
return engineState.value();
}
}
}
}
97 changes: 61 additions & 36 deletions src/test/java/at/datasciencelabs/EsperStreamTest.java
Original file line number Diff line number Diff line change
@@ -1,67 +1,92 @@
package at.datasciencelabs;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.espertech.esper.client.EventBean;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Before;
import org.junit.Test;

public class EsperStreamTest {
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@Test
public void testEsperStream() throws Exception {

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNull.notNullValue;

DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));

EsperStream<TestEvent> esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");

DataStream<TestEvent> resultStream = esperStream.select((EsperSelectFunction<TestEvent>) collector -> {
String name = (String) collector.get("name");
int age = (int) collector.get("age");
return new TestEvent(name, age);
}, TypeInformation.of(TestEvent.class));
public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable {

resultStream.printToErr();
private static List<TestEvent> result;

executionEnvironment.execute("test");

@Before
public void before() {
result = new ArrayList<>();
}

@Test
public void testMoreComplexEsperStream() throws Exception {
@SuppressWarnings("Convert2Lambda")
public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);

DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));

EsperStream<TestEvent> eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent");

DataStream<Double> resultStream = eventEsperStream.select((EsperSelectFunction<Double>) collector -> {
Double age = (Double) collector.get("average_age");
return age;
}, TypeInformation.of(Double.class));

resultStream.printToErr();
EsperStream<TestEvent> esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");

executionEnvironment.execute("test");
DataStream<TestEvent> resultStream = esperStream.select(new EsperSelectFunction<TestEvent>() {
@Override
public TestEvent select(EventBean eventBean) throws Exception {
String name = (String) eventBean.get("name");
int age = (int) eventBean.get("age");
return new TestEvent(name, age);
}
});

resultStream.addSink(new SinkFunction<TestEvent>() {
@Override
public void invoke(TestEvent testEvent) throws Exception {
System.err.println(testEvent);
result.add(testEvent);
}
});

executionEnvironment.execute("test-2");

assertThat(result, is(notNullValue()));
assertThat(result.size(), is(3));
}

@Test
public void testStateChangeRule() throws Exception {
@SuppressWarnings("Convert2Lambda")
public void shouldSelectFromStreamUsingLambdaSelect() throws Exception {

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);

DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30));

DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25));
EsperStream<TestEvent> esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");

EsperStream<TestEvent> eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent");
DataStream<TestEvent> resultStream = esperStream.select((EsperSelectFunction<TestEvent>) collector -> {
String name = (String) collector.get("name");
int age = (int) collector.get("age");
return new TestEvent(name, age);
});

DataStream<Double> resultStream = eventEsperStream.select((EsperSelectFunction<Double>) collector -> {
Double age = (Double) collector.get("average_age");
return age;
}, TypeInformation.of(Double.class));
resultStream.addSink(new SinkFunction<TestEvent>() {
@Override
public void invoke(TestEvent testEvent) throws Exception {
result.add(testEvent);
}
});

resultStream.printToErr();
executionEnvironment.execute("test-1");

executionEnvironment.execute("stateChange1");
assertThat(result, is(notNullValue()));
assertThat(result.size(), is(3));
}

}
Loading

0 comments on commit ef827b5

Please sign in to comment.