diff --git a/src/csharp/Microsoft.Spark.E2ETest/Resources/neighbors.json b/src/csharp/Microsoft.Spark.E2ETest/Resources/neighbors.json
new file mode 100644
index 000000000..03c8c1587
--- /dev/null
+++ b/src/csharp/Microsoft.Spark.E2ETest/Resources/neighbors.json
@@ -0,0 +1,6 @@
+{"Id":"Id1@72F988BF-86F1-41AF-91AB-2D7CD011DB47","DisplayName":"AliceFN AliceLN","GivenName":"AliceFN","Surname":"AliceLN","IMAddress":"sip:alice@microsoft.com","EmailAddress":"alice@microsoft.com","RelevanceScore":1101.0,"puser":"ruih","ptenant":"MSFT"}
+{"Id":"Id2@72F988BF-86F1-41AF-91AB-2D7CD011DB47","DisplayName":"BobFN BobLN","GivenName":"BobFN","Surname":"BobLN","IMAddress":"sip:bob@microsoft.com","EmailAddress":"bob@microsoft.com","RelevanceScore":900.0,"puser":"ruih","ptenant":"MSFT"}
+{"Id":"Id3@72F988BF-86F1-41AF-91AB-2D7CD011DB47","DisplayName":"CharlieFN CharlieLN","GivenName":"CharlieFN","Surname":"CharlieLN","IMAddress":"sip:charlie@microsoft.com","EmailAddress":"charlie@microsoft.com","RelevanceScore":857.0,"puser":"ruih","ptenant":"MSFT"}
+{"Id":"Id4@72F988BF-86F1-41AF-91AB-2D7CD011DB47","DisplayName":"DougFN DougLN","GivenName":"DougFN","Surname":"DougLN","IMAddress":"sip:doug@microsoft.com","EmailAddress":"doug@microsoft.com","RelevanceScore":1101.0,"puser":"rui","ptenant":"MSFT"}
+{"Id":"Id5@72F988BF-86F1-41AF-91AB-2D7CD011DB47","DisplayName":"ElvaFN ElvaLN","GivenName":"ElvaFN","Surname":"ElvaLN","IMAddress":"elva@microsoft.com","EmailAddress":"elva@microsoft.com","RelevanceScore":900.0,"puser":"rui","ptenant":"MSFT"}
+{"Id":"Id6@72F988BF-86F1-41AF-91AB-2D7CD011DB47","DisplayName":"FrankFN FrankLN","GivenName":"FrankFN","Surname":"FrankLN","IMAddress":"sip:frank@microsoft.com","EmailAddress":"frank@microsoft.com","RelevanceScore":857.0,"puser":"rui","ptenant":"MSFT"}
diff --git a/src/csharp/Microsoft.Spark.E2ETest/Resources/search_actions.json b/src/csharp/Microsoft.Spark.E2ETest/Resources/search_actions.json
new file mode 100644
index 000000000..ffa502ebf
--- /dev/null
+++ b/src/csharp/Microsoft.Spark.E2ETest/Resources/search_actions.json
@@ -0,0 +1 @@
+{"SessionId":"AC6B34DE-9168-4D27-89A5-8DBCB9EB2C3C","ImpressionId":"Imp1","ConversationId":"DD8A6B40-B4C9-426F-8194-895E9053077C","RequestTime":"10/12/2018 7:33:14 PM","ClientLocalTime":"10/12/2018 7:33:14 PM","PartnerName":"officeshared","RequestStartTime":"10/12/2018 7:33:14 PM","ScenarioName":"outlookdesktop","RouteName":"query","FlightInfo":"ABCD;PQR;XYZ","HttpStatus":"200","UserType":"Business","UserIdPUID":"0003000080000BF9","Ring":"MSIT","Query":"face","EntityType":"Message","FolderIdList":"F1;F2;F3","ReferenceIdList":"R1;R2;R3","TitleList":"[NonUrgent] [EXO] [PROD]: GriffinV2 MDM Alert - Low success rate in SDFV2.suggestions.GuidedFormulationEmailEntitiesProvider;RESOLVED: [Incident] [EXO] [PROD]: Native Csg is crashing causing Search AppPool restarts;RE: [Incident] [EXO] [PROD]: MDM Notification - High latency in WW.gbrp123.suggestions","ConversationIdList":"434C5A69DE4A504B87650551E90343C7;DDDC67FB24D1094BB012C4F09118BA64;2B1D7646F91FD5488344D491AB3C671C","ItemIdList":"ItemId1;ItemId2;ItemId3","ItemImmutableIdList":"ItemImmutableId1;ItemImmutableId2;ItemImmutableId3","TimeReceivedList":"10/4/2018 4:51:57 PM;9/27/2018 6:53:15 AM;6/7/2018 8:41:02 PM","EventDetails":"{\"Type\":\"ReplyAll\",\"ReferenceId\":\"784d4c42-460e-4022-ad44-0fc48eae529a.1000.1\",\"EventLocalTime\":\"2018-10-12T19:33:15-07:00\"}","ErrorMessage":null}
diff --git a/src/csharp/Microsoft.Spark.E2ETest/ScenarioTests/EmailSearchTests.cs b/src/csharp/Microsoft.Spark.E2ETest/ScenarioTests/EmailSearchTests.cs
new file mode 100644
index 000000000..78253fa33
--- /dev/null
+++ b/src/csharp/Microsoft.Spark.E2ETest/ScenarioTests/EmailSearchTests.cs
@@ -0,0 +1,163 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using Microsoft.Spark.Sql;
+using Xunit;
+using static Microsoft.Spark.Sql.Functions;
+using Column = Microsoft.Spark.Sql.Column;
+
+namespace Microsoft.Spark.E2ETest.ScenarioTests
+{
+ [Collection("Spark E2E Tests")]
+ public class EmailSearchTests
+ {
+ private readonly SparkSession _spark;
+
+ public EmailSearchTests(SparkFixture fixture)
+ {
+ _spark = fixture.Spark;
+ }
+
+ ///
+ /// This is a mimic of Email Search Top People Reducer.
+ /// https://msasg.visualstudio.com/DefaultCollection/Shared%20Data/_search?action=contents&text=TopPeopleReducer&type=code&lp=code-Project&filters=ProjectFilters%7BShared%20Data%7DRepositoryFilters%7BMatrixCompliant%7D&pageSize=25&result=DefaultCollection%2FShared%20Data%2FMatrixCompliant%2FGBmaster%2F%2Fsrc%2FODIN-ML%2FPartner%2FEmailRelevance%2FEmailRelevance%2FEmailRelevanceHelper%2FTopPeopleReducer.cs
+ ///
+ [Fact]
+ public void TestEmailSearchTopNReducerBasics()
+ {
+ // Read the sample data.
+ DataFrame df = _spark
+ .Read()
+ .Schema("Id STRING, DisplayName STRING, GivenName STRING, Surname STRING, IMAddress STRING, EmailAddress STRING, RelevanceScore DOUBLE, puser STRING, ptenant STRING")
+ .Json($"{TestEnvironment.ResourceDirectory}neighbors.json");
+
+ // Trim the IMAddress column.
+ Func trimIMAddress = Udf((str) => str.StartsWith("sip:") ? str.Substring(4) : str);
+ df = df.WithColumn("IMAddress", trimIMAddress(df["IMAddress"]));
+
+ // Reduce
+ df = df.GroupBy("puser", "ptenant").Agg(CollectList("GivenName").Alias("GivenNames"),
+ CollectList("Surname").Alias("Surnames"),
+ CollectList("DisplayName").Alias("DisplayNames"),
+ CollectList("EmailAddress").Alias("EmailAddresses"),
+ CollectList("RelevanceScore").Alias("RelevanceScores"),
+ CollectList("IMAddress").Alias("IMAddresses"));
+ // Format the output.
+ df = df.Select(df["puser"],
+ df["ptenant"],
+ ConcatWs(";", df["GivenNames"]).Alias("GivenNames"),
+ ConcatWs(";", df["Surnames"]).Alias("Surnames"),
+ ConcatWs(";", df["DisplayNames"]).Alias("DisplayNames"),
+ ConcatWs(";", df["EmailAddresses"]).Alias("EmailAddresses"),
+ ConcatWs(";", df["RelevanceScores"]).Alias("RelevanceScores"),
+ ConcatWs(";", df["IMAddresses"]).Alias("IMAddresses"));
+
+ Assert.Equal(2, df.Count());
+ foreach (Row row in df.Collect())
+ {
+ string puser = row.GetAs("puser");
+ Assert.Equal("MSFT", row.GetAs("ptenant"));
+ Assert.Equal("1101.0;900.0;857.0", row.GetAs("RelevanceScores"));
+ switch (puser)
+ {
+ case "ruih":
+ Assert.Equal("AliceFN;BobFN;CharlieFN", row.GetAs("GivenNames"));
+ Assert.Equal("AliceLN;BobLN;CharlieLN", row.GetAs("Surnames"));
+ Assert.Equal("AliceFN AliceLN;BobFN BobLN;CharlieFN CharlieLN", row.GetAs("DisplayNames"));
+ Assert.Equal("alice@microsoft.com;bob@microsoft.com;charlie@microsoft.com", row.GetAs("EmailAddresses"));
+ Assert.Equal("alice@microsoft.com;bob@microsoft.com;charlie@microsoft.com", row.GetAs("IMAddresses"));
+ break;
+ case "rui":
+ Assert.Equal("DougFN;ElvaFN;FrankFN", row.GetAs("GivenNames"));
+ Assert.Equal("DougLN;ElvaLN;FrankLN", row.GetAs("Surnames"));
+ Assert.Equal("DougFN DougLN;ElvaFN ElvaLN;FrankFN FrankLN", row.GetAs("DisplayNames"));
+ Assert.Equal("doug@microsoft.com;elva@microsoft.com;frank@microsoft.com", row.GetAs("EmailAddresses"));
+ Assert.Equal("doug@microsoft.com;elva@microsoft.com;frank@microsoft.com", row.GetAs("IMAddresses"));
+ break;
+ default:
+ throw new Exception($"Unexpected age: {puser}.");
+ }
+ }
+ }
+
+ ///
+ /// This is a mimic of Email Search Success Action Reducer.
+ /// https://msasg.visualstudio.com/DefaultCollection/Shared%20Data/_git/MatrixCompliant?path=%2Fsrc%2FODIN-ML%2FPartner%2FEmailRelevance%2FImpressionView%2FAnalysisTools%2FReducer%2FSearchActionSuccessReducer.cs&_a=contents&version=GBmaster
+ ///
+ [Fact]
+ public void TestEmailSearchSuccessActionReducerBasics()
+ {
+ // Read the sample data.
+ DataFrame df = _spark.Read().Json($"{TestEnvironment.ResourceDirectory}search_actions.json");
+
+ // Select the required columns.
+ df = df.Select("ImpressionId", "ConversationId", "EntityType", "FolderIdList", "ReferenceIdList", "ItemIdList", "ItemImmutableIdList");
+
+ // Convert columns of concatenated string to array of strings.
+ Func toStringArrayUdf = Udf((str) => str.Split(';'));
+ df = df.WithColumn("FolderIdList", toStringArrayUdf(df["FolderIdList"]))
+ .WithColumn("ReferenceIdList", toStringArrayUdf(df["ReferenceIdList"]))
+ .WithColumn("ItemIdList", toStringArrayUdf(df["ItemIdList"]))
+ .WithColumn("ItemImmutableIdList", toStringArrayUdf(df["ItemImmutableIdList"]));
+
+ // Apply the ArrayZip function to combine the i-th element of each array.
+ df = df.Select(df["ConversationId"], df["ImpressionId"], df["EntityType"], ArraysZip(df["FolderIdList"], df["ReferenceIdList"], df["ItemIdList"], df["ItemImmutableIdList"]).Alias("ConcatedColumn"));
+
+ // Apply the Explode function to split into multiple rows.
+ df = df.Select(df["ConversationId"], df["ImpressionId"], df["EntityType"], Explode(df["ConcatedColumn"]).Alias("NewColumn"));
+
+ // Create multiple columns.
+ df = df.WithColumn("FolderId", df["NewColumn"].GetField("FolderIdList"))
+ .WithColumn("ReferenceId", df["NewColumn"].GetField("ReferenceIdList"))
+ .WithColumn("ItemId", df["NewColumn"].GetField("ItemIdList"))
+ .WithColumn("ItemImmutableId", df["NewColumn"].GetField("ItemImmutableIdList"))
+ .Select("ConversationId", "ImpressionId", "EntityType", "FolderId", "ItemId", "ReferenceId", "ItemImmutableId");
+
+ // Check the results.
+ Assert.Equal(3, df.Count());
+ int i = 0;
+ foreach (Row row in df.Collect())
+ {
+ string impressionId = row.GetAs("ImpressionId");
+ string conversationId = row.GetAs("ConversationId");
+ string entityType = row.GetAs("EntityType");
+ Assert.Equal("Imp1", impressionId);
+ Assert.Equal("DD8A6B40-B4C9-426F-8194-895E9053077C", conversationId);
+ Assert.Equal("Message", entityType);
+ string folderId = row.GetAs("FolderId");
+ string itemId = row.GetAs("ItemId");
+ string referenceId = row.GetAs("ReferenceId");
+ string itemImmutableId = row.GetAs("ItemImmutableId");
+ if (i == 0)
+ {
+ Assert.Equal("F1", folderId);
+ Assert.Equal("ItemId1", itemId);
+ Assert.Equal("R1", referenceId);
+ Assert.Equal("ItemImmutableId1", itemImmutableId);
+ }
+ else if (i == 1)
+ {
+ Assert.Equal("F2", folderId);
+ Assert.Equal("ItemId2", itemId);
+ Assert.Equal("R2", referenceId);
+ Assert.Equal("ItemImmutableId2", itemImmutableId);
+ }
+ else if (i == 2)
+ {
+ Assert.Equal("F3", folderId);
+ Assert.Equal("ItemId3", itemId);
+ Assert.Equal("R3", referenceId);
+ Assert.Equal("ItemImmutableId3", itemImmutableId);
+ }
+ else
+ {
+ throw new Exception(string.Format("Unexpected row: ConversationId={0}, ImpressionId={1}", conversationId, impressionId));
+ }
+
+ i++;
+ }
+ }
+ }
+}