Skip to content

Commit

Permalink
naming of parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
phil3k committed Oct 22, 2017
1 parent 419604c commit 4def369
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/main/java/at/datasciencelabs/EsperSelectFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
* @param <OUT> Type of the output DataStream
*/
public interface EsperSelectFunction<OUT> extends Function, Serializable {
OUT select(EventBean collector) throws Exception;
OUT select(EventBean eventBean) throws Exception;
}
20 changes: 10 additions & 10 deletions src/main/java/at/datasciencelabs/EsperStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,33 @@
*/
public class EsperStream<IN> {

private final DataStream<IN> input;
private final String query;
private final DataStream<IN> inputStream;
private final String esperQuery;


/**
* Create a new EsperStream instance.
* @param input The input DataStream
* @param query An Esper query
* @param inputStream The input DataStream
* @param esperQuery An Esper query
*/
public EsperStream(DataStream<IN> input, String query) {
this.input = input;
this.query = query;
public EsperStream(DataStream<IN> inputStream, String esperQuery) {
this.inputStream = inputStream;
this.esperQuery = esperQuery;
}

/**
* Select from the EsperStream, must provide the return type of the output DataStream since no type information is
* currently extracted from the @see {@link EsperSelectFunction}.
*/
public <R> SingleOutputStreamOperator<R> select(EsperSelectFunction<R> selectFunction, TypeInformation<R> returnType) {
public <R> SingleOutputStreamOperator<R> select(EsperSelectFunction<R> esperSelectFunction, TypeInformation<R> dataStreamReturnType) {
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

SingleOutputStreamOperator<R> patternStream;

// TODO until the typeextractor is capable of extracing non-generic parameters, the return type has to be passed in manually

final boolean isProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
patternStream = input.keyBy(keySelector).transform("SelectEsperOperator", returnType, new SelectEsperStreamOperator<Byte, IN, R>(input.getType(), selectFunction, isProcessingTime, query));
final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
patternStream = inputStream.keyBy(keySelector).transform("SelectEsperOperator", dataStreamReturnType, new SelectEsperStreamOperator<Byte, IN, R>(inputStream.getType(), esperSelectFunction, isProcessingTime, esperQuery));

return patternStream;
}
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/at/datasciencelabs/SelectEsperStreamOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ public class SelectEsperStreamOperator<KEY, IN, OUT> extends AbstractUdfStreamOp
/**
* Constructs a new operator. Requires the type of the input DataStream to register its Event Type at Esper.
* Currently only processing time evaluation is supported.
* @param inputType
* @param userFunction
* @param inputStreamType
* @param esperSelectFunction
* @param isProcessingTime
* @param query
* @param esperQuery
*/
public SelectEsperStreamOperator(TypeInformation<IN> inputType, EsperSelectFunction<OUT> userFunction, boolean isProcessingTime, String query) {
super(userFunction);
this.inputType = inputType;
this.query = query;
public SelectEsperStreamOperator(TypeInformation<IN> inputStreamType, EsperSelectFunction<OUT> esperSelectFunction, boolean isProcessingTime, String esperQuery) {
super(esperSelectFunction);
this.inputType = inputStreamType;
this.query = esperQuery;

if (!isProcessingTime) {
throw new UnsupportedOperationException("Event-time is not supported");
Expand Down

0 comments on commit 4def369

Please sign in to comment.