Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace BinaryFormatter with MessagePack serialization #1166

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

namespace Microsoft.Spark.E2ETest.ExternalLibrary
{
[Serializable]
public class ExternalClass
{
private string _s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Serializable]
public class TestBroadcastVariable
{
public int IntValue { get; private set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ private void WriteCsv(int start, int count, string path)
}
}

[Serializable]
private class TestForeachWriter : IForeachWriter
{
[NonSerialized]
Expand Down Expand Up @@ -354,7 +353,6 @@ public virtual void Process(Row value)
}
}

[Serializable]
private class TestForeachWriterOpenFailure : TestForeachWriter
{
public override bool Open(long partitionId, long epochId)
Expand All @@ -364,7 +362,6 @@ public override bool Open(long partitionId, long epochId)
}
}

[Serializable]
private class TestForeachWriterProcessFailure : TestForeachWriter
{
public override void Process(Row value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

<ItemGroup>
<PackageReference Include="Moq" Version="4.10.0" />
<PackageReference Include="System.Memory" Version="4.5.3" />
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="MessagePack" Version="2.5.140" />
</ItemGroup>

<ItemGroup>
Expand Down
17 changes: 8 additions & 9 deletions src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@
using System;
using System.IO;
using System.Reflection;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Utils;
using Xunit;
using MessagePack;

namespace Microsoft.Spark.UnitTest
{
[Collection("Spark Unit Tests")]
public class UdfSerDeTests
{
[Serializable]
private class TestClass
{
private readonly string _str;

// TODO: find out why MessagePack is requiring a parameterless constructor.
public TestClass()
{
}
Comment on lines +21 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several more classes that require similar empty ctors, such as ExternalClass or Broadcast

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the standalone nuget package for BF is now available, I'm leaning towards scraping the whole PR and moving to that instead, because this is IMO a major limitation that could break any code that uses user/3rd party classes that reference a class without an empty ctor. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since microsoft doesn't add [Serializable] attributes to new types since .NET 6, sticking to NuGet will still not work long-term - As soon as there's a new commonly used type - integration will fall apart.

Imho the best option long-term would be either to investigate MsgPack further (Custom resolver for classes without default ctor and workaround for the bug with empty stream), or, if there's no way to work-around ctor behavior - make serializer configurable and include both options

I created a PR with version bump up for .NET and dependencies, for .NET8 it's still possible to use BinaryFormatter from library, for .NET9 we'll have to migrate to NuGet


public TestClass(string s)
{
_str = s;
Expand Down Expand Up @@ -149,16 +153,13 @@ private Delegate SerDe(Delegate udf)
return Deserialize(Serialize(udf));
}

#pragma warning disable SYSLIB0011 // Type or member is obsolete
// TODO: Replace BinaryFormatter with a new, secure serializer.
private byte[] Serialize(Delegate udf)
{
UdfSerDe.UdfData udfData = UdfSerDe.Serialize(udf);

using (var ms = new MemoryStream())
{
var bf = new BinaryFormatter();
bf.Serialize(ms, udfData);
MessagePackSerializer.Typeless.Serialize(ms, udfData);
return ms.ToArray();
}
}
Expand All @@ -167,11 +168,9 @@ private Delegate Deserialize(byte[] serializedUdf)
{
using (var ms = new MemoryStream(serializedUdf, false))
{
var bf = new BinaryFormatter();
UdfSerDe.UdfData udfData = (UdfSerDe.UdfData)bf.Deserialize(ms);
UdfSerDe.UdfData udfData = (UdfSerDe.UdfData)MessagePackSerializer.Typeless.Deserialize(ms);
return UdfSerDe.Deserialize(udfData);
}
}
#pragma warning restore
}
}
13 changes: 3 additions & 10 deletions src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
Expand All @@ -21,6 +20,7 @@
using Razorvine.Pickle;
using Xunit;
using static Microsoft.Spark.UnitTest.TestUtils.ArrowTestUtils;
using MessagePack;

namespace Microsoft.Spark.Worker.UnitTest
{
Expand Down Expand Up @@ -1050,7 +1050,6 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions)
using var inputStream = new MemoryStream();
using var outputStream = new MemoryStream();
// Write test data to the input stream.
var formatter = new BinaryFormatter();
var memoryStream = new MemoryStream();

var inputs = new int[] { 0, 1, 2, 3, 4 };
Expand All @@ -1059,10 +1058,7 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions)
foreach (int input in inputs)
{
memoryStream.Position = 0;
#pragma warning disable SYSLIB0011 // Type or member is obsolete
// TODO: Replace BinaryFormatter with a new, secure serializer.
formatter.Serialize(memoryStream, input);
#pragma warning restore SYSLIB0011 // Type or member is obsolete
MessagePackSerializer.Typeless.Serialize(memoryStream, input);
values.Add(memoryStream.ToArray());
}

Expand Down Expand Up @@ -1092,12 +1088,9 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions)
for (int i = 0; i < inputs.Length; ++i)
{
Assert.True(SerDe.ReadInt32(outputStream) > 0);
#pragma warning disable SYSLIB0011 // Type or member is obsolete
// TODO: Replace BinaryFormatter with a new, secure serializer.
Assert.Equal(
mapUdf(i),
formatter.Deserialize(outputStream));
#pragma warning restore SYSLIB0011 // Type or member is obsolete
MessagePackSerializer.Typeless.Deserialize(outputStream));
}

