Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/phil3k3/flink-esper
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/test/java/at/datasciencelabs/EsperStreamTest.java
  • Loading branch information
phil3k committed Nov 13, 2017
2 parents d26a7c2 + 1991b73 commit 4f31c59
Show file tree
Hide file tree
Showing 14 changed files with 503 additions and 173 deletions.
22 changes: 7 additions & 15 deletions flink-esper.iml → flink-esper-test/flink-esper-test.iml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="flink-esper" />
<orderEntry type="library" name="Maven: com.espertech:esper:5.3.0" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.3" level="project" />
<orderEntry type="library" name="Maven: org.antlr:antlr4-runtime:4.1" level="project" />
<orderEntry type="library" name="Maven: org.abego.treelayout:org.abego.treelayout.core:1.0.1" level="project" />
<orderEntry type="library" name="Maven: cglib:cglib-nodep:3.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-shaded-guava:18.0-1.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-core:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-annotations:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-metrics-core:1.3.2" level="project" />
Expand All @@ -37,19 +39,6 @@
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:force-shading:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-streaming-java_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-runtime_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-clients_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-optimizer_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-math3:3.5" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.sling:org.apache.sling.commons.json:2.0.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-shaded-guava:18.0-1.0" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.apache.flink:flink-test-utils_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.apache.flink:flink-test-utils-junit:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.apache.curator:curator-test:2.12.0" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.google.guava:guava:16.0.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.apache.flink:flink-streaming-java_2.11:test-jar:tests:1.3.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.apache.flink:flink-runtime_2.11:test-jar:tests:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-java:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-shaded-hadoop2:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: xmlenc:xmlenc:0.52" level="project" />
Expand Down Expand Up @@ -88,9 +77,12 @@
<orderEntry type="library" scope="PROVIDED" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.7.0" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.zookeeper:zookeeper:3.4.6" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: jline:jline:0.9.94" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: junit:junit:3.8.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.twitter:chill_2.11:0.7.4" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: com.twitter:chill-java:0.7.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.apache.flink:flink-statebackend-rocksdb_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.data-artisans:frocksdbjni:4.11.2-artisans" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-clients_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-optimizer_2.11:1.3.2" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-math3:3.5" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.sling:org.apache.sling.commons.json:2.0.6" level="project" />
</component>
</module>
61 changes: 61 additions & 0 deletions flink-esper-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-esper-parent</artifactId>
<groupId>at.datasciencelabs.research</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-esper-test</artifactId>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>at.datasciencelabs.test.FlinkTestClass</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>


<dependencies>
<dependency>
<groupId>at.datasciencelabs.research</groupId>
<artifactId>flink-esper</artifactId>
<version>${parent.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package at.datasciencelabs.test;

import at.datasciencelabs.Esper;
import at.datasciencelabs.EsperSelectFunction;
import at.datasciencelabs.EsperStream;
import com.espertech.esper.client.EventBean;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

public class FlinkTestClass {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-test");

EsperStream<String> esperStream = Esper.pattern(dataStream, "select bytes from String");

DataStream<String> result = esperStream.select(new EsperSelectFunction<String>() {
@Override
public String select(EventBean eventBean) throws Exception {
return new String((byte[]) eventBean.get("bytes"));
}
});

result.addSink(new PrintSinkFunction<>(true));

streamExecutionEnvironment.execute("Kafka 0.10 Example");
}

}
63 changes: 63 additions & 0 deletions flink-esper/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-esper-parent</artifactId>
<groupId>at.datasciencelabs.research</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-esper</artifactId>

<dependencies>
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper</artifactId>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
10 changes: 10 additions & 0 deletions flink-esper/src/main/java/at/datasciencelabs/Esper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package at.datasciencelabs;

import org.apache.flink.streaming.api.datastream.DataStream;

public class Esper {

public static <IN> EsperStream<IN> pattern(DataStream<IN> dataStream, String query) {
return new EsperStream<IN>(dataStream, query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package at.datasciencelabs;

import com.espertech.esper.client.EPServiceProvider;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;

public class EsperEngineSerializer extends TypeSerializer<EPServiceProvider> {

@Override
public boolean isImmutableType() {
return false;
}

@Override
public TypeSerializer<EPServiceProvider> duplicate() {
return this;
}

@Override
public EPServiceProvider createInstance() {
return null;
}

@Override
public EPServiceProvider copy(EPServiceProvider from) {
return null;
}

@Override
public EPServiceProvider copy(EPServiceProvider from, EPServiceProvider reuse) {
return null;
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(EPServiceProvider record, DataOutputView target) throws IOException {

}

@Override
public EPServiceProvider deserialize(DataInputView source) throws IOException {
return null;
}

@Override
public EPServiceProvider deserialize(EPServiceProvider reuse, DataInputView source) throws IOException {
return null;
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {

}

@Override
public boolean equals(Object obj) {
return false;
}

@Override
public boolean canEqual(Object obj) {
return false;
}

@Override
public int hashCode() {
return 0;
}

@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
return null;
}

@Override
public CompatibilityResult<EPServiceProvider> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
return null;
}
}
Loading

0 comments on commit 4f31c59

Please sign in to comment.