Skip to content

Commit

Permalink
distinguish between esper pattern and query. cleanup tests. move stat…
Browse files Browse the repository at this point in the history
…e change pattern tests to test project
  • Loading branch information
phil3k committed Jan 14, 2018
1 parent 3a5f759 commit 81c1292
Show file tree
Hide file tree
Showing 16 changed files with 253 additions and 26 deletions.
15 changes: 15 additions & 0 deletions flink-esper-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,20 @@
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class FlinkTestClass {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("file:///tmp/flink-esper-input");

EsperStream<String> esperStream = Esper.pattern(dataStream, "select bytes from String");

DataStream<String> result = esperStream.select(new EsperSelectFunction<String>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package at.datasciencelabs.test;


interface BuildEvent {
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package at.datasciencelabs;
package at.datasciencelabs.test;

public class BuildFailure implements BuildEvent {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package at.datasciencelabs;
package at.datasciencelabs.test;

public class BuildSuccess implements BuildEvent {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package at.datasciencelabs;
package at.datasciencelabs.test;

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
Expand Down
29 changes: 27 additions & 2 deletions flink-esper/src/main/java/at/datasciencelabs/Esper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,34 @@

import org.apache.flink.streaming.api.datastream.DataStream;

/**
* Utility class for complex event processing using Esper.
*
* <p>Methods which transform a {@link DataStream} into a {@link EsperStream} to do CEP.
*/
public class Esper {

public static <IN> EsperStream<IN> pattern(DataStream<IN> dataStream, String query) {
return new EsperStream<IN>(dataStream, query);
/**
* Creates a {@link EsperStream} from an input data stream and a pattern.
*
* @param input DataStream containing the input events
* @param pattern Esper pattern specification which shall be detected
* @param <IN> Type of the input events
* @return Resulting esper stream
*/
public static <IN> EsperStream<IN> pattern(DataStream<IN> input, String pattern) {
return new EsperStream<IN>(input, new EsperPattern(pattern));
}

/**
* Creates a {@link EsperStream} from an input data stream and a query.
*
* @param input DataStream containing the input events
* @param query Query of describing which events should be selected from the stream
* @param <IN> Type of the input events
* @return Resulting esper stream
*/
public static <IN> EsperStream<IN> query(DataStream<IN> input, String query) {
return new EsperStream<IN>(input, new EsperQuery(query));
}
}
18 changes: 18 additions & 0 deletions flink-esper/src/main/java/at/datasciencelabs/EsperPattern.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package at.datasciencelabs;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPStatement;

class EsperPattern implements EsperStatementFactory {

private String pattern;

EsperPattern(String pattern) {
this.pattern = pattern;
}

@Override
public EPStatement createStatement(EPAdministrator administrator) {
return administrator.createPattern(pattern);
}
}
19 changes: 19 additions & 0 deletions flink-esper/src/main/java/at/datasciencelabs/EsperQuery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package at.datasciencelabs;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPStatement;

class EsperQuery implements EsperStatementFactory {

private String query;

EsperQuery(String query) {
this.query = query;
}


@Override
public EPStatement createStatement(EPAdministrator administrator) {
return administrator.createEPL(query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package at.datasciencelabs;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPStatement;

import java.io.Serializable;

interface EsperStatementFactory extends Serializable {
EPStatement createStatement(EPAdministrator administrator);
}
14 changes: 10 additions & 4 deletions flink-esper/src/main/java/at/datasciencelabs/EsperStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@
public class EsperStream<IN> {

private final DataStream<IN> inputStream;
private final String esperQuery;
private final EsperStatementFactory esperQuery;


/**
* Create a new EsperStream instance.
* @param inputStream The input DataStream
* @param esperQuery An Esper query
*/
public EsperStream(DataStream<IN> inputStream, String esperQuery) {
EsperStream(DataStream<IN> inputStream, EsperStatementFactory esperQuery) {
this.inputStream = inputStream;
this.esperQuery = esperQuery;
}

/**
* Select from the EsperStream, must provide the return type of the output DataStream since no type information is
* currently extracted from the @see {@link EsperSelectFunction}.
* Applies a select function to the detected pattern sequence or query results. For each pattern sequence or query result the
* provided {@link EsperSelectFunction} is called. The pattern select function can produce
* exactly one resulting element.
*
* @param esperSelectFunction The pattern select function which is called for each detected pattern sequence.
* @param <R> Type of the resulting elements
* @return {@link DataStream} which contains the resulting elements from the pattern select
* function.
*/
public <R> SingleOutputStreamOperator<R> select(EsperSelectFunction<R> esperSelectFunction) {
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SelectEsperStreamOperator<KEY, IN, OUT> extends AbstractUdfStreamOp
private static final String ESPER_SERVICE_PROVIDER_STATE = "esperServiceProviderState";

/** The Esper query to execute */
private final String query;
private final EsperStatementFactory query;

/** The inferred input type of the user function */
private final TypeInformation<IN> inputType;
Expand All @@ -53,7 +53,7 @@ public class SelectEsperStreamOperator<KEY, IN, OUT> extends AbstractUdfStreamOp
* @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time)
* @param esperQuery The esper query
*/
public SelectEsperStreamOperator(TypeInformation<IN> inputStreamType, EsperSelectFunction<OUT> esperSelectFunction, boolean isProcessingTime, String esperQuery) {
public SelectEsperStreamOperator(TypeInformation<IN> inputStreamType, EsperSelectFunction<OUT> esperSelectFunction, boolean isProcessingTime, EsperStatementFactory esperQuery) {
super(esperSelectFunction);
this.inputType = inputStreamType;
this.query = esperQuery;
Expand Down Expand Up @@ -104,7 +104,7 @@ private EPServiceProvider getServiceProvider(String context) throws IOException
serviceProvider = EPServiceProviderManager.getProvider(context, configuration);
serviceProvider.getEPAdministrator().getConfiguration().addEventType(inputType.getTypeClass());
serviceProvider.getEPRuntime().sendEvent(new CurrentTimeEvent(0));
EPStatement statement = serviceProvider.getEPAdministrator().createEPL(query);
EPStatement statement = query.createStatement(serviceProvider.getEPAdministrator());

statement.addListener((newData, oldData) -> {
for (EventBean event : newData) {
Expand Down
5 changes: 0 additions & 5 deletions flink-esper/src/test/java/at/datasciencelabs/BuildEvent.java

This file was deleted.

138 changes: 138 additions & 0 deletions flink-esper/src/test/java/at/datasciencelabs/EsperPatternTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package at.datasciencelabs;

import com.espertech.esper.client.EventBean;
import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Before;
import org.junit.Test;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

public class EsperPatternTest extends StreamingMultipleProgramsTestBase implements Serializable {


private static List<ComplexEvent> resultingEvents;

@Before
public void setUp() throws Exception {
resultingEvents = Lists.newArrayList();
}

@Test
public void testEsperPattern() throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);

List<ComplexEvent> expectedValues = Lists.newArrayList();
ComplexEvent complexEvent = new ComplexEvent(Event.start(), Event.end());
expectedValues.add(complexEvent);

List<Event> events = Arrays.asList(complexEvent.getStartEvent(), complexEvent.getEndEvent());
DataStream<Event> dataStream = executionEnvironment.fromCollection(events);

EsperStream<Event> esperStream = Esper.pattern(dataStream, "every (A=Event(type='start') -> B=Event(type='end'))");

DataStream<ComplexEvent> complexEventDataStream = esperStream.select(new EsperSelectFunction<ComplexEvent>() {
@Override
public ComplexEvent select(EventBean eventBean) throws Exception {
return new ComplexEvent((Event) eventBean.get("A"), (Event) eventBean.get("B"));
}
});

complexEventDataStream.addSink(new SinkFunction<ComplexEvent>() {
@Override
public void invoke(ComplexEvent value) throws Exception {
System.err.println(value);
resultingEvents.add(value);
}
});

executionEnvironment.execute("test-2");

assertThat(resultingEvents, is(expectedValues));
}

private static class ComplexEvent {
private Event startEvent;
private Event endEvent;

ComplexEvent(Event startEvent, Event endEvent) {
this.startEvent = startEvent;
this.endEvent = endEvent;
}

Event getStartEvent() {
return startEvent;
}

Event getEndEvent() {
return endEvent;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ComplexEvent that = (ComplexEvent) o;

if (startEvent != null ? !startEvent.equals(that.startEvent) : that.startEvent != null) return false;
return endEvent != null ? endEvent.equals(that.endEvent) : that.endEvent == null;
}

@Override
public int hashCode() {
int result = startEvent != null ? startEvent.hashCode() : 0;
result = 31 * result + (endEvent != null ? endEvent.hashCode() : 0);
return result;
}
}

public static class Event {

private String type;

private Event(String type) {
this.type = type;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

static Event start() {
return new Event("start");
}

static Event end() {
return new Event("end");
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Event event = (Event) o;

return type != null ? type.equals(event.type) : event.type == null;
}

@Override
public int hashCode() {
return type != null ? type.hashCode() : 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNull.notNullValue;

public class EsperStreamTest extends StreamingMultipleProgramsTestBase implements Serializable {
public class EsperQueryTest extends StreamingMultipleProgramsTestBase implements Serializable {

private static List<TestEvent> result;
private static List<String> stringResult;
Expand All @@ -37,7 +37,7 @@ public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception {

DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));

EsperStream<TestEvent> esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
EsperStream<TestEvent> esperStream = Esper.query(dataStream, "select name, age from TestEvent");

DataStream<TestEvent> resultStream = esperStream.select(new EsperSelectFunction<TestEvent>() {
@Override
Expand Down Expand Up @@ -71,7 +71,7 @@ public void shouldSelectFromStreamUsingLambdaSelect() throws Exception {

DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter1", 10), new TestEvent("alex1", 25), new TestEvent("maria1", 30));

EsperStream<TestEvent> esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");
EsperStream<TestEvent> esperStream = Esper.query(dataStream, "select name, age from TestEvent");

DataStream<TestEvent> resultStream = esperStream.select((EsperSelectFunction<TestEvent>) collector -> {
String name = (String) collector.get("name");
Expand Down Expand Up @@ -101,7 +101,7 @@ public void shouldSelectFromStringDataStream() throws Exception {
List<String> expectedValues = Arrays.asList("first", "second");
DataStream<String> dataStream = executionEnvironment.fromCollection(expectedValues);

EsperStream<String> esperStream = Esper.pattern(dataStream, "select bytes from String");
EsperStream<String> esperStream = Esper.query(dataStream, "select bytes from String");

DataStream<String> resultStream = esperStream.select((EsperSelectFunction<String>) collector -> {
byte[] bytes = (byte[]) collector.get("bytes");
Expand Down
4 changes: 0 additions & 4 deletions flink-esper/src/test/java/at/datasciencelabs/EsperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.event.map.MapEventBean;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.junit.Ignore;
import org.junit.Test;

public class EsperTest {
Expand Down

0 comments on commit 81c1292

Please sign in to comment.