Skip to content

Commit

Permalink
Merge pull request phil3k3#2 from phil3k3/correcting-processing-time
Browse files Browse the repository at this point in the history
Distinguish between pattern and query
  • Loading branch information
phil3k3 authored Jan 16, 2018
2 parents 1aa5914 + 6e001d7 commit 548857e
Show file tree
Hide file tree
Showing 17 changed files with 580 additions and 111 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

# IntelliJ project files
*.iml
.idea/*
88 changes: 0 additions & 88 deletions flink-esper-test/flink-esper-test.iml

This file was deleted.

15 changes: 15 additions & 0 deletions flink-esper-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,20 @@
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,28 @@
import com.espertech.esper.client.EventBean;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

public class FlinkTestClass {

@SuppressWarnings("Convert2Lambda")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-test");

DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-esper-input");
EsperStream<String> esperStream = Esper.pattern(dataStream, "select bytes from String");

DataStream<String> result = esperStream.select(new EsperSelectFunction<String>() {
private static final long serialVersionUID = 7093943872082195786L;

@Override
public String select(EventBean eventBean) throws Exception {
return new String((byte[]) eventBean.get("bytes"));
}
});

result.addSink(new PrintSinkFunction<>(true));
result.writeAsText("file:///tmp/flink-esper-output");

streamExecutionEnvironment.execute("Kafka 0.10 Example");
streamExecutionEnvironment.execute("Simple Flink Esper Example");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package at.datasciencelabs.test;


interface BuildEvent {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package at.datasciencelabs.test;

public class BuildFailure implements BuildEvent {

private String project;
private int buildId;

BuildFailure(String project, int buildId) {
this.project = project;
this.buildId = buildId;
}

public int getBuildId() {
return buildId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

BuildFailure that = (BuildFailure) o;

if (buildId != that.buildId) return false;
return project != null ? project.equals(that.project) : that.project == null;
}

@Override
public int hashCode() {
int result = project != null ? project.hashCode() : 0;
result = 31 * result + buildId;
return result;
}

public void setBuildId(int buildId) {
this.buildId = buildId;
}

public String getProject() {
return project;
}

public void setProject(String project) {
this.project = project;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package at.datasciencelabs.test;

public class BuildSuccess implements BuildEvent {

private String project;
private int buildId;

BuildSuccess(String project, int buildId) {
this.project = project;
this.buildId = buildId;
}

public String getProject() {
return project;
}

public void setProject(String project) {
this.project = project;
}


public int getBuildId() {
return buildId;
}

public void setBuildId(int buildId) {
this.buildId = buildId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

BuildSuccess that = (BuildSuccess) o;

if (buildId != that.buildId) return false;
return project != null ? project.equals(that.project) : that.project == null;
}

@Override
public int hashCode() {
int result = project != null ? project.hashCode() : 0;
result = 31 * result + buildId;
return result;
}
}
Loading

0 comments on commit 548857e

Please sign in to comment.