From 68badeb81269b47ed0d1418486542274ff3b1dcd Mon Sep 17 00:00:00 2001 From: phil3k Date: Sun, 26 Nov 2017 10:16:11 +0100 Subject: [PATCH 1/5] project cleanup, sanitized simple example, drop Esper's timer in favor of Flink's own processing time --- .gitignore | 3 + flink-esper-test/flink-esper-test.iml | 88 ------------------- .../datasciencelabs/test/FlinkTestClass.java | 8 +- .../SelectEsperStreamOperator.java | 14 ++- 4 files changed, 18 insertions(+), 95 deletions(-) delete mode 100644 flink-esper-test/flink-esper-test.iml diff --git a/.gitignore b/.gitignore index 6143e53..36149fb 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,6 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* + +# IntelliJ project files +*.iml diff --git a/flink-esper-test/flink-esper-test.iml b/flink-esper-test/flink-esper-test.iml deleted file mode 100644 index 12ebfeb..0000000 --- a/flink-esper-test/flink-esper-test.iml +++ /dev/null @@ -1,88 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java index c37f240..0941f78 100644 --- a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java +++ b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java @@ -6,13 +6,13 @@ import com.espertech.esper.client.EventBean; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; public class FlinkTestClass { + @SuppressWarnings("Convert2Lambda") public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-test"); + DataStream dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-esper-input"); EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); @@ -23,9 +23,9 @@ public String select(EventBean eventBean) throws Exception { } }); - result.addSink(new PrintSinkFunction<>(true)); + result.writeAsText("file:///tmp/flink-esper-output"); - streamExecutionEnvironment.execute("Kafka 0.10 Example"); + streamExecutionEnvironment.execute("Simple Flink Esper Example"); } } diff --git a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index bf6a54e..a22258b 100644 --- a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -1,9 +1,12 @@ package at.datasciencelabs; +import com.espertech.esper.client.Configuration; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EventBean; +import com.espertech.esper.client.time.CurrentTimeEvent; +import com.espertech.esper.client.time.CurrentTimeSpanEvent; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -70,12 +73,14 @@ public void processElement(StreamRecord streamRecord) throws Exception { @Override public void onEventTime(InternalTimer internalTimer) throws Exception { - internalTimer.getTimestamp(); + // not supported yet } @Override public void onProcessingTime(InternalTimer internalTimer) throws Exception { - + EPServiceProvider epServiceProvider = getServiceProvider(this.hashCode() + ""); + epServiceProvider.getEPRuntime().sendEvent(new CurrentTimeSpanEvent(internalTimer.getTimestamp())); + this.engineState.update(epServiceProvider); } private EPServiceProvider getServiceProvider(String context) throws IOException { @@ -86,8 +91,11 @@ private EPServiceProvider getServiceProvider(String context) throws IOException synchronized (lock) { serviceProvider = engineState.value(); if (serviceProvider == null) { - serviceProvider = EPServiceProviderManager.getProvider(context); + Configuration configuration = new Configuration(); + configuration.getEngineDefaults().getThreading().setInternalTimerEnabled(false); + serviceProvider = EPServiceProviderManager.getProvider(context, configuration); serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass()); + serviceProvider.getEPRuntime().sendEvent(new CurrentTimeEvent(0)); EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query); statement.addListener((newData, oldData) -> { From 471134a96d5315ac80ecfa3a6126489f4a397d6e Mon Sep 17 00:00:00 2001 From: phil3k Date: Sun, 26 Nov 2017 10:30:55 +0100 Subject: [PATCH 2/5] some documentation --- .../datasciencelabs/SelectEsperStreamOperator.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index a22258b..ee572e8 100644 --- a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -30,12 +30,20 @@ */ public class SelectEsperStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable, Serializable { + private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState"; + + /** The Esper query to execute */ private final String query; + + /** The inferred input type of the user function */ private final TypeInformation inputType; - private ValueState engineState; - private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState"; + + /** The lock for creating a thread-safe instance of an Esper service provider */ private final Object lock = new Object[0]; + /** The state containing the Esper engine */ + private ValueState engineState; + /** * Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper. * Currently only processing time evaluation is supported. From 3a5f759cb602b2d2d9376098c056b08d0b037728 Mon Sep 17 00:00:00 2001 From: phil3k Date: Fri, 12 Jan 2018 21:34:28 +0100 Subject: [PATCH 3/5] added pure Esper tests for testing a possible state change pattern, two of them still failing because inital build failure events are not yet matched correctly --- .gitignore | 1 + .../java/at/datasciencelabs/BuildEvent.java | 5 + .../java/at/datasciencelabs/BuildFailure.java | 47 +++++ .../java/at/datasciencelabs/BuildSuccess.java | 47 +++++ .../java/at/datasciencelabs/EsperTest.java | 4 + .../StateChangePatternTest.java | 188 ++++++++++++++++++ 6 files changed, 292 insertions(+) create mode 100644 flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java create mode 100644 flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java create mode 100644 flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java create mode 100644 flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java diff --git a/.gitignore b/.gitignore index 36149fb..57517d9 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ hs_err_pid* # IntelliJ project files *.iml +.idea/* diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java b/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java new file mode 100644 index 0000000..54a3e86 --- /dev/null +++ b/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java @@ -0,0 +1,5 @@ +package at.datasciencelabs; + + +interface BuildEvent { +} diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java b/flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java new file mode 100644 index 0000000..84eb4f5 --- /dev/null +++ b/flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java @@ -0,0 +1,47 @@ +package at.datasciencelabs; + +public class BuildFailure implements BuildEvent { + + private String project; + private int buildId; + + BuildFailure(String project, int buildId) { + this.project = project; + this.buildId = buildId; + } + + public int getBuildId() { + return buildId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BuildFailure that = (BuildFailure) o; + + if (buildId != that.buildId) return false; + return project != null ? project.equals(that.project) : that.project == null; + } + + @Override + public int hashCode() { + int result = project != null ? project.hashCode() : 0; + result = 31 * result + buildId; + return result; + } + + public void setBuildId(int buildId) { + this.buildId = buildId; + } + + public String getProject() { + return project; + } + + public void setProject(String project) { + this.project = project; + } + +} diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java b/flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java new file mode 100644 index 0000000..02583bf --- /dev/null +++ b/flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java @@ -0,0 +1,47 @@ +package at.datasciencelabs; + +public class BuildSuccess implements BuildEvent { + + private String project; + private int buildId; + + BuildSuccess(String project, int buildId) { + this.project = project; + this.buildId = buildId; + } + + public String getProject() { + return project; + } + + public void setProject(String project) { + this.project = project; + } + + + public int getBuildId() { + return buildId; + } + + public void setBuildId(int buildId) { + this.buildId = buildId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BuildSuccess that = (BuildSuccess) o; + + if (buildId != that.buildId) return false; + return project != null ? project.equals(that.project) : that.project == null; + } + + @Override + public int hashCode() { + int result = project != null ? project.hashCode() : 0; + result = 31 * result + buildId; + return result; + } +} diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java index 902b3f0..c48ec25 100644 --- a/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java +++ b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java @@ -3,6 +3,10 @@ import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; +import com.espertech.esper.event.map.MapEventBean; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.junit.Ignore; import org.junit.Test; public class EsperTest { diff --git a/flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java b/flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java new file mode 100644 index 0000000..8e8f6ab --- /dev/null +++ b/flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java @@ -0,0 +1,188 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EPServiceProvider; +import com.espertech.esper.client.EPServiceProviderManager; +import com.espertech.esper.client.EPStatement; +import com.espertech.esper.client.EventBean; +import com.espertech.esper.event.map.MapEventBean; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class StateChangePatternTest { + + private EPServiceProvider engine; + + private static final String STATE_CHANGE_PATTERN = "(every(A=BuildSuccess) -> (B=BuildFailure(project=A.project) and not A=BuildSuccess))"; + + private EventBuilder eventsBuilder; + + @Before + public void onBefore() { + engine = EPServiceProviderManager.getDefaultProvider(); + + engine.getEPAdministrator().getConfiguration().addEventType(BuildSuccess.class); + engine.getEPAdministrator().getConfiguration().addEventType(BuildFailure.class); + + eventsBuilder = new EventBuilder(); + } + + @Test + public void patternDetected() { + List buildEvents = Lists.newArrayList( + eventsBuilder.expectedBuildSuccess(), + eventsBuilder.expectedBuildFailure() + ); + + runWithPatternAndEvents(STATE_CHANGE_PATTERN, buildEvents); + } + + @Test + public void precedingBuildSuccessIgnored() { + List buildEvents = Lists.newArrayList( + eventsBuilder.buildSuccess(), + eventsBuilder.expectedBuildSuccess(), + eventsBuilder.expectedBuildFailure() + ); + + runWithPatternAndEvents(STATE_CHANGE_PATTERN, buildEvents); + } + + @Test + public void patternDetectedTwice() { + + List buildEvents = Lists.newArrayList( + eventsBuilder.expectedBuildSuccess(), + eventsBuilder.expectedBuildFailure(), + eventsBuilder.expectedBuildSuccess(), + eventsBuilder.expectedBuildFailure() + ); + + runWithPatternAndEvents(STATE_CHANGE_PATTERN, buildEvents); + } + + @Test + public void eventsSeparatedByProject() { + + List buildEvents = Lists.newArrayList( + eventsBuilder.buildSuccess(), + eventsBuilder.expectedBuildSuccess("project2"), + eventsBuilder.expectedBuildFailure("project2"), + eventsBuilder.expectedBuildSuccess(), + eventsBuilder.expectedBuildFailure() + ); + + runWithPatternAndEvents(STATE_CHANGE_PATTERN, buildEvents); + } + + @Test + public void singleBuildFailureDetected() { + List buildEvents = Lists.newArrayList( + eventsBuilder.expectedBuildFailure() + ); + runWithPatternAndEvents(STATE_CHANGE_PATTERN, buildEvents); + } + + @Test + public void precedingBuildFailureDetected() { + + List buildEvents = Lists.newArrayList( + eventsBuilder.expectedBuildFailure(), + eventsBuilder.expectedBuildSuccess(), + eventsBuilder.expectedBuildFailure() + ); + + runWithPatternAndEvents(STATE_CHANGE_PATTERN, buildEvents); + } + + private void runWithPatternAndEvents(String pattern, List buildEvents) { + EPStatement epStatement = engine.getEPAdministrator().createPattern(pattern); + + Map> actualBuildEvents = Maps.newHashMap(); + epStatement.addListener((newData, oldData) -> { + Joiner.MapJoiner joiner = Joiner.on(",").withKeyValueSeparator("="); + Lists.newArrayList(newData).forEach(___ -> System.out.println(joiner.join(((MapEventBean) ___).getProperties()))); + Lists.newArrayList(newData).forEach(___ -> ((MapEventBean) ___).getProperties().entrySet().forEach(entry -> { + actualBuildEvents.putIfAbsent(entry.getKey(), new ArrayList<>()); + actualBuildEvents.get(entry.getKey()).add((BuildEvent) ((EventBean) entry.getValue()).getUnderlying()); + })); + }); + + buildEvents.forEach(___ -> engine.getEPRuntime().sendEvent(___)); + + Map> expectedBuildEvents = Maps.newHashMap(); + for (BuildEvent buildEvent : buildEvents) { + if (buildEvent instanceof Expected) { + String key = ((Expected) buildEvent).getKey(); + expectedBuildEvents.putIfAbsent(key, new ArrayList<>()); + expectedBuildEvents.get(key).add(buildEvent); + } + } + + Assert.assertEquals(expectedBuildEvents, actualBuildEvents); + } + + private class ExpectedBuildFailure extends BuildFailure implements Expected { + + private final String key; + + ExpectedBuildFailure(String project, int buildId, String key) { + super(project, buildId); + this.key = key; + } + + @Override + public String getKey() { + return key; + } + } + + private class ExpectedBuildSuccess extends BuildSuccess implements Expected { + private String key; + + ExpectedBuildSuccess(String project, int buildId, String key) { + super(project, buildId); + this.key = key; + } + + @Override + public String getKey() { + return key; + } + } + + private class EventBuilder { + private int buildId = 1; + + BuildFailure expectedBuildFailure(String project) { + return new ExpectedBuildFailure(project, buildId++, "B"); + } + + BuildSuccess expectedBuildSuccess(String project) { + return new ExpectedBuildSuccess(project, buildId++, "A"); + } + + BuildSuccess expectedBuildSuccess() { + return expectedBuildSuccess("project1"); + } + + BuildFailure expectedBuildFailure() { + return expectedBuildFailure("project1"); + } + + BuildSuccess buildSuccess() { + return new BuildSuccess("project1", buildId++); + } + } + + private interface Expected { + String getKey(); + } +} From 81c12922731f13d7ea8bcade975f04e77ada6690 Mon Sep 17 00:00:00 2001 From: phil3k Date: Sun, 14 Jan 2018 13:53:16 +0100 Subject: [PATCH 4/5] distinguish between esper pattern and query. cleanup tests. move state change pattern tests to test project --- flink-esper-test/pom.xml | 15 ++ .../datasciencelabs/test/FlinkTestClass.java | 2 +- .../at/datasciencelabs/test/BuildEvent.java | 5 + .../datasciencelabs/test}/BuildFailure.java | 2 +- .../datasciencelabs/test}/BuildSuccess.java | 2 +- .../test}/StateChangePatternTest.java | 2 +- .../main/java/at/datasciencelabs/Esper.java | 29 +++- .../java/at/datasciencelabs/EsperPattern.java | 18 +++ .../java/at/datasciencelabs/EsperQuery.java | 19 +++ .../EsperStatementFactory.java | 10 ++ .../java/at/datasciencelabs/EsperStream.java | 14 +- .../SelectEsperStreamOperator.java | 6 +- .../java/at/datasciencelabs/BuildEvent.java | 5 - .../at/datasciencelabs/EsperPatternTest.java | 138 ++++++++++++++++++ ...perStreamTest.java => EsperQueryTest.java} | 8 +- .../java/at/datasciencelabs/EsperTest.java | 4 - 16 files changed, 253 insertions(+), 26 deletions(-) create mode 100644 flink-esper-test/src/test/java/at/datasciencelabs/test/BuildEvent.java rename {flink-esper/src/test/java/at/datasciencelabs => flink-esper-test/src/test/java/at/datasciencelabs/test}/BuildFailure.java (96%) rename {flink-esper/src/test/java/at/datasciencelabs => flink-esper-test/src/test/java/at/datasciencelabs/test}/BuildSuccess.java (96%) rename {flink-esper/src/test/java/at/datasciencelabs => flink-esper-test/src/test/java/at/datasciencelabs/test}/StateChangePatternTest.java (99%) create mode 100644 flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java create mode 100644 flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java create mode 100644 flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java delete mode 100644 flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java create mode 100644 flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java rename flink-esper/src/test/java/at/datasciencelabs/{EsperStreamTest.java => EsperQueryTest.java} (91%) diff --git a/flink-esper-test/pom.xml b/flink-esper-test/pom.xml index 7e29f91..aba1166 100644 --- a/flink-esper-test/pom.xml +++ b/flink-esper-test/pom.xml @@ -57,5 +57,20 @@ flink-streaming-java_${scala.binary.version} provided + + + junit + junit + 4.12 + test + + + + com.google.guava + guava + 19.0 + test + + \ No newline at end of file diff --git a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java index 0941f78..8c5ca09 100644 --- a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java +++ b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java @@ -13,7 +13,7 @@ public class FlinkTestClass { public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-esper-input"); - + EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); DataStream result = esperStream.select(new EsperSelectFunction() { diff --git a/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildEvent.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildEvent.java new file mode 100644 index 0000000..71676cc --- /dev/null +++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildEvent.java @@ -0,0 +1,5 @@ +package at.datasciencelabs.test; + + +interface BuildEvent { +} diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildFailure.java similarity index 96% rename from flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java rename to flink-esper-test/src/test/java/at/datasciencelabs/test/BuildFailure.java index 84eb4f5..6f782de 100644 --- a/flink-esper/src/test/java/at/datasciencelabs/BuildFailure.java +++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildFailure.java @@ -1,4 +1,4 @@ -package at.datasciencelabs; +package at.datasciencelabs.test; public class BuildFailure implements BuildEvent { diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildSuccess.java similarity index 96% rename from flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java rename to flink-esper-test/src/test/java/at/datasciencelabs/test/BuildSuccess.java index 02583bf..e25c3dc 100644 --- a/flink-esper/src/test/java/at/datasciencelabs/BuildSuccess.java +++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/BuildSuccess.java @@ -1,4 +1,4 @@ -package at.datasciencelabs; +package at.datasciencelabs.test; public class BuildSuccess implements BuildEvent { diff --git a/flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java similarity index 99% rename from flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java rename to flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java index 8e8f6ab..4ac6544 100644 --- a/flink-esper/src/test/java/at/datasciencelabs/StateChangePatternTest.java +++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java @@ -1,4 +1,4 @@ -package at.datasciencelabs; +package at.datasciencelabs.test; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; diff --git a/flink-esper/src/main/java/at/datasciencelabs/Esper.java b/flink-esper/src/main/java/at/datasciencelabs/Esper.java index ac4c8a1..42bb1ec 100644 --- a/flink-esper/src/main/java/at/datasciencelabs/Esper.java +++ b/flink-esper/src/main/java/at/datasciencelabs/Esper.java @@ -2,9 +2,34 @@ import org.apache.flink.streaming.api.datastream.DataStream; +/** + * Utility class for complex event processing using Esper. + * + *

Methods which transform a {@link DataStream} into a {@link EsperStream} to do CEP. + */ public class Esper { - public static EsperStream pattern(DataStream dataStream, String query) { - return new EsperStream(dataStream, query); + /** + * Creates a {@link EsperStream} from an input data stream and a pattern. + * + * @param input DataStream containing the input events + * @param pattern Esper pattern specification which shall be detected + * @param Type of the input events + * @return Resulting esper stream + */ + public static EsperStream pattern(DataStream input, String pattern) { + return new EsperStream(input, new EsperPattern(pattern)); + } + + /** + * Creates a {@link EsperStream} from an input data stream and a query. + * + * @param input DataStream containing the input events + * @param query Query of describing which events should be selected from the stream + * @param Type of the input events + * @return Resulting esper stream + */ + public static EsperStream query(DataStream input, String query) { + return new EsperStream(input, new EsperQuery(query)); } } diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java b/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java new file mode 100644 index 0000000..7580485 --- /dev/null +++ b/flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java @@ -0,0 +1,18 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EPAdministrator; +import com.espertech.esper.client.EPStatement; + +class EsperPattern implements EsperStatementFactory { + + private String pattern; + + EsperPattern(String pattern) { + this.pattern = pattern; + } + + @Override + public EPStatement createStatement(EPAdministrator administrator) { + return administrator.createPattern(pattern); + } +} diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java b/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java new file mode 100644 index 0000000..bdd5a24 --- /dev/null +++ b/flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java @@ -0,0 +1,19 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EPAdministrator; +import com.espertech.esper.client.EPStatement; + +class EsperQuery implements EsperStatementFactory { + + private String query; + + EsperQuery(String query) { + this.query = query; + } + + + @Override + public EPStatement createStatement(EPAdministrator administrator) { + return administrator.createEPL(query); + } +} diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java new file mode 100644 index 0000000..dc0afab --- /dev/null +++ b/flink-esper/src/main/java/at/datasciencelabs/EsperStatementFactory.java @@ -0,0 +1,10 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EPAdministrator; +import com.espertech.esper.client.EPStatement; + +import java.io.Serializable; + +interface EsperStatementFactory extends Serializable { + EPStatement createStatement(EPAdministrator administrator); +} diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java index 140f23c..4228e5d 100644 --- a/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java +++ b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java @@ -22,7 +22,7 @@ public class EsperStream { private final DataStream inputStream; - private final String esperQuery; + private final EsperStatementFactory esperQuery; /** @@ -30,14 +30,20 @@ public class EsperStream { * @param inputStream The input DataStream * @param esperQuery An Esper query */ - public EsperStream(DataStream inputStream, String esperQuery) { + EsperStream(DataStream inputStream, EsperStatementFactory esperQuery) { this.inputStream = inputStream; this.esperQuery = esperQuery; } /** - * Select from the EsperStream, must provide the return type of the output DataStream since no type information is - * currently extracted from the @see {@link EsperSelectFunction}. + * Applies a select function to the detected pattern sequence or query results. For each pattern sequence or query result the + * provided {@link EsperSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * @param esperSelectFunction The pattern select function which is called for each detected pattern sequence. + * @param Type of the resulting elements + * @return {@link DataStream} which contains the resulting elements from the pattern select + * function. */ public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction) { KeySelector keySelector = new NullByteKeySelector<>(); diff --git a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index ee572e8..1c7ca8f 100644 --- a/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -33,7 +33,7 @@ public class SelectEsperStreamOperator extends AbstractUdfStreamOp private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState"; /** The Esper query to execute */ - private final String query; + private final EsperStatementFactory query; /** The inferred input type of the user function */ private final TypeInformation inputType; @@ -53,7 +53,7 @@ public class SelectEsperStreamOperator extends AbstractUdfStreamOp * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time) * @param esperQuery The esper query */ - public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, String esperQuery) { + public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, EsperStatementFactory esperQuery) { super(esperSelectFunction); this.inputType = inputStreamType; this.query = esperQuery; @@ -104,7 +104,7 @@ private EPServiceProvider getServiceProvider(String context) throws IOException serviceProvider = EPServiceProviderManager.getProvider(context, configuration); serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass()); serviceProvider.getEPRuntime().sendEvent(new CurrentTimeEvent(0)); - EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query); + EPStatement statement = query.createStatement(serviceProvider.getEPAdministrator()); statement.addListener((newData, oldData) -> { for (EventBean event : newData) { diff --git a/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java b/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java deleted file mode 100644 index 54a3e86..0000000 --- a/flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java +++ /dev/null @@ -1,5 +0,0 @@ -package at.datasciencelabs; - - -interface BuildEvent { -} diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java new file mode 100644 index 0000000..1881a49 --- /dev/null +++ b/flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java @@ -0,0 +1,138 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EventBean; +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class EsperPatternTest extends StreamingMultipleProgramsTestBase implements Serializable { + + + private static List resultingEvents; + + @Before + public void setUp() throws Exception { + resultingEvents = Lists.newArrayList(); + } + + @Test + public void testEsperPattern() throws Exception { + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); + + List expectedValues = Lists.newArrayList(); + ComplexEvent complexEvent = new ComplexEvent(Event.start(), Event.end()); + expectedValues.add(complexEvent); + + List events = Arrays.asList(complexEvent.getStartEvent(), complexEvent.getEndEvent()); + DataStream dataStream = executionEnvironment.fromCollection(events); + + EsperStream esperStream = Esper.pattern(dataStream, "every (A=Event(type='start') -> B=Event(type='end'))"); + + DataStream complexEventDataStream = esperStream.select(new EsperSelectFunction() { + @Override + public ComplexEvent select(EventBean eventBean) throws Exception { + return new ComplexEvent((Event) eventBean.get("A"), (Event) eventBean.get("B")); + } + }); + + complexEventDataStream.addSink(new SinkFunction() { + @Override + public void invoke(ComplexEvent value) throws Exception { + System.err.println(value); + resultingEvents.add(value); + } + }); + + executionEnvironment.execute("test-2"); + + assertThat(resultingEvents, is(expectedValues)); + } + + private static class ComplexEvent { + private Event startEvent; + private Event endEvent; + + ComplexEvent(Event startEvent, Event endEvent) { + this.startEvent = startEvent; + this.endEvent = endEvent; + } + + Event getStartEvent() { + return startEvent; + } + + Event getEndEvent() { + return endEvent; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ComplexEvent that = (ComplexEvent) o; + + if (startEvent != null ? !startEvent.equals(that.startEvent) : that.startEvent != null) return false; + return endEvent != null ? endEvent.equals(that.endEvent) : that.endEvent == null; + } + + @Override + public int hashCode() { + int result = startEvent != null ? startEvent.hashCode() : 0; + result = 31 * result + (endEvent != null ? endEvent.hashCode() : 0); + return result; + } + } + + public static class Event { + + private String type; + + private Event(String type) { + this.type = type; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + static Event start() { + return new Event("start"); + } + + static Event end() { + return new Event("end"); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Event event = (Event) o; + + return type != null ? type.equals(event.type) : event.type == null; + } + + @Override + public int hashCode() { + return type != null ? type.hashCode() : 0; + } + } +} diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java similarity index 91% rename from flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java rename to flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java index 2f5c5c4..5b4f550 100644 --- a/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java +++ b/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java @@ -17,7 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsNull.notNullValue; -public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable { +public class EsperQueryTest extends StreamingMultipleProgramsTestBase implements Serializable { private static List result; private static List stringResult; @@ -37,7 +37,7 @@ public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception { DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); + EsperStream esperStream = Esper.query(dataStream, "select name, age from TestEvent"); DataStream resultStream = esperStream.select(new EsperSelectFunction() { @Override @@ -71,7 +71,7 @@ public void shouldSelectFromStreamUsingLambdaSelect() throws Exception { DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30)); - EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); + EsperStream esperStream = Esper.query(dataStream, "select name, age from TestEvent"); DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { String name = (String) collector.get("name"); @@ -101,7 +101,7 @@ public void shouldSelectFromStringDataStream() throws Exception { List expectedValues = Arrays.asList("first", "second"); DataStream dataStream = executionEnvironment.fromCollection(expectedValues); - EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); + EsperStream esperStream = Esper.query(dataStream, "select bytes from String"); DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { byte[] bytes = (byte[]) collector.get("bytes"); diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java index c48ec25..902b3f0 100644 --- a/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java +++ b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java @@ -3,10 +3,6 @@ import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; -import com.espertech.esper.event.map.MapEventBean; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import org.junit.Ignore; import org.junit.Test; public class EsperTest { From 6e001d7cec1945efe78d8d24a38b0e9ad2bf56b6 Mon Sep 17 00:00:00 2001 From: "Limbeck, Philip" Date: Tue, 16 Jan 2018 09:10:53 +0100 Subject: [PATCH 5/5] added missing serialVersionUIDs, cleanup, ignored failing tests --- .gitignore | 2 +- .../java/at/datasciencelabs/test/FlinkTestClass.java | 2 ++ .../datasciencelabs/test/StateChangePatternTest.java | 9 ++++++--- .../at/datasciencelabs/EsperEngineSerializer.java | 2 ++ .../test/java/at/datasciencelabs/EsperQueryTest.java | 12 ++++++++++++ 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 57517d9..f136654 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,4 @@ hs_err_pid* # IntelliJ project files *.iml -.idea/* +.idea/* \ No newline at end of file diff --git a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java index 8c5ca09..1acf121 100644 --- a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java +++ b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java @@ -17,6 +17,8 @@ public static void main(String[] args) throws Exception { EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); DataStream result = esperStream.select(new EsperSelectFunction() { + private static final long serialVersionUID = 7093943872082195786L; + @Override public String select(EventBean eventBean) throws Exception { return new String((byte[]) eventBean.get("bytes")); diff --git a/flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java b/flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java index 4ac6544..12c4854 100644 --- a/flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java +++ b/flink-esper-test/src/test/java/at/datasciencelabs/test/StateChangePatternTest.java @@ -10,6 +10,7 @@ import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; @@ -83,6 +84,7 @@ public void eventsSeparatedByProject() { } @Test + @Ignore("State change pattern does not detect initial failures yet") public void singleBuildFailureDetected() { List buildEvents = Lists.newArrayList( eventsBuilder.expectedBuildFailure() @@ -91,6 +93,7 @@ public void singleBuildFailureDetected() { } @Test + @Ignore("State change pattern does not detect initial failures yet") public void precedingBuildFailureDetected() { List buildEvents = Lists.newArrayList( @@ -109,9 +112,9 @@ private void runWithPatternAndEvents(String pattern, List buildEvent epStatement.addListener((newData, oldData) -> { Joiner.MapJoiner joiner = Joiner.on(",").withKeyValueSeparator("="); Lists.newArrayList(newData).forEach(___ -> System.out.println(joiner.join(((MapEventBean) ___).getProperties()))); - Lists.newArrayList(newData).forEach(___ -> ((MapEventBean) ___).getProperties().entrySet().forEach(entry -> { - actualBuildEvents.putIfAbsent(entry.getKey(), new ArrayList<>()); - actualBuildEvents.get(entry.getKey()).add((BuildEvent) ((EventBean) entry.getValue()).getUnderlying()); + Lists.newArrayList(newData).forEach(___ -> ((MapEventBean) ___).getProperties().forEach((key, value) -> { + actualBuildEvents.putIfAbsent(key, new ArrayList<>()); + actualBuildEvents.get(key).add((BuildEvent) ((EventBean) value).getUnderlying()); })); }); diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java index 56bfdc9..2869524 100644 --- a/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java +++ b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java @@ -11,6 +11,8 @@ public class EsperEngineSerializer extends TypeSerializer { + private static final long serialVersionUID = -5523322497977330283L; + @Override public boolean isImmutableType() { return false; diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java index 5b4f550..8aa7f53 100644 --- a/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java +++ b/flink-esper/src/test/java/at/datasciencelabs/EsperQueryTest.java @@ -19,6 +19,7 @@ public class EsperQueryTest extends StreamingMultipleProgramsTestBase implements Serializable { + private static final long serialVersionUID = 3151045298871771992L; private static List result; private static List stringResult; @@ -40,6 +41,8 @@ public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception { EsperStream esperStream = Esper.query(dataStream, "select name, age from TestEvent"); DataStream resultStream = esperStream.select(new EsperSelectFunction() { + private static final long serialVersionUID = 8802852465465541287L; + @Override public TestEvent select(EventBean eventBean) throws Exception { String name = (String) eventBean.get("name"); @@ -49,6 +52,9 @@ public TestEvent select(EventBean eventBean) throws Exception { }); resultStream.addSink(new SinkFunction() { + + private static final long serialVersionUID = -8260794084029816089L; + @Override public void invoke(TestEvent testEvent) throws Exception { System.err.println(testEvent); @@ -80,6 +86,9 @@ public void shouldSelectFromStreamUsingLambdaSelect() throws Exception { }); resultStream.addSink(new SinkFunction() { + + private static final long serialVersionUID = 5588530728493738002L; + @Override public void invoke(TestEvent testEvent) throws Exception { result.add(testEvent); @@ -109,6 +118,9 @@ public void shouldSelectFromStringDataStream() throws Exception { }); resultStream.addSink(new SinkFunction() { + + private static final long serialVersionUID = 284955963055337762L; + @Override public void invoke(String testEvent) throws Exception { System.err.println(testEvent);