diff --git a/flink-esper-test/flink-esper-test.iml b/flink-esper-test/flink-esper-test.iml new file mode 100644 index 0000000..12ebfeb --- /dev/null +++ b/flink-esper-test/flink-esper-test.iml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/flink-esper-test/pom.xml b/flink-esper-test/pom.xml new file mode 100644 index 0000000..7e29f91 --- /dev/null +++ b/flink-esper-test/pom.xml @@ -0,0 +1,61 @@ + + + + flink-esper-parent + at.datasciencelabs.research + 0.0.1-SNAPSHOT + + 4.0.0 + + flink-esper-test + + + + + maven-assembly-plugin + + + + at.datasciencelabs.test.FlinkTestClass + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + + + at.datasciencelabs.research + flink-esper + ${parent.version} + + + + org.apache.flink + flink-core + provided + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + provided + + + \ No newline at end of file diff --git a/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java new file mode 100644 index 0000000..c37f240 --- /dev/null +++ b/flink-esper-test/src/main/java/at/datasciencelabs/test/FlinkTestClass.java @@ -0,0 +1,31 @@ +package at.datasciencelabs.test; + +import at.datasciencelabs.Esper; +import at.datasciencelabs.EsperSelectFunction; +import at.datasciencelabs.EsperStream; +import com.espertech.esper.client.EventBean; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; + +public class FlinkTestClass { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-test"); + + EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); + + DataStream result = esperStream.select(new EsperSelectFunction() { + @Override + public String select(EventBean eventBean) throws Exception { + return new String((byte[]) eventBean.get("bytes")); + } + }); + + result.addSink(new PrintSinkFunction<>(true)); + + streamExecutionEnvironment.execute("Kafka 0.10 Example"); + } + +} diff --git a/flink-esper/pom.xml b/flink-esper/pom.xml new file mode 100644 index 0000000..b22d626 --- /dev/null +++ b/flink-esper/pom.xml @@ -0,0 +1,63 @@ + + + + flink-esper-parent + at.datasciencelabs.research + 0.0.1-SNAPSHOT + + 4.0.0 + + flink-esper + + + + com.espertech + esper + + + + org.apache.flink + flink-core + provided + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + provided + + + + org.apache.flink + flink-shaded-guava + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + test + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + test-jar + test + + + + org.apache.flink + flink-runtime_${scala.binary.version} + test-jar + test + + + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + test + + + \ No newline at end of file diff --git a/flink-esper/src/main/java/at/datasciencelabs/Esper.java b/flink-esper/src/main/java/at/datasciencelabs/Esper.java new file mode 100644 index 0000000..ac4c8a1 --- /dev/null +++ b/flink-esper/src/main/java/at/datasciencelabs/Esper.java @@ -0,0 +1,10 @@ +package at.datasciencelabs; + +import org.apache.flink.streaming.api.datastream.DataStream; + +public class Esper { + + public static EsperStream pattern(DataStream dataStream, String query) { + return new EsperStream(dataStream, query); + } +} diff --git a/src/main/java/at/datasciencelabs/EsperEngineSerializer.java b/flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java similarity index 100% rename from src/main/java/at/datasciencelabs/EsperEngineSerializer.java rename to flink-esper/src/main/java/at/datasciencelabs/EsperEngineSerializer.java diff --git a/src/main/java/at/datasciencelabs/EsperSelectFunction.java b/flink-esper/src/main/java/at/datasciencelabs/EsperSelectFunction.java similarity index 100% rename from src/main/java/at/datasciencelabs/EsperSelectFunction.java rename to flink-esper/src/main/java/at/datasciencelabs/EsperSelectFunction.java diff --git a/src/main/java/at/datasciencelabs/EsperStream.java b/flink-esper/src/main/java/at/datasciencelabs/EsperStream.java similarity index 100% rename from src/main/java/at/datasciencelabs/EsperStream.java rename to flink-esper/src/main/java/at/datasciencelabs/EsperStream.java diff --git a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java similarity index 98% rename from src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java rename to flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index 6f2d54e..bf6a54e 100644 --- a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/flink-esper/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -92,7 +92,6 @@ private EPServiceProvider getServiceProvider(String context) throws IOException statement.addListener((newData, oldData) -> { for (EventBean event : newData) { - System.out.println(event.get("name")); EsperSelectFunction userFunction = getUserFunction(); try { output.collect(new StreamRecord<>((userFunction.select(event)))); diff --git a/src/test/java/at/datasciencelabs/EsperStreamTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java similarity index 71% rename from src/test/java/at/datasciencelabs/EsperStreamTest.java rename to flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java index 475b3ae..2f5c5c4 100644 --- a/src/test/java/at/datasciencelabs/EsperStreamTest.java +++ b/flink-esper/src/test/java/at/datasciencelabs/EsperStreamTest.java @@ -10,6 +10,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.hamcrest.CoreMatchers.is; @@ -19,11 +20,13 @@ public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable { private static List result; + private static List stringResult; @Before public void before() { result = new ArrayList<>(); + stringResult = new ArrayList<>(); } @Test @@ -89,4 +92,35 @@ public void invoke(TestEvent testEvent) throws Exception { assertThat(result.size(), is(3)); } + @Test + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStringDataStream() throws Exception { + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.setParallelism(1); + + List expectedValues = Arrays.asList("first", "second"); + DataStream dataStream = executionEnvironment.fromCollection(expectedValues); + + EsperStream esperStream = Esper.pattern(dataStream, "select bytes from String"); + + DataStream resultStream = esperStream.select((EsperSelectFunction) collector -> { + byte[] bytes = (byte[]) collector.get("bytes"); + return new String(bytes); + }); + + resultStream.addSink(new SinkFunction() { + @Override + public void invoke(String testEvent) throws Exception { + System.err.println(testEvent); + stringResult.add(testEvent); + } + }); + + executionEnvironment.execute("test-2"); + + assertThat(stringResult, is(notNullValue())); + assertThat(stringResult.size(), is(2)); + assertThat(stringResult, is(expectedValues)); + } + } \ No newline at end of file diff --git a/src/test/java/at/datasciencelabs/EsperTest.java b/flink-esper/src/test/java/at/datasciencelabs/EsperTest.java similarity index 100% rename from src/test/java/at/datasciencelabs/EsperTest.java rename to flink-esper/src/test/java/at/datasciencelabs/EsperTest.java diff --git a/src/test/java/at/datasciencelabs/TestEvent.java b/flink-esper/src/test/java/at/datasciencelabs/TestEvent.java similarity index 100% rename from src/test/java/at/datasciencelabs/TestEvent.java rename to flink-esper/src/test/java/at/datasciencelabs/TestEvent.java diff --git a/pom.xml b/pom.xml index 1c6c2ec..42ae760 100644 --- a/pom.xml +++ b/pom.xml @@ -5,8 +5,13 @@ 4.0.0 at.datasciencelabs.research - flink-esper + flink-esper-parent + pom 0.0.1-SNAPSHOT + + flink-esper + flink-esper-test + 2.11 @@ -15,65 +20,67 @@ 1.8 - - - com.espertech - esper - 5.3.0 - + + + + com.espertech + esper + 5.3.0 + - - org.apache.flink - flink-core - ${flink.version} - provided - + + org.apache.flink + flink-core + ${flink.version} + provided + - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + - - org.apache.flink - flink-shaded-guava - 18.0-1.0 - + + org.apache.flink + flink-shaded-guava + 18.0-1.0 + - + - - org.apache.flink - flink-test-utils_${scala.binary.version} - ${flink.version} - test - + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + test + - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - test-jar - test - + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + test-jar + test + - - org.apache.flink - flink-runtime_${scala.binary.version} - ${flink.version} - test-jar - test - + + org.apache.flink + flink-runtime_${scala.binary.version} + ${flink.version} + test-jar + test + - - org.apache.flink - flink-statebackend-rocksdb_${scala.binary.version} - ${flink.version} - test - + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + ${flink.version} + test + + + - \ No newline at end of file