From b04e9296d96043063b676f08cc97191bbbb70062 Mon Sep 17 00:00:00 2001 From: Jon Skeet Date: Tue, 30 Jul 2024 11:09:38 +0100 Subject: [PATCH 1/3] Prepare for a separate major version of CloudNative.CloudEvents.Mqtt This more fine-grained way of referring to the versions will allow us to still only change Directory.Build.props just before a release, but have separate major versions flow from that. --- RELEASING.md | 9 ++++++++- src/Directory.Build.props | 19 +++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/RELEASING.md b/RELEASING.md index 4ef2774..5ba5cd0 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -43,13 +43,20 @@ Within this repository, this is achieved by the following mechanisms: - Individual csproj files do not specify a version - The [Directory.Build.props](src/Directory.Build.props) file has a `` element - specifying the version of all packages + specifying the version of all packages which don't need a separate major A single GitHub release (and tag) will be created for each beta release, to cover all packages. - Example tag name: "CloudNative.CloudEvents.All-2.0.0" - Example release title: "All packages version 2.0.0" +### Exception: new major versions + +Sometimes, we need a new major version of a "satellite" package, typically +to adopt a new major version of a dependency. In this case, the satellite package +will have its own major version, but keep the minor and patch version of everything +else. + ## New / unstable package versioning New packages are introduced with alpha and beta versions as would diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 64c7440..a57ec15 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -1,11 +1,22 @@ - 2.7.1 - 2.7.0 + 2 + 7 + 1 + 7 + $(MajorVersion).$(MinorVersion).$(PatchVersion) + + 2.$(PackageValidationMinor).0 $([System.IO.Path]::GetDirectoryName($([MSBuild]::GetPathOfFileAbove('.gitignore', '$(MSBuildThisFileDirectory)')))) From 81fb9bafb6d73bac86acecb132fb108954926bff Mon Sep 17 00:00:00 2001 From: Jon Skeet Date: Tue, 30 Jul 2024 11:35:39 +0100 Subject: [PATCH 2/3] Update CloudNative.CloudEvents.Mqtt to MQTTnet version 4.3.6.1152 This involves a new major version of CloudNative.CloudEvents.Mqtt. --- .../CloudNative.CloudEvents.Mqtt.csproj | 5 ++++- .../MqttExtensions.cs | 10 ++++----- .../Core/BinaryDataUtilities.cs | 11 +++++++++- .../Mqtt/MqttTest.cs | 22 ++++++++++--------- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj b/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj index 5638bd3..2e62d13 100644 --- a/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj +++ b/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj @@ -5,11 +5,14 @@ MQTT extensions for CloudNative.CloudEvents cncf;cloudnative;cloudevents;events;mqtt 8.0 + 3.$(MinorVersion).$(PatchVersion) + + 2.$(PackageValidationMinor).0 enable - + diff --git a/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs b/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs index 9effda8..57f7f86 100644 --- a/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs +++ b/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs @@ -1,4 +1,4 @@ -// Copyright (c) Cloud Native Foundation. +// Copyright (c) Cloud Native Foundation. // Licensed under the Apache 2.0 license. // See LICENSE file in the project root for full license information. @@ -39,10 +39,10 @@ public static CloudEvent ToCloudEvent(this MqttApplicationMessage message, Validation.CheckNotNull(message, nameof(message)); // TODO: Determine if there's a sensible content type we should apply. - return formatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes); + return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes); } - // TODO: Update to a newer version of MQTTNet and support both binary and structured mode? + // TODO: Support both binary and structured mode. /// /// Converts a CloudEvent to . /// @@ -61,11 +61,11 @@ public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cl return new MqttApplicationMessage { Topic = topic, - Payload = BinaryDataUtilities.AsArray(formatter.EncodeStructuredModeMessage(cloudEvent, out _)) + PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out _)) }; default: throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}"); } } } -} \ No newline at end of file +} diff --git a/src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs b/src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs index c1915a9..5dfe119 100644 --- a/src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs +++ b/src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs @@ -114,7 +114,16 @@ public static byte[] AsArray(ReadOnlyMemory memory) } // Note: when this returns, the Array property of the returned segment is guaranteed not to be null. - private static ArraySegment GetArraySegment(ReadOnlyMemory memory) => + + /// + /// Returns the data from as a byte array, return the underlying array + /// if there is one, or creating a copy otherwise. This method should be used with care, due to the + /// "sometimes shared, sometimes not" nature of the result. (It is generally safe to use this with the result + /// of encoding a CloudEvent, assuming the same memory is not used elsewhere.) + /// + /// The memory to obtain the data from. + /// The data in as an array segment. + public static ArraySegment GetArraySegment(ReadOnlyMemory memory) => MemoryMarshal.TryGetArray(memory, out var segment) && segment.Array is not null ? segment : new ArraySegment(memory.ToArray()); diff --git a/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs b/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs index d9dd3af..071d4f7 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs @@ -1,12 +1,10 @@ -// Copyright (c) Cloud Native Foundation. +// Copyright (c) Cloud Native Foundation. // Licensed under the Apache 2.0 license. // See LICENSE file in the project root for full license information. using CloudNative.CloudEvents.NewtonsoftJson; using MQTTnet; using MQTTnet.Client; -using MQTTnet.Client.Options; -using MQTTnet.Client.Receiving; using MQTTnet.Server; using System; using System.Net.Mime; @@ -18,16 +16,17 @@ namespace CloudNative.CloudEvents.Mqtt.UnitTests { public class MqttTest : IDisposable { - private readonly IMqttServer mqttServer; + private readonly MqttServer mqttServer; public MqttTest() { var optionsBuilder = new MqttServerOptionsBuilder() .WithConnectionBacklog(100) + .WithDefaultEndpoint() .WithDefaultEndpointPort(52355); - this.mqttServer = new MqttFactory().CreateMqttServer(); - mqttServer.StartAsync(optionsBuilder.Build()).GetAwaiter().GetResult(); + this.mqttServer = new MqttFactory().CreateMqttServer(optionsBuilder.Build()); + mqttServer.StartAsync().GetAwaiter().GetResult(); } public void Dispose() @@ -55,14 +54,17 @@ public async Task MqttSendTest() var options = new MqttClientOptionsBuilder() .WithClientId("Client1") - .WithTcpServer("localhost", 52355) + .WithTcpServer("127.0.0.1", 52355) .WithCleanSession() .Build(); TaskCompletionSource tcs = new TaskCompletionSource(); await client.ConnectAsync(options); - client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate( - args => tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter))); + client.ApplicationMessageReceivedAsync += args => + { + tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter)); + return Task.CompletedTask; + }; var result = await client.SubscribeAsync("abc"); await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Structured, new JsonEventFormatter(), topic: "abc")); @@ -79,4 +81,4 @@ public async Task MqttSendTest() Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]); } } -} \ No newline at end of file +} From 59649896442deefbc1423bace5a80a5aafcb6d24 Mon Sep 17 00:00:00 2001 From: Jon Skeet Date: Tue, 30 Jul 2024 12:23:28 +0100 Subject: [PATCH 3/3] Incomplete implementation of MQTT binary mode In some ways this *is* complete - but it assumes a v5 client. Instead, we should only populate ContentType (and allow binary mode) when using v5. This probably requires some API design work. Will fix #147 when complete. --- .../CloudNative.CloudEvents.Mqtt.csproj | 2 +- .../MqttExtensions.cs | 49 ++++++++++++++--- .../Mqtt/MqttTest.cs | 52 ++++++++++++++++++- 3 files changed, 95 insertions(+), 8 deletions(-) diff --git a/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj b/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj index 2e62d13..b34b212 100644 --- a/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj +++ b/src/CloudNative.CloudEvents.Mqtt/CloudNative.CloudEvents.Mqtt.csproj @@ -4,7 +4,7 @@ netstandard2.0;netstandard2.1;net8.0 MQTT extensions for CloudNative.CloudEvents cncf;cloudnative;cloudevents;events;mqtt - 8.0 + latest 3.$(MinorVersion).$(PatchVersion) 2.$(PackageValidationMinor).0 diff --git a/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs b/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs index 57f7f86..9f73697 100644 --- a/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs +++ b/src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs @@ -4,8 +4,10 @@ using CloudNative.CloudEvents.Core; using MQTTnet; +using MQTTnet.Packets; using System; using System.Collections.Generic; +using System.Linq; namespace CloudNative.CloudEvents.Mqtt { @@ -38,16 +40,40 @@ public static CloudEvent ToCloudEvent(this MqttApplicationMessage message, Validation.CheckNotNull(formatter, nameof(formatter)); Validation.CheckNotNull(message, nameof(message)); - // TODO: Determine if there's a sensible content type we should apply. - return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes); + // Check if the spec version is specified in user properties. + // If it is, we'll assume it's binary mode. Otherwise, we'll assume it's structured mode. + if (message.UserProperties?.FirstOrDefault(p => p.Name == CloudEventsSpecVersion.SpecVersionAttribute.Name) + is not MqttUserProperty specVersionProperty) + { + // TODO: Determine if there's a sensible content type we should apply. + return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes); + } + + var specVersion = CloudEventsSpecVersion.FromVersionId(specVersionProperty.Value) + ?? throw new ArgumentException($"Unknown CloudEvents spec version '{specVersionProperty.Value}'", nameof(message)); + var cloudEvent = new CloudEvent(specVersion, extensionAttributes); + + foreach (var userProperty in message.UserProperties) + { + if (userProperty == specVersionProperty) + { + continue; + } + cloudEvent.SetAttributeFromString(userProperty.Name, userProperty.Value); + } + + if (message.PayloadSegment.Array is not null) + { + formatter.DecodeBinaryModeEventData(message.PayloadSegment, cloudEvent); + } + return cloudEvent; } - // TODO: Support both binary and structured mode. /// /// Converts a CloudEvent to . /// /// The CloudEvent to convert. Must not be null, and must be a valid CloudEvent. - /// Content mode. Currently only structured mode is supported. + /// Content mode. Both structured mode and binary mode are supported. /// The formatter to use within the conversion. Must not be null. /// The MQTT topic for the message. May be null. public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, string? topic) @@ -58,13 +84,24 @@ public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cl switch (contentMode) { case ContentMode.Structured: + var arraySegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType)); return new MqttApplicationMessage { Topic = topic, - PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out _)) + ContentType = contentType.ToString(), + PayloadSegment = arraySegment }; default: - throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}"); + return new MqttApplicationMessage + { + ContentType = formatter.GetOrInferDataContentType(cloudEvent), + UserProperties = cloudEvent.GetPopulatedAttributes() + .Select(pair => new MqttUserProperty(pair.Key.Name, pair.Key.Format(pair.Value))) + .Append(new MqttUserProperty(CloudEventsSpecVersion.SpecVersionAttribute.Name, cloudEvent.SpecVersion.VersionId)) + .ToList(), + Topic = topic, + PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeBinaryModeEventData(cloudEvent)) + }; } } } diff --git a/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs b/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs index 071d4f7..3945d1e 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs @@ -5,6 +5,7 @@ using CloudNative.CloudEvents.NewtonsoftJson; using MQTTnet; using MQTTnet.Client; +using MQTTnet.Formatter; using MQTTnet.Server; using System; using System.Net.Mime; @@ -35,7 +36,7 @@ public void Dispose() } [Fact] - public async Task MqttSendTest() + public async Task MqttSendTest_Structured() { var jsonEventFormatter = new JsonEventFormatter(); @@ -56,6 +57,7 @@ public async Task MqttSendTest() .WithClientId("Client1") .WithTcpServer("127.0.0.1", 52355) .WithCleanSession() + .WithProtocolVersion(MqttProtocolVersion.V500) .Build(); TaskCompletionSource tcs = new TaskCompletionSource(); @@ -80,5 +82,53 @@ public async Task MqttSendTest() Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]); } + + [Fact] + public async Task MqttSendTest_Binary() + { + + var jsonEventFormatter = new JsonEventFormatter(); + var cloudEvent = new CloudEvent + { + Type = "com.github.pull.create", + Source = new Uri("https://github.com/cloudevents/spec/pull/123"), + Id = "A234-1234-1234", + Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero), + DataContentType = MediaTypeNames.Text.Xml, + Data = "", + ["comexampleextension1"] = "value" + }; + + var client = new MqttFactory().CreateMqttClient(); + + var options = new MqttClientOptionsBuilder() + .WithClientId("Client1") + .WithTcpServer("127.0.0.1", 52355) + .WithCleanSession() + .WithProtocolVersion(MqttProtocolVersion.V500) + .Build(); + + TaskCompletionSource tcs = new TaskCompletionSource(); + await client.ConnectAsync(options); + client.ApplicationMessageReceivedAsync += args => + { + tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter)); + return Task.CompletedTask; + }; + + var result = await client.SubscribeAsync("abc"); + await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Binary, new JsonEventFormatter(), topic: "abc")); + var receivedCloudEvent = await tcs.Task; + + Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion); + Assert.Equal("com.github.pull.create", receivedCloudEvent.Type); + Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source); + Assert.Equal("A234-1234-1234", receivedCloudEvent.Id); + AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value); + Assert.Equal(MediaTypeNames.Text.Xml, receivedCloudEvent.DataContentType); + Assert.Equal("", receivedCloudEvent.Data); + + Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]); + } } }