Skip to content

C SPARQL SLIDING WINDOW OPERATOR

Riccardo Tommasini edited this page Jan 2, 2021 · 2 revisions

Factory

public class CSPARQLTimeWindowOperatorFactory implements StreamToRelationOperatorFactory<Graph, Graph> {

    private final long a, b, t0;
    private final Time time;
    private final Tick tick;
    private final Report report;
    private final ReportGrain grain;
    private final SDS<Graph> context;

    public CSPARQLTimeWindowOperatorFactory(long a, long b, long t0, Time time, Tick tick, Report report, ReportGrain grain, SDS<Graph> context) {
        this.a = a;
        this.b = b;
        this.t0 = t0;
        this.time = time;
        this.tick = tick;
        this.report = report;
        this.grain = grain;
        this.context = context;
    }


    @Override
    public TimeVarying<Graph> apply(WebDataStream<Graph> s, IRI iri) {
        StreamToRelationOp<Graph, Graph> windowStreamToRelationOp = new CSPARQLStreamToRelationOp(iri, a, b, time, tick, report, grain);
        s.addConsumer(windowStreamToRelationOp);
        return windowStreamToRelationOp.set(context);
    }
}

Window Instances

public class WindowImpl implements Window {

    private long c, o;

    public WindowImpl(long o, long c) {
        this.o = o;
        this.c = c;
    }

    public long getC() {
        return c;
    }

    public long getO() {
        return o;
    }

Operator

public class CSPARQLStreamToRelationOp extends ObservableStreamToRelationOp<Graph, Graph> {

    private final long a, b;

    private Map<Window, Content<Graph,Graph>> active_windows;
    private Set<Window> to_evict;
    private long t0;
    private long toi;

    public CSPARQLStreamToRelationOp(IRI iri, long a, long b, Time instance, Tick tick, Report report, ReportGrain grain) {
        super(iri, instance, tick, report, grain);
        this.a = a;
        this.b = b;
        this.t0 = instance.getScope();
        this.toi = 0;
        this.active_windows = new HashMap<>();
        this.to_evict = new HashSet<>();
    }

    @Override
    public Time time() {
        return time;
    }

    @Override
    public Content<Graph,Graph> getContent(long t_e) {
        Optional<Window> max = active_windows.keySet().stream()
                .filter(w -> w.getO() < t_e && w.getC() <= t_e)
                .max(Comparator.comparingLong(Window::getC));

        if (max.isPresent())
            return active_windows.get(max.get());

        return new EmptyGraphContent();
    }

    @Override
    public List<Content<Graph,Graph>> getContents(long t_e) {
        return active_windows.keySet().stream()
                .filter(w -> w.getO() <= t_e && t_e < w.getC())
                .map(active_windows::get).collect(Collectors.toList());
    }

    protected void windowing(Graph e, long timestamp) {

        log.debug("Received element (" + e + "," + timestamp + ")");
        long t_e = timestamp;

        if (time.getAppTime() > t_e) {
            log.error("OUT OF ORDER NOT HANDLED");
            throw new OutOfOrderElementException("(" + e + "," + timestamp + ")");
        }

        scope(t_e);

        active_windows.keySet().forEach(
                w -> {
                    log.debug("Processing Window [" + w.getO() + "," + w.getC() + ") for element (" + e + "," + timestamp + ")");
                    if (w.getO() <= t_e && t_e < w.getC()) {
                        log.debug("Adding element [" + e + "] to Window [" + w.getO() + "," + w.getC() + ")");
                        active_windows.get(w).add(e);
                    } else if (t_e > w.getC()) {
                        log.debug("Scheduling for Eviction [" + w.getO() + "," + w.getC() + ")");
                        schedule_for_eviction(w);
                    }
                });


        active_windows.keySet().stream()
                .filter(w -> report.report(w, null, t_e, System.currentTimeMillis()))
                .max(Comparator.comparingLong(Window::getC))
                .ifPresent(window -> ticker.tick(t_e, window));

        to_evict.forEach(w -> {
            log.debug("Evicting [" + w.getO() + "," + w.getC() + ")");
            active_windows.remove(w);
            if (toi < w.getC())
                toi = w.getC() + b;
        });
        to_evict.clear();
    }

    private void scope(long t_e) {
        long c_sup = (long) Math.ceil(((double) Math.abs(t_e - t0) / (double) b)) * b;
        long o_i = c_sup - a;
        log.debug("Calculating the Windows to Open. First one opens at [" + o_i + "] and closes at [" + c_sup + "]");

        do {
            log.debug("Computing Window [" + o_i + "," + (o_i + a) + ") if absent");

            active_windows
                    .computeIfAbsent(new WindowImpl(o_i, o_i + a), x -> new ContentGraph());
            o_i += b;

        } while (o_i <= t_e);

    }


    private void schedule_for_eviction(Window w) {
        to_evict.add(w);
    }


    public Content<Graph,Graph> compute(long t_e, Window w) {
        Content<Graph,Graph> content = active_windows.containsKey(w) ? active_windows.get(w) : new EmptyGraphContent();
        time.setAppTime(t_e);
        return setVisible(t_e, w, content);
    }


    @Override
    public TimeVaryingGraph set(SDS<Graph> content) {
        this.addObserver((Observer) content);
        //TODO Generalize the type of content using an ENUM
        return new TimeVaryingGraph(this, iri, RDFUtils.createGraph());
    }