2
2
3
3
import java .util .ArrayList ;
4
4
import java .util .Collections ;
5
+ import java .util .Iterator ;
5
6
import java .util .LinkedHashMap ;
7
+ import java .util .LinkedList ;
6
8
import java .util .List ;
7
9
import java .util .Map ;
10
+ import java .util .Queue ;
8
11
9
12
import org .slf4j .Logger ;
10
13
import org .slf4j .LoggerFactory ;
@@ -59,14 +62,17 @@ public class EsperSingleQueryOperator implements StatelessOperator {
59
62
private boolean enableLoggingOfMatches = true ;
60
63
private List <DataTuple > matchCache ;
61
64
62
-
65
+ private Queue <DataTuple > initCache ;
66
+ private boolean initialised = false ;
67
+
63
68
public EsperSingleQueryOperator (String query , String url , String name ) {
64
69
this .esperQuery = query ;
65
70
this .esperEngineURL = url ;
66
71
this .name = name ;
67
72
if (enableLoggingOfMatches ) {
68
73
this .matchCache = Collections .synchronizedList (new ArrayList <DataTuple >());
69
74
}
75
+ this .initCache = new LinkedList <DataTuple >();
70
76
}
71
77
72
78
public EsperSingleQueryOperator (String query , String url , String streamKey , String name , String [] typeBinding ) {
@@ -80,7 +86,42 @@ public EsperSingleQueryOperator(String query, String url, String name, Map<Strin
80
86
this .typesPerStream .put (stream , getTypes (typeBinding .get (stream )));
81
87
}
82
88
89
+ public void initStatement () {
90
+
91
+ if (statement != null ) {
92
+ statement .removeAllListeners ();
93
+ statement .destroy ();
94
+ }
95
+
96
+ log .info ("Creating ESPER query..." );
97
+
98
+ /*
99
+ * Build the ESPER statement
100
+ */
101
+ statement = epService .getEPAdministrator ().createEPL (this .esperQuery );
83
102
103
+ /*
104
+ * Set a listener called when statement matches
105
+ */
106
+ statement .addListener (new UpdateListener () {
107
+ @ Override
108
+ public void update (EventBean [] newEvents , EventBean [] oldEvents ) {
109
+ if (newEvents == null ) {
110
+ // we don't care about events leaving the window (old
111
+ // events)
112
+ return ;
113
+ }
114
+ for (EventBean theEvent : newEvents ) {
115
+ sendOutput (theEvent );
116
+ }
117
+ }
118
+ });
119
+
120
+ initialised = true ;
121
+ log .info ("Done with init: {}" , this .esperQuery );
122
+ }
123
+
124
+
84
125
@ Override
85
126
public void setUp () {
86
127
/*
@@ -112,43 +153,23 @@ public void setUp() {
112
153
epService = EPServiceProviderManager .getProvider (esperEngineURL ,
113
154
configuration );
114
155
115
- log .info ("Creating ESPER query..." );
116
-
117
156
/*
118
- * Build the ESPER statement
119
- */
120
- statement = epService .getEPAdministrator ().createEPL (this .esperQuery );
121
-
122
- /*
123
- * Set a listener called when statement matches
157
+ * Initialise the query statement
124
158
*/
125
- statement .addListener (new UpdateListener () {
126
- @ Override
127
- public void update (EventBean [] newEvents , EventBean [] oldEvents ) {
128
- if (newEvents == null ) {
129
- // we don't care about events leaving the window (old
130
- // events)
131
- return ;
132
- }
133
- for (EventBean theEvent : newEvents ) {
134
- sendOutput (theEvent );
135
- }
136
- }
137
- });
159
+ initStatement ();
138
160
139
161
/*
140
162
* Register rest API handler
141
163
*/
142
164
NodeManager .restAPIRegistry .put ("/query" , new RestAPIEsperGetQueryDesc (this ));
143
165
NodeManager .restAPIRegistry .put ("/matches" , new RestAPIEsperGetMatches (this ));
144
-
166
+ NodeManager . restAPIRegistry . put ( "/query_update" , new RestAPIEsperPostQueryUpdate ( this ));
145
167
146
168
}
147
169
148
170
protected void sendOutput (EventBean out ) {
171
+ log .debug ("Query returned a new result event: {}" , out );
149
172
150
- log .info ("Query returned a new result event: {}" , out );
151
-
152
173
DataTuple output = new DataTuple (api .getDataMapper (), new TuplePayload ());
153
174
List <Object > objects = new ArrayList <>();
154
175
@@ -158,22 +179,38 @@ protected void sendOutput(EventBean out) {
158
179
continue ;
159
180
objects .add (value );
160
181
}
161
- DataTuple t = output .setValues (objects .toArray ());
182
+ DataTuple outTuple = output .setValues (objects .toArray ());
183
+ outTuple .getPayload ().timestamp = System .currentTimeMillis ();
162
184
163
- log .info ("Sending output {}" , t .getPayload ().attrValues );
185
+ log .debug ("At {}, sending output {}" , outTuple .getPayload ().timestamp , outTuple .getPayload ().attrValues );
186
+
187
+ if (this .enableLoggingOfMatches ) {
188
+ long cutOffTime = System .currentTimeMillis () - 1000 *60 *20 ;
189
+ synchronized (matchCache ) {
190
+
191
+ matchCache .add (outTuple );
192
+
193
+ // Remove old items
194
+ Iterator <DataTuple > iter = matchCache .iterator ();
195
+ boolean run = true ;
196
+ while (iter .hasNext () && run ) {
197
+ DataTuple t = iter .next ();
198
+ if (t .getPayload ().timestamp < cutOffTime ) {
199
+ iter .remove ();
200
+ }
201
+ else {
202
+ run = false ;
203
+ }
204
+ }
205
+ }
206
+ }
164
207
165
- if (this .enableLoggingOfMatches )
166
- matchCache .add (t );
208
+ log .info ("Match cache size: {}" , this .matchCache .size ());
167
209
168
- api .send (t );
210
+ api .send (outTuple );
169
211
}
170
212
171
- @ Override
172
- public void processData (DataTuple input ) {
173
-
174
- log .info ("Received input tuple {}" , input .toString ());
175
- log .info ("Map of received input tuple {}" , input .getMap ().toString ());
176
-
213
+ public void sendData (DataTuple input ) {
177
214
String stream = input .getString (STREAM_IDENTIFIER );
178
215
179
216
Map <String , Object > item = new LinkedHashMap <String , Object >();
@@ -182,12 +219,29 @@ public void processData(DataTuple input) {
182
219
for (String key : this .typesPerStream .get (stream ).keySet ())
183
220
item .put (key , input .getValue (key ));
184
221
185
- log .info ("Sending item {} with name '{}' to esper engine" , item ,
222
+ log .debug ("Sending item {} with name '{}' to esper engine" , item ,
186
223
stream );
187
-
224
+
188
225
epService .getEPRuntime ().sendEvent (item , stream );
189
226
}
190
227
228
+ @ Override
229
+ public void processData (DataTuple input ) {
230
+
231
+ log .debug ("Received input tuple {}" , input .toString ());
232
+ log .debug ("Map of received input tuple {}" , input .getMap ().toString ());
233
+
234
+ if (!initialised ) {
235
+ this .initCache .add (input );
236
+ }
237
+ else {
238
+ while (!this .initCache .isEmpty ()) {
239
+ sendData (this .initCache .poll ());
240
+ }
241
+ sendData (input );
242
+ }
243
+ }
244
+
191
245
@ Override
192
246
public void processData (List <DataTuple > dataList ) {
193
247
for (DataTuple tuple : dataList )
@@ -248,6 +302,14 @@ public String getEsperQuery() {
248
302
return esperQuery ;
249
303
}
250
304
305
+ public void initWithNewEsperQuery (String query ) {
306
+ log .info ("init with new esper query: {}" , query );
307
+ initialised = false ;
308
+ this .esperQuery = query ;
309
+ NodeManager .restAPIRegistry .put ("/query" , new RestAPIEsperGetQueryDesc (this ));
310
+ initStatement ();
311
+ }
312
+
251
313
public String getName () {
252
314
return name ;
253
315
}
0 commit comments