Skip to content

badal-io/dataflow-timeseries-iot-gas-demo

Repository files navigation

Dataflow IoT Timeseries Demo

Overview

This repository provides a set of Apache Beam pipelines for processing streaming IoT sensor data from FogLAMP and writing them to BigQuery for downstream analytics. To that end, we have leveraged the Dataflow Timeseries Streaming library to compute timeseries metrics in the ingested IoT data, and the State and Timers for DoFn in Apache Beam for capturing user-defined events in the IoT data.

IoT Demo GCP Architecture

Getting Started

Requirements

  • A GCP project (to create one see here). Make sure billing is activated on the project.
  • Java 8
  • Terraform

Take note of the gcp project name and project number (sequence of number). They will be needed in the following steps.

Setting up the Demo

Executing Terraform will provision the following GCP resources:

  • A virtual machine installed with FogLAMP, Prosys OPC UA server simulator, and Google Chrome Remote Desktop
  • An IoT core registry and telemetry device
  • Three Pub/Sub topics (foglamp-demo, foglamp-demo-raw, and foglamp-demo-events)
  • Two GCS buckets (foglamp_demo_main and foglamp_demo_dataflow)
  • A BigQuery Dataset (foglamp_demo) containing 5 tables (assets, device_connections, devices, event_definitions, and paths)
  • A BigQuery View (events_summary_view) for summarizing Event Frames
  • Three Dataflow Jobs

Terraform will also create the necessary RSA keys to connect to the VM and authenticate the connection between FogLAMP and the IoT Core device.

❗ The RSA keys generated will be stored unencrypted in your Terraform state file. In a production environment, generate your private keys outside of Terraform.

Follow the following steps to deploy the Demo resources using Terraform:

  1. Execute gcloud auth login and follow the instructions to authenticate to GCP.
  2. Clone the repository to your local machine:
git clone https://github.com/badal-io/dataflow-timeseries-iot-gas-demo.git
  1. Navigate to the scripts directory with Terraform folder:
cd ./terraform/scripts
  1. Run the project_init.sh script to replace project variables. You will be prompted to enter gcp project name and gcp project number.
  2. Run the setup_gcp_infra.sh to setup gcp infrastructure.

❕ The deployment will take approximately 10 minutes.

Once Terraform has finished deploying the GCP resources needed for the Demo, you can start setting up FogLAMP:

  1. Connect to the VM through the Google Cloud Console or the gcloud command-line tool:
gcloud compute ssh --project=${PROJECT} --zone=${ZONE} foglamp-demo-instance 
  1. After you connect, use the browser in your local machine to navigate to Google Chrome Remote Desktop
  2. Click on Begin > Next > Authorize. The page should display a command line for Debian Linux that looks like the following:
DISPLAY= /opt/google/chrome-remote-desktop/start-host \
    --code="4/xxxxxxxxxxxxxxxxxxxxxxxx" \
    --redirect-url="https://remotedesktop.google.com/_/oauthredirect" \
    --name=$(hostname)
  1. Copy the command and paste it to the terminal of your VM in the SSH window that's connected to your instance, and then run the command. Follow the steps to setup a pin. Ignore errors like No net_fetcher or Failed to read.
  2. Navigate back to Chrome's Remote Access. You should see the VM's name listed under "Remote Devices". Click on it and enter your pin when prompted. You are now connected to the desktop environment of your VM.
  3. Query the VM's internal IP address from the Terraform terminal:
