From 03b79393e71910a33a39864e563fcbeb2de56658 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Sun, 19 Apr 2020 22:31:05 -0700 Subject: [PATCH 01/14] Adding section for UDF serialization --- docs/broadcast-guide.md | 92 +++++++++++++++++++++ docs/udf-guide.md | 172 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+) create mode 100644 docs/broadcast-guide.md create mode 100644 docs/udf-guide.md diff --git a/docs/broadcast-guide.md b/docs/broadcast-guide.md new file mode 100644 index 000000000..4286c569e --- /dev/null +++ b/docs/broadcast-guide.md @@ -0,0 +1,92 @@ +# Guide to using Broadcast Variables + +This is a guide to show how to use broadcast variables in .NET for Apache Spark. + +## What are Broadcast Variables + +[Broadcast variables in Apache Spark](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) are a mechanism for sharing variables across executors that are meant to be read-only. They allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. + +### How to use broadcast variables in .NET for Apache Spark + +Broadcast variables are created from a variable `v` by calling `SparkContext.Broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `Value()` method on it. + +Example: + +```csharp +string v = "Variable to be broadcasted"; +Broadcast bv = SparkContext.Broadcast(v); + +// Using the broadcast variable in a UDF: +Func udf = Udf( + str => $"{str}: {bv.Value()}"); +``` + +The type of broadcast variable is captured by using Generics in C#, as can be seen in the above example. + +### Deleting broadcast variables + +The broadcast variable can be deleted from all executors by calling the `Destroy()` function on it. + +```csharp +// Destroying the broadcast variable bv: +bv.Destroy(); +``` + +> Note: `Destroy` deletes all data and metadata related to the broadcast variable. Use this with caution- once a broadcast variable has been destroyed, it cannot be used again. + +#### Caveat of using Destroy + +One important thing to keep in mind while using broadcast variables in UDFs is to limit the scope of the variable to only the UDF that is referencing it. The [guide to using UDFs](udf-guide.md) describes this phenomenon in detail. This is especially crucial when calling `Destroy` on the broadcast variable. If the broadcast variable that has been destroyed is visible to or accessible from other UDFs, it gets picked up for serialization by all those UDFs, even if it is not being referenced by them. This will throw an error as .NET for Apache Spark is not able to serialize the destroyed broadcast variable. + +Example to demonstrate: + +```csharp +string v = "Variable to be broadcasted"; +Broadcast bv = SparkContext.Broadcast(v); + +// Using the broadcast variable in a UDF: +Func udf1 = Udf( + str => $"{str}: {bv.Value()}"); + +// Destroying bv +bv.Destroy(); + +// Calling udf1 after destroying bv throws the following expected exception: +// org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed +df.Select(udf1(df["_1"])).Show(); + +// Different UDF udf2 that is not referencing bv +Func udf2 = Udf( + str => $"{str}: not referencing broadcast variable"); + +// Calling udf2 throws the following (unexpected) exception: +// [Error] [JvmBridge] org.apache.spark.SparkException: Task not serializable +df.Select(udf2(df["_1"])).Show(); +``` + +The recommended way of implementing above desired behavior: + +```csharp +string v = "Variable to be broadcasted"; +// Restricting the visibility of bv to only the UDF referencing it +{ + Broadcast bv = SparkContext.Broadcast(v); + + // Using the broadcast variable in a UDF: + Func udf1 = Udf( + str => $"{str}: {bv.Value()}"); + + // Destroying bv + bv.Destroy(); +} + +// Different UDF udf2 that is not referencing bv +Func udf2 = Udf( + str => $"{str}: not referencing broadcast variable"); + +// Calling udf2 works fine as expected +df.Select(udf2(df["_1"])).Show(); +``` + This ensures that destroying `bv` doesn't affect calling `udf2` because of unexpected serialization behavior. + + Broadcast variables are very useful for transmitting read-only data to all executors, as the data is sent only once and this gives huge performance benefits when compared with using local variables that get shipped to the executors with each task. Please refer to the [official documentation](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) to get a deeper understanding of broadcast variables and why they are used. \ No newline at end of file diff --git a/docs/udf-guide.md b/docs/udf-guide.md new file mode 100644 index 000000000..bb308815d --- /dev/null +++ b/docs/udf-guide.md @@ -0,0 +1,172 @@ +# Guide to User-Defined Functions (UDFs) + +This is a guide to show how to use UDFs in .NET for Apache Spark. + +## What are UDFs + +[User-Defined Functions (UDFs)](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html) are a feature of Spark that allow developers to use custom functions to extend the system's built-in functionality. They transform values from a single row within a table to produce a single corresponding output value per row based on the logic defined in the UDF. + +Let's take the following as an example for a UDF definition: + +```csharp +string s1 = "hello"; +Func udf = Udf( + str => $"{s1} {str}"); + +``` +The above defined UDF takes a `string` as an input (in the form of a [Column](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Column.cs#L14) of a [Dataframe](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/DataFrame.cs#L24)), and returns a `string` with `hello` appended in front of the input. + +For a sample Dataframe, let's take the following Dataframe `df`: + +```text ++-------+ +| name| ++-------+ +|Michael| +| Andy| +| Justin| ++-------+ +``` + +Now let's apply the above defined `udf` to the dataframe `df`: + +```csharp +DataFrame udfResult = df.Select(udf(df["name"])); +``` + +This would return the below as the Dataframe `udfResult`: + +```text ++-------------+ +| name| ++-------------+ +|hello Michael| +| hello Andy| +| hello Justin| ++-------------+ +``` +To get a better understanding of how to implement UDFs, please take a look at the [UDF helper functions](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Functions.cs#L3616) and some [test examples](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfSimpleTypesTests.cs#L49). + +## UDF serialization + +Since UDFs are functions that need to be executed on the workers, they have to be serialized and sent to the workers as part of the payload from the driver. This involves serializing the [delegate](https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/delegates/) which is a reference to the method, along with its [target](https://docs.microsoft.com/en-us/dotnet/api/system.delegate.target?view=netframework-4.8) which is the class instance on which the current delegate invokes the instance method. Please take a look at this [code](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs#L149) to get a better understanding of how UDF serialization is being done. + +## Good to know while implementing UDFs + +One behavior to be aware of while implementing UDFs in .NET for Apache Spark is how the target of the UDF gets serialized. .NET for Apache Spark uses .NET Core, which does not support serializing delegates, so it is instead done by using reflection to serialize the target where the delegate is defined. When multiple delegates are defined in a common scope, they have a shared closure that becomes the target of reflection for serialization. Let's take an example to illustrate what that means. + +The following code snippet defines two string variables that are being referenced in two function delegates, that just return the respective strings as result: + +```csharp +using System; + +public class C { + public void M() { + string s1 = "s1"; + string s2 = "s2"; + Func a = str => s1; + Func b = str => s2; + } +} +``` + +The above C# code generates the following C# disassembly (credit source: [sharplab.io](sharplab.io)) code from the compiler: + +```csharp +public class C +{ + [CompilerGenerated] + private sealed class <>c__DisplayClass0_0 + { + public string s1; + + public string s2; + + internal string b__0(string str) + { + return s1; + } + + internal string b__1(string str) + { + return s2; + } + } + + public void M() + { + <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); + <>c__DisplayClass0_.s1 = "s1"; + <>c__DisplayClass0_.s2 = "s2"; + Func func = new Func(<>c__DisplayClass0_.b__0); + Func func2 = new Func(<>c__DisplayClass0_.b__1); + } +} +``` +As can be seen in the above IL code, both `func` and `func2` share the same closure `<>c__DisplayClass0_0`, which is the target that is serialized when serializing the delegates `func` and `func2`. Hence, even though `Func a` is only referencing `s1`, `s2` also gets serialized when sending over the bytes to the workers. + +This can lead to some unexpected behaviors at runtime (like in the case of using [broadcast variables](broadcast-guide.md)), which is why we recommend restricting the visibility of the variables used in a function to that function's scope. +Taking the above example to better explain what that means: + +Recommended user code to implement desired behavior of previous code snippet: + +```csharp +using System; + +public class C { + public void M() { + { + string s1 = "s1"; + Func a = str => s1; + } + { + string s2 = "s2"; + Func b = str => s2; + } + } +} +``` + +The above C# code generates the following C# disassembly (credit source: [sharplab.io](sharplab.io)) code from the compiler: + +```csharp +public class C +{ + [CompilerGenerated] + private sealed class <>c__DisplayClass0_0 + { + public string s1; + + internal string b__0(string str) + { + return s1; + } + } + + [CompilerGenerated] + private sealed class <>c__DisplayClass0_1 + { + public string s2; + + internal string b__1(string str) + { + return s2; + } + } + + public void M() + { + <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); + <>c__DisplayClass0_.s1 = "s1"; + Func func = new Func(<>c__DisplayClass0_.b__0); + <>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1(); + <>c__DisplayClass0_2.s2 = "s2"; + Func func2 = new Func(<>c__DisplayClass0_2.b__1); + } +} +``` + +Here we see that `func` and `func2` no longer share a closure and have their own separate closures `<>c__DisplayClass0_0` and `<>c__DisplayClass0_1` respectively. When used as the target for serialization, nothing other than the referenced variables will get serialized for the delegate. + +This above behavior is important to keep in mind while implementing multiple UDFs in a common scope. +To learn more about UDFs in general, please review the following articles that explain UDFs and how to use them: [UDFs in databricks(scala)](https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html), [Spark UDFs and some gotchas](https://medium.com/@achilleus/spark-udfs-we-can-use-them-but-should-we-use-them-2c5a561fde6d). \ No newline at end of file From 4ef693dbf7616b738a6ae70d1e9dc8c12dd8e5d3 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Sun, 19 Apr 2020 22:32:56 -0700 Subject: [PATCH 02/14] removing guides from master --- docs/broadcast-guide.md | 92 --------------------- docs/udf-guide.md | 172 ---------------------------------------- 2 files changed, 264 deletions(-) delete mode 100644 docs/broadcast-guide.md delete mode 100644 docs/udf-guide.md diff --git a/docs/broadcast-guide.md b/docs/broadcast-guide.md deleted file mode 100644 index 4286c569e..000000000 --- a/docs/broadcast-guide.md +++ /dev/null @@ -1,92 +0,0 @@ -# Guide to using Broadcast Variables - -This is a guide to show how to use broadcast variables in .NET for Apache Spark. - -## What are Broadcast Variables - -[Broadcast variables in Apache Spark](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) are a mechanism for sharing variables across executors that are meant to be read-only. They allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. - -### How to use broadcast variables in .NET for Apache Spark - -Broadcast variables are created from a variable `v` by calling `SparkContext.Broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `Value()` method on it. - -Example: - -```csharp -string v = "Variable to be broadcasted"; -Broadcast bv = SparkContext.Broadcast(v); - -// Using the broadcast variable in a UDF: -Func udf = Udf( - str => $"{str}: {bv.Value()}"); -``` - -The type of broadcast variable is captured by using Generics in C#, as can be seen in the above example. - -### Deleting broadcast variables - -The broadcast variable can be deleted from all executors by calling the `Destroy()` function on it. - -```csharp -// Destroying the broadcast variable bv: -bv.Destroy(); -``` - -> Note: `Destroy` deletes all data and metadata related to the broadcast variable. Use this with caution- once a broadcast variable has been destroyed, it cannot be used again. - -#### Caveat of using Destroy - -One important thing to keep in mind while using broadcast variables in UDFs is to limit the scope of the variable to only the UDF that is referencing it. The [guide to using UDFs](udf-guide.md) describes this phenomenon in detail. This is especially crucial when calling `Destroy` on the broadcast variable. If the broadcast variable that has been destroyed is visible to or accessible from other UDFs, it gets picked up for serialization by all those UDFs, even if it is not being referenced by them. This will throw an error as .NET for Apache Spark is not able to serialize the destroyed broadcast variable. - -Example to demonstrate: - -```csharp -string v = "Variable to be broadcasted"; -Broadcast bv = SparkContext.Broadcast(v); - -// Using the broadcast variable in a UDF: -Func udf1 = Udf( - str => $"{str}: {bv.Value()}"); - -// Destroying bv -bv.Destroy(); - -// Calling udf1 after destroying bv throws the following expected exception: -// org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed -df.Select(udf1(df["_1"])).Show(); - -// Different UDF udf2 that is not referencing bv -Func udf2 = Udf( - str => $"{str}: not referencing broadcast variable"); - -// Calling udf2 throws the following (unexpected) exception: -// [Error] [JvmBridge] org.apache.spark.SparkException: Task not serializable -df.Select(udf2(df["_1"])).Show(); -``` - -The recommended way of implementing above desired behavior: - -```csharp -string v = "Variable to be broadcasted"; -// Restricting the visibility of bv to only the UDF referencing it -{ - Broadcast bv = SparkContext.Broadcast(v); - - // Using the broadcast variable in a UDF: - Func udf1 = Udf( - str => $"{str}: {bv.Value()}"); - - // Destroying bv - bv.Destroy(); -} - -// Different UDF udf2 that is not referencing bv -Func udf2 = Udf( - str => $"{str}: not referencing broadcast variable"); - -// Calling udf2 works fine as expected -df.Select(udf2(df["_1"])).Show(); -``` - This ensures that destroying `bv` doesn't affect calling `udf2` because of unexpected serialization behavior. - - Broadcast variables are very useful for transmitting read-only data to all executors, as the data is sent only once and this gives huge performance benefits when compared with using local variables that get shipped to the executors with each task. Please refer to the [official documentation](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) to get a deeper understanding of broadcast variables and why they are used. \ No newline at end of file diff --git a/docs/udf-guide.md b/docs/udf-guide.md deleted file mode 100644 index bb308815d..000000000 --- a/docs/udf-guide.md +++ /dev/null @@ -1,172 +0,0 @@ -# Guide to User-Defined Functions (UDFs) - -This is a guide to show how to use UDFs in .NET for Apache Spark. - -## What are UDFs - -[User-Defined Functions (UDFs)](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html) are a feature of Spark that allow developers to use custom functions to extend the system's built-in functionality. They transform values from a single row within a table to produce a single corresponding output value per row based on the logic defined in the UDF. - -Let's take the following as an example for a UDF definition: - -```csharp -string s1 = "hello"; -Func udf = Udf( - str => $"{s1} {str}"); - -``` -The above defined UDF takes a `string` as an input (in the form of a [Column](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Column.cs#L14) of a [Dataframe](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/DataFrame.cs#L24)), and returns a `string` with `hello` appended in front of the input. - -For a sample Dataframe, let's take the following Dataframe `df`: - -```text -+-------+ -| name| -+-------+ -|Michael| -| Andy| -| Justin| -+-------+ -``` - -Now let's apply the above defined `udf` to the dataframe `df`: - -```csharp -DataFrame udfResult = df.Select(udf(df["name"])); -``` - -This would return the below as the Dataframe `udfResult`: - -```text -+-------------+ -| name| -+-------------+ -|hello Michael| -| hello Andy| -| hello Justin| -+-------------+ -``` -To get a better understanding of how to implement UDFs, please take a look at the [UDF helper functions](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Functions.cs#L3616) and some [test examples](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfSimpleTypesTests.cs#L49). - -## UDF serialization - -Since UDFs are functions that need to be executed on the workers, they have to be serialized and sent to the workers as part of the payload from the driver. This involves serializing the [delegate](https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/delegates/) which is a reference to the method, along with its [target](https://docs.microsoft.com/en-us/dotnet/api/system.delegate.target?view=netframework-4.8) which is the class instance on which the current delegate invokes the instance method. Please take a look at this [code](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs#L149) to get a better understanding of how UDF serialization is being done. - -## Good to know while implementing UDFs - -One behavior to be aware of while implementing UDFs in .NET for Apache Spark is how the target of the UDF gets serialized. .NET for Apache Spark uses .NET Core, which does not support serializing delegates, so it is instead done by using reflection to serialize the target where the delegate is defined. When multiple delegates are defined in a common scope, they have a shared closure that becomes the target of reflection for serialization. Let's take an example to illustrate what that means. - -The following code snippet defines two string variables that are being referenced in two function delegates, that just return the respective strings as result: - -```csharp -using System; - -public class C { - public void M() { - string s1 = "s1"; - string s2 = "s2"; - Func a = str => s1; - Func b = str => s2; - } -} -``` - -The above C# code generates the following C# disassembly (credit source: [sharplab.io](sharplab.io)) code from the compiler: - -```csharp -public class C -{ - [CompilerGenerated] - private sealed class <>c__DisplayClass0_0 - { - public string s1; - - public string s2; - - internal string b__0(string str) - { - return s1; - } - - internal string b__1(string str) - { - return s2; - } - } - - public void M() - { - <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); - <>c__DisplayClass0_.s1 = "s1"; - <>c__DisplayClass0_.s2 = "s2"; - Func func = new Func(<>c__DisplayClass0_.b__0); - Func func2 = new Func(<>c__DisplayClass0_.b__1); - } -} -``` -As can be seen in the above IL code, both `func` and `func2` share the same closure `<>c__DisplayClass0_0`, which is the target that is serialized when serializing the delegates `func` and `func2`. Hence, even though `Func a` is only referencing `s1`, `s2` also gets serialized when sending over the bytes to the workers. - -This can lead to some unexpected behaviors at runtime (like in the case of using [broadcast variables](broadcast-guide.md)), which is why we recommend restricting the visibility of the variables used in a function to that function's scope. -Taking the above example to better explain what that means: - -Recommended user code to implement desired behavior of previous code snippet: - -```csharp -using System; - -public class C { - public void M() { - { - string s1 = "s1"; - Func a = str => s1; - } - { - string s2 = "s2"; - Func b = str => s2; - } - } -} -``` - -The above C# code generates the following C# disassembly (credit source: [sharplab.io](sharplab.io)) code from the compiler: - -```csharp -public class C -{ - [CompilerGenerated] - private sealed class <>c__DisplayClass0_0 - { - public string s1; - - internal string b__0(string str) - { - return s1; - } - } - - [CompilerGenerated] - private sealed class <>c__DisplayClass0_1 - { - public string s2; - - internal string b__1(string str) - { - return s2; - } - } - - public void M() - { - <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); - <>c__DisplayClass0_.s1 = "s1"; - Func func = new Func(<>c__DisplayClass0_.b__0); - <>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1(); - <>c__DisplayClass0_2.s2 = "s2"; - Func func2 = new Func(<>c__DisplayClass0_2.b__1); - } -} -``` - -Here we see that `func` and `func2` no longer share a closure and have their own separate closures `<>c__DisplayClass0_0` and `<>c__DisplayClass0_1` respectively. When used as the target for serialization, nothing other than the referenced variables will get serialized for the delegate. - -This above behavior is important to keep in mind while implementing multiple UDFs in a common scope. -To learn more about UDFs in general, please review the following articles that explain UDFs and how to use them: [UDFs in databricks(scala)](https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html), [Spark UDFs and some gotchas](https://medium.com/@achilleus/spark-udfs-we-can-use-them-but-should-we-use-them-2c5a561fde6d). \ No newline at end of file From c2c66cc88a9cdecd6e778616bdc5f253f38ed4bc Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Fri, 5 Jun 2020 02:20:36 -0700 Subject: [PATCH 03/14] Initial commit --- .../Microsoft.Spark.E2ETest/SparkFixture.cs | 3 ++ .../Network/DefaultSocketWrapper.cs | 11 +++++- .../Services/ConfigurationService.cs | 12 +++++++ .../Services/IConfigurationService.cs | 5 +++ .../spark/api/dotnet/DotnetBackend.scala | 4 +-- .../spark/deploy/dotnet/DotnetRunner.scala | 35 +++++++++++++++---- 6 files changed, 61 insertions(+), 9 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs b/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs index 6d8dadbac..5674017d0 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs @@ -7,6 +7,7 @@ using System.IO; using System.Reflection; using System.Runtime.InteropServices; +using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; using Microsoft.Spark.UnitTest.TestUtils; @@ -57,6 +58,8 @@ public SparkFixture() $"Environment variable '{EnvironmentVariableNames.WorkerDir}' must be set."); } + Environment.SetEnvironmentVariable("DOTNETBACKEND_IP_ADDRESS", "localhost"); + BuildSparkCmd(out var filename, out var args); // Configure the process using the StartInfo properties. diff --git a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs index 8647a14cb..1e3caa0ac 100644 --- a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs +++ b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs @@ -6,6 +6,7 @@ using System.IO; using System.Net; using System.Net.Sockets; +using Microsoft.Spark.Interop; using Microsoft.Spark.Services; using Microsoft.Spark.Utils; @@ -29,7 +30,15 @@ internal sealed class DefaultSocketWrapper : ISocketWrapper public DefaultSocketWrapper() : this(new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { - _innerSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + string dotnetBackendIPAddress = SparkEnvironment.ConfigurationService.GetBackendIPAddress(); + if (dotnetBackendIPAddress == "localhost") + { + _innerSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + } + else + { + _innerSocket.Bind(new IPEndPoint(IPAddress.Parse(dotnetBackendIPAddress), 0)); + } } /// diff --git a/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs b/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs index 3b7de1555..cb58409b2 100644 --- a/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs +++ b/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs @@ -20,6 +20,7 @@ internal sealed class ConfigurationService : IConfigurationService "spark.dotnet.worker.writeBufferSize"; private const string DotnetBackendPortEnvVarName = "DOTNETBACKEND_PORT"; + private const string DotnetBackendIPAddressEnvVarName = "DOTNETBACKEND_IP_ADDRESS"; private const int DotnetBackendDebugPort = 5567; private static readonly string s_procBaseFileName = "Microsoft.Spark.Worker"; @@ -51,6 +52,17 @@ public int GetBackendPortNumber() return portNumber; } + /// + /// Returns the IP address for socket communication between JVM and CLR. + /// + public string GetBackendIPAddress() + { + string ipAddress = Environment.GetEnvironmentVariable(DotnetBackendIPAddressEnvVarName); + _logger.LogInfo($"Using IP address {ipAddress} for connection."); + + return ipAddress; + } + /// /// Returns the worker executable path. /// diff --git a/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs b/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs index 5c7a4074f..bf4d7e769 100644 --- a/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs +++ b/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs @@ -14,6 +14,11 @@ internal interface IConfigurationService /// int GetBackendPortNumber(); + /// + /// The IP address used for communicating with the .NET backend process. + /// + string GetBackendIPAddress(); + /// /// The full path to the .NET worker executable. /// diff --git a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala index 45b3cd5a4..ca39ec20a 100644 --- a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala +++ b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala @@ -30,7 +30,7 @@ class DotnetBackend extends Logging { private[this] var bootstrap: ServerBootstrap = _ private[this] var bossGroup: EventLoopGroup = _ - def init(portNumber: Int): Int = { + def init(portNumber: Int, ipAddress: String): Int = { // need at least 3 threads, use 10 here for safety bossGroup = new NioEventLoopGroup(10) val workerGroup = bossGroup @@ -57,7 +57,7 @@ class DotnetBackend extends Logging { } }) - channelFuture = bootstrap.bind(new InetSocketAddress("localhost", portNumber)) + channelFuture = bootstrap.bind(new InetSocketAddress(ipAddress, portNumber)) channelFuture.syncUninterruptibly() channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort } diff --git a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index 65a56e3e8..bad99dc2e 100644 --- a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -7,7 +7,8 @@ package org.apache.spark.deploy.dotnet import java.io.File -import java.net.URI +import java.lang.NumberFormatException +import java.net.{InetAddress, URI, UnknownHostException} import java.nio.file.attribute.PosixFilePermissions import java.nio.file.{FileSystems, Files, Paths} import java.util.Locale @@ -51,6 +52,7 @@ object DotnetRunner extends Logging { // In debug mode this runner will not launch a .NET process. val runInDebugMode = settings._1 @volatile var dotnetBackendPortNumber = settings._2 + val dotnetBackendIPAddress = settings._3 var dotnetExecutable = "" var otherArgs: Array[String] = null @@ -98,8 +100,9 @@ object DotnetRunner extends Logging { override def run() { // need to get back dotnetBackendPortNumber because if the value passed to init is 0 // the port number is dynamically assigned in the backend - dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber) - logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber") + dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber, dotnetBackendIPAddress) + logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber on IP address " + + s"$dotnetBackendIPAddress") initialized.release() dotnetBackend.run() } @@ -115,6 +118,7 @@ object DotnetRunner extends Logging { val builder = new ProcessBuilder(processParameters) val env = builder.environment() env.put("DOTNETBACKEND_PORT", dotnetBackendPortNumber.toString) + env.put("DOTNETBACKEND_IP_ADDRESS", dotnetBackendIPAddress) for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { env.put(key, value) @@ -264,19 +268,38 @@ object DotnetRunner extends Logging { returnCode } - private def initializeSettings(args: Array[String]): (Boolean, Int) = { - val runInDebugMode = (args.length == 1 || args.length == 2) && args(0).equalsIgnoreCase( + private def initializeSettings(args: Array[String]): (Boolean, Int, String) = { + val runInDebugMode = (args.length == 1 || args.length == 2 || args.length == 3) && args(0).equalsIgnoreCase( "debug") var portNumber = 0 + var dotnetBackendIPAddress = "localhost" if (runInDebugMode) { if (args.length == 1) { portNumber = DEBUG_PORT } else if (args.length == 2) { + try { + portNumber = Integer.parseInt(args(1)) + } + catch { + case e: Exception => + portNumber = DEBUG_PORT + dotnetBackendIPAddress = args(1) + } + } else if (args.length == 3) { portNumber = Integer.parseInt(args(1)) + dotnetBackendIPAddress = args(2) + } + } else { + try { + var addr = InetAddress.getByName(args(0)) + } + catch { + case e: UnknownHostException => + dotnetBackendIPAddress = "localhost" } } - (runInDebugMode, portNumber) + (runInDebugMode, portNumber, dotnetBackendIPAddress) } private def logThrowable(throwable: Throwable): Unit = From e846bcbdd1e6db3c0e061f9cf4713682de401210 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Fri, 5 Jun 2020 03:00:48 -0700 Subject: [PATCH 04/14] If environment variable is not set, use localhost --- src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs index 1e3caa0ac..56e439d8b 100644 --- a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs +++ b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs @@ -31,7 +31,7 @@ public DefaultSocketWrapper() : this(new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { string dotnetBackendIPAddress = SparkEnvironment.ConfigurationService.GetBackendIPAddress(); - if (dotnetBackendIPAddress == "localhost") + if (dotnetBackendIPAddress == "localhost" || dotnetBackendIPAddress == null) { _innerSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); } From 1a3deac02d1f5acfb972a252dd768444c8e69bb3 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Wed, 10 Jun 2020 18:29:08 -0700 Subject: [PATCH 05/14] Adding change to all spark versions --- .../Microsoft.Spark.E2ETest/SparkFixture.cs | 3 --- .../Network/DefaultSocketWrapper.cs | 2 +- .../spark/api/dotnet/DotnetBackend.scala | 4 +-- .../spark/deploy/dotnet/DotnetRunner.scala | 26 +++++++++++++++---- .../spark/deploy/dotnet/DotnetRunner.scala | 14 +++------- .../spark/api/dotnet/DotnetBackend.scala | 4 +-- .../spark/deploy/dotnet/DotnetRunner.scala | 26 +++++++++++++++---- 7 files changed, 50 insertions(+), 29 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs b/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs index 5674017d0..6d8dadbac 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs @@ -7,7 +7,6 @@ using System.IO; using System.Reflection; using System.Runtime.InteropServices; -using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; using Microsoft.Spark.UnitTest.TestUtils; @@ -58,8 +57,6 @@ public SparkFixture() $"Environment variable '{EnvironmentVariableNames.WorkerDir}' must be set."); } - Environment.SetEnvironmentVariable("DOTNETBACKEND_IP_ADDRESS", "localhost"); - BuildSparkCmd(out var filename, out var args); // Configure the process using the StartInfo properties. diff --git a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs index 56e439d8b..9dfa2dba6 100644 --- a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs +++ b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs @@ -25,7 +25,7 @@ internal sealed class DefaultSocketWrapper : ISocketWrapper /// Default constructor that creates a new instance of DefaultSocket class which represents /// a traditional socket (System.Net.Socket.Socket). /// - /// This socket is bound to Loopback with port 0. + /// This socket is bound to provided IP address with port 0. /// public DefaultSocketWrapper() : this(new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) diff --git a/src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala b/src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala index 45b3cd5a4..ca39ec20a 100644 --- a/src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala +++ b/src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala @@ -30,7 +30,7 @@ class DotnetBackend extends Logging { private[this] var bootstrap: ServerBootstrap = _ private[this] var bossGroup: EventLoopGroup = _ - def init(portNumber: Int): Int = { + def init(portNumber: Int, ipAddress: String): Int = { // need at least 3 threads, use 10 here for safety bossGroup = new NioEventLoopGroup(10) val workerGroup = bossGroup @@ -57,7 +57,7 @@ class DotnetBackend extends Logging { } }) - channelFuture = bootstrap.bind(new InetSocketAddress("localhost", portNumber)) + channelFuture = bootstrap.bind(new InetSocketAddress(ipAddress, portNumber)) channelFuture.syncUninterruptibly() channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort } diff --git a/src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index 99e86bdd0..64d03134b 100644 --- a/src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -8,6 +8,8 @@ package org.apache.spark.deploy.dotnet import java.io.File import java.net.URI +import java.lang.NumberFormatException +import java.net.{InetAddress, URI, UnknownHostException} import java.nio.file.attribute.PosixFilePermissions import java.nio.file.{FileSystems, Files, Paths} import java.util.Locale @@ -51,6 +53,7 @@ object DotnetRunner extends Logging { // In debug mode this runner will not launch a .NET process. val runInDebugMode = settings._1 @volatile var dotnetBackendPortNumber = settings._2 + val dotnetBackendIPAddress = settings._3 var dotnetExecutable = "" var otherArgs: Array[String] = null @@ -98,8 +101,9 @@ object DotnetRunner extends Logging { override def run() { // need to get back dotnetBackendPortNumber because if the value passed to init is 0 // the port number is dynamically assigned in the backend - dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber) - logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber") + dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber, dotnetBackendIPAddress) + logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber on IP address " + + s"$dotnetBackendIPAddress") initialized.release() dotnetBackend.run() } @@ -115,6 +119,7 @@ object DotnetRunner extends Logging { val builder = new ProcessBuilder(processParameters) val env = builder.environment() env.put("DOTNETBACKEND_PORT", dotnetBackendPortNumber.toString) + env.put("DOTNETBACKEND_IP_ADDRESS", dotnetBackendIPAddress) for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { env.put(key, value) @@ -264,19 +269,30 @@ object DotnetRunner extends Logging { returnCode } - private def initializeSettings(args: Array[String]): (Boolean, Int) = { + private def initializeSettings(args: Array[String]): (Boolean, Int, String) = { val runInDebugMode = (args.length == 1 || args.length == 2) && args(0).equalsIgnoreCase( "debug") var portNumber = 0 + var dotnetBackendIPAddress = "localhost" if (runInDebugMode) { if (args.length == 1) { portNumber = DEBUG_PORT } else if (args.length == 2) { - portNumber = Integer.parseInt(args(1)) + portNumber = Integer.parseInt(args(1)) + } + } + else { + try { + var addr = InetAddress.getByName(args(0)) + dotnetBackendIPAddress = args(0) + } + catch { + case e: UnknownHostException => + dotnetBackendIPAddress = "localhost" } } - (runInDebugMode, portNumber) + (runInDebugMode, portNumber, dotnetBackendIPAddress) } private def logThrowable(throwable: Throwable): Unit = diff --git a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index bad99dc2e..573bf0965 100644 --- a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -269,7 +269,7 @@ object DotnetRunner extends Logging { } private def initializeSettings(args: Array[String]): (Boolean, Int, String) = { - val runInDebugMode = (args.length == 1 || args.length == 2 || args.length == 3) && args(0).equalsIgnoreCase( + val runInDebugMode = (args.length == 1 || args.length == 2) && args(0).equalsIgnoreCase( "debug") var portNumber = 0 var dotnetBackendIPAddress = "localhost" @@ -277,21 +277,13 @@ object DotnetRunner extends Logging { if (args.length == 1) { portNumber = DEBUG_PORT } else if (args.length == 2) { - try { portNumber = Integer.parseInt(args(1)) } - catch { - case e: Exception => - portNumber = DEBUG_PORT - dotnetBackendIPAddress = args(1) - } - } else if (args.length == 3) { - portNumber = Integer.parseInt(args(1)) - dotnetBackendIPAddress = args(2) } - } else { + else { try { var addr = InetAddress.getByName(args(0)) + dotnetBackendIPAddress = args(0) } catch { case e: UnknownHostException => diff --git a/src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala b/src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala index 45b3cd5a4..ca39ec20a 100644 --- a/src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala +++ b/src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala @@ -30,7 +30,7 @@ class DotnetBackend extends Logging { private[this] var bootstrap: ServerBootstrap = _ private[this] var bossGroup: EventLoopGroup = _ - def init(portNumber: Int): Int = { + def init(portNumber: Int, ipAddress: String): Int = { // need at least 3 threads, use 10 here for safety bossGroup = new NioEventLoopGroup(10) val workerGroup = bossGroup @@ -57,7 +57,7 @@ class DotnetBackend extends Logging { } }) - channelFuture = bootstrap.bind(new InetSocketAddress("localhost", portNumber)) + channelFuture = bootstrap.bind(new InetSocketAddress(ipAddress, portNumber)) channelFuture.syncUninterruptibly() channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort } diff --git a/src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index daf6d3f89..05d29ad64 100644 --- a/src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -8,6 +8,8 @@ package org.apache.spark.deploy.dotnet import java.io.File import java.net.URI +import java.lang.NumberFormatException +import java.net.{InetAddress, URI, UnknownHostException} import java.nio.file.attribute.PosixFilePermissions import java.nio.file.{FileSystems, Files, Paths} import java.util.Locale @@ -51,6 +53,7 @@ object DotnetRunner extends Logging { // In debug mode this runner will not launch a .NET process. val runInDebugMode = settings._1 @volatile var dotnetBackendPortNumber = settings._2 + val dotnetBackendIPAddress = settings._3 var dotnetExecutable = "" var otherArgs: Array[String] = null @@ -98,8 +101,9 @@ object DotnetRunner extends Logging { override def run() { // need to get back dotnetBackendPortNumber because if the value passed to init is 0 // the port number is dynamically assigned in the backend - dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber) - logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber") + dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber, dotnetBackendIPAddress) + logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber on IP address " + + s"$dotnetBackendIPAddress") initialized.release() dotnetBackend.run() } @@ -115,6 +119,7 @@ object DotnetRunner extends Logging { val builder = new ProcessBuilder(processParameters) val env = builder.environment() env.put("DOTNETBACKEND_PORT", dotnetBackendPortNumber.toString) + env.put("DOTNETBACKEND_IP_ADDRESS", dotnetBackendIPAddress) for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { env.put(key, value) @@ -264,19 +269,30 @@ object DotnetRunner extends Logging { returnCode } - private def initializeSettings(args: Array[String]): (Boolean, Int) = { + private def initializeSettings(args: Array[String]): (Boolean, Int, String) = { val runInDebugMode = (args.length == 1 || args.length == 2) && args(0).equalsIgnoreCase( "debug") var portNumber = 0 + var dotnetBackendIPAddress = "localhost" if (runInDebugMode) { if (args.length == 1) { portNumber = DEBUG_PORT } else if (args.length == 2) { - portNumber = Integer.parseInt(args(1)) + portNumber = Integer.parseInt(args(1)) + } + } + else { + try { + var addr = InetAddress.getByName(args(0)) + dotnetBackendIPAddress = args(0) + } + catch { + case e: UnknownHostException => + dotnetBackendIPAddress = "localhost" } } - (runInDebugMode, portNumber) + (runInDebugMode, portNumber, dotnetBackendIPAddress) } private def logThrowable(throwable: Throwable): Unit = From 8b3fd40b970eb480fb5d6677e29518242fdb9e40 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Sat, 13 Jun 2020 21:43:50 -0700 Subject: [PATCH 06/14] Adding environment variable for dotnet backend ip address --- .../Microsoft.Spark/Interop/Ipc/JvmBridge.cs | 3 ++- .../Network/DefaultSocketWrapper.cs | 10 +------ src/csharp/Microsoft.Spark/RDD.cs | 4 ++- .../Services/ConfigurationService.cs | 26 ++++++++++++++++++- .../Services/IConfigurationService.cs | 7 +++++ src/csharp/Microsoft.Spark/Sql/DataFrame.cs | 3 ++- .../spark/api/dotnet/DotnetBackend.scala | 2 +- .../spark/deploy/dotnet/DotnetRunner.scala | 20 +++----------- 8 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs index abfa63b19..3f6e6c8d5 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs @@ -51,8 +51,9 @@ private ISocketWrapper GetConnection() { if (!_sockets.TryDequeue(out ISocketWrapper socket)) { + IPEndPoint dotnetBackendIPEndpoint = SparkEnvironment.ConfigurationService.GetBackendIPEndpoint(); socket = SocketFactory.CreateSocket(); - socket.Connect(IPAddress.Loopback, _portNumber); + socket.Connect(dotnetBackendIPEndpoint.Address, dotnetBackendIPEndpoint.Port); } return socket; diff --git a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs index 9dfa2dba6..b44d1ba47 100644 --- a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs +++ b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs @@ -30,15 +30,7 @@ internal sealed class DefaultSocketWrapper : ISocketWrapper public DefaultSocketWrapper() : this(new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { - string dotnetBackendIPAddress = SparkEnvironment.ConfigurationService.GetBackendIPAddress(); - if (dotnetBackendIPAddress == "localhost" || dotnetBackendIPAddress == null) - { - _innerSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); - } - else - { - _innerSocket.Bind(new IPEndPoint(IPAddress.Parse(dotnetBackendIPAddress), 0)); - } + _innerSocket.Bind(new IPEndPoint(IPAddress.Any, 0)); } /// diff --git a/src/csharp/Microsoft.Spark/RDD.cs b/src/csharp/Microsoft.Spark/RDD.cs index 17133a057..b36b091f4 100644 --- a/src/csharp/Microsoft.Spark/RDD.cs +++ b/src/csharp/Microsoft.Spark/RDD.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Linq; using System.Net; +using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Network; using Microsoft.Spark.Utils; @@ -262,7 +263,8 @@ public IEnumerable Collect() { (int port, string secret) = CollectAndServe(); using ISocketWrapper socket = SocketFactory.CreateSocket(); - socket.Connect(IPAddress.Loopback, port, secret); + IPEndPoint dotnetBackendIPEndpoint = SparkEnvironment.ConfigurationService.GetBackendIPEndpoint(); + socket.Connect(dotnetBackendIPEndpoint.Address, port, secret); var collector = new RDD.Collector(); System.IO.Stream stream = socket.InputStream; diff --git a/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs b/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs index cb58409b2..552407af4 100644 --- a/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs +++ b/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs @@ -4,6 +4,7 @@ using System; using System.IO; +using System.Net; using System.Runtime.InteropServices; namespace Microsoft.Spark.Services @@ -20,7 +21,7 @@ internal sealed class ConfigurationService : IConfigurationService "spark.dotnet.worker.writeBufferSize"; private const string DotnetBackendPortEnvVarName = "DOTNETBACKEND_PORT"; - private const string DotnetBackendIPAddressEnvVarName = "DOTNETBACKEND_IP_ADDRESS"; + private const string DotnetBackendIPAddressEnvVarName = "DOTNET_SPARK_BACKEND_IP_ADDRESS"; private const int DotnetBackendDebugPort = 5567; private static readonly string s_procBaseFileName = "Microsoft.Spark.Worker"; @@ -34,6 +35,29 @@ internal sealed class ConfigurationService : IConfigurationService private string _workerPath; + /// + /// Returns the IP Endpoint for socket communication between JVM and CLR. + /// + public IPEndPoint GetBackendIPEndpoint() + { + if (!int.TryParse( + Environment.GetEnvironmentVariable(DotnetBackendPortEnvVarName), + out int portNumber)) + { + _logger.LogInfo($"'{DotnetBackendPortEnvVarName}' environment variable is not set."); + portNumber = DotnetBackendDebugPort; + } + string ipAddress = Environment.GetEnvironmentVariable(DotnetBackendIPAddressEnvVarName); + if (ipAddress == null) + { + _logger.LogInfo($"'{DotnetBackendIPAddressEnvVarName}' environment variable is not set."); + ipAddress = "127.0.0.1"; + } + _logger.LogInfo($"Using IP address {ipAddress} and port {portNumber} for connection."); + + return new IPEndPoint(IPAddress.Parse(ipAddress), portNumber); + } + /// /// Returns the port number for socket communication between JVM and CLR. /// diff --git a/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs b/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs index bf4d7e769..01b3f7b5f 100644 --- a/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs +++ b/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs @@ -2,6 +2,8 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Net; + namespace Microsoft.Spark.Services { /// @@ -14,6 +16,11 @@ internal interface IConfigurationService /// int GetBackendPortNumber(); + /// + /// The IP Endpoint used for communicating with the .NET backend process. + /// + IPEndPoint GetBackendIPEndpoint(); + /// /// The IP address used for communicating with the .NET backend process. /// diff --git a/src/csharp/Microsoft.Spark/Sql/DataFrame.cs b/src/csharp/Microsoft.Spark/Sql/DataFrame.cs index afb455f73..5a8b81672 100644 --- a/src/csharp/Microsoft.Spark/Sql/DataFrame.cs +++ b/src/csharp/Microsoft.Spark/Sql/DataFrame.cs @@ -886,9 +886,10 @@ public DataStreamWriter WriteStream() => private IEnumerable GetRows(string funcName) { (int port, string secret) = GetConnectionInfo(funcName); + IPEndPoint dotnetBackendIPEndpoint = SparkEnvironment.ConfigurationService.GetBackendIPEndpoint(); using (ISocketWrapper socket = SocketFactory.CreateSocket()) { - socket.Connect(IPAddress.Loopback, port, secret); + socket.Connect(dotnetBackendIPEndpoint.Address, port, secret); foreach (Row row in new RowCollector().Collect(socket)) { yield return row; diff --git a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala index ca39ec20a..fc823c0aa 100644 --- a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala +++ b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala @@ -30,7 +30,7 @@ class DotnetBackend extends Logging { private[this] var bootstrap: ServerBootstrap = _ private[this] var bossGroup: EventLoopGroup = _ - def init(portNumber: Int, ipAddress: String): Int = { + def init(ipAddress: String, portNumber: Int): Int = { // need at least 3 threads, use 10 here for safety bossGroup = new NioEventLoopGroup(10) val workerGroup = bossGroup diff --git a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index 573bf0965..b5d893642 100644 --- a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -52,7 +52,6 @@ object DotnetRunner extends Logging { // In debug mode this runner will not launch a .NET process. val runInDebugMode = settings._1 @volatile var dotnetBackendPortNumber = settings._2 - val dotnetBackendIPAddress = settings._3 var dotnetExecutable = "" var otherArgs: Array[String] = null @@ -91,6 +90,7 @@ object DotnetRunner extends Logging { // Time to wait for DotnetBackend to initialize in seconds. val backendTimeout = sys.env.getOrElse("DOTNETBACKEND_TIMEOUT", "120").toInt + val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "127.0.0.1") // Launch a DotnetBackend server for the .NET process to connect to; this will let it see our // Java system properties etc. @@ -100,7 +100,7 @@ object DotnetRunner extends Logging { override def run() { // need to get back dotnetBackendPortNumber because if the value passed to init is 0 // the port number is dynamically assigned in the backend - dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber, dotnetBackendIPAddress) + dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendIPAddress, dotnetBackendPortNumber) logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber on IP address " + s"$dotnetBackendIPAddress") initialized.release() @@ -118,7 +118,6 @@ object DotnetRunner extends Logging { val builder = new ProcessBuilder(processParameters) val env = builder.environment() env.put("DOTNETBACKEND_PORT", dotnetBackendPortNumber.toString) - env.put("DOTNETBACKEND_IP_ADDRESS", dotnetBackendIPAddress) for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { env.put(key, value) @@ -268,11 +267,10 @@ object DotnetRunner extends Logging { returnCode } - private def initializeSettings(args: Array[String]): (Boolean, Int, String) = { + private def initializeSettings(args: Array[String]): (Boolean, Int) = { val runInDebugMode = (args.length == 1 || args.length == 2) && args(0).equalsIgnoreCase( "debug") var portNumber = 0 - var dotnetBackendIPAddress = "localhost" if (runInDebugMode) { if (args.length == 1) { portNumber = DEBUG_PORT @@ -280,18 +278,8 @@ object DotnetRunner extends Logging { portNumber = Integer.parseInt(args(1)) } } - else { - try { - var addr = InetAddress.getByName(args(0)) - dotnetBackendIPAddress = args(0) - } - catch { - case e: UnknownHostException => - dotnetBackendIPAddress = "localhost" - } - } - (runInDebugMode, portNumber, dotnetBackendIPAddress) + (runInDebugMode, portNumber) } private def logThrowable(throwable: Throwable): Unit = From 19aad8bc9919b5cf50a5e0dfcb16fc51267103c8 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Sat, 13 Jun 2020 22:30:30 -0700 Subject: [PATCH 07/14] Fixing DaemonWorkerTests by connecting on loopback --- src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs index 5fac38035..c90c09f75 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/DaemonWorkerTests.cs @@ -42,7 +42,7 @@ private static void CreateAndVerifyConnection(ISocketWrapper daemonSocket) var ipEndpoint = (IPEndPoint)daemonSocket.LocalEndPoint; int port = ipEndpoint.Port; ISocketWrapper clientSocket = SocketFactory.CreateSocket(); - clientSocket.Connect(ipEndpoint.Address, port); + clientSocket.Connect(IPAddress.Loopback, port); // Now process the bytes flowing in from the client. PayloadWriter payloadWriter = new PayloadWriterFactory().Create(); From 6a4f3b78057142ca3edf6778d1d86a1f3f6393cd Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Sun, 14 Jun 2020 16:15:42 -0700 Subject: [PATCH 08/14] Binding to 0.0.0.0 on JVM side --- .../scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index b5d893642..ad434063b 100644 --- a/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -90,7 +90,7 @@ object DotnetRunner extends Logging { // Time to wait for DotnetBackend to initialize in seconds. val backendTimeout = sys.env.getOrElse("DOTNETBACKEND_TIMEOUT", "120").toInt - val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "127.0.0.1") + val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "0.0.0.0") // Launch a DotnetBackend server for the .NET process to connect to; this will let it see our // Java system properties etc. From cc98a4be69b9b53719925dfa3c12109d37843b33 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Fri, 1 Apr 2022 20:23:19 -0700 Subject: [PATCH 09/14] changes --- .../Interop/Ipc/IJvmBridgeFactory.cs | 4 +++ .../Microsoft.Spark/Interop/Ipc/JvmBridge.cs | 11 ++++-- .../Interop/SparkEnvironment.cs | 4 ++- .../Network/DefaultSocketWrapper.cs | 4 +-- .../Microsoft.Spark/Network/SocketFactory.cs | 7 ++++ .../Services/ConfigurationService.cs | 34 ++++++------------- .../Services/IConfigurationService.cs | 9 ++--- src/csharp/Microsoft.Spark/Sql/DataFrame.cs | 3 +- .../spark/deploy/dotnet/DotnetRunner.scala | 7 ++-- .../spark/api/dotnet/DotnetBackend.scala | 2 +- .../spark/deploy/dotnet/DotnetRunner.scala | 6 ++-- .../spark/api/dotnet/DotnetBackend.scala | 2 +- .../spark/deploy/dotnet/DotnetRunner.scala | 8 ++--- .../spark/api/dotnet/DotnetBackend.scala | 2 +- .../spark/deploy/dotnet/DotnetRunner.scala | 8 ++--- 15 files changed, 57 insertions(+), 54 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/IJvmBridgeFactory.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/IJvmBridgeFactory.cs index 428565527..b8be2918c 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/IJvmBridgeFactory.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/IJvmBridgeFactory.cs @@ -2,10 +2,14 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Net; + namespace Microsoft.Spark.Interop.Ipc { internal interface IJvmBridgeFactory { IJvmBridge Create(int portNumber); + + IJvmBridge Create(IPAddress ip, int portNumber); } } diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs index ebd6d9e7d..f10d3b071 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs @@ -41,17 +41,22 @@ internal sealed class JvmBridge : IJvmBridge new ConcurrentQueue(); private readonly ILoggerService _logger = LoggerServiceFactory.GetLogger(typeof(JvmBridge)); + private readonly IPAddress _ipAddress; private readonly int _portNumber; private readonly JvmThreadPoolGC _jvmThreadPoolGC; private readonly bool _isRunningRepl; - internal JvmBridge(int portNumber) + internal JvmBridge(int portNumber): this(IPAddress.Loopback, portNumber) + { + } + + internal JvmBridge(IPAddress ipAddress, int portNumber) { if (portNumber == 0) { throw new Exception("Port number is not set."); } - + _ipAddress = ipAddress; _portNumber = portNumber; _logger.LogInfo($"JvMBridge port is {portNumber}"); @@ -85,7 +90,7 @@ private ISocketWrapper GetConnection() { IPEndPoint dotnetBackendIPEndpoint = SparkEnvironment.ConfigurationService.GetBackendIPEndpoint(); socket = SocketFactory.CreateSocket(); - socket.Connect(dotnetBackendIPEndpoint.Address, dotnetBackendIPEndpoint.Port); + socket.Connect(_ipAddress, _portNumber); } return socket; diff --git a/src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs b/src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs index cf2c2dc0e..893318954 100644 --- a/src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs +++ b/src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Net; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Services; @@ -70,8 +71,9 @@ public static IJvmBridge JvmBridge { get { + IPEndPoint jvmBackendEndPoint = ConfigurationService.GetBackendIPEndpoint(); return s_jvmBridge ??= - JvmBridgeFactory.Create(ConfigurationService.GetBackendPortNumber()); + JvmBridgeFactory.Create(jvmBackendEndPoint.Address, jvmBackendEndPoint.Port); } set { diff --git a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs index f4e4843e0..3f4e6b7ad 100644 --- a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs +++ b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs @@ -27,10 +27,10 @@ internal sealed class DefaultSocketWrapper : ISocketWrapper /// /// This socket is bound to provided IP address with port 0. /// - public DefaultSocketWrapper() : + public DefaultSocketWrapper(IPAddress ipAddress) : this(new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { - _innerSocket.Bind(new IPEndPoint(IPAddress.Any, 0)); + _innerSocket.Bind(new IPEndPoint(ipAddress, 0)); } /// diff --git a/src/csharp/Microsoft.Spark/Network/SocketFactory.cs b/src/csharp/Microsoft.Spark/Network/SocketFactory.cs index 46c0f6c0f..090fe59b0 100644 --- a/src/csharp/Microsoft.Spark/Network/SocketFactory.cs +++ b/src/csharp/Microsoft.Spark/Network/SocketFactory.cs @@ -2,6 +2,8 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Net; + namespace Microsoft.Spark.Network { /// @@ -19,5 +21,10 @@ public static ISocketWrapper CreateSocket() { return new DefaultSocketWrapper(); } + + public static ISocketWrapper CreateSocket(IPAddress ip) + { + return new DefaultSocketWrapper(ip); + } } } diff --git a/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs b/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs index 73c874ec8..e431a1fcd 100644 --- a/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs +++ b/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs @@ -26,6 +26,7 @@ internal sealed class ConfigurationService : IConfigurationService private const string DotnetBackendPortEnvVarName = "DOTNETBACKEND_PORT"; private const string DotnetBackendIPAddressEnvVarName = "DOTNET_SPARK_BACKEND_IP_ADDRESS"; + private const string DotnetCallbackServerIPAddressEnvVarName = "DOTNET_SPARK_CALLBACK_SERVER_IP_ADDRESS"; private const int DotnetBackendDebugPort = 5567; private const string DotnetNumBackendThreadsEnvVarName = "DOTNET_SPARK_NUM_BACKEND_THREADS"; @@ -123,24 +124,6 @@ public IPEndPoint GetBackendIPEndpoint() return new IPEndPoint(IPAddress.Parse(ipAddress), portNumber); } - /// - /// Returns the port number for socket communication between JVM and CLR. - /// - public int GetBackendPortNumber() - { - if (!int.TryParse( - GetEnvironmentVariable(DotnetBackendPortEnvVarName), - out int portNumber)) - { - _logger.LogInfo($"'{DotnetBackendPortEnvVarName}' environment variable is not set."); - portNumber = DotnetBackendDebugPort; - } - - _logger.LogInfo($"Using port {portNumber} for connection."); - - return portNumber; - } - /// /// Returns the max number of threads for socket communication between JVM and CLR. /// @@ -157,14 +140,19 @@ public int GetNumBackendThreads() } /// - /// Returns the IP address for socket communication between JVM and CLR. + /// Returns the IP address for socket communication between JVM and CallBack Server. /// - public string GetBackendIPAddress() + public IPAddress GetCallbackServerIPAddress() { - string ipAddress = Environment.GetEnvironmentVariable(DotnetBackendIPAddressEnvVarName); - _logger.LogInfo($"Using IP address {ipAddress} for connection."); + string ipAddress = Environment.GetEnvironmentVariable(DotnetCallbackServerIPAddressEnvVarName); + if (ipAddress == null) + { + _logger.LogInfo($"'{DotnetCallbackServerIPAddressEnvVarName}' environment variable is not set."); + ipAddress = "127.0.0.1"; + } + _logger.LogInfo($"Using IP address {ipAddress} for connection with Callback Server."); - return ipAddress; + return IPAddress.Parse(ipAddress); } /// diff --git a/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs b/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs index 3c7bcb80d..447e4aac4 100644 --- a/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs +++ b/src/csharp/Microsoft.Spark/Services/IConfigurationService.cs @@ -17,11 +17,6 @@ internal interface IConfigurationService /// TimeSpan JvmThreadGCInterval { get; } - /// - /// The port number used for communicating with the .NET backend process. - /// - int GetBackendPortNumber(); - /// /// Returns the max number of threads for socket communication between JVM and CLR. /// @@ -33,9 +28,9 @@ internal interface IConfigurationService IPEndPoint GetBackendIPEndpoint(); /// - /// The IP address used for communicating with the .NET backend process. + /// The IP address used for communicating with CallBack server. /// - string GetBackendIPAddress(); + IPAddress GetCallbackServerIPAddress(); /// /// The full path to the .NET worker executable. diff --git a/src/csharp/Microsoft.Spark/Sql/DataFrame.cs b/src/csharp/Microsoft.Spark/Sql/DataFrame.cs index 289237671..eba977aae 100644 --- a/src/csharp/Microsoft.Spark/Sql/DataFrame.cs +++ b/src/csharp/Microsoft.Spark/Sql/DataFrame.cs @@ -849,7 +849,8 @@ public IEnumerable ToLocalIterator(bool prefetchPartitions) Reference.Invoke("toPythonIterator", prefetchPartitions), true); using ISocketWrapper socket = SocketFactory.CreateSocket(); - socket.Connect(IPAddress.Loopback, port, secret); + IPEndPoint dotnetBackendIPEndpoint = SparkEnvironment.ConfigurationService.GetBackendIPEndpoint(); + socket.Connect(dotnetBackendIPEndpoint.Address, port, secret); foreach (Row row in new RowCollector().Collect(socket, server)) { yield return row; diff --git a/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index caf52257b..fb96ccf85 100644 --- a/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -64,7 +64,7 @@ object DotnetRunner extends Logging { // In debug mode this runner will not launch a .NET process. val runInDebugMode = settings._1 @volatile var dotnetBackendPortNumber = settings._2 - val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "127.0.0.1").toInt + val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "127.0.0.1") var dotnetExecutable = "" var otherArgs: Array[String] = null @@ -112,8 +112,9 @@ object DotnetRunner extends Logging { override def run() { // need to get back dotnetBackendPortNumber because if the value passed to init is 0 // the port number is dynamically assigned in the backend - dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber) - logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber") + dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendIPAddress, dotnetBackendPortNumber) + logInfo(s"IP address used by DotnetBackend is $dotnetBackendIPAddress and " + + s"Port number used is $dotnetBackendPortNumber") initialized.release() dotnetBackend.run() } diff --git a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala index 07863f761..b59ef8b76 100644 --- a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala +++ b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala @@ -34,7 +34,7 @@ class DotnetBackend extends Logging { @volatile private[dotnet] var callbackClient: Option[CallbackClient] = None - def init(portNumber: Int): Int = { + def init(ipAddress: String, portNumber: Int): Int = { val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val numBackendThreads = conf.get(DOTNET_NUM_BACKEND_THREADS) logInfo(s"The number of DotnetBackend threads is set to $numBackendThreads.") diff --git a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index c46436f9b..8e3b37d4a 100644 --- a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -63,6 +63,7 @@ object DotnetRunner extends Logging { // In debug mode this runner will not launch a .NET process. val runInDebugMode = settings._1 @volatile var dotnetBackendPortNumber = settings._2 + val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "127.0.0.1") var dotnetExecutable = "" var otherArgs: Array[String] = null @@ -101,7 +102,6 @@ object DotnetRunner extends Logging { // Time to wait for DotnetBackend to initialize in seconds. val backendTimeout = sys.env.getOrElse("DOTNETBACKEND_TIMEOUT", "120").toInt - val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "0.0.0.0") // Launch a DotnetBackend server for the .NET process to connect to; this will let it see our // Java system properties etc. @@ -112,8 +112,8 @@ object DotnetRunner extends Logging { // need to get back dotnetBackendPortNumber because if the value passed to init is 0 // the port number is dynamically assigned in the backend dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendIPAddress, dotnetBackendPortNumber) - logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber on IP address " + - s"$dotnetBackendIPAddress") + logInfo(s"IP address used by DotnetBackend is $dotnetBackendIPAddress and " + + s"Port number used is $dotnetBackendPortNumber") initialized.release() dotnetBackend.run() } diff --git a/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala b/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala index 07863f761..b59ef8b76 100644 --- a/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala +++ b/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala @@ -34,7 +34,7 @@ class DotnetBackend extends Logging { @volatile private[dotnet] var callbackClient: Option[CallbackClient] = None - def init(portNumber: Int): Int = { + def init(ipAddress: String, portNumber: Int): Int = { val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val numBackendThreads = conf.get(DOTNET_NUM_BACKEND_THREADS) logInfo(s"The number of DotnetBackend threads is set to $numBackendThreads.") diff --git a/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index e0b9fe4f4..76ccee30d 100644 --- a/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -64,7 +64,7 @@ object DotnetRunner extends Logging { // In debug mode this runner will not launch a .NET process. val runInDebugMode = settings._1 @volatile var dotnetBackendPortNumber = settings._2 - val dotnetBackendIPAddress = settings._3 + val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "127.0.0.1") var dotnetExecutable = "" var otherArgs: Array[String] = null @@ -112,9 +112,9 @@ object DotnetRunner extends Logging { override def run() { // need to get back dotnetBackendPortNumber because if the value passed to init is 0 // the port number is dynamically assigned in the backend - dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber, dotnetBackendIPAddress) - logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber on IP address " + - s"$dotnetBackendIPAddress") + dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendIPAddress, dotnetBackendPortNumber) + logInfo(s"IP address used by DotnetBackend is $dotnetBackendIPAddress and " + + s"Port number used is $dotnetBackendPortNumber") initialized.release() dotnetBackend.run() } diff --git a/src/scala/microsoft-spark-3-2/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala b/src/scala/microsoft-spark-3-2/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala index 07863f761..b59ef8b76 100644 --- a/src/scala/microsoft-spark-3-2/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala +++ b/src/scala/microsoft-spark-3-2/src/main/scala/org/apache/spark/api/dotnet/DotnetBackend.scala @@ -34,7 +34,7 @@ class DotnetBackend extends Logging { @volatile private[dotnet] var callbackClient: Option[CallbackClient] = None - def init(portNumber: Int): Int = { + def init(ipAddress: String, portNumber: Int): Int = { val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val numBackendThreads = conf.get(DOTNET_NUM_BACKEND_THREADS) logInfo(s"The number of DotnetBackend threads is set to $numBackendThreads.") diff --git a/src/scala/microsoft-spark-3-2/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-3-2/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index a7f4df9ba..3333401ce 100644 --- a/src/scala/microsoft-spark-3-2/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-3-2/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -64,7 +64,7 @@ object DotnetRunner extends Logging { // In debug mode this runner will not launch a .NET process. val runInDebugMode = settings._1 @volatile var dotnetBackendPortNumber = settings._2 - val dotnetBackendIPAddress = settings._3 + val dotnetBackendIPAddress = sys.env.getOrElse("DOTNET_SPARK_BACKEND_IP_ADDRESS", "127.0.0.1") var dotnetExecutable = "" var otherArgs: Array[String] = null @@ -112,9 +112,9 @@ object DotnetRunner extends Logging { override def run() { // need to get back dotnetBackendPortNumber because if the value passed to init is 0 // the port number is dynamically assigned in the backend - dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendPortNumber, dotnetBackendIPAddress) - logInfo(s"Port number used by DotnetBackend is $dotnetBackendPortNumber on IP address " + - s"$dotnetBackendIPAddress") + dotnetBackendPortNumber = dotnetBackend.init(dotnetBackendIPAddress, dotnetBackendPortNumber) + logInfo(s"IP address used by DotnetBackend is $dotnetBackendIPAddress and " + + s"Port number used is $dotnetBackendPortNumber") initialized.release() dotnetBackend.run() } From 87035bf2fe832439074ad144ab888bda63dd54d5 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Mon, 4 Apr 2022 11:20:29 -0700 Subject: [PATCH 10/14] fix build --- src/csharp/Microsoft.Spark/Interop/Ipc/CallbackServer.cs | 3 ++- src/scala/microsoft-spark-2-4/pom.xml | 1 - .../scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/CallbackServer.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/CallbackServer.cs index d86fd7305..09b5a8b0b 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/CallbackServer.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/CallbackServer.cs @@ -155,7 +155,8 @@ internal void Run(ISocketWrapper listener) /// private void Run() { - Run(SocketFactory.CreateSocket()); + IPAddress dotnetCallbackServerIpAddress = SparkEnvironment.ConfigurationService.GetCallbackServerIPAddress(); + Run(SocketFactory.CreateSocket(dotnetCallbackServerIpAddress)); } /// diff --git a/src/scala/microsoft-spark-2-4/pom.xml b/src/scala/microsoft-spark-2-4/pom.xml index 1fd756c43..1c959bb05 100644 --- a/src/scala/microsoft-spark-2-4/pom.xml +++ b/src/scala/microsoft-spark-2-4/pom.xml @@ -45,7 +45,6 @@ 1.2.5 test - diff --git a/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala b/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala index fb96ccf85..6d3f13288 100644 --- a/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala +++ b/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala @@ -14,7 +14,6 @@ import java.util.Locale import java.util.concurrent.{Semaphore, TimeUnit} import org.apache.commons.io.FilenameUtils -import org.apache.commons.validator.routines.InetAddressValidator import org.apache.hadoop.fs.Path import org.apache.spark import org.apache.spark.api.dotnet.DotnetBackend From 79a3e924c5d1240f5040dfdb71aef2b5fa832a1c Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Mon, 4 Apr 2022 13:49:21 -0700 Subject: [PATCH 11/14] change --- .../PayloadProcessorTests.cs | 4 ++-- .../Microsoft.Spark.Worker.UnitTest/TaskRunnerTests.cs | 4 ++-- src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridgeFactory.cs | 7 +++++++ src/csharp/Microsoft.Spark/Network/SocketFactory.cs | 2 +- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadProcessorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadProcessorTests.cs index b2edba995..fb93df55d 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadProcessorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadProcessorTests.cs @@ -71,11 +71,11 @@ public void TestClosedStreamWithSocket() PayloadWriter payloadWriter = new PayloadWriterFactory().Create(); Payload payload = TestData.GetDefaultPayload(); - using var serverListener = new DefaultSocketWrapper(); + using var serverListener = new DefaultSocketWrapper(IPAddress.Loopback); serverListener.Listen(); var port = (serverListener.LocalEndPoint as IPEndPoint).Port; - using var clientSocket = new DefaultSocketWrapper(); + using var clientSocket = new DefaultSocketWrapper(IPAddress.Loopback); clientSocket.Connect(IPAddress.Loopback, port, null); using (ISocketWrapper serverSocket = serverListener.Accept()) diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/TaskRunnerTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/TaskRunnerTests.cs index f495e142b..23a1a5a46 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/TaskRunnerTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/TaskRunnerTests.cs @@ -16,11 +16,11 @@ public class TaskRunnerTests [Fact] public void TestTaskRunner() { - using var serverListener = new DefaultSocketWrapper(); + using var serverListener = new DefaultSocketWrapper(IPAddress.Loopback); serverListener.Listen(); var port = (serverListener.LocalEndPoint as IPEndPoint).Port; - var clientSocket = new DefaultSocketWrapper(); + var clientSocket = new DefaultSocketWrapper(IPAddress.Loopback); clientSocket.Connect(IPAddress.Loopback, port, null); PayloadWriter payloadWriter = new PayloadWriterFactory().Create(); diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridgeFactory.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridgeFactory.cs index 9c9f4ca43..f8bee0e65 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridgeFactory.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridgeFactory.cs @@ -2,6 +2,8 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Net; + namespace Microsoft.Spark.Interop.Ipc { internal class JvmBridgeFactory : IJvmBridgeFactory @@ -10,5 +12,10 @@ public IJvmBridge Create(int portNumber) { return new JvmBridge(portNumber); } + + public IJvmBridge Create(IPAddress ipAddress, int portNumber) + { + return new JvmBridge(ipAddress, portNumber); + } } } diff --git a/src/csharp/Microsoft.Spark/Network/SocketFactory.cs b/src/csharp/Microsoft.Spark/Network/SocketFactory.cs index 090fe59b0..e0a4a62ea 100644 --- a/src/csharp/Microsoft.Spark/Network/SocketFactory.cs +++ b/src/csharp/Microsoft.Spark/Network/SocketFactory.cs @@ -19,7 +19,7 @@ internal static class SocketFactory /// public static ISocketWrapper CreateSocket() { - return new DefaultSocketWrapper(); + return new DefaultSocketWrapper(IPAddress.Loopback); } public static ISocketWrapper CreateSocket(IPAddress ip) From fbbc6cfc11b1c9d22f110be4104e1832ca8f0461 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Mon, 4 Apr 2022 18:14:10 -0700 Subject: [PATCH 12/14] fix .net unit tests --- src/csharp/Microsoft.Spark.UnitTest/SparkFixture.cs | 3 ++- src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark.UnitTest/SparkFixture.cs b/src/csharp/Microsoft.Spark.UnitTest/SparkFixture.cs index 06c9a3fe2..c7aed2d4d 100644 --- a/src/csharp/Microsoft.Spark.UnitTest/SparkFixture.cs +++ b/src/csharp/Microsoft.Spark.UnitTest/SparkFixture.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Net; using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; using Moq; @@ -27,7 +28,7 @@ public SparkFixture() var mockJvmBridgeFactory = new Mock(); mockJvmBridgeFactory - .Setup(m => m.Create(It.IsAny())) + .Setup(m => m.Create(It.IsAny(), It.IsAny())) .Returns(MockJvm.Object); SparkEnvironment.JvmBridgeFactory = mockJvmBridgeFactory.Object; diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs index f10d3b071..ba28eacdf 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs @@ -58,7 +58,7 @@ internal JvmBridge(IPAddress ipAddress, int portNumber) } _ipAddress = ipAddress; _portNumber = portNumber; - _logger.LogInfo($"JvMBridge port is {portNumber}"); + _logger.LogInfo($"JvMBridge IP is {_ipAddress} port is {_portNumber}"); _jvmThreadPoolGC = new JvmThreadPoolGC( _logger, this, SparkEnvironment.ConfigurationService.JvmThreadGCInterval, _processId); From 8f58e6104ad8d21430d9a2b931660d5134b4af2a Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Wed, 6 Apr 2022 12:58:19 -0700 Subject: [PATCH 13/14] make compatible for both ipv4 and ipv6 --- src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs index 3f4e6b7ad..efcf9aab0 100644 --- a/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs +++ b/src/csharp/Microsoft.Spark/Network/DefaultSocketWrapper.cs @@ -28,7 +28,7 @@ internal sealed class DefaultSocketWrapper : ISocketWrapper /// This socket is bound to provided IP address with port 0. /// public DefaultSocketWrapper(IPAddress ipAddress) : - this(new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + this(new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp)) { _innerSocket.Bind(new IPEndPoint(ipAddress, 0)); } From 716cdac61c6808c50640a435fa0ef6a8ecb658be Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Fri, 15 Apr 2022 17:07:09 -0700 Subject: [PATCH 14/14] change --- src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs index ba28eacdf..2d187b12d 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs @@ -89,7 +89,7 @@ private ISocketWrapper GetConnection() if (!_sockets.TryDequeue(out ISocketWrapper socket)) { IPEndPoint dotnetBackendIPEndpoint = SparkEnvironment.ConfigurationService.GetBackendIPEndpoint(); - socket = SocketFactory.CreateSocket(); + socket = SocketFactory.CreateSocket(dotnetBackendIPEndpoint.Address); socket.Connect(_ipAddress, _portNumber); }