Skip to content

Incomplete implementation of MQTT binary mode #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion RELEASING.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,20 @@ Within this repository, this is achieved by the following mechanisms:

- Individual csproj files do not specify a version
- The [Directory.Build.props](src/Directory.Build.props) file has a `<Version>` element
specifying the version of all packages
specifying the version of all packages which don't need a separate major

A single GitHub release (and tag) will be created for each beta release, to cover all packages.

- Example tag name: "CloudNative.CloudEvents.All-2.0.0"
- Example release title: "All packages version 2.0.0"

### Exception: new major versions

Sometimes, we need a new major version of a "satellite" package, typically
to adopt a new major version of a dependency. In this case, the satellite package
will have its own major version, but keep the minor and patch version of everything
else.

## New / unstable package versioning

New packages are introduced with alpha and beta versions as would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<Description>MQTT extensions for CloudNative.CloudEvents</Description>
<PackageTags>cncf;cloudnative;cloudevents;events;mqtt</PackageTags>
<LangVersion>8.0</LangVersion>
<LangVersion>latest</LangVersion>
<Version>3.$(MinorVersion).$(PatchVersion)</Version>
<!-- After the first release of v3, we'll change the major here to 3. -->
<PackageValidationBaselineVersion>2.$(PackageValidationMinor).0</PackageValidationBaselineVersion>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MQTTnet" Version="3.0.15" />
<PackageReference Include="MQTTnet" Version="4.3.6.1152" />
<ProjectReference Include="..\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
</ItemGroup>

Expand Down
53 changes: 45 additions & 8 deletions src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Copyright (c) Cloud Native Foundation.
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

using CloudNative.CloudEvents.Core;
using MQTTnet;
using MQTTnet.Packets;
using System;
using System.Collections.Generic;
using System.Linq;

namespace CloudNative.CloudEvents.Mqtt
{
Expand Down Expand Up @@ -38,16 +40,40 @@ public static CloudEvent ToCloudEvent(this MqttApplicationMessage message,
Validation.CheckNotNull(formatter, nameof(formatter));
Validation.CheckNotNull(message, nameof(message));

// TODO: Determine if there's a sensible content type we should apply.
return formatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes);
// Check if the spec version is specified in user properties.
// If it is, we'll assume it's binary mode. Otherwise, we'll assume it's structured mode.
if (message.UserProperties?.FirstOrDefault(p => p.Name == CloudEventsSpecVersion.SpecVersionAttribute.Name)
is not MqttUserProperty specVersionProperty)
{
// TODO: Determine if there's a sensible content type we should apply.
return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes);
}

var specVersion = CloudEventsSpecVersion.FromVersionId(specVersionProperty.Value)
?? throw new ArgumentException($"Unknown CloudEvents spec version '{specVersionProperty.Value}'", nameof(message));
var cloudEvent = new CloudEvent(specVersion, extensionAttributes);

foreach (var userProperty in message.UserProperties)
{
if (userProperty == specVersionProperty)
{
continue;
}
cloudEvent.SetAttributeFromString(userProperty.Name, userProperty.Value);
}

if (message.PayloadSegment.Array is not null)
{
formatter.DecodeBinaryModeEventData(message.PayloadSegment, cloudEvent);
}
return cloudEvent;
}

// TODO: Update to a newer version of MQTTNet and support both binary and structured mode?
/// <summary>
/// Converts a CloudEvent to <see cref="MqttApplicationMessage"/>.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Currently only structured mode is supported.</param>
/// <param name="contentMode">Content mode. Both structured mode and binary mode are supported.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
/// <param name="topic">The MQTT topic for the message. May be null.</param>
public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, string? topic)
Expand All @@ -58,14 +84,25 @@ public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cl
switch (contentMode)
{
case ContentMode.Structured:
var arraySegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType));
return new MqttApplicationMessage
{
Topic = topic,
Payload = BinaryDataUtilities.AsArray(formatter.EncodeStructuredModeMessage(cloudEvent, out _))
ContentType = contentType.ToString(),
PayloadSegment = arraySegment
};
default:
throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
return new MqttApplicationMessage
{
ContentType = formatter.GetOrInferDataContentType(cloudEvent),
UserProperties = cloudEvent.GetPopulatedAttributes()
.Select(pair => new MqttUserProperty(pair.Key.Name, pair.Key.Format(pair.Value)))
.Append(new MqttUserProperty(CloudEventsSpecVersion.SpecVersionAttribute.Name, cloudEvent.SpecVersion.VersionId))
.ToList(),
Topic = topic,
PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeBinaryModeEventData(cloudEvent))
};
}
}
}
}
}
11 changes: 10 additions & 1 deletion src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,16 @@ public static byte[] AsArray(ReadOnlyMemory<byte> memory)
}

// Note: when this returns, the Array property of the returned segment is guaranteed not to be null.
private static ArraySegment<byte> GetArraySegment(ReadOnlyMemory<byte> memory) =>

