() {
diff --git a/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildEvent.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildEvent.java
new file mode 100644
index 0000000..71676cc
--- /dev/null
+++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildEvent.java
@@ -0,0 +1,5 @@
+package at.datasciencelabs.test;
+
+
+interface BuildEvent {
+}
diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildFailure.java
similarity index 96%
rename from flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java
rename to flink-esper-test/src/test/java/at/datasciencelabs/test/BuildFailure.java
index 84eb4f5..6f782de 100644
--- a/flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java
+++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildFailure.java
@@ -1,4 +1,4 @@
-package at.datasciencelabs;
+package at.datasciencelabs.test;
public class BuildFailure implements BuildEvent {
diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildSuccess.java
similarity index 96%
rename from flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java
rename to flink-esper-test/src/test/java/at/datasciencelabs/test/BuildSuccess.java
index 02583bf..e25c3dc 100644
--- a/flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java
+++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildSuccess.java
@@ -1,4 +1,4 @@
-package at.datasciencelabs;
+package at.datasciencelabs.test;
public class BuildSuccess implements BuildEvent {
diff --git a/flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java
similarity index 99%
rename from flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java
rename to flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java
index 8e8f6ab..4ac6544 100644
--- a/flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java
+++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java
@@ -1,4 +1,4 @@
-package at.datasciencelabs;
+package at.datasciencelabs.test;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
diff --git a/flink-esper/src/main/java/at/datasciencelabs/Esper.java b/flink-esper/src/main/java/at/datasciencelabs/Esper.java
index ac4c8a1..42bb1ec 100644
--- a/flink-esper/src/main/java/at/datasciencelabs/Esper.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/Esper.java
@@ -2,9 +2,34 @@
import org.apache.flink.streaming.api.datastream.DataStream;
+/**
+ * Utility class for complex event processing using Esper.
+ *
+ * Methods which transform a {@link DataStream} into a {@link EsperStream} to do CEP.
+ */
public class Esper {
- public static EsperStream pattern(DataStream dataStream, String query) {
- return new EsperStream(dataStream, query);
+ /**
+ * Creates a {@link EsperStream} from an input data stream and a pattern.
+ *
+ * @param input DataStream containing the input events
+ * @param pattern Esper pattern specification which shall be detected
+ * @param Type of the input events
+ * @return Resulting esper stream
+ */
+ public static EsperStream pattern(DataStream input, String pattern) {
+ return new EsperStream(input, new EsperPattern(pattern));
+ }
+
+ /**
+ * Creates a {@link EsperStream} from an input data stream and a query.
+ *
+ * @param input DataStream containing the input events
+ * @param query Query of describing which events should be selected from the stream
+ * @param Type of the input events
+ * @return Resulting esper stream
+ */
+ public static EsperStream query(DataStream input, String query) {
+ return new EsperStream(input, new EsperQuery(query));
}
}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java b/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java
new file mode 100644
index 0000000..7580485
--- /dev/null
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java
@@ -0,0 +1,18 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EPAdministrator;
+import com.espertech.esper.client.EPStatement;
+
+class EsperPattern implements EsperStatementFactory {
+
+ private String pattern;
+
+ EsperPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ @Override
+ public EPStatement createStatement(EPAdministrator administrator) {
+ return administrator.createPattern(pattern);
+ }
+}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java b/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java
new file mode 100644
index 0000000..bdd5a24
--- /dev/null
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java
@@ -0,0 +1,19 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EPAdministrator;
+import com.espertech.esper.client.EPStatement;
+
+class EsperQuery implements EsperStatementFactory {
+
+ private String query;
+
+ EsperQuery(String query) {
+ this.query = query;
+ }
+
+
+ @Override
+ public EPStatement createStatement(EPAdministrator administrator) {
+ return administrator.createEPL(query);
+ }
+}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java
new file mode 100644
index 0000000..dc0afab
--- /dev/null
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java
@@ -0,0 +1,10 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EPAdministrator;
+import com.espertech.esper.client.EPStatement;
+
+import java.io.Serializable;
+
+interface EsperStatementFactory extends Serializable {
+ EPStatement createStatement(EPAdministrator administrator);
+}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
index 140f23c..4228e5d 100644
--- a/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
@@ -22,7 +22,7 @@
public class EsperStream {
private final DataStream inputStream;
- private final String esperQuery;
+ private final EsperStatementFactory esperQuery;
/**
@@ -30,14 +30,20 @@ public class EsperStream {
* @param inputStream The input DataStream
* @param esperQuery An Esper query
*/
- public EsperStream(DataStream inputStream, String esperQuery) {
+ EsperStream(DataStream inputStream, EsperStatementFactory esperQuery) {
this.inputStream = inputStream;
this.esperQuery = esperQuery;
}
/**
- * Select from the EsperStream, must provide the return type of the output DataStream since no type information is
- * currently extracted from the @see {@link EsperSelectFunction}.
+ * Applies a select function to the detected pattern sequence or query results. For each pattern sequence or query result the
+ * provided {@link EsperSelectFunction} is called. The pattern select function can produce
+ * exactly one resulting element.
+ *
+ * @param esperSelectFunction The pattern select function which is called for each detected pattern sequence.
+ * @param Type of the resulting elements
+ * @return {@link DataStream} which contains the resulting elements from the pattern select
+ * function.
*/
public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction) {
KeySelector keySelector = new NullByteKeySelector<>();
diff --git a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
index ee572e8..1c7ca8f 100644
--- a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
@@ -33,7 +33,7 @@ public class SelectEsperStreamOperator extends AbstractUdfStreamOp
private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState";
/** The Esper query to execute */
- private final String query;
+ private final EsperStatementFactory query;
/** The inferred input type of the user function */
private final TypeInformation inputType;
@@ -53,7 +53,7 @@ public class SelectEsperStreamOperator extends AbstractUdfStreamOp
* @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time)
* @param esperQuery The esper query
*/
- public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, String esperQuery) {
+ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, EsperStatementFactory esperQuery) {
super(esperSelectFunction);
this.inputType = inputStreamType;
this.query = esperQuery;
@@ -104,7 +104,7 @@ private EPServiceProvider getServiceProvider(String context) throws IOException
serviceProvider = EPServiceProviderManager.getProvider(context, configuration);
serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass());
serviceProvider.getEPRuntime().sendEvent(new CurrentTimeEvent(0));
- EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query);
+ EPStatement statement = query.createStatement(serviceProvider.getEPAdministrator());
statement.addListener((newData, oldData) -> {
for (EventBean event : newData) {
diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java b/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java
deleted file mode 100644
index 54a3e86..0000000
--- a/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package at.datasciencelabs;
-
-
-interface BuildEvent {
-}
diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java
new file mode 100644
index 0000000..1881a49
--- /dev/null
+++ b/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java
@@ -0,0 +1,138 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EventBean;
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class EsperPatternTest extends StreamingMultipleProgramsTestBase implements Serializable {
+
+
+ private static List resultingEvents;
+
+ @Before
+ public void setUp() throws Exception {
+ resultingEvents = Lists.newArrayList();
+ }
+
+ @Test
+ public void testEsperPattern() throws Exception {
+ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ executionEnvironment.setParallelism(1);
+
+ List expectedValues = Lists.newArrayList();
+ ComplexEvent complexEvent = new ComplexEvent(Event.start(), Event.end());
+ expectedValues.add(complexEvent);
+
+ List events = Arrays.asList(complexEvent.getStartEvent(), complexEvent.getEndEvent());
+ DataStream dataStream = executionEnvironment.fromCollection(events);
+
+ EsperStream esperStream = Esper.pattern(dataStream, "every (A=Event(type='start') -> B=Event(type='end'))");
+
+ DataStream complexEventDataStream = esperStream.select(new EsperSelectFunction() {
+ @Override
+ public ComplexEvent select(EventBean eventBean) throws Exception {
+ return new ComplexEvent((Event) eventBean.get("A"), (Event) eventBean.get("B"));
+ }
+ });
+
+ complexEventDataStream.addSink(new SinkFunction() {
+ @Override
+ public void invoke(ComplexEvent value) throws Exception {
+ System.err.println(value);
+ resultingEvents.add(value);
+ }
+ });
+
+ executionEnvironment.execute("test-2");
+
+ assertThat(resultingEvents, is(expectedValues));
+ }
+
+ private static class ComplexEvent {
+ private Event startEvent;
+ private Event endEvent;
+
+ ComplexEvent(Event startEvent, Event endEvent) {
+ this.startEvent = startEvent;
+ this.endEvent = endEvent;
+ }
+
+ Event getStartEvent() {
+ return startEvent;
+ }
+
+ Event getEndEvent() {
+ return endEvent;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ComplexEvent that = (ComplexEvent) o;
+
+ if (startEvent != null ? !startEvent.equals(that.startEvent) : that.startEvent != null) return false;
+ return endEvent != null ? endEvent.equals(that.endEvent) : that.endEvent == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = startEvent != null ? startEvent.hashCode() : 0;
+ result = 31 * result + (endEvent != null ? endEvent.hashCode() : 0);
+ return result;
+ }
+ }
+
+ public static class Event {
+
+ private String type;
+
+ private Event(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ static Event start() {
+ return new Event("start");
+ }
+
+ static Event end() {
+ return new Event("end");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Event event = (Event) o;
+
+ return type != null ? type.equals(event.type) : event.type == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return type != null ? type.hashCode() : 0;
+ }
+ }
+}
diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java
similarity index 91%
rename from flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java
rename to flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java
index 2f5c5c4..5b4f550 100644
--- a/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java
+++ b/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java
@@ -17,7 +17,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNull.notNullValue;
-public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable {
+public class EsperQueryTest extends StreamingMultipleProgramsTestBase implements Serializable {
private static List result;
private static List stringResult;
@@ -37,7 +37,7 @@ public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception {
DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));
- EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
+ EsperStream esperStream = Esper.query(dataStream, "select name, age from TestEvent");
DataStream resultStream = esperStream.select(new EsperSelectFunction() {
@Override
@@ -71,7 +71,7 @@ public void shouldSelectFromStreamUsingLambdaSelect() throws Exception {
DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30));
- EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
+ EsperStream esperStream = Esper.query(dataStream, "select name, age from TestEvent");
DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> {
String name = (String) collector.get("name");
@@ -101,7 +101,7 @@ public void shouldSelectFromStringDataStream() throws Exception {
List expectedValues = Arrays.asList("first", "second");
DataStream dataStream = executionEnvironment.fromCollection(expectedValues);
- EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String");
+ EsperStream esperStream = Esper.query(dataStream, "select bytes from String");
DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> {
byte[] bytes = (byte[]) collector.get("bytes");
diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java
index c48ec25..902b3f0 100644
--- a/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java
+++ b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java
@@ -3,10 +3,6 @@
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
-import com.espertech.esper.event.map.MapEventBean;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.junit.Ignore;
import org.junit.Test;
public class EsperTest {