Skip to content

Relation to Stream Operators

Riccardo Tommasini edited this page Jan 2, 2021 · 6 revisions

RSP-QL defines three R2S operators, adopting those from CQL. Given two consecutive time instants τ - 1 and τ we formally define R2S operators as follows:

  • The insert stream operator that streams out all new entries w.r.t. the previous instant. More formally, The insert stream operator applied to a relation R emits an element <s,τ> if and only if the tuple s is in R(τ) - R(τ - 1) at time τ

f1

  • The delete stream operator that streams out all deleted entries w.r.t the previous instant. More formally, the delete stream operator applied to a relation R emits an element <s,τ> if and only if the tuple s is in R(τ - 1)
  • R(τ) at time τ :

f2

  • The relation stream operator that streams out all elements at a certain instant in the source relation. More formally, the relation stream operator applied to a relation R emits an element <s,τ> if and only if the tuple s is in R(τ) at time τ:

f3

In RS4J, we use an Enumeration it.polimi.deib.sr.rsp.api.enums.StreamOperator to enlist the default operator in for parsing.

  • DSTREAM
  • ISTREAM
  • RSTREAM

Relation To Stream

public interface RelationToStreamOperator<T> {

    T eval(SolutionMapping<T> sm, long ts);

    default Stream<T> eval(Stream<SolutionMapping<T>> sml, long ts) {
        return sml.map(sm -> eval(sm, ts));
    }
}

Solution Mappings

public interface SolutionMapping<I> {
    long getCreationTime();

    I get();

    SolutionMapping<I> difference(SolutionMapping<I> r);

    SolutionMapping<I> intersection(SolutionMapping<I> new_response);
}

YASPER Implementations

IStream

public class Istream<T> implements RelationToStreamOperator<T> {
    private final int i;
    private SolutionMapping<T> last_response;

    public Istream(int i) {
        this.i = i;
    }

    public static RelationToStreamOperator get() {
        return new Istream(1);
    }

    @Override
    public T eval(SolutionMapping<T> new_response, long ts) {
        if (last_response == null) {
            last_response = new_response;
            return last_response.get();
        } else {
            SolutionMapping<T> diff = new_response.difference(last_response);
            last_response = new_response;
            return diff.get();
        }
    }

}

DStream

public class Dstream<I> implements RelationToStreamOperator<I> {
    private final int i;
    private SolutionMapping<I> last_response;

    public Dstream(int i) {
        this.i = i;
    }

    public static RelationToStreamOperator get() {
        return new Dstream(1);
    }

    @Override
    public I eval(SolutionMapping<I> new_response, long ts) {
        SolutionMapping<I> diff = last_response.difference(new_response);
        last_response = new_response;
        return diff.get();
    }

}

RStream

public class Rstream<T> implements RelationToStreamOperator<T> {

    public static RelationToStreamOperator get() {
        return new Rstream();
    }

    @Override
    public T eval(SolutionMapping<T> last_response, long ts) {
        return last_response.get();
    }
}