Skip to content

Adding support for DataFrameWriterV2 #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 69 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
03b7939
Adding section for UDF serialization
Niharikadutta Apr 20, 2020
4ef693d
removing guides from master
Niharikadutta Apr 20, 2020
81145ca
Merge latest from master
Niharikadutta May 6, 2020
e4b81af
merging latest from master
Niharikadutta May 7, 2020
4c32173
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 2, 2020
4987a09
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 14, 2020
ca9612e
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 16, 2020
f581c86
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 20, 2020
086b325
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 23, 2020
2f72907
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jul 25, 2020
6bab996
CountVectorizer
Jul 27, 2020
e2a566b
moving private methods to bottom
Jul 27, 2020
5f682a6
changing wrap method
Jul 28, 2020
31371db
setting min version required
Jul 31, 2020
60eb82f
undoing csproj change
Jul 31, 2020
ed36375
member doesnt need to be internal
Jul 31, 2020
c7baf72
too many lines
Jul 31, 2020
d13303c
removing whitespace change
Jul 31, 2020
f5b477c
removing whitespace change
Jul 31, 2020
73db52b
ionide
Jul 31, 2020
98f5e4d
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 7, 2020
4c5d502
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 10, 2020
a766146
Merge branch 'master' into ml/countvectorizer
GoEddie Aug 12, 2020
ad6bced
Merge branch 'ml/countvectorizer' of https://github.com/GoEddie/spark
Niharikadutta Aug 13, 2020
8e1685c
Revert "Merge branch 'master' into ml/countvectorizer"
Niharikadutta Aug 13, 2020
255515e
Revert "Merge branch 'ml/countvectorizer' of https://github.com/GoEdd…
Niharikadutta Aug 13, 2020
a44c882
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 14, 2020
3c2c936
fixing merge errors
Niharikadutta Aug 14, 2020
88e834d
removing ionid
Niharikadutta Aug 20, 2020
a13de2d
Merge branch 'master' of github.com:Niharikadutta/spark
Niharikadutta Aug 21, 2020
13d0e4a
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 24, 2020
595b141
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 29, 2020
decfa48
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 2, 2020
ce694ff
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 8, 2020
8128ba0
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 12, 2020
52f0a74
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 19, 2020
2cd9a2a
First commit
Niharikadutta Sep 19, 2020
abea46a
exposing public APIs
Niharikadutta Sep 19, 2020
07fbfaa
changes
Niharikadutta Sep 19, 2020
f541348
Adding DataFrameWriterV2 test file
Niharikadutta Sep 20, 2020
9016635
changes
Niharikadutta Sep 20, 2020
436a519
changes
Niharikadutta Sep 22, 2020
6a89f01
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 24, 2020
8f30d95
Commenting out tests
Niharikadutta Sep 24, 2020
5f4a294
Merge branch 'master' into nidutta/spark3.0readiness_part3
Niharikadutta Sep 24, 2020
684dd90
changes
Niharikadutta Sep 25, 2020
4b1de41
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 25, 2020
030f920
Merge branch 'master' into nidutta/spark3.0readiness_part3
Niharikadutta Sep 25, 2020
c76aec7
Dropping test table if exists
Niharikadutta Sep 25, 2020
c69fac3
Merge branch 'master' into nidutta/spark3.0readiness_part3
Niharikadutta Sep 25, 2020
76b205a
PR comments
Niharikadutta Sep 25, 2020
02bf2ac
Merge branch 'nidutta/spark3.0readiness_part3' of github.com:Niharika…
Niharikadutta Sep 25, 2020
419b084
Adding tests for new APIs
Niharikadutta Sep 25, 2020
b76fb9c
PR comments
Niharikadutta Sep 27, 2020
b08b027
nit
Niharikadutta Sep 27, 2020
9fbfdaa
Merge branch 'master' into nidutta/spark3.0readiness_part3
Niharikadutta Sep 27, 2020
ef4808f
Merge branch 'master' into nidutta/spark3.0readiness_part3
imback82 Sep 28, 2020
c93b72f
Merge branch 'master' into nidutta/spark3.0readiness_part3
imback82 Sep 28, 2020
c3de5c0
PR comments
Niharikadutta Sep 29, 2020
156b207
Merge branch 'nidutta/spark3.0readiness_part3' of github.com:Niharika…
Niharikadutta Sep 29, 2020
355d1ae
Merge branch 'master' into nidutta/spark3.0readiness_part3
imback82 Sep 29, 2020
8605a71
testing hanging after `OverWritePartitions` call
Niharikadutta Sep 29, 2020
18c46f8
Merge branch 'nidutta/spark3.0readiness_part3' of github.com:Niharika…
Niharikadutta Sep 29, 2020
54034bb
testing
Niharikadutta Sep 29, 2020
da76001
testing `OverwritePartitions()` hang
Niharikadutta Sep 30, 2020
31bd1f7
reverting change
Niharikadutta Sep 30, 2020
9ee7938
Merge branch 'master' into nidutta/spark3.0readiness_part3
Niharikadutta Oct 1, 2020
2603938
PR comments
Niharikadutta Oct 1, 2020
c162894
Merge branch 'nidutta/spark3.0readiness_part3' of github.com:Niharika…
Niharikadutta Oct 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using Microsoft.Spark.E2ETest.Utils;
using Microsoft.Spark.Sql;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class DataFrameWriterV2Tests
{
private readonly SparkSession _spark;

public DataFrameWriterV2Tests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test signatures for APIs introduced in Spark 3.*.
/// </summary>
[SkipIfSparkVersionIsLessThan(Versions.V3_0_0)]
public void TestSignaturesV3_0_X()
{
DataFrame df = _spark
.Read()
.Schema("age INT, name STRING")
.Json($"{TestEnvironment.ResourceDirectory}people.json");

DataFrameWriterV2 dfwV2 = df.WriteTo("testtable");

Assert.IsType<DataFrameWriterV2>(dfwV2.Using("json"));

Assert.IsType<DataFrameWriterV2>(dfwV2.Option("key1", "value"));
Assert.IsType<DataFrameWriterV2>(dfwV2.Option("key2", true));
Assert.IsType<DataFrameWriterV2>(dfwV2.Option("key3", 1L));
Assert.IsType<DataFrameWriterV2>(dfwV2.Option("key4", 2D));

Assert.IsType<DataFrameWriterV2>(dfwV2.Options(
new Dictionary<string, string>() { { "key", "value" } }));

Assert.IsType<DataFrameWriterV2>(dfwV2.TableProperty("prop", "value"));

_spark.Sql("DROP TABLE IF EXISTS default.testtable");
dfwV2.Create();

Assert.IsType<DataFrameWriterV2>(dfwV2.PartitionedBy(df.Col("age")));

// Throws the following exception:
// org.apache.spark.sql.AnalysisException: REPLACE TABLE AS SELECT is only supported
// with v2 tables.
Assert.Throws<Exception>(() => dfwV2.Replace());

// Throws the following exception:
// org.apache.spark.sql.AnalysisException: REPLACE TABLE AS SELECT is only supported
// with v2 tables.
Assert.Throws<Exception>(() => dfwV2.CreateOrReplace());

// Throws the following exception:
// org.apache.spark.sql.AnalysisException: Table default.testtable does not support
// append in batch mode.
Assert.Throws<Exception>(() => dfwV2.Append());

// Throws the following exception:
// org.apache.spark.sql.AnalysisException: Table default.testtable does not support
// overwrite by filter in batch mode.
Assert.Throws<Exception>(() => dfwV2.Overwrite(df.Col("age")));

// Throws the following exception:
// org.apache.spark.sql.AnalysisException: Table default.testtable does not support
// dynamic overwrite in batch mode.
Assert.Throws<Exception>(() => dfwV2.OverwritePartitions());
}
}
}
9 changes: 9 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,15 @@ public DataFrame Agg(Column expr, params Column[] exprs) =>
public DataFrame Observe(string name, Column expr, params Column[] exprs) =>
WrapAsDataFrame(_jvmObject.Invoke("observe", name, expr, exprs));

