-
Notifications
You must be signed in to change notification settings - Fork 12
C SPARQL SLIDING WINDOW OPERATOR
Riccardo Tommasini edited this page Jan 2, 2021
·
2 revisions
public class CSPARQLTimeWindowOperatorFactory implements StreamToRelationOperatorFactory<Graph, Graph> {
private final long a, b, t0;
private final Time time;
private final Tick tick;
private final Report report;
private final ReportGrain grain;
private final SDS<Graph> context;
public CSPARQLTimeWindowOperatorFactory(long a, long b, long t0, Time time, Tick tick, Report report, ReportGrain grain, SDS<Graph> context) {
this.a = a;
this.b = b;
this.t0 = t0;
this.time = time;
this.tick = tick;
this.report = report;
this.grain = grain;
this.context = context;
}
@Override
public TimeVarying<Graph> apply(WebDataStream<Graph> s, IRI iri) {
StreamToRelationOp<Graph, Graph> windowStreamToRelationOp = new CSPARQLStreamToRelationOp(iri, a, b, time, tick, report, grain);
s.addConsumer(windowStreamToRelationOp);
return windowStreamToRelationOp.set(context);
}
}
public class WindowImpl implements Window {
private long c, o;
public WindowImpl(long o, long c) {
this.o = o;
this.c = c;
}
public long getC() {
return c;
}
public long getO() {
return o;
}
public class CSPARQLStreamToRelationOp extends ObservableStreamToRelationOp<Graph, Graph> {
private final long a, b;
private Map<Window, Content<Graph,Graph>> active_windows;
private Set<Window> to_evict;
private long t0;
private long toi;
public CSPARQLStreamToRelationOp(IRI iri, long a, long b, Time instance, Tick tick, Report report, ReportGrain grain) {
super(iri, instance, tick, report, grain);
this.a = a;
this.b = b;
this.t0 = instance.getScope();
this.toi = 0;
this.active_windows = new HashMap<>();
this.to_evict = new HashSet<>();
}
@Override
public Time time() {
return time;
}
@Override
public Content<Graph,Graph> getContent(long t_e) {
Optional<Window> max = active_windows.keySet().stream()
.filter(w -> w.getO() < t_e && w.getC() <= t_e)
.max(Comparator.comparingLong(Window::getC));
if (max.isPresent())
return active_windows.get(max.get());
return new EmptyGraphContent();
}
@Override
public List<Content<Graph,Graph>> getContents(long t_e) {
return active_windows.keySet().stream()
.filter(w -> w.getO() <= t_e && t_e < w.getC())
.map(active_windows::get).collect(Collectors.toList());
}
protected void windowing(Graph e, long timestamp) {
log.debug("Received element (" + e + "," + timestamp + ")");
long t_e = timestamp;
if (time.getAppTime() > t_e) {
log.error("OUT OF ORDER NOT HANDLED");
throw new OutOfOrderElementException("(" + e + "," + timestamp + ")");
}
scope(t_e);
active_windows.keySet().forEach(
w -> {
log.debug("Processing Window [" + w.getO() + "," + w.getC() + ") for element (" + e + "," + timestamp + ")");
if (w.getO() <= t_e && t_e < w.getC()) {
log.debug("Adding element [" + e + "] to Window [" + w.getO() + "," + w.getC() + ")");
active_windows.get(w).add(e);
} else if (t_e > w.getC()) {
log.debug("Scheduling for Eviction [" + w.getO() + "," + w.getC() + ")");
schedule_for_eviction(w);
}
});
active_windows.keySet().stream()
.filter(w -> report.report(w, null, t_e, System.currentTimeMillis()))
.max(Comparator.comparingLong(Window::getC))
.ifPresent(window -> ticker.tick(t_e, window));
to_evict.forEach(w -> {
log.debug("Evicting [" + w.getO() + "," + w.getC() + ")");
active_windows.remove(w);
if (toi < w.getC())
toi = w.getC() + b;
});
to_evict.clear();
}
private void scope(long t_e) {
long c_sup = (long) Math.ceil(((double) Math.abs(t_e - t0) / (double) b)) * b;
long o_i = c_sup - a;
log.debug("Calculating the Windows to Open. First one opens at [" + o_i + "] and closes at [" + c_sup + "]");
do {
log.debug("Computing Window [" + o_i + "," + (o_i + a) + ") if absent");
active_windows
.computeIfAbsent(new WindowImpl(o_i, o_i + a), x -> new ContentGraph());
o_i += b;
} while (o_i <= t_e);
}
private void schedule_for_eviction(Window w) {
to_evict.add(w);
}
public Content<Graph,Graph> compute(long t_e, Window w) {
Content<Graph,Graph> content = active_windows.containsKey(w) ? active_windows.get(w) : new EmptyGraphContent();
time.setAppTime(t_e);
return setVisible(t_e, w, content);
}
@Override
public TimeVaryingGraph set(SDS<Graph> content) {
this.addObserver((Observer) content);
//TODO Generalize the type of content using an ENUM
return new TimeVaryingGraph(this, iri, RDFUtils.createGraph());
}