diff --git a/Changelog.md b/Changelog.md index 2cd5a67..dc1bd8f 100644 --- a/Changelog.md +++ b/Changelog.md @@ -11,7 +11,7 @@ Change history * **Version 1.3.3.0 (2024-03-03)**: Fixed conversion issues. Fixes https://github.com/SeppPenner/SparkplugNet/issues/64, https://github.com/SeppPenner/SparkplugNet/issues/70, https://github.com/SeppPenner/SparkplugNet/issues/71, https://github.com/SeppPenner/SparkplugNet/issues/72. Special thanks go to https://github.com/shouidar, who really helped me with the conversion of all the array types. * **Version 1.3.2.0 (2024-02-10)**: Fixed conversion for data types (Hopefully): https://github.com/SeppPenner/SparkplugNet/issues/63, https://github.com/SeppPenner/SparkplugNet/issues/64, https://github.com/SeppPenner/SparkplugNet/issues/66. Updated some NuGet packages. * **Version 1.3.1.0 (2024-01-05)**: Removed validation for incoming metrics on application (Still needs review). -* **Version 1.3.0.0 (2023-12-18)**: Removed support for NetCore3.1, added support for Net8.0, updated NuGet packages, tried to get towards 3.0 compatibility, updated supported data types and adjusted data type handling (Use carefully, not yet fully tested). +* **Version 1.3.0.0 (2023-12-18)**: Removed support for NetCore3.1, added support for net9.0, updated NuGet packages, tried to get towards 3.0 compatibility, updated supported data types and adjusted data type handling (Use carefully, not yet fully tested). * **Version 1.2.0.0 (2022-11-20)**: Updated nuget packages, removed support for Net5.0, added support for Net7.0, fixed a bug with the device storage (Thanks to @Patrick2607, https://github.com/SeppPenner/SparkplugNet/pull/31). * **Version 1.1.1.0 (2022-10-30)** : Fixed a bug within the metric conversion (Thanks to @49564B0F, https://github.com/SeppPenner/SparkplugNet/issues/30), fixed a bug with the device metrics on publishing (Thanks to @emmanuelbertho, https://github.com/SeppPenner/SparkplugNet/issues/28), unified even more things (Thanks to @Patrick2607, https://github.com/SeppPenner/SparkplugNet/pull/29), updated NuGet packages. * **Version 1.1.0.0 (2022-10-04)** : Updated events (Thanks to @cjmurph, https://github.com/SeppPenner/SparkplugNet/pull/19), added abstract base classes (Thanks to @BoBiene, https://github.com/SeppPenner/SparkplugNet/pull/20), introducing interfaces, unified things, rework to async events (Thanks to @BoBiene, https://github.com/SeppPenner/SparkplugNet/pull/21), fixed version B metric conversion error (Thanks to @Patrick2607, https://github.com/SeppPenner/SparkplugNet/pull/26), updated NuGet packages. diff --git a/src/SparkplugNet.Examples/SparkplugNet.Examples.csproj b/src/SparkplugNet.Examples/SparkplugNet.Examples.csproj index f34c804..3723c94 100644 --- a/src/SparkplugNet.Examples/SparkplugNet.Examples.csproj +++ b/src/SparkplugNet.Examples/SparkplugNet.Examples.csproj @@ -2,7 +2,7 @@ Exe - net8.0 + net9.0 latest enable enable diff --git a/src/SparkplugNet.Tests/SparkplugNet.Tests.csproj b/src/SparkplugNet.Tests/SparkplugNet.Tests.csproj index 9d3560d..9d0639a 100644 --- a/src/SparkplugNet.Tests/SparkplugNet.Tests.csproj +++ b/src/SparkplugNet.Tests/SparkplugNet.Tests.csproj @@ -2,7 +2,7 @@ Library - net8.0 + net9.0 latest enable enable diff --git a/src/SparkplugNet.sln b/src/SparkplugNet.sln index 69a4139..b041d41 100644 --- a/src/SparkplugNet.sln +++ b/src/SparkplugNet.sln @@ -1,4 +1,4 @@ - + Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.1.32210.238 diff --git a/src/SparkplugNet.sln.DotSettings b/src/SparkplugNet.sln.DotSettings index e8c8224..ba5f49a 100644 --- a/src/SparkplugNet.sln.DotSettings +++ b/src/SparkplugNet.sln.DotSettings @@ -1,4 +1,4 @@ - + True True True diff --git a/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs b/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs index c140274..2ad7f4a 100644 --- a/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs +++ b/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs @@ -47,14 +47,9 @@ public SparkplugApplicationBase( } /// - /// Gets the node states. + /// Get the group states. /// - public ConcurrentDictionary> NodeStates { get; } = new(); - - /// - /// Gets the device states. - /// - public ConcurrentDictionary> DeviceStates { get; } = new(); + public ConcurrentDictionary> GroupStates { get; } = new(); /// /// Gets the options. @@ -85,8 +80,7 @@ public async Task Start(SparkplugApplicationOptions applicationOptions) } // Clear states. - this.NodeStates.Clear(); - this.DeviceStates.Clear(); + this.GroupStates.Clear(); // Add handlers. this.AddEventHandlers(); @@ -364,7 +358,7 @@ private async Task ConnectInternal() } else { - builder.WithWebSocketServer(options => + builder.WithWebSocketServer(options => options.WithCookieContainer(this.Options.MqttWebSocketOptions.CookieContainer) .WithCookieContainer(this.Options.MqttWebSocketOptions.Credentials) .WithProxyOptions(this.Options.MqttWebSocketOptions.ProxyOptions) @@ -450,11 +444,12 @@ private async Task SubscribeInternal() /// The metric state. private void UpdateMetricState(SparkplugMetricStatus metricState) { - var keys = new List(this.NodeStates.Keys.ToList()); - - foreach (string key in keys) + foreach (var group in this.GroupStates) { - this.NodeStates[key].MetricStatus = metricState; + foreach (var node in group.Value.NodeStates) + { + node.Value.MetricStatus = metricState; + } } } diff --git a/src/SparkplugNet/Core/GroupState .cs b/src/SparkplugNet/Core/GroupState .cs new file mode 100644 index 0000000..5088007 --- /dev/null +++ b/src/SparkplugNet/Core/GroupState .cs @@ -0,0 +1,22 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// The project is licensed under the MIT license. +// +// +// A state class for the metrics. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace SparkplugNet.Core; + +/// +/// The group state class. +/// +/// The type parameter. +public sealed class GroupState where T : IMetric, new() +{ + /// + /// Get the device states. + /// + public ConcurrentDictionary> NodeStates { get; } = new(); +} diff --git a/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs b/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs index d96c248..18ca17f 100644 --- a/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs +++ b/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs @@ -672,7 +672,7 @@ private MqttApplicationMessage GetSparkplugNodeBirthB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeBirth); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -756,7 +756,7 @@ private MqttApplicationMessage GetSparkplugDeviceBirthB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceBirth); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -827,7 +827,7 @@ private MqttApplicationMessage GetSparkplugNodeDeathB( Metrics = metrics.ToList() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeDeath); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -911,7 +911,7 @@ private MqttApplicationMessage GetSparkplugDeviceDeathB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceDeath); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -991,7 +991,7 @@ private MqttApplicationMessage GetSparkplugNodeDataB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeData); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -1075,7 +1075,7 @@ private MqttApplicationMessage GetSparkplugDeviceDataB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceData); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -1154,7 +1154,7 @@ private static MqttApplicationMessage GetSparkplugNodeCommandB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeCommand); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -1236,7 +1236,7 @@ private static MqttApplicationMessage GetSparkplugDeviceCommandB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceCommand); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() diff --git a/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs b/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs index 0621bd2..b4a7485 100644 --- a/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs +++ b/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs @@ -61,16 +61,6 @@ internal static string GetDeviceCommandSubscribeTopic(SparkplugNamespace nameSpa return $"{nameSpace.GetDescription()}/{groupIdentifier}/{SparkplugMessageType.DeviceCommand.GetDescription()}/{edgeNodeIdentifier}/{deviceIdentifier}"; } - /// - /// Gets state subscription topic. - /// - /// The SCADA host identifier. - /// The state subscription topic . - internal static string GetStateSubscribeTopic(string scadaHostIdentifier) - { - return $"{SparkplugMessageType.StateMessage.GetDescription()}/{scadaHostIdentifier}"; - } - /// /// Gets the topic (Except STATE messages). /// diff --git a/src/SparkplugNet/Core/MetricState.cs b/src/SparkplugNet/Core/MetricState.cs index 104919d..b2722d5 100644 --- a/src/SparkplugNet/Core/MetricState.cs +++ b/src/SparkplugNet/Core/MetricState.cs @@ -13,7 +13,7 @@ namespace SparkplugNet.Core; /// The metric state class. /// /// The type parameter. -public sealed class MetricState where T : IMetric, new() +public class MetricState where T : IMetric, new() { /// /// Gets or sets the metric status. diff --git a/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs b/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs index 27df22d..76b345e 100644 --- a/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs +++ b/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs @@ -86,7 +86,11 @@ public async Task Start(SparkplugNodeOptions nodeOptions, KnownMetricStorage? kn // Connect, subscribe to incoming messages and send a state message. await this.ConnectInternal(); await this.SubscribeInternal(); - await this.PublishNodeAndDeviceBirthsInternal(); + + if (string.IsNullOrEmpty(this.Options.ScadaHostIdentifier)) + { + await this.PublishNodeAndDeviceBirthsInternal(); + } } /// @@ -124,14 +128,11 @@ public async Task PublishMetrics(IEnumerable metrics } /// - /// Does a node rebirth. + /// Does a node birth. /// /// The new metrics. - public async Task Rebirth(IEnumerable metrics) + public async Task Birth(IEnumerable metrics) { - // Send node death first. - await this.SendNodeDeathMessage(); - // Reset the known metrics. this.knownMetrics = new KnownMetricStorage(metrics); @@ -139,6 +140,18 @@ public async Task Rebirth(IEnumerable metrics) await this.PublishNodeAndDeviceBirthsInternal(); } + /// + /// Does a node rebirth. + /// + /// The new metrics. + public async Task Rebirth(IEnumerable metrics) + { + // Send node death first. + await this.SendNodeDeathMessage(); + + await this.Birth(metrics); + } + /// /// Publishes metrics for a node. /// @@ -403,7 +416,8 @@ private async Task SubscribeInternal() await this.client.SubscribeAsync(deviceCommandSubscribeTopic, (MqttQualityOfServiceLevel)SparkplugQualityOfServiceLevel.AtLeastOnce); // Subscribe to the state topic. - var stateSubscribeTopic = SparkplugTopicGenerator.GetStateSubscribeTopic(this.Options.ScadaHostIdentifier); + //var stateSubscribeTopic = SparkplugTopicGenerator.GetStateSubscribeTopic(this.Options.ScadaHostIdentifier); + var stateSubscribeTopic = SparkplugTopicGenerator.GetSparkplugStateMessageTopic(this.Options.ScadaHostIdentifier, this.specificationVersion); await this.client.SubscribeAsync(stateSubscribeTopic, (MqttQualityOfServiceLevel)SparkplugQualityOfServiceLevel.AtLeastOnce); } diff --git a/src/SparkplugNet/Core/NodeState .cs b/src/SparkplugNet/Core/NodeState .cs new file mode 100644 index 0000000..e0816b9 --- /dev/null +++ b/src/SparkplugNet/Core/NodeState .cs @@ -0,0 +1,22 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// The project is licensed under the MIT license. +// +// +// A state class for the metrics. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace SparkplugNet.Core; + +/// +/// The node state class. +/// +/// The type parameter. +public sealed class NodeState : MetricState where T : IMetric, new() +{ + /// + /// Get the device states. + /// + public ConcurrentDictionary> DeviceStates { get; set; } = new(); +} diff --git a/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs b/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs index 83590ed..9244f49 100644 --- a/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs +++ b/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs @@ -313,5 +313,11 @@ private void AddVersionBMetric(T metric, Metric versionBMetric) this.knownMetricsByName[metric.Name] = metric; } } + + /// + /// Return the known metrics by name. + /// + /// + public ConcurrentDictionary GetKnownMetricsByName() { return this.knownMetricsByName; } } } diff --git a/src/SparkplugNet/Core/SparkplugBase.cs b/src/SparkplugNet/Core/SparkplugBase.cs index 7e0b7da..a3aa994 100644 --- a/src/SparkplugNet/Core/SparkplugBase.cs +++ b/src/SparkplugNet/Core/SparkplugBase.cs @@ -17,6 +17,11 @@ namespace SparkplugNet.Core; /// public partial class SparkplugBase : ISparkplugConnection where T : IMetric, new() { + /// + /// The sparkplug specification version. + /// + internal readonly SparkplugSpecificationVersion specificationVersion; + /// /// The message generator. /// @@ -54,6 +59,7 @@ public SparkplugBase(IEnumerable knownMetrics, SparkplugSpecificationVersion /// public SparkplugBase(KnownMetricStorage knownMetricsStorage, SparkplugSpecificationVersion specificationVersion) { + this.specificationVersion = specificationVersion; this.knownMetrics = knownMetricsStorage; if (typeof(T).IsAssignableFrom(typeof(VersionAData.KuraMetric))) diff --git a/src/SparkplugNet/SparkplugNet.csproj b/src/SparkplugNet/SparkplugNet.csproj index 4774d34..dc4f562 100644 --- a/src/SparkplugNet/SparkplugNet.csproj +++ b/src/SparkplugNet/SparkplugNet.csproj @@ -1,7 +1,7 @@ - net6.0;net8.0 + net9.0 SparkplugNet SparkplugNet true @@ -28,13 +28,10 @@ NU1803,CS0618,CS0809,NU1901,NU1902 true all + Debug;Release;p1600sedac;p1000edge;p1600sedacedge;p1800epsi;p1000master;p0501betafencepll - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - all diff --git a/src/SparkplugNet/ValueClasses.cd b/src/SparkplugNet/ValueClasses.cd index 0e00271..384c37b 100644 --- a/src/SparkplugNet/ValueClasses.cd +++ b/src/SparkplugNet/ValueClasses.cd @@ -1,4 +1,4 @@ - + diff --git a/src/SparkplugNet/VersionA/SparkplugApplication.cs b/src/SparkplugNet/VersionA/SparkplugApplication.cs index 1f76637..df6a564 100644 --- a/src/SparkplugNet/VersionA/SparkplugApplication.cs +++ b/src/SparkplugNet/VersionA/SparkplugApplication.cs @@ -222,30 +222,51 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi /// The topic. /// The metrics. /// The metric status. - /// Thrown if the edge node identifier is invalid. + /// Thrown if any identifier is invalid. /// Thrown if the metric cast is invalid. private IEnumerable ProcessPayload( SparkplugMessageTopic topic, List metrics, SparkplugMetricStatus metricStatus) { - var metricState = new MetricState + // Check group id. + if (string.IsNullOrWhiteSpace(topic.GroupIdentifier)) + { + throw new InvalidOperationException($"The group identifier is invalid {topic.GroupIdentifier}."); + } + + if (!this.GroupStates.ContainsKey(topic.GroupIdentifier)) + { + this.GroupStates[topic.GroupIdentifier] = new GroupState(); + } + + NodeState metricState = new() { MetricStatus = metricStatus }; + // Check node id. + if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier)) + { + throw new InvalidOperationException($"The edge node identifier is invalid {topic.EdgeNodeIdentifier}."); + } + if (!string.IsNullOrWhiteSpace(topic.DeviceIdentifier)) { - if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier)) + if (!this.GroupStates[topic.GroupIdentifier].NodeStates.ContainsKey(topic.EdgeNodeIdentifier)) { - throw new InvalidOperationException($"The edge node identifier is invalid for device {topic.DeviceIdentifier}."); + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] = metricState; } - this.DeviceStates[$"{topic.EdgeNodeIdentifier}/{topic.DeviceIdentifier}"] = metricState; + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] + .DeviceStates[topic.DeviceIdentifier] = metricState; } else { - this.NodeStates[topic.EdgeNodeIdentifier] = metricState; + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] = metricState; } foreach (var payloadMetric in metrics) diff --git a/src/SparkplugNet/VersionB/PayloadConverter.cs b/src/SparkplugNet/VersionB/PayloadConverter.cs index fcc2242..0229d7b 100644 --- a/src/SparkplugNet/VersionB/PayloadConverter.cs +++ b/src/SparkplugNet/VersionB/PayloadConverter.cs @@ -18,39 +18,56 @@ internal static class PayloadConverter /// Gets the version B payload converted from the ProtoBuf payload. /// /// The . + /// /// The . - public static Payload ConvertVersionBPayload(VersionBProtoBuf.ProtoBufPayload protoPayload) + public static Payload ConvertVersionBPayload(VersionBProtoBuf.ProtoBufPayload protoPayload, ConcurrentDictionary? metrics) => new() { Body = protoPayload.Body, - Metrics = protoPayload.Metrics.Select(ConvertVersionBMetric).ToList(), + Metrics = protoPayload.Metrics.Select(m => ConvertVersionBMetric(m, metrics)).ToList(), Seq = protoPayload.Seq, Timestamp = protoPayload.Timestamp, Uuid = protoPayload.Uuid ?? string.Empty }; + + public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload) + => ConvertVersionBPayload(payload, null); + /// /// Gets the ProtoBuf payload converted from the version B payload. /// /// The . + /// The . /// The . - public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload) + public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload, SparkplugMessageType? sparkplugMessageType) => new() { Body = payload.Body, - Metrics = payload.Metrics.Select(ConvertVersionBMetric).ToList(), + Metrics = payload.Metrics.Select(m => ConvertVersionBMetric(m, sparkplugMessageType)).ToList(), Seq = payload.Seq, Timestamp = payload.Timestamp, Uuid = payload.Uuid }; + /// + /// + /// + /// + /// + public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric) + { + return ConvertVersionBMetric(protoMetric, null); + } + /// /// Gets the version B metric from the version B ProtoBuf metric. /// /// The . + /// The . /// Thrown if the metric data type is unknown. /// The . - public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric) + public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric, ConcurrentDictionary? metrics) { var metric = new Metric() { @@ -62,7 +79,34 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr Timestamp = protoMetric.Timestamp }; - var dataType = ConvertVersionBDataType((VersionBProtoBuf.DataType?)protoMetric.DataType); + // Get properties + if (protoMetric.PropertySetValue is not null) + { + PropertySet propertySet = new(); + propertySet.Keys = protoMetric.PropertySetValue.Keys; + + propertySet.Values = []; + + foreach (var item in protoMetric.PropertySetValue.Values) + { + propertySet.Values.Add(ConvertVersionBPropertyValue(item)); + } + + metric.Properties = propertySet; + } + + // [tck-id-payloads-metric-datatype-not-req] + // The datatype SHOULD NOT be included with metric definitions in NDATA, NCMD, DDATA, and DCMD messages. + VersionBDataTypeEnum dataType; + + if (metrics is null) + { + dataType = ConvertVersionBDataType((VersionBProtoBuf.DataType?)protoMetric.DataType); + } + else + { + dataType = metrics.Where(o => o.Key == metric.Name).Select(o => o.Value.DataType).FirstOrDefault(); + } switch (dataType) { @@ -139,7 +183,7 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr metric.SetValue(VersionBDataTypeEnum.Int32Array, int32Array); break; case VersionBDataTypeEnum.Int64Array: - var int64Array = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadInt64LittleEndian); + var int64Array = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadInt64LittleEndian); metric.SetValue(VersionBDataTypeEnum.Int64Array, int64Array); break; case VersionBDataTypeEnum.UInt8Array: @@ -177,7 +221,7 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr var dateTimeArray = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadUInt64LittleEndian); metric.SetValue(VersionBDataTypeEnum.DateTimeArray, dateTimeArray.Select(x => DateTimeOffset.FromUnixTimeMilliseconds((long)x)).ToArray()); break; - // Todo: What to do here? + // Todo: What to do here? case VersionBDataTypeEnum.PropertySetList: case VersionBDataTypeEnum.Unknown: default: @@ -191,15 +235,35 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr /// Gets the version B ProtoBuf metric from the version B metric. /// /// The . + /// + public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric) + { + return ConvertVersionBMetric(metric, null); + } + + /// + /// Gets the version B ProtoBuf metric from the version B metric. + /// + /// The . + /// The . /// Thrown if the property set data type is set for a metric. /// Thrown if the metric data type is unknown. /// The . - public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric) + public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric, SparkplugMessageType? sparkplugMessageType) { + // [tck-id-payloads-metric-datatype-not-req] + // The datatype SHOULD NOT be included with metric definitions in NDATA, NCMD, DDATA, and DCMD messages. + uint? dataType = null; + + if (sparkplugMessageType == null || sparkplugMessageType == SparkplugMessageType.NodeBirth || sparkplugMessageType == SparkplugMessageType.DeviceBirth) + { + dataType = (uint?)ConvertVersionBDataType(metric.DataType); + } + var protoMetric = new VersionBProtoBuf.ProtoBufPayload.Metric() { Alias = metric.Alias, - DataType = (uint?)ConvertVersionBDataType(metric.DataType), + DataType = dataType, IsHistorical = metric.IsHistorical, IsNull = metric.IsNull, IsTransient = metric.IsTransient, diff --git a/src/SparkplugNet/VersionB/SparkplugApplication.cs b/src/SparkplugNet/VersionB/SparkplugApplication.cs index 76f896b..38a8c9e 100644 --- a/src/SparkplugNet/VersionB/SparkplugApplication.cs +++ b/src/SparkplugNet/VersionB/SparkplugApplication.cs @@ -9,6 +9,8 @@ namespace SparkplugNet.VersionB; +using System.Diagnostics.Metrics; + /// /// /// A class that handles a Sparkplug application. @@ -126,17 +128,39 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt { var payloadVersionB = PayloadHelper.Deserialize(payload); - if (payloadVersionB is not null) + if (payloadVersionB == null) { return; } + + ConcurrentDictionary? metrics = null; + + if (!(topic.MessageType == SparkplugMessageType.NodeBirth || topic.MessageType == SparkplugMessageType.DeviceBirth)) { - var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB); + // Get known metrics + if (!this.GroupStates.TryGetValue(topic.GroupIdentifier, out var groupState)) { return; } + + if (!groupState.NodeStates.TryGetValue(topic.EdgeNodeIdentifier, out var nodeState)) { return; } - if (convertedPayload is not Payload _) + if (topic.DeviceIdentifier is null) { - throw new InvalidCastException("The metric cast didn't work properly."); + metrics = nodeState.Metrics; + } + else if (nodeState.DeviceStates.TryGetValue(topic.DeviceIdentifier, out var metricState)) + { + metrics = metricState.Metrics; } + else + { + return; + } + } + + var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB, metrics); - await this.HandleMessagesForVersionB(topic, convertedPayload); + if (convertedPayload is not Payload _) + { + throw new InvalidCastException("The metric cast didn't work properly."); } + + await this.HandleMessagesForVersionB(topic, convertedPayload); } /// @@ -149,12 +173,12 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa { // Filter out session number metric. var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name == Constants.SessionNumberMetricName); - var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName); - var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); + var metrics = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName).ToList(); + // var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); if (sessionNumberMetric is not null) { - filteredMetrics.Add(sessionNumberMetric); + metrics.Add(sessionNumberMetric); } // Handle messages. @@ -162,7 +186,7 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa { case SparkplugMessageType.NodeBirth: await this.FireNodeBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, - this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online)); + this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online)); break; case SparkplugMessageType.DeviceBirth: if (string.IsNullOrWhiteSpace(topic.DeviceIdentifier)) @@ -171,10 +195,10 @@ await this.FireNodeBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier } await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier, - this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online)); + this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online)); break; case SparkplugMessageType.NodeData: - var nodeDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online); + var nodeDataMetrics = this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online); await this.FireNodeDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, nodeDataMetrics); break; case SparkplugMessageType.DeviceData: @@ -183,11 +207,11 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi throw new InvalidOperationException($"Topic {topic} is invalid!"); } - var deviceDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online); + var deviceDataMetrics = this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online); await this.FireDeviceDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier, deviceDataMetrics); break; case SparkplugMessageType.NodeDeath: - this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline); + this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Offline); await this.FireNodeDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, sessionNumberMetric); break; case SparkplugMessageType.DeviceDeath: @@ -196,7 +220,7 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi throw new InvalidOperationException($"Topic {topic} is invalid!"); } - this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline); + this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Offline); await this.FireDeviceDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier); break; } @@ -208,27 +232,63 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi /// The topic. /// The metrics. /// The metric status. - /// Thrown if the edge node identifier is invalid. + /// Thrown if any identifier is invalid. /// Thrown if the metric cast is invalid. private IEnumerable ProcessPayload(SparkplugMessageTopic topic, List metrics, SparkplugMetricStatus metricStatus) { - var metricState = new MetricState + // Check group id. + if (string.IsNullOrWhiteSpace(topic.GroupIdentifier)) + { + throw new InvalidOperationException($"The group identifier is invalid {topic.GroupIdentifier}."); + } + + if (!this.GroupStates.ContainsKey(topic.GroupIdentifier)) + { + this.GroupStates[topic.GroupIdentifier] = new GroupState(); + } + + NodeState metricState = new() { MetricStatus = metricStatus }; + // Check node id. + if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier)) + { + throw new InvalidOperationException($"The edge node identifier is invalid {topic.EdgeNodeIdentifier}."); + } + if (!string.IsNullOrWhiteSpace(topic.DeviceIdentifier)) { - if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier)) + // If the group doesn't contain the node, create a new node. + if (!this.GroupStates[topic.GroupIdentifier].NodeStates.ContainsKey(topic.EdgeNodeIdentifier)) + { + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] = metricState; + } + + if (this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] + .DeviceStates.TryGetValue(topic.DeviceIdentifier, out var metric)) { - throw new InvalidOperationException($"The edge node identifier is invalid for device {topic.DeviceIdentifier}."); + metricState.Metrics = metric.Metrics; } - this.DeviceStates[$"{topic.EdgeNodeIdentifier}/{topic.DeviceIdentifier}"] = metricState; + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] + .DeviceStates[topic.DeviceIdentifier] = metricState; } else { - this.NodeStates[topic.EdgeNodeIdentifier] = metricState; + if (this.GroupStates[topic.GroupIdentifier] + .NodeStates.TryGetValue(topic.EdgeNodeIdentifier, out var metric)) + { + metricState.Metrics = metric.Metrics; + metricState.DeviceStates = metric.DeviceStates; + } + + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] = metricState; } foreach (var payloadMetric in metrics) diff --git a/src/SparkplugNet/VersionB/SparkplugNode.cs b/src/SparkplugNet/VersionB/SparkplugNode.cs index 58ed39f..de76e06 100644 --- a/src/SparkplugNet/VersionB/SparkplugNode.cs +++ b/src/SparkplugNet/VersionB/SparkplugNode.cs @@ -91,17 +91,36 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt { var payloadVersionB = PayloadHelper.Deserialize(payload); - if (payloadVersionB is not null) - { - var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB); + if (payloadVersionB == null) { return; } + + ConcurrentDictionary? metrics = null; - if (convertedPayload is not Payload _) + if (!(topic.MessageType == SparkplugMessageType.NodeBirth || topic.MessageType == SparkplugMessageType.DeviceBirth)) + { + // Get known metrics + if (topic.DeviceIdentifier is null) + { + metrics = this.knownMetrics.GetKnownMetricsByName(); + } + else if (this.KnownDevices.TryGetValue(topic.DeviceIdentifier, out var knownMetricStorage)) + { + metrics = knownMetricStorage.GetKnownMetricsByName(); + } + else { - throw new InvalidCastException("The metric cast didn't work properly."); + return; } + } + + var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB, metrics); - await this.HandleMessagesForVersionB(topic, convertedPayload); + if (convertedPayload is not Payload _) + { + throw new InvalidCastException("The metric cast didn't work properly."); } + + await this.HandleMessagesForVersionB(topic, convertedPayload); + } /// @@ -114,12 +133,12 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa { // Filter out session number metric. var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name == Constants.SessionNumberMetricName); - var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName); - var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); + var metrics = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName).ToList(); + // var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); if (sessionNumberMetric is not null) { - filteredMetrics.Add(sessionNumberMetric); + metrics.Add(sessionNumberMetric); } // Handle messages. @@ -131,11 +150,11 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa throw new InvalidOperationException($"Topic {topic} is invalid!"); } - await this.FireDeviceCommandReceived(topic.DeviceIdentifier, filteredMetrics); + await this.FireDeviceCommandReceived(topic.DeviceIdentifier, metrics); break; case SparkplugMessageType.NodeCommand: - await this.FireNodeCommandReceived(filteredMetrics); + await this.FireNodeCommandReceived(metrics); break; } }