Skip to content

Commit

Permalink
ci(bors): merge #162
Browse files Browse the repository at this point in the history
162: feat: add source timestamps r=cailloumajor a=cailloumajor



Co-authored-by: Arnaud Rocher <[email protected]>
  • Loading branch information
bors[bot] and cailloumajor authored Jan 10, 2023
2 parents b80c7f5 + 473123a commit e32bfe3
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 113 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Queries to MongoDB will use following parameters:
For each data change notification received from the OPC-UA server, an update query will be issued to MongoDB on collection `data`, as a document comprising following fields:

- `data`: mapping of tag names to their values;
- `sourceTimestamps`: mapping of tag names to to the timestamp of last value change;
- `updatedAt`: MongoDB current date and time.

### Health
Expand Down
12 changes: 9 additions & 3 deletions integration/smoke_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ while :; do
esac
done

set -ex
set -eux

# Create a fresh pki tree
rm -r pki || true
Expand Down Expand Up @@ -109,15 +109,21 @@ fi
# Check result
got=$(echo "$result" | jq '.theAnswer')
want=42
if [ "$got" -ne $want ]; then
if ! [ "$got" -eq $want ]; then
die "Assert error for \"theAnswer\": want $want, got $got"
fi

got=$(echo "$result" | jq '.timeDiff')
want=100
if [ "$got" -gt $want ]; then
if ! [ "$got" -le $want ]; then
die "Assert error for \"timeDiff\": want less than $want, got $got"
fi

got=$(echo "$result" | jq '.sourceTimestampDiff')
want=0
if ! [ "$got" -gt $want ]; then
die "Assert error for \"sourceTimestampDiff\": want more than $want, got $got"
fi

echo "🎉 success"
teardown
7 changes: 7 additions & 0 deletions integration/tests.mongodb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ result = db.data.findOne(
{ $dateFromString: { dateString: "$data.currentTime" } }
]
}
},
sourceTimestampDiff: {
$dateDiff: {
startDate: "$sourceTimestamps.theAnswer",
endDate: "$$NOW",
unit: "millisecond"
}
}
}
);
Expand Down
132 changes: 58 additions & 74 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::collections::HashMap;
use std::fmt;
use std::time::Duration;
use std::vec::IntoIter;

use actix::prelude::*;
use anyhow::{Context as _, Result};
use futures_util::FutureExt;
use mongodb::{
bson::{self, doc, DateTime, Document},
options::{ClientOptions, UpdateOptions},
Client, Database,
};
use mongodb::bson::{self, doc, DateTime, Document};
use mongodb::options::{ClientOptions, UpdateOptions};
use mongodb::{Client, Database};
use tracing::{debug, debug_span, error, info, Instrument};

use opcua_proxy::{DATABASE, OPCUA_DATA_COLL, OPCUA_HEALTH_COLL};

use crate::variant::Variant;

const VALUES_KEY: &str = "data";
const TIMESTAMPS_KEY: &str = "sourceTimestamps";

pub(crate) type DatabaseActorAddress = Addr<DatabaseActor>;

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -51,40 +51,27 @@ impl Actor for DatabaseActor {
type Context = Context<Self>;
}

pub(crate) struct DataChangeMessage(Vec<(String, Variant)>);

