From ef827b50a5fa9a928001cb6f6bce08950b191330 Mon Sep 17 00:00:00 2001 From: phil3k Date: Wed, 25 Oct 2017 20:44:22 +0200 Subject: [PATCH 1/3] moved Esper Engine to value state, started with type serialization --- .../EsperEngineSerializer.java | 88 +++++++++++++++++ .../java/at/datasciencelabs/EsperStream.java | 32 +++++- .../SelectEsperStreamOperator.java | 71 ++++++++++---- .../at/datasciencelabs/EsperStreamTest.java | 97 ++++++++++++------- .../java/at/datasciencelabs/TestEvent.java | 8 ++ 5 files changed, 237 insertions(+), 59 deletions(-) create mode 100644 src/main/java/at/datasciencelabs/EsperEngineSerializer.java diff --git a/src/main/java/at/datasciencelabs/EsperEngineSerializer.java b/src/main/java/at/datasciencelabs/EsperEngineSerializer.java new file mode 100644 index 0000000..56bfdc9 --- /dev/null +++ b/src/main/java/at/datasciencelabs/EsperEngineSerializer.java @@ -0,0 +1,88 @@ +package at.datasciencelabs; + +import com.espertech.esper.client.EPServiceProvider; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +public class EsperEngineSerializer extends TypeSerializer { + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer duplicate() { + return this; + } + + @Override + public EPServiceProvider createInstance() { + return null; + } + + @Override + public EPServiceProvider copy(EPServiceProvider from) { + return null; + } + + @Override + public EPServiceProvider copy(EPServiceProvider from, EPServiceProvider reuse) { + return null; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(EPServiceProvider record, DataOutputView target) throws IOException { + + } + + @Override + public EPServiceProvider deserialize(DataInputView source) throws IOException { + return null; + } + + @Override + public EPServiceProvider deserialize(EPServiceProvider reuse, DataInputView source) throws IOException { + return null; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + + } + + @Override + public boolean equals(Object obj) { + return false; + } + + @Override + public boolean canEqual(Object obj) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return null; + } + + @Override + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + return null; + } +} diff --git a/src/main/java/at/datasciencelabs/EsperStream.java b/src/main/java/at/datasciencelabs/EsperStream.java index dccdefa..140f23c 100644 --- a/src/main/java/at/datasciencelabs/EsperStream.java +++ b/src/main/java/at/datasciencelabs/EsperStream.java @@ -1,12 +1,19 @@ package at.datasciencelabs; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.NullByteKeySelector; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractionException; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import java.lang.reflect.Type; + /** * A DataStream which is able to detect event sequences and patterns using Esper @@ -32,17 +39,36 @@ public EsperStream(DataStream inputStream, String esperQuery) { * Select from the EsperStream, must provide the return type of the output DataStream since no type information is * currently extracted from the @see {@link EsperSelectFunction}. */ - public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction, TypeInformation dataStreamReturnType) { + public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction) { KeySelector keySelector = new NullByteKeySelector<>(); SingleOutputStreamOperator patternStream; - // TODO until the type extractor is capable of extracting non-generic parameters, the return type has to be passed in manually + TypeInformation typeInformation = getTypeInformation(esperSelectFunction); final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; - patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", dataStreamReturnType, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery)); + patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", typeInformation, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery)); return patternStream; } + @SuppressWarnings("unchecked") + private TypeInformation getTypeInformation(EsperSelectFunction esperSelectFunction) { + try { + TypeExtractionUtils.LambdaExecutable lambdaExecutable = TypeExtractionUtils.checkAndExtractLambda(esperSelectFunction); + if (esperSelectFunction instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) esperSelectFunction).getProducedType(); + } + if (lambdaExecutable != null) { + Type type = lambdaExecutable.getReturnType(); + return (TypeInformation) TypeExtractor.createTypeInfo(type); + } + else { + return TypeExtractor.createTypeInfo(esperSelectFunction, EsperSelectFunction.class, esperSelectFunction.getClass(), 0); + } + } catch (TypeExtractionException e) { + throw new InvalidTypesException("Could not extract types.", e); + } + } + } diff --git a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index 92121dd..6f2d54e 100644 --- a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -4,6 +4,8 @@ import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EventBean; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.VoidNamespace; @@ -13,27 +15,32 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import java.io.IOException; import java.io.Serializable; /** * An operator which supports detecting event sequences and patterns using Esper. + * * @param Type of the key - * @param Type of the input stream + * @param Type of the input stream * @param Type of the output stream */ public class SelectEsperStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable, Serializable { private final String query; private final TypeInformation inputType; - private EPServiceProvider engine; + private ValueState engineState; + private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState"; + private final Object lock = new Object[0]; /** * Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper. * Currently only processing time evaluation is supported. - * @param inputStreamType type of the input DataStream + * + * @param inputStreamType type of the input DataStream * @param esperSelectFunction function to select from Esper's output - * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time) - * @param esperQuery The esper query + * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time) + * @param esperQuery The esper query */ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, String esperQuery) { super(esperSelectFunction); @@ -48,34 +55,58 @@ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelec @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - engine = EPServiceProviderManager.getDefaultProvider(); - engine.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass()); - EPStatement statement = engine.getEPAdministrator().createEPL(query); - statement.addListener((newData, oldData) -> { - for (EventBean event : newData) { - EsperSelectFunction userFunction = getUserFunction(); - try { - output.collect(new StreamRecord<>((userFunction.select(event)))); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); + + if (this.engineState == null) { + this.engineState = getRuntimeContext().getState(new ValueStateDescriptor<>(ESPER_SERVICE_PROVIDER_STATE, new EsperEngineSerializer())); + } } @Override public void processElement(StreamRecord streamRecord) throws Exception { - engine.getEPRuntime().sendEvent(streamRecord.getValue()); + EPServiceProvider esperServiceProvider = getServiceProvider(this.hashCode() + ""); + esperServiceProvider.getEPRuntime().sendEvent(streamRecord.getValue()); + this.engineState.update(esperServiceProvider); } @Override public void onEventTime(InternalTimer internalTimer) throws Exception { internalTimer.getTimestamp(); - } @Override public void onProcessingTime(InternalTimer internalTimer) throws Exception { } + + private EPServiceProvider getServiceProvider(String context) throws IOException { + EPServiceProvider serviceProvider = engineState.value(); + if (serviceProvider != null) { + return serviceProvider; + } + synchronized (lock) { + serviceProvider = engineState.value(); + if (serviceProvider == null) { + serviceProvider = EPServiceProviderManager.getProvider(context); + serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass()); + EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query); + + statement.addListener((newData, oldData) -> { + for (EventBean event : newData) { + System.out.println(event.get("name")); + EsperSelectFunction userFunction = getUserFunction(); + try { + output.collect(new StreamRecord<>((userFunction.select(event)))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + this.engineState.update(serviceProvider); + return serviceProvider; + + } else { + return engineState.value(); + } + } + } } diff --git a/src/test/java/at/datasciencelabs/EsperStreamTest.java b/src/test/java/at/datasciencelabs/EsperStreamTest.java index 9e88f30..475b3ae 100644 --- a/src/test/java/at/datasciencelabs/EsperStreamTest.java +++ b/src/test/java/at/datasciencelabs/EsperStreamTest.java @@ -1,67 +1,92 @@ package at.datasciencelabs; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import com.espertech.esper.client.EventBean; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Before; import org.junit.Test; -public class EsperStreamTest { +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; - @Test - public void testEsperStream() throws Exception { - - StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNull.notNullValue; - DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - - EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - - DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { - String name = (String) collector.get("name"); - int age = (int) collector.get("age"); - return new TestEvent(name, age); - }, TypeInformation.of(TestEvent.class)); +public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable { - resultStream.printToErr(); + private static List result; - executionEnvironment.execute("test"); + @Before + public void before() { + result = new ArrayList<>(); } @Test - public void testMoreComplexEsperStream() throws Exception { + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent"); - - DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> { - Double age = (Double) collector.get("average_age"); - return age; - }, TypeInformation.of(Double.class)); - - resultStream.printToErr(); + EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - executionEnvironment.execute("test"); + DataStream resultStream = esperStream.select(new EsperSelectFunction() { + @Override + public TestEvent select(EventBean eventBean) throws Exception { + String name = (String) eventBean.get("name"); + int age = (int) eventBean.get("age"); + return new TestEvent(name, age); + } + }); + + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(TestEvent testEvent) throws Exception { + System.err.println(testEvent); + result.add(testEvent); + } + }); + + executionEnvironment.execute("test-2"); + + assertThat(result, is(notNullValue())); + assertThat(result.size(), is(3)); } @Test - public void testStateChangeRule() throws Exception { + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStreamUsingLambdaSelect() throws Exception { + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); + + DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30)); - DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25)); + EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent"); + DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { + String name = (String) collector.get("name"); + int age = (int) collector.get("age"); + return new TestEvent(name, age); + }); - DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> { - Double age = (Double) collector.get("average_age"); - return age; - }, TypeInformation.of(Double.class)); + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(TestEvent testEvent) throws Exception { + result.add(testEvent); + } + }); - resultStream.printToErr(); + executionEnvironment.execute("test-1"); - executionEnvironment.execute("stateChange1"); + assertThat(result, is(notNullValue())); + assertThat(result.size(), is(3)); } } \ No newline at end of file diff --git a/src/test/java/at/datasciencelabs/TestEvent.java b/src/test/java/at/datasciencelabs/TestEvent.java index 9838ed8..6bf616a 100644 --- a/src/test/java/at/datasciencelabs/TestEvent.java +++ b/src/test/java/at/datasciencelabs/TestEvent.java @@ -37,4 +37,12 @@ public String getName() { public int getAge() { return age; } + + @Override + public String toString() { + return "TestEvent{" + + "name='" + name + '\'' + + ", age=" + age + + '}'; + } } From c9a45076797661e8ee55d12d18a9cb039aedd804 Mon Sep 17 00:00:00 2001 From: phil3k Date: Mon, 13 Nov 2017 19:21:14 +0100 Subject: [PATCH 2/3] restructured maven project, added test for simple String datastream, first version of deployable test program --- flink-esper-test/flink-esper-test.iml | 88 ++++++++++++++ flink-esper-test/pom.xml | 61 ++++++++++ .../datasciencelabs/test/FlinkTestClass.java | 31 +++++ flink-esper/pom.xml | 63 ++++++++++ .../main/java/at/datasciencelabs/Esper.java | 10 ++ .../EsperEngineSerializer.java | 0 .../datasciencelabs/EsperSelectFunction.java | 0 .../java/at/datasciencelabs/EsperStream.java | 0 .../SelectEsperStreamOperator.java | 1 - .../at/datasciencelabs/EsperStreamTest.java | 34 ++++++ .../java/at/datasciencelabs/EsperTest.java | 0 .../java/at/datasciencelabs/TestEvent.java | 0 pom.xml | 111 ++++++++++-------- 13 files changed, 346 insertions(+), 53 deletions(-) create mode 100644 flink-esper-test/flink-esper-test.iml create mode 100644 flink-esper-test/pom.xml create mode 100644 flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java create mode 100644 flink-esper/pom.xml create mode 100644 flink-esper/src/main/java/at/datasciencelabs/Esper.java rename {src => flink-esper/src}/main/java/at/datasciencelabs/EsperEngineSerializer.java (100%) rename {src => flink-esper/src}/main/java/at/datasciencelabs/EsperSelectFunction.java (100%) rename {src => flink-esper/src}/main/java/at/datasciencelabs/EsperStream.java (100%) rename {src => flink-esper/src}/main/java/at/datasciencelabs/SelectEsperStreamOperator.java (98%) rename {src => flink-esper/src}/test/java/at/datasciencelabs/EsperStreamTest.java (71%) rename {src => flink-esper/src}/test/java/at/datasciencelabs/EsperTest.java (100%) rename {src => flink-esper/src}/test/java/at/datasciencelabs/TestEvent.java (100%) diff --git a/flink-esper-test/flink-esper-test.iml b/flink-esper-test/flink-esper-test.iml new file mode 100644 index 0000000..12ebfeb --- /dev/null +++ b/flink-esper-test/flink-esper-test.iml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/flink-esper-test/pom.xml b/flink-esper-test/pom.xml new file mode 100644 index 0000000..7e29f91 --- /dev/null +++ b/flink-esper-test/pom.xml @@ -0,0 +1,61 @@ + + + + flink-esper-parent + at.datasciencelabs.research + 0.0.1-SNAPSHOT + + 4.0.0 + + flink-esper-test + + + + + maven-assembly-plugin + + + + at.datasciencelabs.test.FlinkTestClass + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + + + at.datasciencelabs.research + flink-esper + ${parent.version} + + + + org.apache.flink + flink-core + provided + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + provided + + + \ No newline at end of file diff --git a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java new file mode 100644 index 0000000..c37f240 --- /dev/null +++ b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java @@ -0,0 +1,31 @@ +package at.datasciencelabs.test; + +import at.datasciencelabs.Esper; +import at.datasciencelabs.EsperSelectFunction; +import at.datasciencelabs.EsperStream; +import com.espertech.esper.client.EventBean; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; + +public class FlinkTestClass { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-test"); + + EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); + + DataStream result = esperStream.select(new EsperSelectFunction() { + @Override + public String select(EventBean eventBean) throws Exception { + return new String((byte[]) eventBean.get("bytes")); + } + }); + + result.addSink(new PrintSinkFunction<>(true)); + + streamExecutionEnvironment.execute("Kafka 0.10 Example"); + } + +} diff --git a/flink-esper/pom.xml b/flink-esper/pom.xml new file mode 100644 index 0000000..b22d626 --- /dev/null +++ b/flink-esper/pom.xml @@ -0,0 +1,63 @@ + + + + flink-esper-parent + at.datasciencelabs.research + 0.0.1-SNAPSHOT + + 4.0.0 + + flink-esper + + + + com.espertech + esper + + + + org.apache.flink + flink-core + provided + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + provided + + + + org.apache.flink + flink-shaded-guava + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + test + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + test-jar + test + + + + org.apache.flink + flink-runtime_${scala.binary.version} + test-jar + test + + + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + test + + + \ No newline at end of file diff --git a/flink-esper/src/main/java/at/datasciencelabs/Esper.java b/flink-esper/src/main/java/at/datasciencelabs/Esper.java new file mode 100644 index 0000000..ac4c8a1 --- /dev/null +++ b/flink-esper/src/main/java/at/datasciencelabs/Esper.java @@ -0,0 +1,10 @@ +package at.datasciencelabs; + +import org.apache.flink.streaming.api.datastream.DataStream; + +public class Esper { + + public static EsperStream pattern(DataStream dataStream, String query) { + return new EsperStream(dataStream, query); + } +} diff --git a/src/main/java/at/datasciencelabs/EsperEngineSerializer.java b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java similarity index 100% rename from src/main/java/at/datasciencelabs/EsperEngineSerializer.java rename to flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java diff --git a/src/main/java/at/datasciencelabs/EsperSelectFunction.java b/flink-esper/src/main/java/at/datasciencelabs/EsperSelectFunction.java similarity index 100% rename from src/main/java/at/datasciencelabs/EsperSelectFunction.java rename to flink-esper/src/main/java/at/datasciencelabs/EsperSelectFunction.java diff --git a/src/main/java/at/datasciencelabs/EsperStream.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java similarity index 100% rename from src/main/java/at/datasciencelabs/EsperStream.java rename to flink-esper/src/main/java/at/datasciencelabs/EsperStream.java diff --git a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java similarity index 98% rename from src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java rename to flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index 6f2d54e..bf6a54e 100644 --- a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -92,7 +92,6 @@ private EPServiceProvider getServiceProvider(String context) throws IOException statement.addListener((newData, oldData) -> { for (EventBean event : newData) { - System.out.println(event.get("name")); EsperSelectFunction userFunction = getUserFunction(); try { output.collect(new StreamRecord<>((userFunction.select(event)))); diff --git a/src/test/java/at/datasciencelabs/EsperStreamTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java similarity index 71% rename from src/test/java/at/datasciencelabs/EsperStreamTest.java rename to flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java index 475b3ae..2f5c5c4 100644 --- a/src/test/java/at/datasciencelabs/EsperStreamTest.java +++ b/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java @@ -10,6 +10,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.hamcrest.CoreMatchers.is; @@ -19,11 +20,13 @@ public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable { private static List result; + private static List stringResult; @Before public void before() { result = new ArrayList<>(); + stringResult = new ArrayList<>(); } @Test @@ -89,4 +92,35 @@ public void invoke(TestEvent testEvent) throws Exception { assertThat(result.size(), is(3)); } + @Test + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStringDataStream() throws Exception { + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); + + List expectedValues = Arrays.asList("first", "second"); + DataStream dataStream = executionEnvironment.fromCollection(expectedValues); + + EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); + + DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { + byte[] bytes = (byte[]) collector.get("bytes"); + return new String(bytes); + }); + + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(String testEvent) throws Exception { + System.err.println(testEvent); + stringResult.add(testEvent); + } + }); + + executionEnvironment.execute("test-2"); + + assertThat(stringResult, is(notNullValue())); + assertThat(stringResult.size(), is(2)); + assertThat(stringResult, is(expectedValues)); + } + } \ No newline at end of file diff --git a/src/test/java/at/datasciencelabs/EsperTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java similarity index 100% rename from src/test/java/at/datasciencelabs/EsperTest.java rename to flink-esper/src/test/java/at/datasciencelabs/EsperTest.java diff --git a/src/test/java/at/datasciencelabs/TestEvent.java b/flink-esper/src/test/java/at/datasciencelabs/TestEvent.java similarity index 100% rename from src/test/java/at/datasciencelabs/TestEvent.java rename to flink-esper/src/test/java/at/datasciencelabs/TestEvent.java diff --git a/pom.xml b/pom.xml index 1c6c2ec..42ae760 100644 --- a/pom.xml +++ b/pom.xml @@ -5,8 +5,13 @@ 4.0.0 at.datasciencelabs.research - flink-esper + flink-esper-parent + pom 0.0.1-SNAPSHOT + + flink-esper + flink-esper-test + 2.11 @@ -15,65 +20,67 @@ 1.8 - - - com.espertech - esper - 5.3.0 - + + + + com.espertech + esper + 5.3.0 + - - org.apache.flink - flink-core - ${flink.version} - provided - + + org.apache.flink + flink-core + ${flink.version} + provided + - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + - - org.apache.flink - flink-shaded-guava - 18.0-1.0 - + + org.apache.flink + flink-shaded-guava + 18.0-1.0 + - + - - org.apache.flink - flink-test-utils_${scala.binary.version} - ${flink.version} - test - + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + test + - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - test-jar - test - + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + test-jar + test + - - org.apache.flink - flink-runtime_${scala.binary.version} - ${flink.version} - test-jar - test - + + org.apache.flink + flink-runtime_${scala.binary.version} + ${flink.version} + test-jar + test + - - org.apache.flink - flink-statebackend-rocksdb_${scala.binary.version} - ${flink.version} - test - + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + ${flink.version} + test + + + - \ No newline at end of file From 2d6214b194b5a5ab1d974f181a891090b1530d26 Mon Sep 17 00:00:00 2001 From: phil3k Date: Mon, 13 Nov 2017 19:23:31 +0100 Subject: [PATCH 3/3] removed project file --- flink-esper.iml | 96 ------------------------------------------------- 1 file changed, 96 deletions(-) delete mode 100644 flink-esper.iml diff --git a/flink-esper.iml b/flink-esper.iml deleted file mode 100644 index 1d93ba2..0000000 --- a/flink-esper.iml +++ /dev/null @@ -1,96 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file