-
Notifications
You must be signed in to change notification settings - Fork 12
Execution Semantics
This package contains all the necessary abstractions to define the exeuction semantics of an RSP engine It wraps the RSP-QL portion that is based on the SECRET MODEL
This package is used to specify the content of the window as for the SECRET definition.
the method coalesce()
provides a time-agnostic representation of the data inside the window, as it is required by the SECRET MODEL.
In the below implementation, the status is maintained as a List whose content is merged together upon request.
public class ContentGraph implements Content<Graph> {
private List<Graph> elements;
@Override
public Graph coalesce() {
if (elements.size() == 1)
return elements.get(0);
else {
Graph g = RDFUtils.createGraph();
elements.stream().flatMap(Graph::stream).forEach(g::add);
return g;
}
}
}
This package is used to describe the reporting approach adopted by a given SPE w.r.t. the windowing mechanism it implements.
To represent SECRET definition of the report, we decoupled report implementation and the strategies it employs.
public class ReportImpl implements Report {
List<ReportingStrategy> strategies = new ArrayList<>();
@Override
public boolean report(Window w, Content c, long application_time, long system_time) {
return strategies.stream().allMatch(strategy -> strategy.match(w, c, tapp, tsys));
}
}
In the SECRET MODEL, the reporting approach is the result of the combination of four strategies.
- ON CONTENT CHANGE, the content is reported anytime it changes
- ON WINDOW CLOSE, the content is reported when the active window closes
- PERIODIC, the content is reported with a period p
- NON-EMPTY CONTENT, only non-empty content is reported
To make the reporting extensible, we defined the ReportingStrategy
as an interface, and we included the aforementioned one.
/**
* Window close (Rwc): reporting is done for t
* only when the active window closes (i.e., |Scope(t)| = w ).
**/
public class OnWindowClose implements ReportingStrategy {
@Override
public boolean match(Window w, Content c, long tapp, long tsys) {
return w.getC() < tapp;
}
}
The Tick dimension in our model defines the condition which drives an SPE to take action on its input (also referred to as “window state change” or “window re-evaluation” ).
Like Report, Tick is also part of a system’s internal execution model. While some systems react to individual tuples as they arrive, others collectively react to all or subsets of tuples with the same tapp value. RSP-QL, like SECRET, identifies three main ways that different systems “tick”:
- tuple-driven, where each tuple arrival causes a system to react;
- time-driven, where the progress of tapp causes a system to react;
- batch-driven, where either a new batch arrival or the progress of tapp causes a system to react.
An enumeration ``it.polimi.deib.sr.rsp.api.enums.Tick``` enlits the default ticks options for parsing purposes.
public interface Ticker {
void tick(long t_e, Window w);
}
All the default Tick definition are implemented in YASPER using the Ticker
interface. Moreover, the Ticker
interface (see above) allows RSP4J's users to implement custom tick logics, i.e.,
The tuple ticker triggers the operator evaluation every time a new item arrives.
@RequiredArgsConstructor
public class TupleTicker implements Ticker {
private final StreamToRelationOp<?, ?> op;
@Override
public void tick(long t_e, Window w) {
op.compute(t_e, w);
}
}
The time ticker triggers the operator evaluation when time advances.
@RequiredArgsConstructor
public class TimeTicker implements Ticker {
private final StreamToRelationOp<?, ?> op;
private final Time time;
@Override
public void tick(long t_e, Window w) {
time.addEvaluationTimeInstants(new TimeInstant(t_e));
if (t_e > time.getAppTime()) {
op.compute(t_e, w);
}
}
}
The time ticker triggers the operator evaluation when a new batch passes.
@RequiredArgsConstructor
public class BatchTicker implements Ticker {
private int curr = 0;
protected final StreamToRelationOp<?, ?> op;
@Setter
private int batch;
@Override
public void tick(long t_e, Window w) {
curr++;
if (curr == batch) {
op.compute(t_e, w);
curr = 0;
}
}
}