/// <summary>
/// Create a write configuration builder for v2 sources.
/// </summary>
/// <param name="table">Name of table to write to</param>
/// <returns>DataFrameWriterV2 object</returns>
[Since(Versions.V3_0_0)]
public DataFrameWriterV2 WriteTo(string table) =>
new DataFrameWriterV2((JvmObjectReference)_jvmObject.Invoke("writeTo", table));

/// <summary>
/// Returns a new `DataFrame` by taking the first `number` rows.
/// </summary>
Expand Down
153 changes: 153 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/DataFrameWriterV2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using Microsoft.Spark.Interop.Ipc;

namespace Microsoft.Spark.Sql
{
/// <summary>
/// Interface used to write a <see cref="DataFrame"/> to external storage using the v2
/// API.
/// </summary>
[Since(Versions.V3_0_0)]
public sealed class DataFrameWriterV2 : IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;

internal DataFrameWriterV2(JvmObjectReference jvmObject) => _jvmObject = jvmObject;

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;

/// <summary>
/// Specifies a provider for the underlying output data source. Spark's default catalog
/// supports "parquet", "json", etc.
/// </summary>
/// <param name="provider">Provider name</param>
/// <returns>This DataFrameWriterV2 object</returns>
public DataFrameWriterV2 Using(string provider)
{
_jvmObject.Invoke("using", provider);
return this;
}

/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">string value of the option</param>
/// <returns>This DataFrameWriterV2 object</returns>
public DataFrameWriterV2 Option(string key, string value)
{
_jvmObject.Invoke("option", key, value);
return this;
}

/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">bool value of the option</param>
/// <returns>This DataFrameWriterV2 object</returns>
public DataFrameWriterV2 Option(string key, bool value)
{
_jvmObject.Invoke("option", key, value);
return this;
}

/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">Long value of the option</param>
/// <returns>This DataFrameWriterV2 object</returns>
public DataFrameWriterV2 Option(string key, long value)
{
_jvmObject.Invoke("option", key, value);
return this;
}

/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">Double value of the option</param>
/// <returns>This DataFrameWriterV2 object</returns>
public DataFrameWriterV2 Option(string key, double value)
{
_jvmObject.Invoke("option", key, value);
return this;
}

/// <summary>
/// Adds output options for the underlying data source.
/// </summary>
/// <param name="options">Key/value options</param>
/// <returns>This DataFrameWriterV2 object</returns>
public DataFrameWriterV2 Options(Dictionary<string, string> options)
{
_jvmObject.Invoke("options", options);
return this;
}

/// <summary>
/// Add a table property.
/// </summary>
/// <param name="property">Name of property</param>
/// <param name="value">Value of the property</param>
/// <returns>This DataFrameWriterV2 object</returns>
public DataFrameWriterV2 TableProperty(string property, string value)
{
_jvmObject.Invoke("tableProperty", property, value);
return this;
}

/// <summary>
/// Partition the output table created by <see cref="Create"/>,
/// <see cref="CreateOrReplace"/>, or <see cref="Replace"/> using the given columns or
/// transforms.
/// </summary>
/// <param name="column">Column name to partition on</param>
/// <param name="columns">Columns to partition on</param>
/// <returns>This DataFrameWriterV2 object</returns>
public DataFrameWriterV2 PartitionedBy(Column column, params Column[] columns)
{
_jvmObject.Invoke("partitionedBy", column, columns);
return this;
}

/// <summary>
/// Create a new table from the contents of the data frame.
/// </summary>
public void Create() => _jvmObject.Invoke("create");

/// <summary>
/// Replace an existing table with the contents of the data frame.
/// </summary>
public void Replace() => _jvmObject.Invoke("replace");

/// <summary>
/// Create a new table or replace an existing table with the contents of the data frame.
/// </summary>
public void CreateOrReplace() => _jvmObject.Invoke("createOrReplace");

/// <summary>
/// Append the contents of the data frame to the output table.
/// </summary>
public void Append() => _jvmObject.Invoke("append");

/// <summary>
/// Overwrite rows matching the given filter condition with the contents of the data frame
/// in the output table.
/// </summary>
/// <param name="condition">Condition filter to overwrite based on</param>
public void Overwrite(Column condition) => _jvmObject.Invoke("overwrite", condition);

/// <summary>
/// Overwrite all partition for which the data frame contains at least one row with the
/// contents of the data frame in the output table.
/// </summary>
public void OverwritePartitions() => _jvmObject.Invoke("overwritePartitions");
}
}