Skip to content

Commit

Permalink
[fix]修正WebSocket消息组包,Payload可能是链式数据包
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Oct 9, 2024
1 parent 7d20a2d commit 65be42d
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 30 deletions.
84 changes: 75 additions & 9 deletions NewLife.Core/Data/IPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,14 @@ protected override void Dispose(Boolean disposing)
/// <returns></returns>
public Memory<Byte> GetMemory() => new(_buffer, _offset, _length);

/// <summary>切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用</summary>
/// <remarks>
/// 可能是引用同一块内存,也可能是新的内存。
/// 可能就是当前数据包,也可能引用相同的所有者或数组。
/// </remarks>
/// <param name="offset">偏移</param>
/// <param name="count">个数。默认-1表示到末尾</param>
IPacket IPacket.Slice(Int32 offset, Int32 count) => Slice(offset, count);
///// <summary>切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用</summary>
///// <remarks>
///// 可能是引用同一块内存,也可能是新的内存。
///// 可能就是当前数据包,也可能引用相同的所有者或数组。
///// </remarks>
///// <param name="offset">偏移</param>
///// <param name="count">个数。默认-1表示到末尾</param>
//IPacket IPacket.Slice(Int32 offset, Int32 count) => Slice(offset, count);

/// <summary>切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用</summary>
/// <remarks>
Expand All @@ -397,7 +397,7 @@ protected override void Dispose(Boolean disposing)
/// </remarks>
/// <param name="offset">偏移</param>
/// <param name="count">个数。默认-1表示到末尾</param>
public OwnerPacket Slice(Int32 offset, Int32 count)
public OwnerPacket SliceSingle(Int32 offset, Int32 count)
{
// 带有Next时,不支持Slice
if (Next != null) throw new NotSupportedException("Slice with Next");
Expand All @@ -415,6 +415,72 @@ public OwnerPacket Slice(Int32 offset, Int32 count)
return new OwnerPacket(buffer, _offset + offset, count);
}

/// <summary>切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用</summary>
/// <remarks>
/// 可能是引用同一块内存,也可能是新的内存。
/// 可能就是当前数据包,也可能引用相同的所有者或数组。
/// </remarks>
/// <param name="offset">偏移</param>
/// <param name="count">个数。默认-1表示到末尾</param>
public IPacket Slice(Int32 offset, Int32 count)
{
//// 带有Next时,不支持Slice
//if (Next != null) throw new NotSupportedException("Slice with Next");

// 只有一次Slice机会
if (_buffer == null) throw new InvalidDataException();

var start = _offset + offset;
var remain = _length - offset;

// 超出范围
if (count > Total - offset) throw new ArgumentOutOfRangeException(nameof(count), "count must be non-negative and less than or equal to the memory owner's length.");

// 单个数据包
if (Next == null)
{
// 转移管理权
var buffer = _buffer;
_buffer = null!;

if (count < 0 || count > remain) count = remain;
return new OwnerPacket(buffer, start, count);
}
else
{
// 如果当前段用完,则取下一段
if (remain <= 0)
{
var rs = Next.Slice(offset - _length, count);
// 切分后数据包要用到Next,这里清空避免当前包释放时把Next也释放
Next = null;
return rs;
}

// 转移管理权
var buffer = _buffer;
_buffer = null!;

// 当前包用一截,剩下的全部
if (count < 0)
{
var rs = new OwnerPacket(buffer, start, remain) { Next = Next };
Next = null;
return rs;
}

// 当前包可以读完
if (count <= remain) return new OwnerPacket(buffer, start, count);

// 当前包用一截,剩下的再截取
{
var rs = new OwnerPacket(buffer, start, remain) { Next = Next.Slice(0, count - remain) };
Next = null;
return rs;
}
}
}

/// <summary>尝试获取数据段</summary>
/// <param name="segment"></param>
/// <returns></returns>
Expand Down
2 changes: 1 addition & 1 deletion NewLife.Core/Http/HttpBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public virtual IOwnerPacket Build()

if (body != null) writer.Write(body.GetSpan());

return pk.Slice(0, writer.Position);
return pk.SliceSingle(0, writer.Position);
}

