Skip to content

Commit

Permalink
prepare streaming in management sdk. (#2129)
Browse files Browse the repository at this point in the history
  • Loading branch information
vwxyzh authored Mar 5, 2025
1 parent 1759ac6 commit 049ef04
Show file tree
Hide file tree
Showing 18 changed files with 1,226 additions and 940 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Net.Http;

using Microsoft.AspNetCore.SignalR.Protocol;

#nullable enable

namespace Microsoft.Azure.SignalR.Common
namespace Microsoft.Azure.SignalR.Common;

internal class BinaryPayloadContentBuilder : IPayloadContentBuilder
{
internal class BinaryPayloadContentBuilder : IPayloadContentBuilder
private readonly IReadOnlyList<IHubProtocol> _hubProtocols;

public BinaryPayloadContentBuilder(IReadOnlyList<IHubProtocol> hubProtocols)
{
private readonly IReadOnlyList<IHubProtocol> _hubProtocols;
public BinaryPayloadContentBuilder(IReadOnlyList<IHubProtocol> hubProtocols)
{
_hubProtocols = hubProtocols;
}
_hubProtocols = hubProtocols;
}

public HttpContent? Build(PayloadMessage? payload)
{
return payload == null ? null : (HttpContent)new BinaryPayloadMessageContent(payload, _hubProtocols);
}
public HttpContent? Build(HubMessage? payload, Type? typeHint)
{
return payload == null ? null : (HttpContent)new BinaryPayloadMessageContent(payload, _hubProtocols);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
Expand All @@ -15,51 +15,50 @@

using Microsoft.AspNetCore.SignalR.Protocol;

namespace Microsoft.Azure.SignalR.Common
namespace Microsoft.Azure.SignalR.Common;

internal class BinaryPayloadMessageContent : HttpContent
{
internal class BinaryPayloadMessageContent : HttpContent
private static readonly Dictionary<string, byte[]> ProtocolMap = new(2)
{
private static readonly Dictionary<string, byte[]> ProtocolMap = new Dictionary<string, byte[]>(2)
{
{Constants.Protocol.Json, Encoding.UTF8.GetBytes(Constants.Protocol.Json) },
{Constants.Protocol.MessagePack,Encoding.UTF8.GetBytes(Constants.Protocol.MessagePack)}
};
private static readonly MediaTypeHeaderValue ContentType = new("application/octet-stream");
{Constants.Protocol.Json, Encoding.UTF8.GetBytes(Constants.Protocol.Json) },
{Constants.Protocol.MessagePack,Encoding.UTF8.GetBytes(Constants.Protocol.MessagePack)}
};

private readonly PayloadMessage _payloadMessage;
private readonly IReadOnlyList<IHubProtocol> _hubProtocols;
private static readonly MediaTypeHeaderValue ContentType = new("application/octet-stream");

public BinaryPayloadMessageContent(PayloadMessage payloadMessage, IReadOnlyList<IHubProtocol> hubProtocols)
{
_payloadMessage = payloadMessage ?? throw new ArgumentNullException(nameof(payloadMessage));
_hubProtocols = hubProtocols ?? throw new ArgumentNullException(nameof(hubProtocols));
Headers.ContentType = ContentType;
}
private readonly HubMessage _payloadMessage;
private readonly IReadOnlyList<IHubProtocol> _hubProtocols;

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
using var memoryBufferWriter = new MemoryBufferWriter();
WriteMessageCore(memoryBufferWriter);
await memoryBufferWriter.CopyToAsync(stream);
}
public BinaryPayloadMessageContent(HubMessage payloadMessage, IReadOnlyList<IHubProtocol> hubProtocols)
{
_payloadMessage = payloadMessage ?? throw new ArgumentNullException(nameof(payloadMessage));
_hubProtocols = hubProtocols ?? throw new ArgumentNullException(nameof(hubProtocols));
Headers.ContentType = ContentType;
}

protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
using var memoryBufferWriter = new MemoryBufferWriter();
WriteMessageCore(memoryBufferWriter);
await memoryBufferWriter.CopyToAsync(stream);
}

private void WriteMessageCore(IBufferWriter<byte> bufferWriter)
protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}

private void WriteMessageCore(IBufferWriter<byte> bufferWriter)
{
var messagePackWriter = new MessagePackWriter(bufferWriter);
messagePackWriter.WriteMapHeader(_hubProtocols.Count);
foreach (var hubProtocol in _hubProtocols)
{
var invocationMessage = new InvocationMessage(_payloadMessage.Target, _payloadMessage.Arguments);
var messagePackWriter = new MessagePackWriter(bufferWriter);
messagePackWriter.WriteMapHeader(_hubProtocols.Count);
foreach (var hubProtocol in _hubProtocols)
{
messagePackWriter.WriteString(ProtocolMap[hubProtocol.Name]);
messagePackWriter.Write(hubProtocol.GetMessageBytes(invocationMessage).Span);
}
messagePackWriter.Flush();
messagePackWriter.WriteString(ProtocolMap[hubProtocol.Name]);
messagePackWriter.Write(hubProtocol.GetMessageBytes(_payloadMessage).Span);
}
messagePackWriter.Flush();
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Net.Http;

using Microsoft.AspNetCore.SignalR.Protocol;

#nullable enable

namespace Microsoft.Azure.SignalR.Common
namespace Microsoft.Azure.SignalR.Common;

internal interface IPayloadContentBuilder
{
internal interface IPayloadContentBuilder
{
HttpContent? Build(PayloadMessage? payload);
}
HttpContent? Build(HubMessage? payload, Type? typeHint);
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Net.Http;

using Azure.Core.Serialization;

using Microsoft.AspNetCore.SignalR.Protocol;

#nullable enable
namespace Microsoft.Azure.SignalR.Common
namespace Microsoft.Azure.SignalR.Common;

internal class JsonPayloadContentBuilder : IPayloadContentBuilder
{
internal class JsonPayloadContentBuilder : IPayloadContentBuilder
{
private readonly ObjectSerializer _jsonObjectSerializer;
private readonly ObjectSerializer _jsonObjectSerializer;

public JsonPayloadContentBuilder(ObjectSerializer jsonObjectSerializer)
{
_jsonObjectSerializer = jsonObjectSerializer;
}
public JsonPayloadContentBuilder(ObjectSerializer jsonObjectSerializer)
{
_jsonObjectSerializer = jsonObjectSerializer;
}

public HttpContent? Build(PayloadMessage? payload)
{
return payload == null ? null : new JsonPayloadMessageContent(payload, _jsonObjectSerializer);
}
public HttpContent? Build(HubMessage? payload, Type? typeHint)
{
return payload == null ? null : new JsonPayloadMessageContent(payload, _jsonObjectSerializer, typeHint);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.IO;
using System.Net;
using System.Net.Http;
Expand All @@ -10,45 +11,55 @@

using Azure.Core.Serialization;

namespace Microsoft.Azure.SignalR
using Microsoft.AspNetCore.SignalR.Protocol;

namespace Microsoft.Azure.SignalR;

internal class JsonPayloadMessageContent : HttpContent
{
internal class JsonPayloadMessageContent : HttpContent
private static readonly MediaTypeHeaderValue ContentType = new("application/json")
{
private static readonly MediaTypeHeaderValue ContentType = new("application/json")
{
CharSet = "utf-8"
};
private static readonly JsonWriterOptions JsonWriterOptions = new()
{
// We must skip validation because what we break the writing midway and write JSON in other ways.
SkipValidation = true
};
private readonly PayloadMessage _payloadMessage;
private readonly ObjectSerializer _jsonObjectSerializer;
CharSet = "utf-8"
};
private static readonly JsonWriterOptions JsonWriterOptions = new()
{
// We must skip validation because what we break the writing midway and write JSON in other ways.
SkipValidation = true
};
private readonly HubMessage _payloadMessage;
private readonly ObjectSerializer _jsonObjectSerializer;
private readonly Type _typeHint;

public JsonPayloadMessageContent(PayloadMessage payloadMessage, ObjectSerializer jsonObjectSerializer)
{
_payloadMessage = payloadMessage ?? throw new System.ArgumentNullException(nameof(payloadMessage));
_jsonObjectSerializer = jsonObjectSerializer;
Headers.ContentType = ContentType;
}
public JsonPayloadMessageContent(HubMessage payloadMessage, ObjectSerializer jsonObjectSerializer, Type typeHint)
{
_payloadMessage = payloadMessage ?? throw new System.ArgumentNullException(nameof(payloadMessage));
_jsonObjectSerializer = jsonObjectSerializer;
_typeHint = typeHint;
Headers.ContentType = ContentType;
}

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
if (_payloadMessage is InvocationMessage invocationMessage)
{
using var jsonWriter = new Utf8JsonWriter(stream, JsonWriterOptions);
jsonWriter.WriteStartObject();
jsonWriter.WriteString(nameof(PayloadMessage.Target), _payloadMessage.Target);
jsonWriter.WriteString(nameof(PayloadMessage.Target), invocationMessage.Target);
jsonWriter.WritePropertyName(nameof(PayloadMessage.Arguments));
await jsonWriter.FlushAsync();
await _jsonObjectSerializer.SerializeAsync(stream, _payloadMessage.Arguments, typeof(object[]), default);
await _jsonObjectSerializer.SerializeAsync(stream, invocationMessage.Arguments, typeof(object[]), default);
jsonWriter.WriteEndObject();
await jsonWriter.FlushAsync();
}

protected override bool TryComputeLength(out long length)
else if (_payloadMessage is StreamItemMessage streamItemMessage)
{
length = 0;
return false;
await _jsonObjectSerializer.SerializeAsync(stream, streamItemMessage.Item, _typeHint, default);
}
}

protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}
}
Loading

0 comments on commit 049ef04

Please sign in to comment.