// Validate all the data on the stream is read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Moq" Version="4.10.0" />
<PackageReference Include="MessagePack" Version="2.5.140" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Microsoft.Spark.Worker\Microsoft.Spark.Worker.csproj" />
Expand Down
10 changes: 2 additions & 8 deletions src/csharp/Microsoft.Spark.Worker/Command/RDDCommandExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Utils;
using MessagePack;

namespace Microsoft.Spark.Worker.Command
{
Expand All @@ -19,8 +19,6 @@ internal class RDDCommandExecutor
{
[ThreadStatic]
private static MemoryStream s_writeOutputStream;
[ThreadStatic]
private static BinaryFormatter s_binaryFormatter;

/// <summary>
/// Executes the commands on the input data read from input stream
Expand Down Expand Up @@ -111,11 +109,7 @@ private void Serialize(
switch (serializerMode)
{
case CommandSerDe.SerializedMode.Byte:
BinaryFormatter formatter = s_binaryFormatter ??= new BinaryFormatter();
#pragma warning disable SYSLIB0011 // Type or member is obsolete
// TODO: Replace BinaryFormatter with a new, secure serializer.
formatter.Serialize(stream, message);
#pragma warning restore SYSLIB0011 // Type or member is obsolete
MessagePackSerializer.Typeless.Serialize(stream, message);
break;
case CommandSerDe.SerializedMode.None:
case CommandSerDe.SerializedMode.String:
Expand Down
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @arsdragonfly

I was looking at the dependencies, and the MessagePack library only seems to be compatible with .net standard 2.0.
https://www.nuget.org/packages/MessagePack/2.5.140#dependencies-body-tab

Should we start removing the targeting for .net standard 2.1?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. meant to put this comment on the Microsoft.Spark.csproj.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why the netstandard2.1 tf was there before, looks redundant to me

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.DotNet.DependencyManager" Version="10.10.0-beta.20254.4" />
<PackageReference Include="System.Memory" Version="4.5.3" />
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="MessagePack" Version="2.5.140" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Microsoft.Spark\Microsoft.Spark.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Network;
using MessagePack;

namespace Microsoft.Spark.Worker.Processor
{
Expand Down Expand Up @@ -47,7 +47,6 @@ internal BroadcastVariables Process(Stream stream)
}
}

var formatter = new BinaryFormatter();
for (int i = 0; i < broadcastVars.Count; ++i)
{
long bid = SerDe.ReadInt64(stream);
Expand All @@ -62,21 +61,15 @@ internal BroadcastVariables Process(Stream stream)
$"server {readBid} is different from the Broadcast Id received " +
$"from the payload {bid}.");
}
#pragma warning disable SYSLIB0011 // Type or member is obsolete
// TODO: Replace BinaryFormatter with a new, secure serializer.
object value = formatter.Deserialize(socket.InputStream);
#pragma warning restore SYSLIB0011 // Type or member is obsolete
object value = MessagePackSerializer.Typeless.Deserialize(socket.InputStream);
BroadcastRegistry.Add(bid, value);
}
else
{
string path = SerDe.ReadString(stream);
using FileStream fStream =
File.Open(path, FileMode.Open, FileAccess.Read, FileShare.Read);
#pragma warning disable SYSLIB0011 // Type or member is obsolete
// TODO: Replace BinaryFormatter with a new, secure serializer.
object value = formatter.Deserialize(fStream);
#pragma warning restore SYSLIB0011 // Type or member is obsolete
object value = MessagePackSerializer.Typeless.Deserialize(fStream);
BroadcastRegistry.Add(bid, value);
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/csharp/Microsoft.Spark/Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
using System.IO;
using System.Net;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Network;
using Microsoft.Spark.Services;
using MessagePack;

namespace Microsoft.Spark
{
Expand All @@ -20,7 +20,6 @@ namespace Microsoft.Spark
/// also attempts to distribute broadcast variables using efficient broadcast algorithms to
/// reduce communication cost.
/// </summary>
[Serializable]
public sealed class Broadcast<T> : IJvmObjectReferenceProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to register broadcast var in the registry on it being serialized, we can implement IMessagePackSerializationCallbackReceiver, and rename existing OnSerialized to OnBeforeSerialize(). Otherwise it's impossible to access var from worker context

{
[NonSerialized]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's replace [NonSerialized] wtih [IgnoreMember] attribute from MessagePack to ensure non-breaking changes

Expand Down Expand Up @@ -223,8 +222,7 @@ private void WriteToFile(object value)
/// <param name="stream">Stream to which the object is serialized</param>
private void Dump(object value, Stream stream)
{
var formatter = new BinaryFormatter();
formatter.Serialize(stream, value);
MessagePackSerializer.Typeless.Serialize(stream, value);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/csharp/Microsoft.Spark/Microsoft.Spark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
<PackageReference Include="Microsoft.Data.Analysis" Version="0.18.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Razorvine.Pyrolite" Version="4.26.0" />
<PackageReference Include="System.Memory" Version="4.5.3" />
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="MessagePack" Version="2.5.140" />
</ItemGroup>

<ItemGroup>
Expand Down
8 changes: 3 additions & 5 deletions src/csharp/Microsoft.Spark/RDD/Collector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Utils;
using static Microsoft.Spark.Utils.CommandSerDe;
using MessagePack;

namespace Microsoft.Spark.RDD
{
Expand Down Expand Up @@ -66,15 +66,13 @@ internal interface IDeserializer
}

/// <summary>
/// Deserializer using the BinaryFormatter.
/// Deserializer using MessagePack.
/// </summary>
private sealed class BinaryDeserializer : IDeserializer
{
private readonly BinaryFormatter _formater = new BinaryFormatter();

public object Deserialize(Stream stream, int length)
{
return _formater.Deserialize(stream);
return MessagePackSerializer.Typeless.Deserialize(stream);
}
Comment on lines 73 to 76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bug in MessagePack I found while tried to reuse these changes, when called from Collector::Collect: Deserializer erases entire stream after the first read.

Workaround for it is to either read stream in small chunks and pass to MessagePack, or load entire stream in MemoryStream and then use it.

MessagePack-CSharp/MessagePack-CSharp#1881

}

Expand Down
5 changes: 2 additions & 3 deletions src/csharp/Microsoft.Spark/SparkContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Hadoop.Conf;
using Microsoft.Spark.Interop.Internal.Scala;
using Microsoft.Spark.Interop.Ipc;
using static Microsoft.Spark.Utils.CommandSerDe;
using MessagePack;

namespace Microsoft.Spark
{
Expand Down Expand Up @@ -225,13 +225,12 @@ public void ClearJobGroup()
/// <returns>RDD representing distributed collection</returns>
internal RDD<T> Parallelize<T>(IEnumerable<T> seq, int? numSlices = null)
{
var formatter = new BinaryFormatter();
using var memoryStream = new MemoryStream();

var values = new List<byte[]>();
foreach (T obj in seq)
{
formatter.Serialize(memoryStream, obj);
MessagePackSerializer.Typeless.Serialize(memoryStream, obj);
values.Add(memoryStream.ToArray());
memoryStream.SetLength(0);
}
Expand Down
Loading