From 473123accda141e506c64a59e3b4af32469ef03a Mon Sep 17 00:00:00 2001 From: Arnaud Rocher Date: Mon, 9 Jan 2023 13:19:13 +0100 Subject: [PATCH] feat: add source timestamps --- README.md | 1 + integration/smoke_test.sh | 12 +++- integration/tests.mongodb | 7 ++ src/db.rs | 132 +++++++++++++++++--------------------- src/opcua.rs | 55 ++++++++-------- src/variant.rs | 10 +-- 6 files changed, 104 insertions(+), 113 deletions(-) diff --git a/README.md b/README.md index 1220e679..1c50686e 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ Queries to MongoDB will use following parameters: For each data change notification received from the OPC-UA server, an update query will be issued to MongoDB on collection `data`, as a document comprising following fields: - `data`: mapping of tag names to their values; +- `sourceTimestamps`: mapping of tag names to to the timestamp of last value change; - `updatedAt`: MongoDB current date and time. ### Health diff --git a/integration/smoke_test.sh b/integration/smoke_test.sh index 8222898a..c41ccdc6 100755 --- a/integration/smoke_test.sh +++ b/integration/smoke_test.sh @@ -50,7 +50,7 @@ while :; do esac done -set -ex +set -eux # Create a fresh pki tree rm -r pki || true @@ -109,15 +109,21 @@ fi # Check result got=$(echo "$result" | jq '.theAnswer') want=42 -if [ "$got" -ne $want ]; then +if ! [ "$got" -eq $want ]; then die "Assert error for \"theAnswer\": want $want, got $got" fi got=$(echo "$result" | jq '.timeDiff') want=100 -if [ "$got" -gt $want ]; then +if ! [ "$got" -le $want ]; then die "Assert error for \"timeDiff\": want less than $want, got $got" fi +got=$(echo "$result" | jq '.sourceTimestampDiff') +want=0 +if ! [ "$got" -gt $want ]; then + die "Assert error for \"sourceTimestampDiff\": want more than $want, got $got" +fi + echo "🎉 success" teardown diff --git a/integration/tests.mongodb b/integration/tests.mongodb index a0e388be..a814e17e 100644 --- a/integration/tests.mongodb +++ b/integration/tests.mongodb @@ -11,6 +11,13 @@ result = db.data.findOne( { $dateFromString: { dateString: "$data.currentTime" } } ] } + }, + sourceTimestampDiff: { + $dateDiff: { + startDate: "$sourceTimestamps.theAnswer", + endDate: "$$NOW", + unit: "millisecond" + } } } ); diff --git a/src/db.rs b/src/db.rs index 4c5ad8c3..b14df155 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,22 +1,22 @@ use std::collections::HashMap; use std::fmt; use std::time::Duration; -use std::vec::IntoIter; use actix::prelude::*; use anyhow::{Context as _, Result}; use futures_util::FutureExt; -use mongodb::{ - bson::{self, doc, DateTime, Document}, - options::{ClientOptions, UpdateOptions}, - Client, Database, -}; +use mongodb::bson::{self, doc, DateTime, Document}; +use mongodb::options::{ClientOptions, UpdateOptions}; +use mongodb::{Client, Database}; use tracing::{debug, debug_span, error, info, Instrument}; use opcua_proxy::{DATABASE, OPCUA_DATA_COLL, OPCUA_HEALTH_COLL}; use crate::variant::Variant; +const VALUES_KEY: &str = "data"; +const TIMESTAMPS_KEY: &str = "sourceTimestamps"; + pub(crate) type DatabaseActorAddress = Addr; #[tracing::instrument(skip_all)] @@ -51,40 +51,27 @@ impl Actor for DatabaseActor { type Context = Context; } -pub(crate) struct DataChangeMessage(Vec<(String, Variant)>); - -impl IntoIterator for DataChangeMessage { - type Item = (String, Variant); - type IntoIter = IntoIter<(String, Variant)>; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl FromIterator<(String, Variant)> for DataChangeMessage { - fn from_iter>(iter: T) -> Self { - let mut c = Self(Vec::new()); - for i in iter { - c.0.push(i) - } - c - } +#[derive(Debug)] +struct DataValue { + value: Variant, + source_timestamp: DateTime, } -impl fmt::Display for DataChangeMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[ ")?; +#[derive(Debug)] +pub(crate) struct DataChangeMessage(HashMap); - let mut tag_values = self.0.iter().peekable(); - while let Some(tag_value) = tag_values.next() { - write!(f, "{}={}", tag_value.0, tag_value.1)?; - if tag_values.peek().is_some() { - write!(f, ", ")?; - } - } +impl DataChangeMessage { + pub(crate) fn with_capacity(cap: usize) -> Self { + Self(HashMap::with_capacity(cap)) + } - write!(f, " ]") + pub(crate) fn insert(&mut self, tag_name: String, value: Variant, source_millis: i64) { + let source_timestamp = DateTime::from_millis(source_millis); + let data_value = DataValue { + value, + source_timestamp, + }; + self.0.insert(tag_name, data_value); } } @@ -96,35 +83,52 @@ impl Handler for DatabaseActor { type Result = ResponseFuture<()>; fn handle(&mut self, msg: DataChangeMessage, _ctx: &mut Self::Context) -> Self::Result { - let message_display = msg.to_string(); let collection = self.db.collection::(OPCUA_DATA_COLL); let query = doc! { "_id": &self.partner_id }; - let update_data: HashMap = msg - .into_iter() - .map(|(k, v)| ("data.".to_owned() + k.as_str(), v)) + let values_map: HashMap = msg + .0 + .iter() + .map(|(tag_name, data_value)| { + ( + format!("{}.{}", VALUES_KEY, tag_name), + data_value.value.to_owned(), + ) + }) + .collect(); + let timestamps_map: HashMap = msg + .0 + .iter() + .map(|(tag_name, data_value)| { + ( + format!("{}.{}", TIMESTAMPS_KEY, tag_name), + data_value.source_timestamp, + ) + }) .collect(); let options = UpdateOptions::builder().upsert(true).build(); async move { - debug!(received = "msg", msg = message_display); - let update_data_doc = match bson::to_document(&update_data) { + debug!(event = "message received", ?msg); + let values_doc = match bson::to_document(&values_map) { Ok(doc) => doc, Err(err) => { - error!( - when = "encoding data update document", - err = err.to_string() - ); + error!(when = "encoding values document", %err); return; } }; - let update = doc! { - "$currentDate": { "updatedAt": true }, - "$set": update_data_doc, - }; - match collection.update_one(query, update, options).await { - Ok(_) => (), + let timestamps_doc = match bson::to_document(×tamps_map) { + Ok(doc) => doc, Err(err) => { - error!(when = "updating document", %err); + error!(when = "encoding timestamps document", %err); + return; } + }; + let update = vec![ + doc! { "$addFields": { "updatedAt": "$$NOW" } }, + doc! { "$addFields": values_doc }, + doc! { "$addFields": timestamps_doc }, + ]; + if let Err(err) = collection.update_one(query, update, options).await { + error!(when = "updating document", %err); } } .instrument(debug_span!("handle data change message")) @@ -162,7 +166,7 @@ impl Handler for DatabaseActor { }; let options = UpdateOptions::builder().upsert(true).build(); async move { - debug!(received = "msg", %msg); + debug!(event="message received", %msg); match collection.update_one(query, update, options).await { Ok(_) => (), Err(err) => { @@ -174,23 +178,3 @@ impl Handler for DatabaseActor { .boxed() } } - -#[cfg(test)] -mod tests { - use super::*; - - mod data_change_message { - use super::*; - use opcua::types::Variant as OpcUaVariant; - - #[test] - fn display() { - let dcm = DataChangeMessage(vec![ - ("first".into(), OpcUaVariant::from("a value").into()), - ("second".into(), OpcUaVariant::from(42).into()), - ("third".into(), OpcUaVariant::from(false).into()), - ]); - assert_eq!(dcm.to_string(), "[ first=a value, second=42, third=false ]") - } - } -} diff --git a/src/opcua.rs b/src/opcua.rs index 1a5c6a6a..d36d1b32 100644 --- a/src/opcua.rs +++ b/src/opcua.rs @@ -213,32 +213,33 @@ where let cloned_tag_set = tag_set.clone(); let data_change_callback = DataChangeCallback::new(move |monitored_items| { let _entered = info_span!("tags values change handler").entered(); - let message: DataChangeMessage = monitored_items - .iter() - .filter_map(|item| { - let node_id = &item.item_to_monitor().node_id; - let client_handle = item.client_handle(); - let value = &item.last_value().value; - if value.is_none() { - error!(%node_id, err="missing value"); - } - let index = usize::try_from(client_handle).unwrap() - 1; - let name_and_value = value.as_ref().and_then(|value| { - cloned_tag_set - .0 - .get(index) - .map(|tag| (tag.name.to_owned(), value.to_owned().into())) - }); - if name_and_value.is_none() { - error!( - %node_id, - client_handle, - err = "tag not found for client handle" - ); - } - name_and_value - }) - .collect(); + let mut message = DataChangeMessage::with_capacity(monitored_items.len()); + for item in monitored_items { + let node_id = &item.item_to_monitor().node_id; + let client_handle = item.client_handle(); + let index = usize::try_from(client_handle).unwrap() - 1; + let Some(tag) = cloned_tag_set.0.get(index) else { + error!(%node_id, client_handle, err="tag not found for client handle"); + continue; + }; + let Some(last_value) = &item.last_value().value else { + error!(%node_id, err="missing value"); + continue; + }; + let source_millis = item + .last_value() + .source_timestamp + .map(|dt| dt.as_chrono().timestamp_millis()); + let Some(source_millis) = source_millis else { + error!(%node_id, err="missing source timestamp"); + continue; + }; + message.insert( + tag.name.to_owned(), + last_value.to_owned().into(), + source_millis, + ) + } send_addr.do_send(message); }); @@ -253,7 +254,7 @@ where let results = session .create_monitored_items( subscription_id, - TimestampsToReturn::Neither, + TimestampsToReturn::Source, &items_to_create, ) .context("error creating monitored items")?; diff --git a/src/variant.rs b/src/variant.rs index 62e52d6d..49946ca5 100644 --- a/src/variant.rs +++ b/src/variant.rs @@ -1,5 +1,3 @@ -use std::fmt; - use opcua::types::{Array, Variant as OpcUaVariant}; use serde::ser::{self, Serialize, Serializer}; @@ -15,15 +13,9 @@ impl Serialize for Bytes { } /// Wraps [opcua::types::Variant] to provide custom, seamless serializing. -#[derive(Clone)] +#[derive(Debug, Clone)] pub(crate) struct Variant(OpcUaVariant); -impl fmt::Display for Variant { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - impl From for Variant { fn from(v: OpcUaVariant) -> Self { Self(v)