terraform output internal_ip
  1. From the desktop environment of your VM, navigate to /opt/prosys-opc-ua-simulation-server and click on the Prosys OPC UA Simulation Server icon. Once the Server Status changes to "Running", copy the "Connection Address (UA TCP)" as you will need this later.
  2. In the OPC UA server, click on the second tab "Objects". Remove the sample nodes, and add a new object node:
    Object Node
  3. Then add a variable node under the object node:
    Variable Node
    ❗ Take note of the node IDs as you will need them later on.
  4. Open the browser of your VM and navigate to http://{{ Your VM's internal IP}}. Your are now accessing the FogLAMP dashboard GUI.
  5. Using the menu bar on the left side of the GUI, click on "South" and then click on "Add+" in the upper right of the South Services screen. Select "opcua" from the list and provide a unique name for the asset. Click on "Next".
  6. Copy the Connection Address of your OPC UA server to the "OPCUA Server URL" field, and ender the Node Id of your object node to the "OPCUA Object Subscriptions":
    FogLAMP South
  7. Click on "Next" and unselect "Enabled" for now.
  8. From the "South Services" menu, click on your asset and then click on "Applications+". From the Plugin list select "metadata" and click on "Next".
  9. Here you can enter useful metadata associated with your sensor, such as location, configuration version, etc. For the demo we will define the device version:
    Metadata
  10. Click on "Done" to enable the Metadata plugin.
  11. Back on the configuration menu of your asset, click on "Applications+" and from the plugin list select "rename".
  12. Select "datapoint" as the "Operation" and set the "Find" field value to the Node Id of your OPC UA variable. Replace it with the actual property being measured, e.g. "flowrate":
    Rename
  13. Add an additional "rename" plugin. Select "asset" as the "Operation" and set the "Find" field value to the default asset name that FogLAMP has assigned. Replace it with the actual device-Id, e.g. "Coriolis_01":
    Rename Device
  14. Using the menu bar on the left side of the GUI, click on "North" and then click on "Add+" in the upper right of the North Services screen. Select "GCP" from the list and provide a unique name for the asset. Click on "Next".
  15. Enter your Project ID and region, and the following default values for the Registry ID, Device ID, and Key Name. Terraform has already configured the private key required to connect FogLAMP and GCP IoT Core, so all that's required is to enter the default key name:
    FogLAMP North
  16. Click on "Next" to enable the GCP plugin.
  17. Finally, back to the "South Service" menu, click on your asset and select "Enabled" to activate it.
  18. After a few moments, you should be able to see the number of messages that have been read/sent through FogLAMP:
    FogLAMP Final

Now that you have tested end to end operation for one variable. You can repeat the process for the following variables:

  • density
  • pressure
  • temperature

You can also replicate these steps to create more sensors (through duplication) as well as more south operators associated to them in order to simulate more scenarios.

Exploring the Data

Once FogLAMP is transmitting the OPC UA data to the IoT Core, the downstream Pub/Sub topics and Dataflow Jobs will stream them to BigQuery.
To explore the data:

  1. Go to the BigQuery console
  2. Look for the foglamp_demo dataset, where you will have access to the following tables:
    BigQuery
  3. The measurements_raw table is where the raw IoT data are landed, whereas the IoT data processed with the Dataflow Timeseries Streaming library are inserted to the measurements_window_1min. Note that you can configure the Terraform configuration to deploy as many as Timeseries Dataflow jobs you wish to cover different windowing periods (e.g. 1 min, 10 min, etc.):
    BigQuery Tables

Simulating Event Frames

One of the features of this demo is the capturing of abnormal device behaviour in the form of events. Events are critical time periods while an activity that is significant to a process or operation is taking place. Engineers and operations managers need to define the events that are relevant to their process and perform analysis on these events in real-time when they occur. However, the event data and the process data are often in different data silos, making their integration challenging. In our architecture, by combining a stream of raw sensor data and the event criteria defined by a user, we are able to detect the sensor data that are part of an event and stream these to the same BigQuery dataset. Let's do the following example:

  1. In the desktop environment of your VM, go to the OPC UA server and stop the simulation by clicking on the "Stop" button in the "Objects" tab:
    OPC UA Stop
  2. Back to BigQuery, you will now have a table measurements_raw_events where the outage of the sensor is captured in real-time for as long as the outage lasts:
    Events
    What about custom events? The event_definitions table allows a user to define custom events for all or specific devices and measured properties:
    Event Definitions
  3. Let's go back to the OPC UA GUI and start the simulation again, but this time change the flowrate to a value less that 10 to activate the "Low Flowrate" Event.
  4. As soon as we change the value, we can see the event rows being inserted to the events table in BigQuery as a separate event:
    Events
  5. Finally, querying the events_summary_view view enables the user to obtain a summary of the key metrics for each event:
    Events View