impl IntoIterator for DataChangeMessage {
type Item = (String, Variant);
type IntoIter = IntoIter<(String, Variant)>;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

impl FromIterator<(String, Variant)> for DataChangeMessage {
fn from_iter<T: IntoIterator<Item = (String, Variant)>>(iter: T) -> Self {
let mut c = Self(Vec::new());
for i in iter {
c.0.push(i)
}
c
}
#[derive(Debug)]
struct DataValue {
value: Variant,
source_timestamp: DateTime,
}

impl fmt::Display for DataChangeMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[ ")?;
#[derive(Debug)]
pub(crate) struct DataChangeMessage(HashMap<String, DataValue>);

let mut tag_values = self.0.iter().peekable();
while let Some(tag_value) = tag_values.next() {
write!(f, "{}={}", tag_value.0, tag_value.1)?;
if tag_values.peek().is_some() {
write!(f, ", ")?;
}
}
impl DataChangeMessage {
pub(crate) fn with_capacity(cap: usize) -> Self {
Self(HashMap::with_capacity(cap))
}

write!(f, " ]")
pub(crate) fn insert(&mut self, tag_name: String, value: Variant, source_millis: i64) {
let source_timestamp = DateTime::from_millis(source_millis);
let data_value = DataValue {
value,
source_timestamp,
};
self.0.insert(tag_name, data_value);
}
}

Expand All @@ -96,35 +83,52 @@ impl Handler<DataChangeMessage> for DatabaseActor {
type Result = ResponseFuture<()>;

fn handle(&mut self, msg: DataChangeMessage, _ctx: &mut Self::Context) -> Self::Result {
let message_display = msg.to_string();
let collection = self.db.collection::<Document>(OPCUA_DATA_COLL);
let query = doc! { "_id": &self.partner_id };
let update_data: HashMap<String, Variant> = msg
.into_iter()
.map(|(k, v)| ("data.".to_owned() + k.as_str(), v))
let values_map: HashMap<String, Variant> = msg
.0
.iter()
.map(|(tag_name, data_value)| {
(
format!("{}.{}", VALUES_KEY, tag_name),
data_value.value.to_owned(),
)
})
.collect();
let timestamps_map: HashMap<String, DateTime> = msg
.0
.iter()
.map(|(tag_name, data_value)| {
(
format!("{}.{}", TIMESTAMPS_KEY, tag_name),
data_value.source_timestamp,
)
})
.collect();
let options = UpdateOptions::builder().upsert(true).build();
async move {
debug!(received = "msg", msg = message_display);
let update_data_doc = match bson::to_document(&update_data) {
debug!(event = "message received", ?msg);
let values_doc = match bson::to_document(&values_map) {
Ok(doc) => doc,
Err(err) => {
error!(
when = "encoding data update document",
err = err.to_string()
);
error!(when = "encoding values document", %err);
return;
}
};
let update = doc! {
"$currentDate": { "updatedAt": true },
"$set": update_data_doc,
};
match collection.update_one(query, update, options).await {
Ok(_) => (),
let timestamps_doc = match bson::to_document(&timestamps_map) {
Ok(doc) => doc,
Err(err) => {
error!(when = "updating document", %err);
error!(when = "encoding timestamps document", %err);
return;
}
};
let update = vec![
doc! { "$addFields": { "updatedAt": "$$NOW" } },
doc! { "$addFields": values_doc },
doc! { "$addFields": timestamps_doc },
];
if let Err(err) = collection.update_one(query, update, options).await {
error!(when = "updating document", %err);
}
}
.instrument(debug_span!("handle data change message"))
Expand Down Expand Up @@ -162,7 +166,7 @@ impl Handler<HealthMessage> for DatabaseActor {
};
let options = UpdateOptions::builder().upsert(true).build();
async move {
debug!(received = "msg", %msg);
debug!(event="message received", %msg);
match collection.update_one(query, update, options).await {
Ok(_) => (),
Err(err) => {
Expand All @@ -174,23 +178,3 @@ impl Handler<HealthMessage> for DatabaseActor {
.boxed()
}
}

#[cfg(test)]
mod tests {
use super::*;

mod data_change_message {
use super::*;
use opcua::types::Variant as OpcUaVariant;

#[test]
fn display() {
let dcm = DataChangeMessage(vec![
("first".into(), OpcUaVariant::from("a value").into()),
("second".into(), OpcUaVariant::from(42).into()),
("third".into(), OpcUaVariant::from(false).into()),
]);
assert_eq!(dcm.to_string(), "[ first=a value, second=42, third=false ]")
}
}
}
55 changes: 28 additions & 27 deletions src/opcua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,32 +213,33 @@ where
let cloned_tag_set = tag_set.clone();
let data_change_callback = DataChangeCallback::new(move |monitored_items| {
let _entered = info_span!("tags values change handler").entered();
let message: DataChangeMessage = monitored_items
.iter()
.filter_map(|item| {
let node_id = &item.item_to_monitor().node_id;
let client_handle = item.client_handle();
let value = &item.last_value().value;
if value.is_none() {
error!(%node_id, err="missing value");
}
let index = usize::try_from(client_handle).unwrap() - 1;
let name_and_value = value.as_ref().and_then(|value| {
cloned_tag_set
.0
.get(index)
.map(|tag| (tag.name.to_owned(), value.to_owned().into()))
});
if name_and_value.is_none() {
error!(
%node_id,
client_handle,
err = "tag not found for client handle"
);
}
name_and_value
})
.collect();
let mut message = DataChangeMessage::with_capacity(monitored_items.len());
for item in monitored_items {
let node_id = &item.item_to_monitor().node_id;
let client_handle = item.client_handle();
let index = usize::try_from(client_handle).unwrap() - 1;
let Some(tag) = cloned_tag_set.0.get(index) else {
error!(%node_id, client_handle, err="tag not found for client handle");
continue;
};
let Some(last_value) = &item.last_value().value else {
error!(%node_id, err="missing value");
continue;
};
let source_millis = item
.last_value()
.source_timestamp
.map(|dt| dt.as_chrono().timestamp_millis());
let Some(source_millis) = source_millis else {
error!(%node_id, err="missing source timestamp");
continue;
};
message.insert(
tag.name.to_owned(),
last_value.to_owned().into(),
source_millis,
)
}
send_addr.do_send(message);
});

Expand All @@ -253,7 +254,7 @@ where
let results = session
.create_monitored_items(
subscription_id,
TimestampsToReturn::Neither,
TimestampsToReturn::Source,
&items_to_create,
)
.context("error creating monitored items")?;
Expand Down
10 changes: 1 addition & 9 deletions src/variant.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::fmt;

use opcua::types::{Array, Variant as OpcUaVariant};
use serde::ser::{self, Serialize, Serializer};

Expand All @@ -15,15 +13,9 @@ impl Serialize for Bytes {
}

/// Wraps [opcua::types::Variant] to provide custom, seamless serializing.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub(crate) struct Variant(OpcUaVariant);

impl fmt::Display for Variant {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

impl From<OpcUaVariant> for Variant {
fn from(v: OpcUaVariant) -> Self {
Self(v)
Expand Down

0 comments on commit e32bfe3

Please sign in to comment.