-
Notifications
You must be signed in to change notification settings - Fork 12
CQELS SLIDING WINDOW OPERATOR
Riccardo Tommasini edited this page Jan 2, 2021
·
1 revision
public class CQELSTimeWindowOperatorFactory implements StreamToRelationOperatorFactory<Graph, Graph> {
private final long a, t0;
private final Time time;
private final Tick tick;
private final Report report;
private final ReportGrain grain;
private SDS<Graph> context;
public CQELSTimeWindowOperatorFactory(long a, long t0, Time time, Tick tick, Report report, ReportGrain grain, SDS<Graph> context) {
this.a = a;
this.t0 = t0;
this.time = time;
this.tick = tick;
this.report = report;
this.grain = grain;
this.context=context;
}
@Override
public TimeVarying<Graph> apply(WebDataStream<Graph> s, IRI iri) {
StreamToRelationOp<Graph, Graph> windowStreamToRelationOp = new CQELSStreamToRelationOp(iri, a, time, tick, report, grain);
s.addConsumer(windowStreamToRelationOp);
return windowStreamToRelationOp.set(this.context);
}
}
public class WindowImpl implements Window {
private long c, o;
public WindowImpl(long o, long c) {
this.o = o;
this.c = c;
}
public long getC() {
return c;
}
public long getO() {
return o;
}
...
public class CQELSStreamToRelationOp extends ObservableStreamToRelationOp<Graph, Graph> {
private final long a;
private Map<Window, Content<Graph, Graph>> windows;
private Map<Graph, Long> r_stream;
private Map<Graph, Long> d_stream;
private Set<Window> to_evict;
private long tc0;
private long toi;
public CQELSStreamToRelationOp(IRI iri, long a, Time instance, Tick tick, Report report, ReportGrain grain) {
super(iri, instance, tick, report, grain);
this.a = a;
this.tc0 = instance.getScope();
this.toi = 0;
this.windows = new HashMap<>();
this.to_evict = new HashSet<>();
this.r_stream = new HashMap<>();
this.d_stream = new HashMap<>();
}
@Override
public Time time() {
return time;
}
@Override
public Content<Graph, Graph> getContent(long t_e) {
Optional<Window> max = windows.keySet().stream()
.filter(w -> w.getO() < t_e && w.getC() <= t_e)
.max(Comparator.comparingLong(Window::getC));
if (max.isPresent())
return windows.get(max.get());
return new EmptyGraphContent();
}
@Override
public List<Content<Graph, Graph>> getContents(long t_e) {
return windows.keySet().stream()
.filter(w -> w.getO() <= t_e && t_e < w.getC())
.map(windows::get).collect(Collectors.toList());
}
protected void windowing(Graph e, long ts) {
log.debug("Received element (" + e + "," + ts + ")");
long t_e = ts;
if (time.getAppTime() > t_e) {
log.error("OUT OF ORDER NOT HANDLED");
throw new OutOfOrderElementException("(" + e + "," + ts + ")");
}
Window active = scope(t_e);
Content<Graph, Graph> content = windows.get(active);
r_stream.entrySet().stream().filter(ee -> ee.getValue() < active.getO()).forEach(ee -> d_stream.put(ee.getKey(), ee.getValue()));
r_stream.entrySet().stream().filter(ee -> ee.getValue() >= active.getO()).map(Map.Entry::getKey).forEach(content::add);
r_stream.put(e, ts);
content.add(e);
if (report.report(active, content, t_e, System.currentTimeMillis())) {
ticker.tick(t_e, active);
}
//REMOVE ALL THE WINDOWS THAT CONTAIN DSTREAM ELEMENTS
//Theoretically active window has always size 1
d_stream.entrySet().forEach(ee -> {
log.debug("Evicting [" + ee + "]");
windows.forEach((window, content1) -> {
if (window.getO() <= ee.getValue() && window.getC() < ee.getValue())
schedule_for_eviction(window);
});
r_stream.remove(ee);
});
to_evict.forEach(windows::remove);
to_evict.clear();
}
private Window scope(long t_e) {
long o_i = t_e - a;
log.debug("Calculating the Windows to Open. First one opens at [" + o_i + "] and closes at [" + t_e + "]");
log.debug("Computing Window [" + o_i + "," + (o_i + a) + ") if absent");
WindowImpl active = new WindowImpl(o_i, t_e);
windows.computeIfAbsent(active, window -> new ContentGraph());
return active;
}
private void schedule_for_eviction(Window w) {
to_evict.add(w);
}
public Content<Graph, Graph> compute(long t_e, Window w) {
Content<Graph, Graph> content = windows.containsKey(w) ? windows.get(w) : new EmptyGraphContent();
time.setAppTime(t_e);
return setVisible(t_e, w, content);
}
@Override
public TimeVaryingGraph set(SDS<Graph> content) {
this.addObserver((Observer) content);
//TODO Generalize the type of content using an ENUM
return new TimeVaryingGraph(this, iri, RDFUtils.createGraph());
}
}