/// <summary>
/// Returns the data from <paramref name="memory"/> as a byte array, return the underlying array
/// if there is one, or creating a copy otherwise. This method should be used with care, due to the
/// "sometimes shared, sometimes not" nature of the result. (It is generally safe to use this with the result
/// of encoding a CloudEvent, assuming the same memory is not used elsewhere.)
/// </summary>
/// <param name="memory">The memory to obtain the data from.</param>
/// <returns>The data in <paramref name="memory"/> as an array segment.</returns>
public static ArraySegment<byte> GetArraySegment(ReadOnlyMemory<byte> memory) =>
MemoryMarshal.TryGetArray(memory, out var segment) && segment.Array is not null
? segment
: new ArraySegment<byte>(memory.ToArray());
Expand Down
19 changes: 15 additions & 4 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
<Project>
<PropertyGroup>
<!--
- We use the same version number for all stable
- packages. See RELEASING.md for details.
- We use the same minor/patch version number for all stable
- packages, and the same major for most packages.
- See RELEASING.md for details.
-->
<Version>2.7.1</Version>
<PackageValidationBaselineVersion>2.7.0</PackageValidationBaselineVersion>
<MajorVersion>2</MajorVersion>
<MinorVersion>7</MinorVersion>
<PatchVersion>1</PatchVersion>
<PackageValidationMinor>7</PackageValidationMinor>
<Version>$(MajorVersion).$(MinorVersion).$(PatchVersion)</Version>
<!--
- The version used for detecting breaking changes.
- This is always older than the current version (except when creating a new major)
- and is the patch-0 of either the current minor (if the current patch is non-zero)
- or the previous minor (if the current patch is zero).
-->
<PackageValidationBaselineVersion>2.$(PackageValidationMinor).0</PackageValidationBaselineVersion>

<!-- Make the repository root available for other properties -->
<RepoRoot>$([System.IO.Path]::GetDirectoryName($([MSBuild]::GetPathOfFileAbove('.gitignore', '$(MSBuildThisFileDirectory)'))))</RepoRoot>
Expand Down
74 changes: 63 additions & 11 deletions test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright (c) Cloud Native Foundation.
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

using CloudNative.CloudEvents.NewtonsoftJson;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Formatter;
using MQTTnet.Server;
using System;
using System.Net.Mime;
Expand All @@ -18,16 +17,17 @@ namespace CloudNative.CloudEvents.Mqtt.UnitTests
{
public class MqttTest : IDisposable
{
private readonly IMqttServer mqttServer;
private readonly MqttServer mqttServer;

public MqttTest()
{
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(100)
.WithDefaultEndpoint()
.WithDefaultEndpointPort(52355);

this.mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.StartAsync(optionsBuilder.Build()).GetAwaiter().GetResult();
this.mqttServer = new MqttFactory().CreateMqttServer(optionsBuilder.Build());
mqttServer.StartAsync().GetAwaiter().GetResult();
}

public void Dispose()
Expand All @@ -36,7 +36,7 @@ public void Dispose()
}

[Fact]
public async Task MqttSendTest()
public async Task MqttSendTest_Structured()
{

var jsonEventFormatter = new JsonEventFormatter();
Expand All @@ -55,14 +55,18 @@ public async Task MqttSendTest()

var options = new MqttClientOptionsBuilder()
.WithClientId("Client1")
.WithTcpServer("localhost", 52355)
.WithTcpServer("127.0.0.1", 52355)
.WithCleanSession()
.WithProtocolVersion(MqttProtocolVersion.V500)
.Build();

TaskCompletionSource<CloudEvent> tcs = new TaskCompletionSource<CloudEvent>();
await client.ConnectAsync(options);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(
args => tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter)));
client.ApplicationMessageReceivedAsync += args =>
{
tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter));
return Task.CompletedTask;
};

var result = await client.SubscribeAsync("abc");
await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Structured, new JsonEventFormatter(), topic: "abc"));
Expand All @@ -78,5 +82,53 @@ public async Task MqttSendTest()

Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]);
}

[Fact]
public async Task MqttSendTest_Binary()
{

var jsonEventFormatter = new JsonEventFormatter();
var cloudEvent = new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull/123"),
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
};

var client = new MqttFactory().CreateMqttClient();

var options = new MqttClientOptionsBuilder()
.WithClientId("Client1")
.WithTcpServer("127.0.0.1", 52355)
.WithCleanSession()
.WithProtocolVersion(MqttProtocolVersion.V500)
.Build();

TaskCompletionSource<CloudEvent> tcs = new TaskCompletionSource<CloudEvent>();
await client.ConnectAsync(options);
client.ApplicationMessageReceivedAsync += args =>
{
tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter));
return Task.CompletedTask;
};

var result = await client.SubscribeAsync("abc");
await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Binary, new JsonEventFormatter(), topic: "abc"));
var receivedCloudEvent = await tcs.Task;

Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value);
Assert.Equal(MediaTypeNames.Text.Xml, receivedCloudEvent.DataContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);

Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]);
}
}
}
}
Loading