-
Notifications
You must be signed in to change notification settings - Fork 0
Case Study Smart City Application
<![endif]-->
In a city, we have two arrays of sensors, one controlled by the waste company which consists of fill level sensors in each waste container. The second set of sensors are several weather stations which measure several things, between other measurements, the stations measure temperature. The sensors send the sensor data using IoT connectivity such as Syfox, LowRa, M2M ETSI or 5G and the communication protocol MQTT into the corresponding servers. For the sake of the example, we have three stakeholders, the waste company, the city government and the citizen using a citizen app provided by a third party.
- docker-compose
- Mosquitto-clients (or any MQTT client)
- Curl (or any REST client)
- GIT client
- jq (optional, remove it from commands in the case is not installed)
To generate the scenario, we have created a docker-compose which starts three brokers, one agent, one Service Catalog, and one rest endpoint. We will explain this in detail later. To load the scenario by cloning the tutorial (currently only in dev branch):
git clone https://code.linksmart.eu/scm/la/tutorial.git
cd tutorial
And now start the scenario by:
docker-compose -f docker-compose.yml -f docker-compose.tutorial.yml up --abort-on-container-exit
If everything works, you should be able to subscribe to fill level sensors and temperature sensors by:
# subscribe to the temperature measured by the weather stations
mosquitto_sub -t 'LS/weatherStation/ws1/OGC/1.0/Datastreams/#' -p 1880
# subscribe to the fill level measured by the waste bins
mosquitto_sub -t 'LS/bin/bin1/OGC/1.0/Datastreams/#' -p 1881
Troubleshoot
The following ports must be free (not used by any other service) in the host network (the user computer): 1880 (weather-broker), 1881 (waste-broker), 1882 (city-broker), 1883 (ls-broker), 8082 (service catalog), 8319 (agent)
Now the city wants to know the average temperatures in the city using their infrastructure. To do so, they deploy following aggregation statement which gives us the average temperature for all five weather stations:
select avg(intResult) from Observation(datastream.id.toString() like 'tmp%').win:time(30 sec)
The full statement below:
{
"name": "average_temperature",
"statement": "select avg(intResult) from Observation(datastream.id.toString() like 'tmp%').win:time(30 sec) "
}
deploy by (or any REST client):
curl -X PUT \
http://localhost:8319/statement/average_temperature \
-H 'Content-Type: application/json' \
-d '{"name": "average_temperature" ,"statement": "select avg(intResult) from Observation(datastream.id.toString() like '\''tmp%'\'').win:time(30 sec) "}' -v | jq .
now you can subscribe to the average temperatures by:
mosquitto_sub -t LS/+/+/OGC/1.0/Datastreams/average_temperature
The queries are written in EPL this is the query language of ESPER (See Esper 7.1.0). The agent can load any CEP engine, but esper is the prefered one.
Now the waste management company wants to get an alert every time a bin is over 75%, to do so wrote following statement:
select datastream.thing.id from Observation(datastream.id.toString() like 'fill%' and intResult > 75)
This query will create a new measurement for each measurement over 75% with the corresponding bin ID.
The full statement below:
{
"name": "full_alert",
"statement": "select datastream.thing.id from Observation(datastream.id.toString() like 'fill%' and intResult> 75)"
}
Deploy by:
curl -X PUT \
http://localhost:8319/statement/full_alert \
-H 'Content-Type: application/json' \
-d '{"name": "full_alert" ,"statement": "select datastream.thing.id from Observation(datastream.id.toString() like '\''fill%'\'' and intResult> 75)"}' -v | jq .
now you can subscribe to the alerts by:
mosquitto_sub -t LS/+/+/OGC/1.0/Datastreams/full_alert
Now the waste management company wants approximately how much weight the bins have. They multiply the average weight to the fullness percentage
select datastream.thing.id as binID, intResult*10000 as weight from Observation(datastream.id.toString() like 'fill%')
The full statement below:
{
"name": "weight",
"statement": "select datastream.thing.id as binID, intResult*10000 as weight from Observation(datastream.id.toString() like 'fill%')"
}
deploy by:
curl -X PUT \
http://localhost:8319/statement/weight \
-H 'Content-Type: application/json' \
-d '{"name": "weight" ,"statement": "select datastream.thing.id as binID, intResult*10000 as weight from Observation(datastream.id.toString() like '\''fill%'\'')"}' -v | jq .
now you can subscribe to the alerts by:
mosquitto_sub -t LS/+/+/OGC/1.0/Datastreams/weight
The citizens have been complaining that some bins stink when it is too hot. Therefore, they ask the waste management company to pick them up earlier when the temperature is too hot. To do so, the city combines the weather and bin data. They check if the thermometer nearby a specific bin (bin1 in this example) is over 25° and if the bin is also over 50% full, they create an alert.
select 'bin1' as binID from Observation((datastream.id.toString() = 'tmp1' and intResult> 25 ) or ( datastream.id.toString() = 'fill1' and intResult> 50 )).win:time(1 sec) having count(*)=2
The full statement below:
{
"name": "stinky_bin" ,
"statement": "select 'bin1' as binID from Observation((datastream.id.toString() = 'tmp1' and intResult> 25 ) or ( datastream.id.toString() = 'fill1' and intResult> 50 )).win:time(1 sec) having count(*)=2"
}
Deploy by:
curl -X PUT \
http://localhost:8319/statement/stinky_bin \
-H 'Content-Type: application/json' \
-d '{
"name": "stinky_bin1" ,
"statement": "select '\''bin1'\'' as binID from Observation((datastream.id.toString() = '\''tmp1'\'' and intResult> 25 ) or ( datastream.id.toString() = '\''fill1'\'' and intResult> 50 )).win:time(1 sec) having count(*)=2"
}'
now you can subscribe to the alerts by:
mosquitto_sub -t LS/+/+/OGC/1.0/Datastreams/stinky_bin
Important note of the Agent IO
The agent in this tutorial is configured to collect, events from the waste and whether brokers. Additionally, an agent has one default broker output; for this tutorial was configured to the weather broker, which is exposed in the default MQTT unsecured port.
Now as already the note above explained, the events are arriving at the weather network and not to the city network. Now we query a second stinky bin doing as before.
select 'bin2' as binID from Observation((datastream.id.toString() = 'tmp2' and intResult> 25 ) or ( datastream.id.toString() = 'fill2' and intResult> 50 )).win:time(1 sec) having count(*)=2
But now we change the default output of the event to the city network by assign as target scope the city broker.
The full statement below:
{
"name": "stinky_bin_route",
"statement": "select 'bin2' as binID from Observation((datastream.id.toString() = 'tmp2' and intResult> 25 ) or ( datastream.id.toString() = 'fill2' and intResult> 50 )).win:time(1 sec) having count(*)=2",
"scope": [
"city"
]
}
In this tutorial, we used Service catalog as the information point for the Services. This means we resolve the endpoints using aliases as IDs to located the service endpoints which are stored in the Service Catalog. Therefore, when a scope is defined, and there is a Service Catalog deploy, the Agent will search for the endpoint using the alias in the Service Catalog. Alternatively, the resolution of the aliases can be done by the configuration of the Agent using its configuration file.
deploy by:
curl -X PUT \
http://localhost:8319/statement/stinky_bin_route \
-H 'Content-Type: application/json' \
-d '{"name": "stinky_bin_route" ,"statement": "select '\''bin2'\'' as binID from Observation((datastream.id.toString() = '\''tmp2'\'' and intResult> 25 ) or ( datastream.id.toString() = '\''fill2'\'' and intResult> 50 )).win:time(1 sec) having count(*)=2","scope":["city"]}' -v | jq .
now you can subscribe to the alerts by:
mosquitto_sub -p 1882 -t LS/+/+/OGC/1.0/Datastreams/stinky_bin_route
Important note of the Agent IO
The output topic is autogenerated using LinkSmart® specification. For OGC Observations (what we are using in this examples) the topic is LS///OGC/1.0/ . The base topic for generating the events can be changed using the configuration file or environmental variables.
Now let's imagine that the city has different topic structure, therefore want to change the default output topic. Using similar query like the example before.
select 'bin3' as binID from Observation((datastream.id.toString() = 'tmp3' and intResult> 25 ) or ( datastream.id.toString() = 'fill3' and intResult> 50 )).win:time(1 sec) having count(*)=2
Now we change the default output topic of the event sent to the city broker, to do so, we set the output to the desired topic.
The full statement below:
{
"name": "stinky_bin_route2",
"statement": "select 'bin3' as binID from Observation((datastream.id.toString() = 'tmp3' and intResult> 25 ) or ( datastream.id.toString() = 'fill3' and intResult> 50 )).win:time(1 sec) having count(*)=2",
"scope": [
"city"
],
"output": [
"LS/my/topic"
]
}
deploy by:
curl -X PUT \
http://localhost:8319/statement/stinky_bin_route2 \
-H 'Content-Type: application/json' \
-d '{"name": "stinky_bin_route2" ,"statement": "select '\''bin3'\'' as binID from Observation((datastream.id.toString() = '\''tmp3'\'' and intResult> 25 ) or ( datastream.id.toString() = '\''fill3'\'' and intResult> 50 )).win:time(1 sec) having count(*)=2","scope":["city"], "output":["LS/my/topic"]}' -v | jq .
now you can subscribe to the alerts by:
mosquitto_sub -p 1882 -t LS/my/topic
Important note of the Agent IO
The default output payload in the Agent is OGC Observation. The agent supports OGC Observation, SenML, and RAW types. The RAW time can accept and generate any JSON event which based in an object. The default output can be changed using the configuration file or environmental variables
Now let's imagine that the city has different topic structure and also the follow another payload standard, therefore want to change the default output topic and payload. Using a similar query like the example before.
select 'bin4' as binID from Observation((datastream.id.toString() = 'tmp4' and intResult> 25 ) or ( datastream.id.toString() = 'fill4' and intResult> 50 )).win:time(1 sec) having count(*)=2
Now we change the default output topic of the event sent to the city broker, to do so, we set the output to the desired topic.
The full statement below:
{
"name": "stinky_bin_route_translate",
"statement": "select 'bin4' as binID from Observation((datastream.id.toString() = 'tmp4' and intResult> 25 ) or ( datastream.id.toString() = 'fill4' and intResult> 50 )).win:time(1 sec) having count(*)=2",
"scope": [
"city"
],
"output": [
"LS/DPA/1/SenML/10/Event/stinky_bin_route_translate"
],
"resultType": "SenML"
}
deploy by:
curl -X PUT \
http://localhost:8319/statement/stinky_bin_route_translate \
-H 'Content-Type: application/json' \
-d '{"name": "stinky_bin_route_translate" ,"statement": "select '\''bin4'\'' as binID from Observation((datastream.id.toString() = '\''tmp4'\'' and intResult> 25 ) or ( datastream.id.toString() = '\''fill4'\'' and intResult> 50 )).win:time(1 sec) having count(*)=2","scope":["city"],"output":["LS/DPA/1/SenML/10/Event/stinky_bin_route_translate"],"resultType":"SenML"}' -v | jq .
now you can subscribe to the alerts by:
mosquitto_sub -p 1882 -t LS/DPA/1/SenML/10/Event/stinky_bin_route_translate
Now imagine that the city does not follow any standard. Then we need to free configured the output. Using similar query as before
select 'bin5' as binID from Observation((datastream.id.toString() = 'tmp5' and intResult> 25 ) or ( datastream.id.toString() = 'fill5' and intResult> 50 )).win:time(1 sec) having count(*)=2
Now we want a flat map structure as an output. The query below, change the output, broker, and will generate the event as a map with properties name the column name.
The full statement below:
{
"name": "stinky_bin_route_transform",
"statement": "select 'bin5' as binID from Observation((datastream.id.toString() = 'tmp5' and intResult> 25 ) or ( datastream.id.toString() = 'fill5' and intResult> 50 )).win:time(1 sec) having count(*)=2",
"scope": [
"city"
],
"output": [
"LS/DPA/1/RAW/0/RAW/stinky_bin"
],
"resultType": "none"
}
To do so, we deploy:
curl -X PUT \
http://localhost:8319/statement/stinky_bin_route_transform \
-H 'Content-Type: application/json' \
-d '{"name": "stinky_bin_route_transform","statement": "select '\''bin5'\'' as binID from Observation((datastream.id.toString() = '\''tmp5'\'' and intResult> 25 ) or ( datastream.id.toString() = '\''fill5'\'' and intResult> 50 )).win:time(1 sec) having count(*)=2","scope":["city"],"output":["LS/DPA/1/RAW/0/RAW/stinky_bin"],
"resultType":"none"}' -v | jq .
now you can subscribe to the alerts by:
mosquitto_sub -p 1882 -t LS/DPA/1/RAW/0/RAW/stinky_bin
Now imagine that the city wants to send all events to another backend which uses rest for the citizen application. To do so, first we need to query all events and select them as they are:
select event from Observation as event
We indicate the alias of the endpoint and API Key Name and the path to send as scope and output respectively. Then we indicate that the output is a REST_POST
In case of REST endpoints, the key name must be given alongside the scope alias separate by ':' (:), i.e., appbackend:post2pub.
The full statement below:
{
"name": "protocol_translate",
"statement": "select event from Observation as event",
"scope": [
"appbackend:post2pub"
],
"output": [
"routed"
],
"publisher": "REST_POST"
}
curl -X PUT \
http://localhost:8319/statement/protocol_translate \
-H 'Content-Type: application/json' \
-d '{"name": "protocol_translate" ,"statement": "select event from Observation as event","scope":["appbackend"],"output":["routed"],"publisher":"REST_POST"}' -v | jq .
To be able to show the output the endpoint publish in the weather broker to see the output then subscribe to:
mosquitto_sub -t routed
The agent can 'publish' data using MQTT PUB, REST POST, and REST PUT. Also by REST GET and email, both special cases. REST GET instead of sending an event, every time the query is triggered the agent will GET from an endpoint data and inserting it in the CEP engine. The email is a new feature and is handled differently than the other methods.
Originally written by José Ángel Carvajal Soto.