-
Notifications
You must be signed in to change notification settings - Fork 12
Relation to Stream Operators
Riccardo Tommasini edited this page Jan 2, 2021
·
6 revisions
RSP-QL defines three R2S operators, adopting those from CQL. Given two consecutive time instants τ - 1 and τ we formally define R2S operators as follows:
- The insert stream operator that streams out all new entries w.r.t. the previous instant. More formally, The insert stream operator applied to a relation R emits an element <s,τ> if and only if the tuple s is in R(τ) - R(τ - 1) at time τ
- The delete stream operator that streams out all deleted entries w.r.t the previous instant. More formally, the delete stream operator applied to a relation R emits an element <s,τ> if and only if the tuple s is in R(τ - 1)
- R(τ) at time τ :
- The relation stream operator that streams out all elements at a certain instant in the source relation. More formally, the relation stream operator applied to a relation R emits an element <s,τ> if and only if the tuple s is in R(τ) at time τ:
In RS4J, we use an Enumeration it.polimi.deib.sr.rsp.api.enums.StreamOperator
to enlist the default operator in for parsing.
- DSTREAM
- ISTREAM
- RSTREAM
public interface RelationToStreamOperator<T> {
T eval(SolutionMapping<T> sm, long ts);
default Stream<T> eval(Stream<SolutionMapping<T>> sml, long ts) {
return sml.map(sm -> eval(sm, ts));
}
}
public interface SolutionMapping<I> {
long getCreationTime();
I get();
SolutionMapping<I> difference(SolutionMapping<I> r);
SolutionMapping<I> intersection(SolutionMapping<I> new_response);
}
public class Istream<T> implements RelationToStreamOperator<T> {
private final int i;
private SolutionMapping<T> last_response;
public Istream(int i) {
this.i = i;
}
public static RelationToStreamOperator get() {
return new Istream(1);
}
@Override
public T eval(SolutionMapping<T> new_response, long ts) {
if (last_response == null) {
last_response = new_response;
return last_response.get();
} else {
SolutionMapping<T> diff = new_response.difference(last_response);
last_response = new_response;
return diff.get();
}
}
}
public class Dstream<I> implements RelationToStreamOperator<I> {
private final int i;
private SolutionMapping<I> last_response;
public Dstream(int i) {
this.i = i;
}
public static RelationToStreamOperator get() {
return new Dstream(1);
}
@Override
public I eval(SolutionMapping<I> new_response, long ts) {
SolutionMapping<I> diff = last_response.difference(new_response);
last_response = new_response;
return diff.get();
}
}
public class Rstream<T> implements RelationToStreamOperator<T> {
public static RelationToStreamOperator get() {
return new Rstream();
}
@Override
public T eval(SolutionMapping<T> last_response, long ts) {
return last_response.get();
}
}