diff --git a/src/main/java/at/datasciencelabs/EsperEngineSerializer.java b/src/main/java/at/datasciencelabs/EsperEngineSerializer.java new file mode 100644 index 0000000..56bfdc9 --- /dev/null +++ b/src/main/java/at/datasciencelabs/EsperEngineSerializer.java @@ -0,0 +1,88 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EPServiceProvider; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +public class EsperEngineSerializer extends TypeSerializer { + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer duplicate() { + return this; + } + + @Override + public EPServiceProvider createInstance() { + return null; + } + + @Override + public EPServiceProvider copy(EPServiceProvider from) { + return null; + } + + @Override + public EPServiceProvider copy(EPServiceProvider from, EPServiceProvider reuse) { + return null; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(EPServiceProvider record, DataOutputView target) throws IOException { + + } + + @Override + public EPServiceProvider deserialize(DataInputView source) throws IOException { + return null; + } + + @Override + public EPServiceProvider deserialize(EPServiceProvider reuse, DataInputView source) throws IOException { + return null; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + + } + + @Override + public boolean equals(Object obj) { + return false; + } + + @Override + public boolean canEqual(Object obj) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return null; + } + + @Override + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + return null; + } +} diff --git a/src/main/java/at/datasciencelabs/EsperStream.java b/src/main/java/at/datasciencelabs/EsperStream.java index dccdefa..140f23c 100644 --- a/src/main/java/at/datasciencelabs/EsperStream.java +++ b/src/main/java/at/datasciencelabs/EsperStream.java @@ -1,12 +1,19 @@ package at.datasciencelabs; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.NullByteKeySelector; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractionException; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import java.lang.reflect.Type; + /** * A DataStream which is able to detect event sequences and patterns using Esper @@ -32,17 +39,36 @@ public EsperStream(DataStream inputStream, String esperQuery) { * Select from the EsperStream, must provide the return type of the output DataStream since no type information is * currently extracted from the @see {@link EsperSelectFunction}. */ - public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction, TypeInformation dataStreamReturnType) { + public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction) { KeySelector keySelector = new NullByteKeySelector<>(); SingleOutputStreamOperator patternStream; - // TODO until the type extractor is capable of extracting non-generic parameters, the return type has to be passed in manually + TypeInformation typeInformation = getTypeInformation(esperSelectFunction); final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; - patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", dataStreamReturnType, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery)); + patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", typeInformation, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery)); return patternStream; } + @SuppressWarnings("unchecked") + private TypeInformation getTypeInformation(EsperSelectFunction esperSelectFunction) { + try { + TypeExtractionUtils.LambdaExecutable lambdaExecutable = TypeExtractionUtils.checkAndExtractLambda(esperSelectFunction); + if (esperSelectFunction instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) esperSelectFunction).getProducedType(); + } + if (lambdaExecutable != null) { + Type type = lambdaExecutable.getReturnType(); + return (TypeInformation) TypeExtractor.createTypeInfo(type); + } + else { + return TypeExtractor.createTypeInfo(esperSelectFunction, EsperSelectFunction.class, esperSelectFunction.getClass(), 0); + } + } catch (TypeExtractionException e) { + throw new InvalidTypesException("Could not extract types.", e); + } + } + } diff --git a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index 92121dd..6f2d54e 100644 --- a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -4,6 +4,8 @@ import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EventBean; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.VoidNamespace; @@ -13,27 +15,32 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import java.io.IOException; import java.io.Serializable; /** * An operator which supports detecting event sequences and patterns using Esper. + * * @param Type of the key - * @param Type of the input stream + * @param Type of the input stream * @param Type of the output stream */ public class SelectEsperStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable, Serializable { private final String query; private final TypeInformation inputType; - private EPServiceProvider engine; + private ValueState engineState; + private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState"; + private final Object lock = new Object[0]; /** * Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper. * Currently only processing time evaluation is supported. - * @param inputStreamType type of the input DataStream + * + * @param inputStreamType type of the input DataStream * @param esperSelectFunction function to select from Esper's output - * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time) - * @param esperQuery The esper query + * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time) + * @param esperQuery The esper query */ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, String esperQuery) { super(esperSelectFunction); @@ -48,34 +55,58 @@ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelec @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - engine = EPServiceProviderManager.getDefaultProvider(); - engine.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass()); - EPStatement statement = engine.getEPAdministrator().createEPL(query); - statement.addListener((newData, oldData) -> { - for (EventBean event : newData) { - EsperSelectFunction userFunction = getUserFunction(); - try { - output.collect(new StreamRecord<>((userFunction.select(event)))); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); + + if (this.engineState == null) { + this.engineState = getRuntimeContext().getState(new ValueStateDescriptor<>(ESPER_SERVICE_PROVIDER_STATE, new EsperEngineSerializer())); + } } @Override public void processElement(StreamRecord streamRecord) throws Exception { - engine.getEPRuntime().sendEvent(streamRecord.getValue()); + EPServiceProvider esperServiceProvider = getServiceProvider(this.hashCode() + ""); + esperServiceProvider.getEPRuntime().sendEvent(streamRecord.getValue()); + this.engineState.update(esperServiceProvider); } @Override public void onEventTime(InternalTimer internalTimer) throws Exception { internalTimer.getTimestamp(); - } @Override public void onProcessingTime(InternalTimer internalTimer) throws Exception { } + + private EPServiceProvider getServiceProvider(String context) throws IOException { + EPServiceProvider serviceProvider = engineState.value(); + if (serviceProvider != null) { + return serviceProvider; + } + synchronized (lock) { + serviceProvider = engineState.value(); + if (serviceProvider == null) { + serviceProvider = EPServiceProviderManager.getProvider(context); + serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass()); + EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query); + + statement.addListener((newData, oldData) -> { + for (EventBean event : newData) { + System.out.println(event.get("name")); + EsperSelectFunction userFunction = getUserFunction(); + try { + output.collect(new StreamRecord<>((userFunction.select(event)))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + this.engineState.update(serviceProvider); + return serviceProvider; + + } else { + return engineState.value(); + } + } + } } diff --git a/src/test/java/at/datasciencelabs/EsperStreamTest.java b/src/test/java/at/datasciencelabs/EsperStreamTest.java index 9e88f30..475b3ae 100644 --- a/src/test/java/at/datasciencelabs/EsperStreamTest.java +++ b/src/test/java/at/datasciencelabs/EsperStreamTest.java @@ -1,67 +1,92 @@ package at.datasciencelabs; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import com.espertech.esper.client.EventBean; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Before; import org.junit.Test; -public class EsperStreamTest { +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; - @Test - public void testEsperStream() throws Exception { - - StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNull.notNullValue; - DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - - EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - - DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { - String name = (String) collector.get("name"); - int age = (int) collector.get("age"); - return new TestEvent(name, age); - }, TypeInformation.of(TestEvent.class)); +public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable { - resultStream.printToErr(); + private static List result; - executionEnvironment.execute("test"); + @Before + public void before() { + result = new ArrayList<>(); } @Test - public void testMoreComplexEsperStream() throws Exception { + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent"); - - DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> { - Double age = (Double) collector.get("average_age"); - return age; - }, TypeInformation.of(Double.class)); - - resultStream.printToErr(); + EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - executionEnvironment.execute("test"); + DataStream resultStream = esperStream.select(new EsperSelectFunction() { + @Override + public TestEvent select(EventBean eventBean) throws Exception { + String name = (String) eventBean.get("name"); + int age = (int) eventBean.get("age"); + return new TestEvent(name, age); + } + }); + + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(TestEvent testEvent) throws Exception { + System.err.println(testEvent); + result.add(testEvent); + } + }); + + executionEnvironment.execute("test-2"); + + assertThat(result, is(notNullValue())); + assertThat(result.size(), is(3)); } @Test - public void testStateChangeRule() throws Exception { + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStreamUsingLambdaSelect() throws Exception { + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); + + DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30)); - DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25)); + EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent"); + DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { + String name = (String) collector.get("name"); + int age = (int) collector.get("age"); + return new TestEvent(name, age); + }); - DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> { - Double age = (Double) collector.get("average_age"); - return age; - }, TypeInformation.of(Double.class)); + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(TestEvent testEvent) throws Exception { + result.add(testEvent); + } + }); - resultStream.printToErr(); + executionEnvironment.execute("test-1"); - executionEnvironment.execute("stateChange1"); + assertThat(result, is(notNullValue())); + assertThat(result.size(), is(3)); } } \ No newline at end of file diff --git a/src/test/java/at/datasciencelabs/TestEvent.java b/src/test/java/at/datasciencelabs/TestEvent.java index 9838ed8..6bf616a 100644 --- a/src/test/java/at/datasciencelabs/TestEvent.java +++ b/src/test/java/at/datasciencelabs/TestEvent.java @@ -37,4 +37,12 @@ public String getName() { public int getAge() { return age; } + + @Override + public String toString() { + return "TestEvent{" + + "name='" + name + '\'' + + ", age=" + age + + '}'; + } }