> expectedBuildEvents = Maps.newHashMap();
+ for (BuildEvent buildEvent : buildEvents) {
+ if (buildEvent instanceof Expected) {
+ String key = ((Expected) buildEvent).getKey();
+ expectedBuildEvents.putIfAbsent(key, new ArrayList<>());
+ expectedBuildEvents.get(key).add(buildEvent);
+ }
+ }
+
+ Assert.assertEquals(expectedBuildEvents, actualBuildEvents);
+ }
+
+ private class ExpectedBuildFailure extends BuildFailure implements Expected {
+
+ private final String key;
+
+ ExpectedBuildFailure(String project, int buildId, String key) {
+ super(project, buildId);
+ this.key = key;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+ }
+
+ private class ExpectedBuildSuccess extends BuildSuccess implements Expected {
+ private String key;
+
+ ExpectedBuildSuccess(String project, int buildId, String key) {
+ super(project, buildId);
+ this.key = key;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+ }
+
+ private class EventBuilder {
+ private int buildId = 1;
+
+ BuildFailure expectedBuildFailure(String project) {
+ return new ExpectedBuildFailure(project, buildId++, "B");
+ }
+
+ BuildSuccess expectedBuildSuccess(String project) {
+ return new ExpectedBuildSuccess(project, buildId++, "A");
+ }
+
+ BuildSuccess expectedBuildSuccess() {
+ return expectedBuildSuccess("project1");
+ }
+
+ BuildFailure expectedBuildFailure() {
+ return expectedBuildFailure("project1");
+ }
+
+ BuildSuccess buildSuccess() {
+ return new BuildSuccess("project1", buildId++);
+ }
+ }
+
+ private interface Expected {
+ String getKey();
+ }
+}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/Esper.java b/flink-esper/src/main/java/at/datasciencelabs/Esper.java
index ac4c8a1..42bb1ec 100644
--- a/flink-esper/src/main/java/at/datasciencelabs/Esper.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/Esper.java
@@ -2,9 +2,34 @@
import org.apache.flink.streaming.api.datastream.DataStream;
+/**
+ * Utility class for complex event processing using Esper.
+ *
+ * Methods which transform a {@link DataStream} into a {@link EsperStream} to do CEP.
+ */
public class Esper {
- public static EsperStream pattern(DataStream dataStream, String query) {
- return new EsperStream(dataStream, query);
+ /**
+ * Creates a {@link EsperStream} from an input data stream and a pattern.
+ *
+ * @param input DataStream containing the input events
+ * @param pattern Esper pattern specification which shall be detected
+ * @param Type of the input events
+ * @return Resulting esper stream
+ */
+ public static EsperStream pattern(DataStream input, String pattern) {
+ return new EsperStream(input, new EsperPattern(pattern));
+ }
+
+ /**
+ * Creates a {@link EsperStream} from an input data stream and a query.
+ *
+ * @param input DataStream containing the input events
+ * @param query Query of describing which events should be selected from the stream
+ * @param Type of the input events
+ * @return Resulting esper stream
+ */
+ public static EsperStream query(DataStream input, String query) {
+ return new EsperStream(input, new EsperQuery(query));
}
}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java
index 56bfdc9..2869524 100644
--- a/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java
@@ -11,6 +11,8 @@
public class EsperEngineSerializer extends TypeSerializer {
+ private static final long serialVersionUID = -5523322497977330283L;
+
@Override
public boolean isImmutableType() {
return false;
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java b/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java
new file mode 100644
index 0000000..7580485
--- /dev/null
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java
@@ -0,0 +1,18 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EPAdministrator;
+import com.espertech.esper.client.EPStatement;
+
+class EsperPattern implements EsperStatementFactory {
+
+ private String pattern;
+
+ EsperPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ @Override
+ public EPStatement createStatement(EPAdministrator administrator) {
+ return administrator.createPattern(pattern);
+ }
+}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java b/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java
new file mode 100644
index 0000000..bdd5a24
--- /dev/null
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java
@@ -0,0 +1,19 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EPAdministrator;
+import com.espertech.esper.client.EPStatement;
+
+class EsperQuery implements EsperStatementFactory {
+
+ private String query;
+
+ EsperQuery(String query) {
+ this.query = query;
+ }
+
+
+ @Override
+ public EPStatement createStatement(EPAdministrator administrator) {
+ return administrator.createEPL(query);
+ }
+}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java
new file mode 100644
index 0000000..dc0afab
--- /dev/null
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java
@@ -0,0 +1,10 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EPAdministrator;
+import com.espertech.esper.client.EPStatement;
+
+import java.io.Serializable;
+
+interface EsperStatementFactory extends Serializable {
+ EPStatement createStatement(EPAdministrator administrator);
+}
diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
index 140f23c..4228e5d 100644
--- a/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
@@ -22,7 +22,7 @@
public class EsperStream {
private final DataStream inputStream;
- private final String esperQuery;
+ private final EsperStatementFactory esperQuery;
/**
@@ -30,14 +30,20 @@ public class EsperStream {
* @param inputStream The input DataStream
* @param esperQuery An Esper query
*/
- public EsperStream(DataStream inputStream, String esperQuery) {
+ EsperStream(DataStream inputStream, EsperStatementFactory esperQuery) {
this.inputStream = inputStream;
this.esperQuery = esperQuery;
}
/**
- * Select from the EsperStream, must provide the return type of the output DataStream since no type information is
- * currently extracted from the @see {@link EsperSelectFunction}.
+ * Applies a select function to the detected pattern sequence or query results. For each pattern sequence or query result the
+ * provided {@link EsperSelectFunction} is called. The pattern select function can produce
+ * exactly one resulting element.
+ *
+ * @param esperSelectFunction The pattern select function which is called for each detected pattern sequence.
+ * @param Type of the resulting elements
+ * @return {@link DataStream} which contains the resulting elements from the pattern select
+ * function.
*/
public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction) {
KeySelector keySelector = new NullByteKeySelector<>();
diff --git a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
index bf6a54e..1c7ca8f 100644
--- a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
+++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
@@ -1,9 +1,12 @@
package at.datasciencelabs;
+import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
+import com.espertech.esper.client.time.CurrentTimeEvent;
+import com.espertech.esper.client.time.CurrentTimeSpanEvent;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -27,12 +30,20 @@
*/
public class SelectEsperStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable, Serializable {
- private final String query;
- private final TypeInformation inputType;
- private ValueState engineState;
private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState";
+
+ /** The Esper query to execute */
+ private final EsperStatementFactory query;
+
+ /** The inferred input type of the user function */
+ private final TypeInformation inputType;
+
+ /** The lock for creating a thread-safe instance of an Esper service provider */
private final Object lock = new Object[0];
+ /** The state containing the Esper engine */
+ private ValueState engineState;
+
/**
* Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper.
* Currently only processing time evaluation is supported.
@@ -42,7 +53,7 @@ public class SelectEsperStreamOperator extends AbstractUdfStreamOp
* @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time)
* @param esperQuery The esper query
*/
- public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, String esperQuery) {
+ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, EsperStatementFactory esperQuery) {
super(esperSelectFunction);
this.inputType = inputStreamType;
this.query = esperQuery;
@@ -70,12 +81,14 @@ public void processElement(StreamRecord streamRecord) throws Exception {
@Override
public void onEventTime(InternalTimer internalTimer) throws Exception {
- internalTimer.getTimestamp();
+ // not supported yet
}
@Override
public void onProcessingTime(InternalTimer internalTimer) throws Exception {
-
+ EPServiceProvider epServiceProvider = getServiceProvider(this.hashCode() + "");
+ epServiceProvider.getEPRuntime().sendEvent(new CurrentTimeSpanEvent(internalTimer.getTimestamp()));
+ this.engineState.update(epServiceProvider);
}
private EPServiceProvider getServiceProvider(String context) throws IOException {
@@ -86,9 +99,12 @@ private EPServiceProvider getServiceProvider(String context) throws IOException
synchronized (lock) {
serviceProvider = engineState.value();
if (serviceProvider == null) {
- serviceProvider = EPServiceProviderManager.getProvider(context);
+ Configuration configuration = new Configuration();
+ configuration.getEngineDefaults().getThreading().setInternalTimerEnabled(false);
+ serviceProvider = EPServiceProviderManager.getProvider(context, configuration);
serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass());
- EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query);
+ serviceProvider.getEPRuntime().sendEvent(new CurrentTimeEvent(0));
+ EPStatement statement = query.createStatement(serviceProvider.getEPAdministrator());
statement.addListener((newData, oldData) -> {
for (EventBean event : newData) {
diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java
new file mode 100644
index 0000000..1881a49
--- /dev/null
+++ b/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java
@@ -0,0 +1,138 @@
+package at.datasciencelabs;
+
+import com.espertech.esper.client.EventBean;
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class EsperPatternTest extends StreamingMultipleProgramsTestBase implements Serializable {
+
+
+ private static List resultingEvents;
+
+ @Before
+ public void setUp() throws Exception {
+ resultingEvents = Lists.newArrayList();
+ }
+
+ @Test
+ public void testEsperPattern() throws Exception {
+ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ executionEnvironment.setParallelism(1);
+
+ List expectedValues = Lists.newArrayList();
+ ComplexEvent complexEvent = new ComplexEvent(Event.start(), Event.end());
+ expectedValues.add(complexEvent);
+
+ List events = Arrays.asList(complexEvent.getStartEvent(), complexEvent.getEndEvent());
+ DataStream dataStream = executionEnvironment.fromCollection(events);
+
+ EsperStream esperStream = Esper.pattern(dataStream, "every (A=Event(type='start') -> B=Event(type='end'))");
+
+ DataStream complexEventDataStream = esperStream.select(new EsperSelectFunction() {
+ @Override
+ public ComplexEvent select(EventBean eventBean) throws Exception {
+ return new ComplexEvent((Event) eventBean.get("A"), (Event) eventBean.get("B"));
+ }
+ });
+
+ complexEventDataStream.addSink(new SinkFunction() {
+ @Override
+ public void invoke(ComplexEvent value) throws Exception {
+ System.err.println(value);
+ resultingEvents.add(value);
+ }
+ });
+
+ executionEnvironment.execute("test-2");
+
+ assertThat(resultingEvents, is(expectedValues));
+ }
+
+ private static class ComplexEvent {
+ private Event startEvent;
+ private Event endEvent;
+
+ ComplexEvent(Event startEvent, Event endEvent) {
+ this.startEvent = startEvent;
+ this.endEvent = endEvent;
+ }
+
+ Event getStartEvent() {
+ return startEvent;
+ }
+
+ Event getEndEvent() {
+ return endEvent;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ComplexEvent that = (ComplexEvent) o;
+
+ if (startEvent != null ? !startEvent.equals(that.startEvent) : that.startEvent != null) return false;
+ return endEvent != null ? endEvent.equals(that.endEvent) : that.endEvent == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = startEvent != null ? startEvent.hashCode() : 0;
+ result = 31 * result + (endEvent != null ? endEvent.hashCode() : 0);
+ return result;
+ }
+ }
+
+ public static class Event {
+
+ private String type;
+
+ private Event(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ static Event start() {
+ return new Event("start");
+ }
+
+ static Event end() {
+ return new Event("end");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Event event = (Event) o;
+
+ return type != null ? type.equals(event.type) : event.type == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return type != null ? type.hashCode() : 0;
+ }
+ }
+}
diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java
similarity index 84%
rename from flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java
rename to flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java
index 2f5c5c4..8aa7f53 100644
--- a/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java
+++ b/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java
@@ -17,8 +17,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNull.notNullValue;
-public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable {
+public class EsperQueryTest extends StreamingMultipleProgramsTestBase implements Serializable {
+ private static final long serialVersionUID = 3151045298871771992L;
private static List result;
private static List stringResult;
@@ -37,9 +38,11 @@ public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception {
DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));
- EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
+ EsperStream esperStream = Esper.query(dataStream, "select name, age from TestEvent");
DataStream resultStream = esperStream.select(new EsperSelectFunction() {
+ private static final long serialVersionUID = 8802852465465541287L;
+
@Override
public TestEvent select(EventBean eventBean) throws Exception {
String name = (String) eventBean.get("name");
@@ -49,6 +52,9 @@ public TestEvent select(EventBean eventBean) throws Exception {
});
resultStream.addSink(new SinkFunction() {
+
+ private static final long serialVersionUID = -8260794084029816089L;
+
@Override
public void invoke(TestEvent testEvent) throws Exception {
System.err.println(testEvent);
@@ -71,7 +77,7 @@ public void shouldSelectFromStreamUsingLambdaSelect() throws Exception {
DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30));
- EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
+ EsperStream esperStream = Esper.query(dataStream, "select name, age from TestEvent");
DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> {
String name = (String) collector.get("name");
@@ -80,6 +86,9 @@ public void shouldSelectFromStreamUsingLambdaSelect() throws Exception {
});
resultStream.addSink(new SinkFunction() {
+
+ private static final long serialVersionUID = 5588530728493738002L;
+
@Override
public void invoke(TestEvent testEvent) throws Exception {
result.add(testEvent);
@@ -101,7 +110,7 @@ public void shouldSelectFromStringDataStream() throws Exception {
List expectedValues = Arrays.asList("first", "second");
DataStream dataStream = executionEnvironment.fromCollection(expectedValues);
- EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String");
+ EsperStream esperStream = Esper.query(dataStream, "select bytes from String");
DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> {
byte[] bytes = (byte[]) collector.get("bytes");
@@ -109,6 +118,9 @@ public void shouldSelectFromStringDataStream() throws Exception {
});
resultStream.addSink(new SinkFunction() {
+
+ private static final long serialVersionUID = 284955963055337762L;
+
@Override
public void invoke(String testEvent) throws Exception {
System.err.println(testEvent);