From d26a7c2366c4fe8a47ec9d98140fe8a8fc82b31b Mon Sep 17 00:00:00 2001 From: phil3k Date: Wed, 25 Oct 2017 20:44:22 +0200 Subject: [PATCH] type extraction, started with unit tests WIP (DO NOT PUSH LIKE THIS!) --- .../java/at/datasciencelabs/EsperStream.java | 32 ++++++++++- .../at/datasciencelabs/EsperStreamTest.java | 55 +++++++++++++------ 2 files changed, 66 insertions(+), 21 deletions(-) diff --git a/src/main/java/at/datasciencelabs/EsperStream.java b/src/main/java/at/datasciencelabs/EsperStream.java index dccdefa..140f23c 100644 --- a/src/main/java/at/datasciencelabs/EsperStream.java +++ b/src/main/java/at/datasciencelabs/EsperStream.java @@ -1,12 +1,19 @@ package at.datasciencelabs; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.NullByteKeySelector; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractionException; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import java.lang.reflect.Type; + /** * A DataStream which is able to detect event sequences and patterns using Esper @@ -32,17 +39,36 @@ public EsperStream(DataStream inputStream, String esperQuery) { * Select from the EsperStream, must provide the return type of the output DataStream since no type information is * currently extracted from the @see {@link EsperSelectFunction}. */ - public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction, TypeInformation dataStreamReturnType) { + public SingleOutputStreamOperator select(EsperSelectFunction esperSelectFunction) { KeySelector keySelector = new NullByteKeySelector<>(); SingleOutputStreamOperator patternStream; - // TODO until the type extractor is capable of extracting non-generic parameters, the return type has to be passed in manually + TypeInformation typeInformation = getTypeInformation(esperSelectFunction); final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; - patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", dataStreamReturnType, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery)); + patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", typeInformation, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery)); return patternStream; } + @SuppressWarnings("unchecked") + private TypeInformation getTypeInformation(EsperSelectFunction esperSelectFunction) { + try { + TypeExtractionUtils.LambdaExecutable lambdaExecutable = TypeExtractionUtils.checkAndExtractLambda(esperSelectFunction); + if (esperSelectFunction instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) esperSelectFunction).getProducedType(); + } + if (lambdaExecutable != null) { + Type type = lambdaExecutable.getReturnType(); + return (TypeInformation) TypeExtractor.createTypeInfo(type); + } + else { + return TypeExtractor.createTypeInfo(esperSelectFunction, EsperSelectFunction.class, esperSelectFunction.getClass(), 0); + } + } catch (TypeExtractionException e) { + throw new InvalidTypesException("Could not extract types.", e); + } + } + } diff --git a/src/test/java/at/datasciencelabs/EsperStreamTest.java b/src/test/java/at/datasciencelabs/EsperStreamTest.java index 9e88f30..7786181 100644 --- a/src/test/java/at/datasciencelabs/EsperStreamTest.java +++ b/src/test/java/at/datasciencelabs/EsperStreamTest.java @@ -1,14 +1,24 @@ package at.datasciencelabs; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import com.espertech.esper.client.EventBean; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.junit.Assert; import org.junit.Test; -public class EsperStreamTest { +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNull.notNullValue; + +public class EsperStreamTest implements Serializable { @Test - public void testEsperStream() throws Exception { + public void shouldSelectFromStreamUsingLambdaSelect() throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -20,48 +30,57 @@ public void testEsperStream() throws Exception { String name = (String) collector.get("name"); int age = (int) collector.get("age"); return new TestEvent(name, age); - }, TypeInformation.of(TestEvent.class)); + }); - resultStream.printToErr(); + final List result = new ArrayList<>(); + resultStream.addSink((SinkFunction) result::add); executionEnvironment.execute("test"); + assertThat(result, is(notNullValue())); + assertThat(result.size(), is(3)); } @Test - public void testMoreComplexEsperStream() throws Exception { + @SuppressWarnings("Convert2Lambda") + public void shouldSelectFromStreamUsingAnonymousClassSelect() throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); - EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent"); + EsperStream esperStream = new EsperStream<>(dataStream, "select name, age from TestEvent"); - DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> { - Double age = (Double) collector.get("average_age"); - return age; - }, TypeInformation.of(Double.class)); + DataStream resultStream = esperStream.select(new EsperSelectFunction() { + @Override + public TestEvent select(EventBean eventBean) throws Exception { + String name = (String) eventBean.get("name"); + int age = (int) eventBean.get("age"); + return new TestEvent(name, age); + } + }); resultStream.printToErr(); executionEnvironment.execute("test"); + + Assert.fail("not finished"); } @Test - public void testStateChangeRule() throws Exception { + public void shouldCalculateAverageAge() throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25)); + DataStream dataStream = executionEnvironment.fromElements(new TestEvent("peter", 10), new TestEvent("alex", 25), new TestEvent("maria", 30)); EsperStream eventEsperStream = new EsperStream<>(dataStream, "select avg(age) as average_age from TestEvent"); - DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> { - Double age = (Double) collector.get("average_age"); - return age; - }, TypeInformation.of(Double.class)); + DataStream resultStream = eventEsperStream.select((EsperSelectFunction) collector -> (Double) collector.get("average_age")); resultStream.printToErr(); - executionEnvironment.execute("stateChange1"); + executionEnvironment.execute("test"); + + Assert.fail("not finished"); } } \ No newline at end of file