/// <summary>创建头部</summary>
Expand Down
2 changes: 1 addition & 1 deletion NewLife.Core/Http/TinyHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected virtual async Task<IOwnerPacket> SendDataAsync(Uri? uri, IPacket? requ
var count = await ns.ReadAsync(pk.Buffer, 0, pk.Length, source.Token).ConfigureAwait(false);
#endif

return pk.Slice(0, count);
return pk.SliceSingle(0, count);
}

/// <summary>异步发出请求,并接收响应</summary>
Expand Down
6 changes: 4 additions & 2 deletions NewLife.Core/Http/WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ private void Send(WebSocketMessage msg)
var socket = Context?.Socket;
if (session == null && socket == null) throw new ObjectDisposedException(nameof(Context));

using var data = msg.ToPacket();
var data = msg.ToPacket();
if (session != null)
session.Send(data);
else
socket?.Send(data);
data.TryDispose();
}

/// <summary>发送消息</summary>
Expand All @@ -138,8 +139,9 @@ public void SendAll(IPacket data, WebSocketMessageType type, Func<INetSession, B
{
var session = (Context?.Connection) ?? throw new ObjectDisposedException(nameof(Context));
var msg = new WebSocketMessage { Type = type, Payload = data };
using var data2 = msg.ToPacket();
var data2 = msg.ToPacket();
session.Host.SendAllAsync(data2, predicate).Wait();
data.TryDispose();
}

/// <summary>想所有连接发送文本消息</summary>
Expand Down
18 changes: 11 additions & 7 deletions NewLife.Core/Http/WebSocketMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ public Boolean Read(IPacket pk)

/// <summary>把消息转为封包</summary>
/// <returns></returns>
public virtual IOwnerPacket ToPacket()
public virtual IPacket ToPacket()
{
var pk = Payload;
var len = pk == null ? 0 : pk.Total;
var body = Payload;
var len = body == null ? 0 : body.Total;

// 特殊处理关闭消息
if (len == 0 && Type == WebSocketMessageType.Close)
Expand Down Expand Up @@ -194,18 +194,22 @@ public virtual IOwnerPacket ToPacket()
writer.Write(masks);

// 掩码混淆数据。直接在数据缓冲区修改,避免拷贝
if (Payload != null)
if (body != null)
{
var data = Payload.GetSpan();
var data = body.GetSpan();
for (var i = 0; i < len; i++)
{
data[i] = (Byte)(data[i] ^ masks[i % 4]);
}
}
}

if (pk != null && pk.Length > 0)
writer.Write(pk.GetSpan());
if (body != null && body.Length > 0)
{
// 注意body可能是链式数据包
//writer.Write(body.GetSpan());
return rs.Slice(0, writer.Position).Append(body);
}
else if (Type == WebSocketMessageType.Close)
{
writer.Write((Int16)CloseStatus);
Expand Down
6 changes: 3 additions & 3 deletions NewLife.Core/Net/Handlers/MessageCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ public class MessageCodec<T> : Handler
public override Object? Write(IHandlerContext context, Object message)
{
// 谁申请,谁归还
IOwnerPacket? owner = null;
IPacket? owner = null;
if (message is T msg)
{
var rs = Encode(context, msg);
if (rs == null) return null;

message = rs;
owner = rs as IOwnerPacket;
owner = rs as IPacket;

// 加入队列,忽略请求消息
if (message is IMessage msg2)
Expand All @@ -73,7 +73,7 @@ public class MessageCodec<T> : Handler
finally
{
// 下游可能忘了释放内存,这里兜底释放
owner?.Dispose();
owner.TryDispose();
}
}

Expand Down
4 changes: 2 additions & 2 deletions NewLife.Core/Net/Handlers/WebSocketCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public override Boolean Close(IHandlerContext context, String reason)
message = new WebSocketMessage { Type = WebSocketMessageType.Binary, Payload = pk };

// 谁申请,谁归还
IOwnerPacket? owner = null;
IPacket? owner = null;
if (message is WebSocketMessage msg)
message = owner = msg.ToPacket();

Expand All @@ -80,7 +80,7 @@ public override Boolean Close(IHandlerContext context, String reason)
finally
{
// 下游可能忘了释放内存,这里兜底释放
owner?.Dispose();
owner.TryDispose();
}
}
}
4 changes: 2 additions & 2 deletions NewLife.Core/Net/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public Int32 Send(IPacket data)
var size = Client.Receive(pk.Buffer, SocketFlags.None);
if (span != null) span.Value = size;

return pk.Slice(0, size);
return pk.SliceSingle(0, size);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -306,7 +306,7 @@ public Int32 Send(IPacket data)
#endif
if (span != null) span.Value = size;

return pk.Slice(0, size);
return pk.SliceSingle(0, size);
}
catch (Exception ex)
{
Expand Down
2 changes: 1 addition & 1 deletion NewLife.Core/Net/TcpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ protected override Int32 OnSend(IPacket pk)
var size = await ss.ReadAsync(pk.Buffer, 0, pk.Length, cancellationToken);
if (span != null) span.Value = size;

return pk.Slice(0, size);
return pk.SliceSingle(0, size);
}
catch (Exception ex)
{
Expand Down
4 changes: 2 additions & 2 deletions NewLife.Core/Net/UdpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public IOwnerPacket Receive()
var size = Server.Client.ReceiveFrom(pk.Buffer, ref ep);
if (span != null) span.Value = size;

return pk.Slice(0, size);
return pk.SliceSingle(0, size);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -289,7 +289,7 @@ public IOwnerPacket Receive()
#endif
if (span != null) span.Value = size;

return pk.Slice(0, size);
return pk.SliceSingle(0, size);
}
catch (Exception ex)
{
Expand Down
107 changes: 107 additions & 0 deletions XUnitTest.Core/Http/WebSocketMessageTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using NewLife.Data;
using NewLife.Http;
using NewLife.Messaging;
using Xunit;

namespace XUnitTest.Http;

public class WebSocketMessageTests
{
[Fact]
public void Text()
{
var msg = new WebSocketMessage
{
Type = WebSocketMessageType.Text,
Payload = (ArrayPacket)$"Hello NewLife",
};

var pk = msg.ToPacket();
Assert.Equal("810D48656C6C6F204E65774C696665", pk.ToHex());

var msg2 = new WebSocketMessage();
var rs = msg2.Read(pk);
Assert.True(rs);

Assert.Equal(msg.Type, msg2.Type);
Assert.Equal(msg.Payload.ToHex(), msg2.Payload.ToHex());
}

[Fact]
public void Ping()
{
var msg = new WebSocketMessage
{
Type = WebSocketMessageType.Ping,
Payload = (ArrayPacket)$"Ping {DateTime.UtcNow.ToFullString()}",
};

var pk = msg.ToPacket();
Assert.StartsWith("891850696E67", pk.ToHex());

var msg2 = new WebSocketMessage();
var rs = msg2.Read(pk);
Assert.True(rs);

Assert.Equal(msg.Type, msg2.Type);
Assert.Equal(msg.Payload.ToHex(), msg2.Payload.ToHex());
}

[Fact]
public void Close()
{
var msg = new WebSocketMessage
{
Type = WebSocketMessageType.Close,
CloseStatus = 1000,
StatusDescription = "Finish",
};

var pk = msg.ToPacket();
Assert.Equal("880803E846696E697368", pk.ToHex());

var msg2 = new WebSocketMessage();
var rs = msg2.Read(pk);
Assert.True(rs);

Assert.Equal(msg.Type, msg2.Type);
Assert.Equal(msg.CloseStatus, msg2.CloseStatus);
Assert.Equal(msg.StatusDescription, msg2.StatusDescription);
}

[Fact]
public void DefaultMessageOverWebsocket()
{
var dm = new DefaultMessage
{
Flag = 0x01,
Sequence = 0xAB,
Payload = (ArrayPacket)"Hello NewLife"
};

var msg = new WebSocketMessage
{
Type = WebSocketMessageType.Binary,
Payload = dm.ToPacket(),
};

var pk = msg.ToPacket();
Assert.Equal("821101AB0D0048656C6C6F204E65774C696665", pk.ToHex());

var msg2 = new WebSocketMessage();
var rs = msg2.Read(pk);
Assert.True(rs);

Assert.Equal(msg.Type, msg2.Type);
Assert.Equal(msg.Payload.ToHex(), msg2.Payload.ToHex());

var dm2 = new DefaultMessage();
rs = dm2.Read(msg2.Payload);
Assert.True(rs);

Assert.Equal(dm.Flag, dm2.Flag);
Assert.Equal(dm.Sequence, dm2.Sequence);
Assert.Equal(dm.Payload.ToHex(), dm2.Payload.ToHex());
}
}

0 comments on commit 65be42d

Please sign in to comment.