Description
Issue
Currently, there is no way to define windowing semantics in RML.
Windowing is crucial when evaluating joins between different live streaming
data sources.
Furthermore, windowing could also support buffering capabilities for
aggregation functions when processing streaming data sources. For example,
calculating an average of the values over the last 5 minutes.
Requirements
According to Gedik B.,
windows' behaviour is defined based on its type, and policies.
There are 2 main types of windows: tumbling, and sliding windows.
An illustration about how these windows work can be found here.
Note: Session window is a special case of tumbling window where the window
only gets dropped when inactivity threshold is violated.
The policies control when the windows evicts the tuples inside
the window (eviction policy), and when they triggers the processing of the
tuples using the operator logic defined inside the window (trigger policy).
Policies are further divided into 4 categories namely:
- Count-based
- Uses the number of incoming tuples to inform when to evict/trigger.
- Delta-based
- Uses a threshold of an attribute of the incoming tuples to
inform when to evict/trigger. E.g. When the temperature value of a sensor is above 40C.
- Uses a threshold of an attribute of the incoming tuples to
- Time-based
- Uses the timestamp of the incoming tuple.
- Punctuation-based
- Injects punctuations inside the incoming data stream as markers to decide
when to evict/trigger.
- Injects punctuations inside the incoming data stream as markers to decide
Thus, we need a set of vocabulary to define and configure windows by
describing:
- Window Type
- Eviction policy
- Trigger policy
The true semantics and combination of the policies are further explained by
Gedik B..
Example
Given the following RML with a join condition:
<#TM1>
rml:logicalSource <#STREAM1> ;
rml:subjectMap <#SM1> ;
rml:predicateObjectMap [
rml:predicateMap <#PM1> ;
rml:objectMap [
rml:parentTriplesMap <#SM2>;
rr:joinCondition [
rr:child "id";
rr:parent "p_id";
];
];
].
<#TM2>
rml:logicalSource <#STREAM2> ;
rml:subjectMap <#SM2> ;
rml:predicateObjectMap [
rml:predicateMap <#PM2> ;
rml:objectMap <#OM2> ] .
Windows could be defined in the object map
<#TM1>
rml:logicalSource <#STREAM1> ;
rml:subjectMap <#SM1> ;
rml:predicateObjectMap [
rml:predicateMap <#PM1> ;
rml:objectMap [
# Define the window to be used for joining
rml:window [
# Define window types
rml:windowType rml:Tumbling;
# Define the trigger policy for the window
# Every 5th record will execute the join
rml:trigger [ a rml:CountPolicy
rml:countValue 5;
];
# Define the eviction policy for the window
# Clean up window after processing the 15th record
rml:evict [ a rml:CountPolicy;
rml:countValue 15;
];
];
rml:parentTriplesMap <#SM2>;
rr:joinCondition [
rr:child "id";
rr:parent "p_id";
];
];
].
<#TM2>
rml:logicalSource <#STREAM2> ;
rml:subjectMap <#SM2> ;
rml:predicateObjectMap [
rml:predicateMap <#PM2> ;
rml:objectMap <#OM2> ] .