Apache Beam Pipelines

The first pipeline is intended to be the point-of-entry for the raw IoT data. The pipeline consists of the following components:

  • Sources:
    1. Pub/Sub topic with raw sensor data from FogLAMP (unbounded main-input)
    2. BigQuery table with "event frame" definitions (bounded side-input)
  • Format Pub/Sub messages to key/value pairs where they key is the IoT device-Id and the value is a BigQuery TableRow object
  • Process the key/value pairs through a stateful, looping timer. The timer expires after a user-defined duration when the @ProcessElement DoFn hasn't received any new elements for a given key, thus enabling the detection of devices that have gone silent and potentially lost function. Upon expiry, the @OnTimer DoFn resets the timer for that key and outputs a TableRow with the key / device-id.
  • The EventFilter method describes a ParDo with a TaggedOutput so that each element in the output PCollection is tagged indicating whether it is an event or not. It does so by comparing the key/value pairs of an element against the conditions defined in the side-input table from BigQuery, and if are satisfied, the event_type field is appended to the value of the elemenent and it is emitted by the @ProcessElement block with an event_measurements tag, whereas all measurements are emitted with the all_measurements tag. The elements from the looping timer when a sensor has gone "silent" are also outputted here with the event_measurements tag.
  • Sinks:
    1. The PCollection with the all_measurements tag is inserted to a BigQuery table containing all "raw" IoT sensor data
    2. The PCollection with the all_measurements tag is published to a Pub/Sub topic for downstream time-series processing
    3. The PCollection with the event_measurements tag is published to a Pub/Sub topic for downstream event processing

Looping Stateful Timer (1)

This pipeline is designed to process the sensor event data emitted from the first pipeline. The pipeline consists of the following components:

  • Sources:
    • Pub/Sub topic with sensor event data from the first pipeline
  • Format Pub/Sub messages to key/value pairs where they key is the IoT device-Id # event_type and the value is a BigQuery TableRow object
  • Process the key/value pairs through a stateful, looping timer. For every device-Id # event_type key, a timer and a random UUID event-Id are initialized and the event-Id is written to the ValueState interface. Every key/value pair of sensor events that are processed by the @ProcessElement DoFn reset the timer and read the current event-Id from the ValueState, which is appened as an event-Id field to the TableRow before being outputted. After a user-defined duration without new elements for a given device-Id # event_type key, the timer for that key expires and a new event-Id is written to the ValueState replacing the old value.
  • Sinks:
    • The PCollection is inserted to a BigQuery table containing all "event" IoT sensor data

Looping Stateful Timer (2)

The final pipeline is based on the Dataflow Timeseries Streaming library to compute metrics across several time periods, such as the relative strength index (RSI) and moving average (MA). The pipeline consists of the following components:

  • Sources:
    • Pub/Sub topic with formatted sensor data from the first pipeline
  • The custom method ParseTSDataPointFromPubSub transforms the Pub/Sub messages to the native TSDataPoint object of the Timeseries Library. The primary key is set to the device-Id, whereas the secondary key is set to the property_measured in each data point (e.g. mass density, temperature, etc.).
  • The GenerateComputations method of the TimeSeries Library is used to window the elements and compute the metrics declared in the pipeline options.
  • Finally, the custom method TSAccumToRowPivot parses the PCollection with the computated metric values into a Row object.
  • Sinks:
    • The PCollection is inserted to a BigQuery table containing all the timeseries-metrics data

About

Dataflow code for integration with GCP Core IoT and FogLamp

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published