diff --git a/flink-esper.iml b/flink-esper-test/flink-esper-test.iml similarity index 88% rename from flink-esper.iml rename to flink-esper-test/flink-esper-test.iml index 1d93ba2..12ebfeb 100644 --- a/flink-esper.iml +++ b/flink-esper-test/flink-esper-test.iml @@ -11,12 +11,14 @@ + + @@ -37,19 +39,6 @@ - - - - - - - - - - - - - @@ -88,9 +77,12 @@ + - - + + + + \ No newline at end of file diff --git a/flink-esper-test/pom.xml b/flink-esper-test/pom.xml new file mode 100644 index 0000000..7e29f91 --- /dev/null +++ b/flink-esper-test/pom.xml @@ -0,0 +1,61 @@ + + + + flink-esper-parent + at.datasciencelabs.research + 0.0.1-SNAPSHOT + + 4.0.0 + + flink-esper-test + + + + + maven-assembly-plugin + + + + at.datasciencelabs.test.FlinkTestClass + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + + + at.datasciencelabs.research + flink-esper + ${parent.version} + + + + org.apache.flink + flink-core + provided + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + provided + + + \ No newline at end of file diff --git a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java new file mode 100644 index 0000000..c37f240 --- /dev/null +++ b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java @@ -0,0 +1,31 @@ +package at.datasciencelabs.test; + +import at.datasciencelabs.Esper; +import at.datasciencelabs.EsperSelectFunction; +import at.datasciencelabs.EsperStream; +import com.espertech.esper.client.EventBean; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; + +public class FlinkTestClass { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-test"); + + EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); + + DataStream result = esperStream.select(new EsperSelectFunction() { + @Override + public String select(EventBean eventBean) throws Exception { + return new String((byte[]) eventBean.get("bytes")); + } + }); + + result.addSink(new PrintSinkFunction<>(true)); + + streamExecutionEnvironment.execute("Kafka 0.10 Example"); + } + +} diff --git a/flink-esper/pom.xml b/flink-esper/pom.xml new file mode 100644 index 0000000..b22d626 --- /dev/null +++ b/flink-esper/pom.xml @@ -0,0 +1,63 @@ + + + + flink-esper-parent + at.datasciencelabs.research + 0.0.1-SNAPSHOT + + 4.0.0 + + flink-esper + + + + com.espertech + esper + + + + org.apache.flink + flink-core + provided + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + provided + + + + org.apache.flink + flink-shaded-guava + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + test + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + test-jar + test + + + + org.apache.flink + flink-runtime_${scala.binary.version} + test-jar + test + + + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + test + + + \ No newline at end of file diff --git a/flink-esper/src/main/java/at/datasciencelabs/Esper.java b/flink-esper/src/main/java/at/datasciencelabs/Esper.java new file mode 100644 index 0000000..ac4c8a1 --- /dev/null +++ b/flink-esper/src/main/java/at/datasciencelabs/Esper.java @@ -0,0 +1,10 @@ +package at.datasciencelabs; + +import org.apache.flink.streaming.api.datastream.DataStream; + +public class Esper { + + public static EsperStream pattern(DataStream dataStream, String query) { + return new EsperStream(dataStream, query); + } +} diff --git a/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java new file mode 100644 index 0000000..56bfdc9 --- /dev/null +++ b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java @@ -0,0 +1,88 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EPServiceProvider; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +public class EsperEngineSerializer extends TypeSerializer { + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer duplicate() { + return this; + } + + @Override + public EPServiceProvider createInstance() { + return null; + } + + @Override + public EPServiceProvider copy(EPServiceProvider from) { + return null; + } + + @Override + public EPServiceProvider copy(EPServiceProvider from, EPServiceProvider reuse) { + return null; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(EPServiceProvider record, DataOutputView target) throws IOException { + + } + + @Override + public EPServiceProvider deserialize(DataInputView source) throws IOException { + return null; + } + + @Override + public EPServiceProvider deserialize(EPServiceProvider reuse, DataInputView source) throws IOException { + return null; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + + } + + @Override + public boolean equals(Object obj) { + return false; + } + + @Override + public boolean canEqual(Object obj) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return null; + } + + @Override + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + return null; + } +} diff --git a/src/main/java/at/datasciencelabs/EsperSelectFunction.java b/flink-esper/src/main/java/at/datasciencelabs/EsperSelectFunction.java similarity index 100% rename from src/main/java/at/datasciencelabs/EsperSelectFunction.java rename to flink-esper/src/main/java/at/datasciencelabs/EsperSelectFunction.java diff --git a/src/main/java/at/datasciencelabs/EsperStream.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java similarity index 100% rename from src/main/java/at/datasciencelabs/EsperStream.java rename to flink-esper/src/main/java/at/datasciencelabs/EsperStream.java diff --git a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java similarity index 51% rename from src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java rename to flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index 92121dd..bf6a54e 100644 --- a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -4,6 +4,8 @@ import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EventBean; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.VoidNamespace; @@ -13,27 +15,32 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import java.io.IOException; import java.io.Serializable; /** * An operator which supports detecting event sequences and patterns using Esper. + * * @param Type of the key - * @param Type of the input stream + * @param Type of the input stream * @param Type of the output stream */ public class SelectEsperStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable, Serializable { private final String query; private final TypeInformation inputType; - private EPServiceProvider engine; + private ValueState engineState; + private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState"; + private final Object lock = new Object[0]; /** * Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper. * Currently only processing time evaluation is supported. - * @param inputStreamType type of the input DataStream + * + * @param inputStreamType type of the input DataStream * @param esperSelectFunction function to select from Esper's output - * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time) - * @param esperQuery The esper query + * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time) + * @param esperQuery The esper query */ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, String esperQuery) { super(esperSelectFunction); @@ -48,34 +55,57 @@ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelec @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - engine = EPServiceProviderManager.getDefaultProvider(); - engine.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass()); - EPStatement statement = engine.getEPAdministrator().createEPL(query); - statement.addListener((newData, oldData) -> { - for (EventBean event : newData) { - EsperSelectFunction userFunction = getUserFunction(); - try { - output.collect(new StreamRecord<>((userFunction.select(event)))); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); + + if (this.engineState == null) { + this.engineState = getRuntimeContext().getState(new ValueStateDescriptor<>(ESPER_SERVICE_PROVIDER_STATE, new EsperEngineSerializer())); + } } @Override public void processElement(StreamRecord streamRecord) throws Exception { - engine.getEPRuntime().sendEvent(streamRecord.getValue()); + EPServiceProvider esperServiceProvider = getServiceProvider(this.hashCode() + ""); + esperServiceProvider.getEPRuntime().sendEvent(streamRecord.getValue()); + this.engineState.update(esperServiceProvider); } @Override public void onEventTime(InternalTimer internalTimer) throws Exception { internalTimer.getTimestamp(); - } @Override public void onProcessingTime(InternalTimer internalTimer) throws Exception { } + + private EPServiceProvider getServiceProvider(String context) throws IOException { + EPServiceProvider serviceProvider = engineState.value(); + if (serviceProvider != null) { + return serviceProvider; + } + synchronized (lock) { + serviceProvider = engineState.value(); + if (serviceProvider == null) { + serviceProvider = EPServiceProviderManager.getProvider(context); + serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass()); + EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query); + + statement.addListener((newData, oldData) -> { + for (EventBean event : newData) { + EsperSelectFunction userFunction = getUserFunction(); + try { + output.collect(new StreamRecord<>((userFunction.select(event)))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + this.engineState.update(serviceProvider); + return serviceProvider; + + } else { + return engineState.value(); + } + } + } } diff --git a/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java new file mode 100644 index 0000000..2f5c5c4 --- /dev/null +++ b/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java @@ -0,0 +1,126 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EventBean; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNull.notNullValue; + +public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable { + + private static List result; + private static List stringResult; + + + @Before + public void before() { + result = new ArrayList<>(); + stringResult = new ArrayList<>(); + } + + @Test + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception { + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); + + DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); + + EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); + + DataStream resultStream = esperStream.select(new EsperSelectFunction() { + @Override + public TestEvent select(EventBean eventBean) throws Exception { + String name = (String) eventBean.get("name"); + int age = (int) eventBean.get("age"); + return new TestEvent(name, age); + } + }); + + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(TestEvent testEvent) throws Exception { + System.err.println(testEvent); + result.add(testEvent); + } + }); + + executionEnvironment.execute("test-2"); + + assertThat(result, is(notNullValue())); + assertThat(result.size(), is(3)); + } + + @Test + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStreamUsingLambdaSelect() throws Exception { + + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); + + DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30)); + + EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); + + DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { + String name = (String) collector.get("name"); + int age = (int) collector.get("age"); + return new TestEvent(name, age); + }); + + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(TestEvent testEvent) throws Exception { + result.add(testEvent); + } + }); + + executionEnvironment.execute("test-1"); + + assertThat(result, is(notNullValue())); + assertThat(result.size(), is(3)); + } + + @Test + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStringDataStream() throws Exception { + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); + + List expectedValues = Arrays.asList("first", "second"); + DataStream dataStream = executionEnvironment.fromCollection(expectedValues); + + EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); + + DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { + byte[] bytes = (byte[]) collector.get("bytes"); + return new String(bytes); + }); + + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(String testEvent) throws Exception { + System.err.println(testEvent); + stringResult.add(testEvent); + } + }); + + executionEnvironment.execute("test-2"); + + assertThat(stringResult, is(notNullValue())); + assertThat(stringResult.size(), is(2)); + assertThat(stringResult, is(expectedValues)); + } + +} \ No newline at end of file diff --git a/src/test/java/at/datasciencelabs/EsperTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java similarity index 100% rename from src/test/java/at/datasciencelabs/EsperTest.java rename to flink-esper/src/test/java/at/datasciencelabs/EsperTest.java diff --git a/src/test/java/at/datasciencelabs/TestEvent.java b/flink-esper/src/test/java/at/datasciencelabs/TestEvent.java similarity index 81% rename from src/test/java/at/datasciencelabs/TestEvent.java rename to flink-esper/src/test/java/at/datasciencelabs/TestEvent.java index 9838ed8..6bf616a 100644 --- a/src/test/java/at/datasciencelabs/TestEvent.java +++ b/flink-esper/src/test/java/at/datasciencelabs/TestEvent.java @@ -37,4 +37,12 @@ public String getName() { public int getAge() { return age; } + + @Override + public String toString() { + return "TestEvent{" + + "name='" + name + '\'' + + ", age=" + age + + '}'; + } } diff --git a/pom.xml b/pom.xml index 1c6c2ec..42ae760 100644 --- a/pom.xml +++ b/pom.xml @@ -5,8 +5,13 @@ 4.0.0 at.datasciencelabs.research - flink-esper + flink-esper-parent + pom 0.0.1-SNAPSHOT + + flink-esper + flink-esper-test + 2.11 @@ -15,65 +20,67 @@ 1.8 - - - com.espertech - esper - 5.3.0 - + + + + com.espertech + esper + 5.3.0 + - - org.apache.flink - flink-core - ${flink.version} - provided - + + org.apache.flink + flink-core + ${flink.version} + provided + - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + - - org.apache.flink - flink-shaded-guava - 18.0-1.0 - + + org.apache.flink + flink-shaded-guava + 18.0-1.0 + - + - - org.apache.flink - flink-test-utils_${scala.binary.version} - ${flink.version} - test - + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + test + - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - test-jar - test - + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + test-jar + test + - - org.apache.flink - flink-runtime_${scala.binary.version} - ${flink.version} - test-jar - test - + + org.apache.flink + flink-runtime_${scala.binary.version} + ${flink.version} + test-jar + test + - - org.apache.flink - flink-statebackend-rocksdb_${scala.binary.version} - ${flink.version} - test - + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + ${flink.version} + test + + + - \ No newline at end of file diff --git a/src/test/java/at/datasciencelabs/EsperStreamTest.java b/src/test/java/at/datasciencelabs/EsperStreamTest.java deleted file mode 100644 index 7786181..0000000 --- a/src/test/java/at/datasciencelabs/EsperStreamTest.java +++ /dev/null @@ -1,86 +0,0 @@ -package at.datasciencelabs; - -import com.espertech.esper.client.EventBean; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.junit.Assert; -import org.junit.Test; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsNull.notNullValue; - -public class EsperStreamTest implements Serializable { - - @Test - public void shouldSelectFromStreamUsingLambdaSelect() throws Exception { - - StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - - EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - - DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { - String name = (String) collector.get("name"); - int age = (int) collector.get("age"); - return new TestEvent(name, age); - }); - - final List result = new ArrayList<>(); - resultStream.addSink((SinkFunction) result::add); - - executionEnvironment.execute("test"); - - assertThat(result, is(notNullValue())); - assertThat(result.size(), is(3)); - } - - @Test - @SuppressWarnings("Convert2Lambda") - public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception { - StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - - EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - - DataStream resultStream = esperStream.select(new EsperSelectFunction() { - @Override - public TestEvent select(EventBean eventBean) throws Exception { - String name = (String) eventBean.get("name"); - int age = (int) eventBean.get("age"); - return new TestEvent(name, age); - } - }); - - resultStream.printToErr(); - - executionEnvironment.execute("test"); - - Assert.fail("not finished"); - } - - @Test - public void shouldCalculateAverageAge() throws Exception { - StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - - EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent"); - - DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> (Double) collector.get("average_age")); - - resultStream.printToErr(); - - executionEnvironment.execute("test"); - - Assert.fail("not finished"); - } - -} \ No newline at end of file