diff --git a/src/main/java/at/datasciencelabs/EsperSelectFunction.java b/src/main/java/at/datasciencelabs/EsperSelectFunction.java index 8845d0c..ffafff9 100644 --- a/src/main/java/at/datasciencelabs/EsperSelectFunction.java +++ b/src/main/java/at/datasciencelabs/EsperSelectFunction.java @@ -10,5 +10,12 @@ * @param Type of the output DataStream */ public interface EsperSelectFunction extends Function, Serializable { + + /** + * Select by transforming an {@link EventBean} to an instance of the OUT type. + * @param eventBean Result event of an esper pattern + * @return The transformed instance + * @throws Exception If there is an error in the transformation. + */ OUT select(EventBean eventBean) throws Exception; } diff --git a/src/main/java/at/datasciencelabs/EsperStream.java b/src/main/java/at/datasciencelabs/EsperStream.java index 76c79e9..dccdefa 100644 --- a/src/main/java/at/datasciencelabs/EsperStream.java +++ b/src/main/java/at/datasciencelabs/EsperStream.java @@ -37,7 +37,7 @@ public SingleOutputStreamOperator select(EsperSelectFunction esperSele SingleOutputStreamOperator patternStream; - // TODO until the typeextractor is capable of extracing non-generic parameters, the return type has to be passed in manually + // TODO until the type extractor is capable of extracting non-generic parameters, the return type has to be passed in manually final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", dataStreamReturnType, new SelectEsperStreamOperator(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery)); diff --git a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java b/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java index 2630497..92121dd 100644 --- a/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java +++ b/src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java @@ -30,10 +30,10 @@ public class SelectEsperStreamOperator extends AbstractUdfStreamOp /** * Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper. * Currently only processing time evaluation is supported. - * @param inputStreamType - * @param esperSelectFunction - * @param isProcessingTime - * @param esperQuery + * @param inputStreamType type of the input DataStream + * @param esperSelectFunction function to select from Esper's output + * @param isProcessingTime Flag indicating how time is interpreted (processing time vs event time) + * @param esperQuery The esper query */ public SelectEsperStreamOperator(TypeInformation inputStreamType, EsperSelectFunction esperSelectFunction, boolean isProcessingTime, String esperQuery) { super(esperSelectFunction);