Skip to content

Commit

Permalink
type extraction, started with unit tests WIP (DO NOT PUSH LIKE THIS!)
Browse files Browse the repository at this point in the history
  • Loading branch information
phil3k committed Oct 25, 2017
1 parent 73c71ae commit d26a7c2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 21 deletions.
32 changes: 29 additions & 3 deletions src/main/java/at/datasciencelabs/EsperStream.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package at.datasciencelabs;

import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractionException;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import java.lang.reflect.Type;


/**
* A DataStream which is able to detect event sequences and patterns using Esper
Expand All @@ -32,17 +39,36 @@ public EsperStream(DataStream<IN> inputStream, String esperQuery) {
* Select from the EsperStream, must provide the return type of the output DataStream since no type information is
* currently extracted from the @see {@link EsperSelectFunction}.
*/
public <R> SingleOutputStreamOperator<R> select(EsperSelectFunction<R> esperSelectFunction, TypeInformation<R> dataStreamReturnType) {
public <R> SingleOutputStreamOperator<R> select(EsperSelectFunction<R> esperSelectFunction) {
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

SingleOutputStreamOperator<R> patternStream;

// TODO until the type extractor is capable of extracting non-generic parameters, the return type has to be passed in manually
TypeInformation<R> typeInformation = getTypeInformation(esperSelectFunction);

final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", dataStreamReturnType, new SelectEsperStreamOperator<Byte, IN, R>(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery));
patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", typeInformation, new SelectEsperStreamOperator<Byte, IN, R>(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery));

return patternStream;
}

@SuppressWarnings("unchecked")
private <OUT> TypeInformation<OUT> getTypeInformation(EsperSelectFunction<OUT> esperSelectFunction) {
try {
TypeExtractionUtils.LambdaExecutable lambdaExecutable = TypeExtractionUtils.checkAndExtractLambda(esperSelectFunction);
if (esperSelectFunction instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) esperSelectFunction).getProducedType();
}
if (lambdaExecutable != null) {
Type type = lambdaExecutable.getReturnType();
return (TypeInformation<OUT>) TypeExtractor.createTypeInfo(type);
}
else {
return TypeExtractor.createTypeInfo(esperSelectFunction, EsperSelectFunction.class, esperSelectFunction.getClass(), 0);
}
} catch (TypeExtractionException e) {
throw new InvalidTypesException("Could not extract types.", e);
}
}

}
55 changes: 37 additions & 18 deletions src/test/java/at/datasciencelabs/EsperStreamTest.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package at.datasciencelabs;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.espertech.esper.client.EventBean;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.Assert;
import org.junit.Test;

public class EsperStreamTest {
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNull.notNullValue;

public class EsperStreamTest implements Serializable {

@Test
public void testEsperStream() throws Exception {
public void shouldSelectFromStreamUsingLambdaSelect() throws Exception {

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

Expand All @@ -20,48 +30,57 @@ public void testEsperStream() throws Exception {
String name = (String) collector.get("name");
int age = (int) collector.get("age");
return new TestEvent(name, age);
}, TypeInformation.of(TestEvent.class));
});

resultStream.printToErr();
final List<TestEvent> result = new ArrayList<>();
resultStream.addSink((SinkFunction<TestEvent>) result::add);

executionEnvironment.execute("test");

assertThat(result, is(notNullValue()));
assertThat(result.size(), is(3));
}

@Test
public void testMoreComplexEsperStream() throws Exception {
@SuppressWarnings("Convert2Lambda")
public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));

EsperStream<TestEvent> eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent");
EsperStream<TestEvent> esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent");

DataStream<Double> resultStream = eventEsperStream.select((EsperSelectFunction<Double>) collector -> {
Double age = (Double) collector.get("average_age");
return age;
}, TypeInformation.of(Double.class));
DataStream<TestEvent> resultStream = esperStream.select(new EsperSelectFunction<TestEvent>() {
@Override
public TestEvent select(EventBean eventBean) throws Exception {
String name = (String) eventBean.get("name");
int age = (int) eventBean.get("age");
return new TestEvent(name, age);
}
});

resultStream.printToErr();

executionEnvironment.execute("test");

Assert.fail("not finished");
}

@Test
public void testStateChangeRule() throws Exception {
public void shouldCalculateAverageAge() throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25));
DataStream<TestEvent> dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30));

EsperStream<TestEvent> eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent");

DataStream<Double> resultStream = eventEsperStream.select((EsperSelectFunction<Double>) collector -> {
Double age = (Double) collector.get("average_age");
return age;
}, TypeInformation.of(Double.class));
DataStream<Double> resultStream = eventEsperStream.select((EsperSelectFunction<Double>) collector -> (Double) collector.get("average_age"));

resultStream.printToErr();

executionEnvironment.execute("stateChange1");
executionEnvironment.execute("test");

Assert.fail("not finished");
}

}

0 comments on commit d26a7c2

Please sign in to comment.