diff --git a/flink-esper.iml b/flink-esper-test/flink-esper-test.iml
similarity index 88%
rename from flink-esper.iml
rename to flink-esper-test/flink-esper-test.iml
index 1d93ba2..12ebfeb 100644
--- a/flink-esper.iml
+++ b/flink-esper-test/flink-esper-test.iml
@@ -11,12 +11,14 @@
+
+
@@ -37,19 +39,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -88,9 +77,12 @@
+
-
-
+
+
+
+
\ No newline at end of file
diff --git a/flink-esper-test/pom.xml b/flink-esper-test/pom.xml
new file mode 100644
index 0000000..7e29f91
--- /dev/null
+++ b/flink-esper-test/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+ flink-esper-parent
+ at.datasciencelabs.research
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ flink-esper-test
+
+
+
+
+ maven-assembly-plugin
+
+
+
+ at.datasciencelabs.test.FlinkTestClass
+
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+
+
+ at.datasciencelabs.research
+ flink-esper
+ ${parent.version}
+
+
+
+ org.apache.flink
+ flink-core
+ provided
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ provided
+
+
+
\ No newline at end of file
diff --git a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java
new file mode 100644
index 0000000..c37f240
--- /dev/null
+++ b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java
@@ -0,0 +1,31 @@
+package at.datasciencelabs.test;
+
+import at.datasciencelabs.Esper;
+import at.datasciencelabs.EsperSelectFunction;
+import at.datasciencelabs.EsperStream;
+import com.espertech.esper.client.EventBean;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+
+public class FlinkTestClass {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-test");
+
+ EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String");
+
+ DataStream result = esperStream.select(new EsperSelectFunction() {
+ @Override
+ public String select(EventBean eventBean) throws Exception {
+ return new String((byte[]) eventBean.get("bytes"));
+ }
+ });
+
+ result.addSink(new PrintSinkFunction<>(true));
+
+ streamExecutionEnvironment.execute("Kafka 0.10 Example");
+ }
+
+}
diff --git a/flink-esper/pom.xml b/flink-esper/pom.xml
new file mode 100644
index 0000000..b22d626
--- /dev/null
+++ b/flink-esper/pom.xml
@@ -0,0 +1,63 @@
+
+
+
+ flink-esper-parent
+ at.datasciencelabs.research
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ flink-esper
+
+
+
+ com.espertech
+ esper
+
+
+
+ org.apache.flink
+ flink-core
+ provided
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-shaded-guava
+
+
+
+ org.apache.flink
+ flink-test-utils_${scala.binary.version}
+ test
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-runtime_${scala.binary.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-statebackend-rocksdb_${scala.binary.version}
+ test
+
+
+
\ No newline at end of file
diff --git a/flink-esper/src/main/java/at/datasciencelabs/Esper.java b/flink-esper/src/main/java/at/datasciencelabs/Esper.java
new file mode 100644
index 0000000..ac4c8a1
--- /dev/null
+++ b/flink-esper/src/main/java/at/datasciencelabs/Esper.java
@@ -0,0 +1,10 @@
+package at.datasciencelabs;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public class Esper {
+
+ public static EsperStream pattern(DataStream dataStream, String query) {
+ return new EsperStream(dataStream, query);
+ }
+}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java
new file mode 100644
index 0000000..56bfdc9
--- /dev/null
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java
@@ -0,0 +1,88 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EPServiceProvider;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+public class EsperEngineSerializer extends TypeSerializer {
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer duplicate() {
+ return this;
+ }
+
+ @Override
+ public EPServiceProvider createInstance() {
+ return null;
+ }
+
+ @Override
+ public EPServiceProvider copy(EPServiceProvider from) {
+ return null;
+ }
+
+ @Override
+ public EPServiceProvider copy(EPServiceProvider from, EPServiceProvider reuse) {
+ return null;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(EPServiceProvider record, DataOutputView target) throws IOException {
+
+ }
+
+ @Override
+ public EPServiceProvider deserialize(DataInputView source) throws IOException {
+ return null;
+ }
+
+ @Override
+ public EPServiceProvider deserialize(EPServiceProvider reuse, DataInputView source) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return false;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return null;
+ }
+
+ @Override
+ public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ return null;
+ }
+}
diff --git a/src/main/java/at/datasciencelabs/EsperSelectFunction.java b/flink-esper/src/main/java/at/datasciencelabs/EsperSelectFunction.java
similarity index 100%
rename from src/main/java/at/datasciencelabs/EsperSelectFunction.java
rename to flink-esper/src/main/java/at/datasciencelabs/EsperSelectFunction.java
diff --git a/src/main/java/at/datasciencelabs/EsperStream.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
similarity index 50%
rename from src/main/java/at/datasciencelabs/EsperStream.java
rename to flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
index dccdefa..140f23c 100644
--- a/src/main/java/at/datasciencelabs/EsperStream.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
@@ -1,12 +1,19 @@
package at.datasciencelabs;
+import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.NullByteKeySelector;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractionException;
+import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import java.lang.reflect.Type;
+
/**
* A DataStream which is able to detect event sequences and patterns using Esper
@@ -32,17 +39,36 @@ public EsperStream(DataStream inputStream, String esperQuery) {
* Select from the EsperStream, must provide the return type of the output DataStream since no type information is
* currently extracted from the @see {@link EsperSelectFunction}.
*/
- public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction, TypeInformation dataStreamReturnType) {
+ public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction) {
KeySelector keySelector = new NullByteKeySelector<>();
SingleOutputStreamOperator patternStream;
- // TODO until the type extractor is capable of extracting non-generic parameters, the return type has to be passed in manually
+ TypeInformation typeInformation = getTypeInformation(esperSelectFunction);
final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
- patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", dataStreamReturnType, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery));
+ patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", typeInformation, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery));
return patternStream;
}
+ @SuppressWarnings("unchecked")
+ private TypeInformation getTypeInformation(EsperSelectFunction esperSelectFunction) {
+ try {
+ TypeExtractionUtils.LambdaExecutable lambdaExecutable = TypeExtractionUtils.checkAndExtractLambda(esperSelectFunction);
+ if (esperSelectFunction instanceof ResultTypeQueryable) {
+ return ((ResultTypeQueryable) esperSelectFunction).getProducedType();
+ }
+ if (lambdaExecutable != null) {
+ Type type = lambdaExecutable.getReturnType();
+ return (TypeInformation) TypeExtractor.createTypeInfo(type);
+ }
+ else {
+ return TypeExtractor.createTypeInfo(esperSelectFunction, EsperSelectFunction.class, esperSelectFunction.getClass(), 0);
+ }
+ } catch (TypeExtractionException e) {
+ throw new InvalidTypesException("Could not extract types.", e);
+ }
+ }
+
}
diff --git a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
similarity index 51%
rename from src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
rename to flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
index 92121dd..bf6a54e 100644
--- a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
@@ -4,6 +4,8 @@
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
@@ -13,27 +15,32 @@
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import java.io.IOException;
import java.io.Serializable;
/**
* An operator which supports detecting event sequences and patterns using Esper.
+ *
* @param Type of the key
- * @param Type of the input stream
+ * @param Type of the input stream
* @param Type of the output stream
*/
public class SelectEsperStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable, Serializable {
private final String query;
private final TypeInformation inputType;
- private EPServiceProvider engine;
+ private ValueState engineState;
+ private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState";
+ private final Object lock = new Object[0];
/**
* Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper.
* Currently only processing time evaluation is supported.
- * @param inputStreamType type of the input DataStream
+ *
+ * @param inputStreamType type of the input DataStream
* @param esperSelectFunction function to select from Esper's output
- * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time)
- * @param esperQuery The esper query
+ * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time)
+ * @param esperQuery The esper query
*/
public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, String esperQuery) {
super(esperSelectFunction);
@@ -48,34 +55,57 @@ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelec
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
- engine = EPServiceProviderManager.getDefaultProvider();
- engine.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass());
- EPStatement statement = engine.getEPAdministrator().createEPL(query);
- statement.addListener((newData, oldData) -> {
- for (EventBean event : newData) {
- EsperSelectFunction userFunction = getUserFunction();
- try {
- output.collect(new StreamRecord<>((userFunction.select(event))));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
+
+ if (this.engineState == null) {
+ this.engineState = getRuntimeContext().getState(new ValueStateDescriptor<>(ESPER_SERVICE_PROVIDER_STATE, new EsperEngineSerializer()));
+ }
}
@Override
public void processElement(StreamRecord streamRecord) throws Exception {
- engine.getEPRuntime().sendEvent(streamRecord.getValue());
+ EPServiceProvider esperServiceProvider = getServiceProvider(this.hashCode() + "");
+ esperServiceProvider.getEPRuntime().sendEvent(streamRecord.getValue());
+ this.engineState.update(esperServiceProvider);
}
@Override
public void onEventTime(InternalTimer internalTimer) throws Exception {
internalTimer.getTimestamp();
-
}
@Override
public void onProcessingTime(InternalTimer internalTimer) throws Exception {
}
+
+ private EPServiceProvider getServiceProvider(String context) throws IOException {
+ EPServiceProvider serviceProvider = engineState.value();
+ if (serviceProvider != null) {
+ return serviceProvider;
+ }
+ synchronized (lock) {
+ serviceProvider = engineState.value();
+ if (serviceProvider == null) {
+ serviceProvider = EPServiceProviderManager.getProvider(context);
+ serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass());
+ EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query);
+
+ statement.addListener((newData, oldData) -> {
+ for (EventBean event : newData) {
+ EsperSelectFunction userFunction = getUserFunction();
+ try {
+ output.collect(new StreamRecord<>((userFunction.select(event))));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ this.engineState.update(serviceProvider);
+ return serviceProvider;
+
+ } else {
+ return engineState.value();
+ }
+ }
+ }
}
diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java
new file mode 100644
index 0000000..2f5c5c4
--- /dev/null
+++ b/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java
@@ -0,0 +1,126 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EventBean;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNull.notNullValue;
+
+public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable {
+
+ private static List result;
+ private static List stringResult;
+
+
+ @Before
+ public void before() {
+ result = new ArrayList<>();
+ stringResult = new ArrayList<>();
+ }
+
+ @Test
+ @SuppressWarnings("Convert2Lambda")
+ public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception {
+ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ executionEnvironment.setParallelism(1);
+
+ DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));
+
+ EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
+
+ DataStream resultStream = esperStream.select(new EsperSelectFunction() {
+ @Override
+ public TestEvent select(EventBean eventBean) throws Exception {
+ String name = (String) eventBean.get("name");
+ int age = (int) eventBean.get("age");
+ return new TestEvent(name, age);
+ }
+ });
+
+ resultStream.addSink(new SinkFunction() {
+ @Override
+ public void invoke(TestEvent testEvent) throws Exception {
+ System.err.println(testEvent);
+ result.add(testEvent);
+ }
+ });
+
+ executionEnvironment.execute("test-2");
+
+ assertThat(result, is(notNullValue()));
+ assertThat(result.size(), is(3));
+ }
+
+ @Test
+ @SuppressWarnings("Convert2Lambda")
+ public void shouldSelectFromStreamUsingLambdaSelect() throws Exception {
+
+ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ executionEnvironment.setParallelism(1);
+
+ DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30));
+
+ EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
+
+ DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> {
+ String name = (String) collector.get("name");
+ int age = (int) collector.get("age");
+ return new TestEvent(name, age);
+ });
+
+ resultStream.addSink(new SinkFunction() {
+ @Override
+ public void invoke(TestEvent testEvent) throws Exception {
+ result.add(testEvent);
+ }
+ });
+
+ executionEnvironment.execute("test-1");
+
+ assertThat(result, is(notNullValue()));
+ assertThat(result.size(), is(3));
+ }
+
+ @Test
+ @SuppressWarnings("Convert2Lambda")
+ public void shouldSelectFromStringDataStream() throws Exception {
+ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ executionEnvironment.setParallelism(1);
+
+ List expectedValues = Arrays.asList("first", "second");
+ DataStream dataStream = executionEnvironment.fromCollection(expectedValues);
+
+ EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String");
+
+ DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> {
+ byte[] bytes = (byte[]) collector.get("bytes");
+ return new String(bytes);
+ });
+
+ resultStream.addSink(new SinkFunction() {
+ @Override
+ public void invoke(String testEvent) throws Exception {
+ System.err.println(testEvent);
+ stringResult.add(testEvent);
+ }
+ });
+
+ executionEnvironment.execute("test-2");
+
+ assertThat(stringResult, is(notNullValue()));
+ assertThat(stringResult.size(), is(2));
+ assertThat(stringResult, is(expectedValues));
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/at/datasciencelabs/EsperTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java
similarity index 100%
rename from src/test/java/at/datasciencelabs/EsperTest.java
rename to flink-esper/src/test/java/at/datasciencelabs/EsperTest.java
diff --git a/src/test/java/at/datasciencelabs/TestEvent.java b/flink-esper/src/test/java/at/datasciencelabs/TestEvent.java
similarity index 81%
rename from src/test/java/at/datasciencelabs/TestEvent.java
rename to flink-esper/src/test/java/at/datasciencelabs/TestEvent.java
index 9838ed8..6bf616a 100644
--- a/src/test/java/at/datasciencelabs/TestEvent.java
+++ b/flink-esper/src/test/java/at/datasciencelabs/TestEvent.java
@@ -37,4 +37,12 @@ public String getName() {
public int getAge() {
return age;
}
+
+ @Override
+ public String toString() {
+ return "TestEvent{" +
+ "name='" + name + '\'' +
+ ", age=" + age +
+ '}';
+ }
}
diff --git a/pom.xml b/pom.xml
index 1c6c2ec..42ae760 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,8 +5,13 @@
4.0.0
at.datasciencelabs.research
- flink-esper
+ flink-esper-parent
+ pom
0.0.1-SNAPSHOT
+
+ flink-esper
+ flink-esper-test
+
2.11
@@ -15,65 +20,67 @@
1.8
-
-
- com.espertech
- esper
- 5.3.0
-
+
+
+
+ com.espertech
+ esper
+ 5.3.0
+
-
- org.apache.flink
- flink-core
- ${flink.version}
- provided
-
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
-
- org.apache.flink
- flink-streaming-java_${scala.binary.version}
- ${flink.version}
- provided
-
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+ provided
+
-
- org.apache.flink
- flink-shaded-guava
- 18.0-1.0
-
+
+ org.apache.flink
+ flink-shaded-guava
+ 18.0-1.0
+
-
+
-
- org.apache.flink
- flink-test-utils_${scala.binary.version}
- ${flink.version}
- test
-
+
+ org.apache.flink
+ flink-test-utils_${scala.binary.version}
+ ${flink.version}
+ test
+
-
- org.apache.flink
- flink-streaming-java_${scala.binary.version}
- ${flink.version}
- test-jar
- test
-
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
-
- org.apache.flink
- flink-runtime_${scala.binary.version}
- ${flink.version}
- test-jar
- test
-
+
+ org.apache.flink
+ flink-runtime_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
-
- org.apache.flink
- flink-statebackend-rocksdb_${scala.binary.version}
- ${flink.version}
- test
-
+
+ org.apache.flink
+ flink-statebackend-rocksdb_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+
-
\ No newline at end of file
diff --git a/src/test/java/at/datasciencelabs/EsperStreamTest.java b/src/test/java/at/datasciencelabs/EsperStreamTest.java
deleted file mode 100644
index 9e88f30..0000000
--- a/src/test/java/at/datasciencelabs/EsperStreamTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package at.datasciencelabs;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.junit.Test;
-
-public class EsperStreamTest {
-
- @Test
- public void testEsperStream() throws Exception {
-
- StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));
-
- EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
-
- DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> {
- String name = (String) collector.get("name");
- int age = (int) collector.get("age");
- return new TestEvent(name, age);
- }, TypeInformation.of(TestEvent.class));
-
- resultStream.printToErr();
-
- executionEnvironment.execute("test");
-
- }
-
- @Test
- public void testMoreComplexEsperStream() throws Exception {
- StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));
-
- EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent");
-
- DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> {
- Double age = (Double) collector.get("average_age");
- return age;
- }, TypeInformation.of(Double.class));
-
- resultStream.printToErr();
-
- executionEnvironment.execute("test");
- }
-
- @Test
- public void testStateChangeRule() throws Exception {
- StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25));
-
- EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent");
-
- DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> {
- Double age = (Double) collector.get("average_age");
- return age;
- }, TypeInformation.of(Double.class));
-
- resultStream.printToErr();
-
- executionEnvironment.execute("stateChange1");
- }
-
-}
\ No newline at end of file