Skip to content

Commit

Permalink
[improv]改进SRMP多层通信的内存分配,Body>SRMP>WebSocket时,尽量一次性把内存分配到位。
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Oct 9, 2024
1 parent 65be42d commit a5f5d77
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 8 deletions.
36 changes: 35 additions & 1 deletion NewLife.Core/Data/IPacket.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;
using System.Text;
using NewLife.Collections;
Expand Down Expand Up @@ -293,6 +294,32 @@ public static Boolean TryGetSpan(this IPacket pk, out Span<Byte> span)
span = default;
return false;
}

/// <summary>尝试扩展头部,用于填充包头,减少内存分配</summary>
/// <param name="pk"></param>
/// <param name="size"></param>
/// <param name="newPacket"></param>
/// <returns></returns>
public static Boolean TryExpandHeader(this IPacket pk, Int32 size, [NotNullWhen(true)] out IPacket? newPacket)
{
newPacket = null;

if (pk is ArrayPacket ap && ap.Offset >= size)
{
newPacket = new ArrayPacket(ap.Buffer, ap.Offset - size, ap.Length + size) { Next = ap.Next };

return true;
}
else if (pk is OwnerPacket owner && owner.Offset >= size)
{
newPacket = new OwnerPacket(owner.Buffer, owner.Offset - size, owner.Length + size) { Next = owner.Next };
owner.Free();

return true;
}

return false;
}
}

/// <summary>所有权内存包。具有所有权管理,不再使用时释放</summary>
Expand Down Expand Up @@ -346,7 +373,7 @@ public OwnerPacket(Int32 length)
/// <param name="offset"></param>
/// <param name="length">长度</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
private OwnerPacket(Byte[] buffer, Int32 offset, Int32 length)
public OwnerPacket(Byte[] buffer, Int32 offset, Int32 length)
{
if (offset < 0 || length < 0 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(length), "Length must be non-negative and less than or equal to the memory owner's length.");
Expand Down Expand Up @@ -502,6 +529,13 @@ Boolean IPacket.TryGetArray(out ArraySegment<Byte> segment)
return false;
}

/// <summary>释放所有权,不再使用</summary>
public void Free()
{
_buffer = null!;
Next = null;
}

/// <summary>钉住内存</summary>
/// <param name="elementIndex"></param>
/// <returns></returns>
Expand Down
27 changes: 23 additions & 4 deletions NewLife.Core/Http/WebSocketMessage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Buffers.Binary;
using System.Drawing;
using System.Text;
using NewLife.Buffers;
using NewLife.Data;
Expand Down Expand Up @@ -132,6 +133,7 @@ public virtual IPacket ToPacket()
{
var body = Payload;
var len = body == null ? 0 : body.Total;
var masks = MaskKey;

// 特殊处理关闭消息
if (len == 0 && Type == WebSocketMessageType.Close)
Expand All @@ -140,16 +142,29 @@ public virtual IPacket ToPacket()
if (!StatusDescription.IsNullOrEmpty()) len += Encoding.UTF8.GetByteCount(StatusDescription);
}

var rs = new OwnerPacket(1 + 1 + 8 + 4 + len);
//var rs = new OwnerPacket(1 + 1 + 8 + 4 + len);
var size = len switch
{
< 126 => 1 + 1,
< 0xFFFF => 1 + 1 + 2,
_ => 1 + 1 + 8,
};
if (masks != null) size += masks.Length;

if (body == null || !body.TryExpandHeader(size, out var rs))
{
if (Type == WebSocketMessageType.Close)
rs = new OwnerPacket(size + len);
else
rs = new OwnerPacket(size) { Next = body };
}
var writer = new SpanWriter(rs.GetSpan())
{
IsLittleEndian = false
};

writer.WriteByte((Byte)(0x80 | (Byte)Type));

var masks = MaskKey;

/*
* 数据长度
* len < 126 单字节表示长度
Expand Down Expand Up @@ -208,7 +223,11 @@ public virtual IPacket ToPacket()
{
// 注意body可能是链式数据包
//writer.Write(body.GetSpan());
return rs.Slice(0, writer.Position).Append(body);

// 扩展得到的数据包,直接写入了头部,尾部数据不用拷贝也无需切片
return rs;

//return rs.Slice(0, writer.Position).Append(body);
}
else if (Type == WebSocketMessageType.Close)
{
Expand Down
4 changes: 1 addition & 3 deletions NewLife.Core/Messaging/DefaultMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ public override IPacket ToPacket()

// 增加4字节头部,如果负载数据之前有足够空间则直接使用,否则新建数据包形成链式结构
var size = len < 0xFFFF ? 4 : 8;
var pk = body is ArrayPacket ap && ap.Offset >= size
? new ArrayPacket(ap.Buffer, ap.Offset - size, ap.Length + size) { Next = ap.Next }
: new ArrayPacket(new Byte[size]) { Next = body };
if (body == null || !body.TryExpandHeader(size, out var pk)) pk = new OwnerPacket(size) { Next = body };

// 标记位
var header = pk.GetSpan();
Expand Down
46 changes: 46 additions & 0 deletions XUnitTest.Core/Http/WebSocketMessageTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using NewLife.Buffers;
using NewLife.Data;
using NewLife.Http;
using NewLife.Messaging;
Expand Down Expand Up @@ -104,4 +105,49 @@ public void DefaultMessageOverWebsocket()
Assert.Equal(dm.Sequence, dm2.Sequence);
Assert.Equal(dm.Payload.ToHex(), dm2.Payload.ToHex());
}

[Fact]
public void DefaultMessageOverWebsocket2()
{
var str = "Hello NewLife";
var buf = new Byte[8 + str.Length];
var src = new ArrayPacket(buf, 8, buf.Length - 8);
var writer = new SpanWriter(src.GetSpan());
writer.Write(str, -1);

var dm = new DefaultMessage
{
Flag = 0x01,
Sequence = 0xAB,
Payload = src
};
var pk = dm.ToPacket();
Assert.Null(pk.Next);

var msg = new WebSocketMessage
{
Type = WebSocketMessageType.Binary,
Payload = pk,
};

pk = msg.ToPacket();
Assert.Null(pk.Next);
Assert.Equal("821101AB0D0048656C6C6F204E65774C696665", pk.ToHex());

var msg2 = new WebSocketMessage();
var rs = msg2.Read(pk);
Assert.True(rs);

Assert.Equal(msg.Type, msg2.Type);
Assert.Equal(msg.Payload.ToHex(), msg2.Payload.ToHex());

var dm2 = new DefaultMessage();
rs = dm2.Read(msg2.Payload);
Assert.True(rs);

Assert.Equal(dm.Flag, dm2.Flag);
Assert.Equal(dm.Sequence, dm2.Sequence);
Assert.Equal(dm.Payload.ToHex(), dm2.Payload.ToHex());
Assert.Equal(str, dm2.Payload.ToStr());
}
}

0 comments on commit a5f5d77

Please sign in to comment.