Skip to content

Commit

Permalink
restructured maven project, added test for simple String datastream, …
Browse files Browse the repository at this point in the history
…first version of deployable test program
  • Loading branch information
phil3k committed Nov 13, 2017
1 parent ef827b5 commit c9a4507
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 53 deletions.
88 changes: 88 additions & 0 deletions flink-esper-test/flink-esper-test.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="flink-esper" />
<orderEntry type="library" name="Maven: com.espertech:esper:5.3.0" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.3" level="project" />
<orderEntry type="library" name="Maven: org.antlr:antlr4-runtime:4.1" level="project" />
<orderEntry type="library" name="Maven: org.abego.treelayout:org.abego.treelayout.core:1.0.1" level="project" />
<orderEntry type="library" name="Maven: cglib:cglib-nodep:3.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-shaded-guava:18.0-1.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-core:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-annotations:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-metrics-core:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-lang3:3.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.kryo:kryo:2.24.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.minlog:minlog:1.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.objenesis:objenesis:2.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-collections:commons-collections:3.2.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-compress:1.4.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.tukaani:xz:1.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.avro:avro:1.7.7" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.13" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.thoughtworks.paranamer:paranamer:2.3" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.xerial.snappy:snappy-java:1.0.5" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:force-shading:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-streaming-java_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-runtime_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-java:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-shaded-hadoop2:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: xmlenc:xmlenc:0.52" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-codec:commons-codec:1.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-io:commons-io:2.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-net:commons-net:3.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty-util:6.1.26" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.sun.jersey:jersey-core:1.9" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-el:commons-el:1.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.jamesmurty.utils:java-xmlbuilder:0.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-lang:commons-lang:2.6" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-configuration:commons-configuration:1.7" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-digester:commons-digester:1.8.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.jcraft:jsch:0.1.42" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-beanutils:commons-beanutils-bean-collections:1.8.3" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-daemon:commons-daemon:1.0.13" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: javax.xml.bind:jaxb-api:2.2.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: javax.activation:activation:1.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: commons-cli:commons-cli:1.3.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: io.netty:netty-all:4.0.27.Final" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.javassist:javassist:3.18.2-GA" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.scala-lang:scala-library:2.11.7" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.data-artisans:flakka-actor_2.11:2.3-custom" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe:config:1.2.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.data-artisans:flakka-remote_2.11:2.3-custom" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: io.netty:netty:3.8.0.Final" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.google.protobuf:protobuf-java:2.5.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.uncommons.maths:uncommons-maths:1.2.2a" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.data-artisans:flakka-slf4j_2.11:2.3-custom" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.clapper:grizzled-slf4j_2.11:1.0.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.github.scopt:scopt_2.11:3.5.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.fasterxml.jackson.core:jackson-core:2.7.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.7.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.7.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.zookeeper:zookeeper:3.4.6" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: jline:jline:0.9.94" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: junit:junit:3.8.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.twitter:chill_2.11:0.7.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.twitter:chill-java:0.7.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-clients_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-optimizer_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-math3:3.5" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.sling:org.apache.sling.commons.json:2.0.6" level="project" />
</component>
</module>
61 changes: 61 additions & 0 deletions flink-esper-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-esper-parent</artifactId>
<groupId>at.datasciencelabs.research</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-esper-test</artifactId>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>at.datasciencelabs.test.FlinkTestClass</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>


<dependencies>
<dependency>
<groupId>at.datasciencelabs.research</groupId>
<artifactId>flink-esper</artifactId>
<version>${parent.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package at.datasciencelabs.test;

import at.datasciencelabs.Esper;
import at.datasciencelabs.EsperSelectFunction;
import at.datasciencelabs.EsperStream;
import com.espertech.esper.client.EventBean;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

public class FlinkTestClass {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-test");

EsperStream<String> esperStream = Esper.pattern(dataStream, "select bytes from String");

DataStream<String> result = esperStream.select(new EsperSelectFunction<String>() {
@Override
public String select(EventBean eventBean) throws Exception {
return new String((byte[]) eventBean.get("bytes"));
}
});

result.addSink(new PrintSinkFunction<>(true));

streamExecutionEnvironment.execute("Kafka 0.10 Example");
}

}
63 changes: 63 additions & 0 deletions flink-esper/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-esper-parent</artifactId>
<groupId>at.datasciencelabs.research</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-esper</artifactId>

<dependencies>
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper</artifactId>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
10 changes: 10 additions & 0 deletions flink-esper/src/main/java/at/datasciencelabs/Esper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package at.datasciencelabs;

import org.apache.flink.streaming.api.datastream.DataStream;

public class Esper {

public static <IN> EsperStream<IN> pattern(DataStream<IN> dataStream, String query) {
return new EsperStream<IN>(dataStream, query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ private EPServiceProvider getServiceProvider(String context) throws IOException

statement.addListener((newData, oldData) -> {
for (EventBean event : newData) {
System.out.println(event.get("name"));
EsperSelectFunction<OUT> userFunction = getUserFunction();
try {
output.collect(new StreamRecord<>((userFunction.select(event))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.CoreMatchers.is;
Expand All @@ -19,11 +20,13 @@
public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable {

private static List<TestEvent> result;
private static List<String> stringResult;


@Before
public void before() {
result = new ArrayList<>();
stringResult = new ArrayList<>();
}

@Test
Expand Down Expand Up @@ -89,4 +92,35 @@ public void invoke(TestEvent testEvent) throws Exception {
assertThat(result.size(), is(3));
}

@Test
@SuppressWarnings("Convert2Lambda")
public void shouldSelectFromStringDataStream() throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);

List<String> expectedValues = Arrays.asList("first", "second");
DataStream<String> dataStream = executionEnvironment.fromCollection(expectedValues);

EsperStream<String> esperStream = Esper.pattern(dataStream, "select bytes from String");

DataStream<String> resultStream = esperStream.select((EsperSelectFunction<String>) collector -> {
byte[] bytes = (byte[]) collector.get("bytes");
return new String(bytes);
});

resultStream.addSink(new SinkFunction<String>() {
@Override
public void invoke(String testEvent) throws Exception {
System.err.println(testEvent);
stringResult.add(testEvent);
}
});

executionEnvironment.execute("test-2");

assertThat(stringResult, is(notNullValue()));
assertThat(stringResult.size(), is(2));
assertThat(stringResult, is(expectedValues));
}

}
Loading

0 comments on commit c9a4507

Please sign in to comment.