Skip to content

Defining window operations in RML #85

Open
@s-minoo

Description

@s-minoo

Issue

Currently, there is no way to define windowing semantics in RML.
Windowing is crucial when evaluating joins between different live streaming
data sources.

Furthermore, windowing could also support buffering capabilities for
aggregation functions when processing streaming data sources. For example,
calculating an average of the values over the last 5 minutes.

Requirements

According to Gedik B.,
windows' behaviour is defined based on its type, and policies.

There are 2 main types of windows: tumbling, and sliding windows.
An illustration about how these windows work can be found here.
Note: Session window is a special case of tumbling window where the window
only gets dropped when inactivity threshold is violated.

The policies control when the windows evicts the tuples inside
the window (eviction policy), and when they triggers the processing of the
tuples using the operator logic defined inside the window (trigger policy).

Policies are further divided into 4 categories namely:

  1. Count-based
    • Uses the number of incoming tuples to inform when to evict/trigger.
  2. Delta-based
    • Uses a threshold of an attribute of the incoming tuples to
      inform when to evict/trigger. E.g. When the temperature value of a sensor is above 40C.
  3. Time-based
    • Uses the timestamp of the incoming tuple.
  4. Punctuation-based
    • Injects punctuations inside the incoming data stream as markers to decide
      when to evict/trigger.

Thus, we need a set of vocabulary to define and configure windows by
describing:

  1. Window Type
  2. Eviction policy
  3. Trigger policy

The true semantics and combination of the policies are further explained by
Gedik B..

Example

Given the following RML with a join condition:

<#TM1> 
    rml:logicalSource <#STREAM1> ;
    rml:subjectMap <#SM1> ;
    rml:predicateObjectMap [
        rml:predicateMap <#PM1> ;
        rml:objectMap [ 
            rml:parentTriplesMap <#SM2>; 
            rr:joinCondition [
                rr:child "id";
                rr:parent "p_id"; 
            ];

        ];

    ]. 



<#TM2> 
    rml:logicalSource <#STREAM2> ;
    rml:subjectMap <#SM2> ;
    rml:predicateObjectMap [
        rml:predicateMap <#PM2> ;
        rml:objectMap <#OM2> ] .

Windows could be defined in the object map

<#TM1> 
    rml:logicalSource <#STREAM1> ;
    rml:subjectMap <#SM1> ;
    rml:predicateObjectMap [
        rml:predicateMap <#PM1> ;
        rml:objectMap [
            # Define the window to be used for joining
            rml:window [ 
                # Define window types 
                rml:windowType rml:Tumbling; 

                # Define the trigger policy for the window 
                # Every 5th record will execute the join
                rml:trigger [ a rml:CountPolicy
                    rml:countValue  5;

                ]; 

                # Define the eviction policy for the window
                # Clean up window after processing the 15th record
                rml:evict [ a rml:CountPolicy;
                    rml:countValue  15;
                ];

            ];
            rml:parentTriplesMap <#SM2>; 
            rr:joinCondition [
                rr:child "id";
                rr:parent "p_id"; 
            ];
        ];
    ]. 

<#TM2> 
    rml:logicalSource <#STREAM2> ;
    rml:subjectMap <#SM2> ;
    rml:predicateObjectMap [
        rml:predicateMap <#PM2> ;
        rml:objectMap <#OM2> ] .

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions