Skip to content

Commit 9ab164b

Browse files
Adding support for DataFrameWriterV2 (#677)
1 parent f828221 commit 9ab164b

File tree

3 files changed

+241
-0
lines changed

3 files changed

+241
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using Microsoft.Spark.E2ETest.Utils;
8+
using Microsoft.Spark.Sql;
9+
using Xunit;
10+
11+
namespace Microsoft.Spark.E2ETest.IpcTests
12+
{
13+
[Collection("Spark E2E Tests")]
14+
public class DataFrameWriterV2Tests
15+
{
16+
private readonly SparkSession _spark;
17+
18+
public DataFrameWriterV2Tests(SparkFixture fixture)
19+
{
20+
_spark = fixture.Spark;
21+
}
22+
23+
/// <summary>
24+
/// Test signatures for APIs introduced in Spark 3.*.
25+
/// </summary>
26+
[SkipIfSparkVersionIsLessThan(Versions.V3_0_0)]
27+
public void TestSignaturesV3_0_X()
28+
{
29+
DataFrame df = _spark
30+
.Read()
31+
.Schema("age INT, name STRING")
32+
.Json($"{TestEnvironment.ResourceDirectory}people.json");
33+
34+
DataFrameWriterV2 dfwV2 = df.WriteTo("testtable");
35+
36+
Assert.IsType<DataFrameWriterV2>(dfwV2.Using("json"));
37+
38+
Assert.IsType<DataFrameWriterV2>(dfwV2.Option("key1", "value"));
39+
Assert.IsType<DataFrameWriterV2>(dfwV2.Option("key2", true));
40+
Assert.IsType<DataFrameWriterV2>(dfwV2.Option("key3", 1L));
41+
Assert.IsType<DataFrameWriterV2>(dfwV2.Option("key4", 2D));
42+
43+
Assert.IsType<DataFrameWriterV2>(dfwV2.Options(
44+
new Dictionary<string, string>() { { "key", "value" } }));
45+
46+
Assert.IsType<DataFrameWriterV2>(dfwV2.TableProperty("prop", "value"));
47+
48+
_spark.Sql("DROP TABLE IF EXISTS default.testtable");
49+
dfwV2.Create();
50+
51+
Assert.IsType<DataFrameWriterV2>(dfwV2.PartitionedBy(df.Col("age")));
52+
53+
// Throws the following exception:
54+
// org.apache.spark.sql.AnalysisException: REPLACE TABLE AS SELECT is only supported
55+
// with v2 tables.
56+
Assert.Throws<Exception>(() => dfwV2.Replace());
57+
58+
// Throws the following exception:
59+
// org.apache.spark.sql.AnalysisException: REPLACE TABLE AS SELECT is only supported
60+
// with v2 tables.
61+
Assert.Throws<Exception>(() => dfwV2.CreateOrReplace());
62+
63+
// Throws the following exception:
64+
// org.apache.spark.sql.AnalysisException: Table default.testtable does not support
65+
// append in batch mode.
66+
Assert.Throws<Exception>(() => dfwV2.Append());
67+
68+
// Throws the following exception:
69+
// org.apache.spark.sql.AnalysisException: Table default.testtable does not support
70+
// overwrite by filter in batch mode.
71+
Assert.Throws<Exception>(() => dfwV2.Overwrite(df.Col("age")));
72+
73+
// Throws the following exception:
74+
// org.apache.spark.sql.AnalysisException: Table default.testtable does not support
75+
// dynamic overwrite in batch mode.
76+
Assert.Throws<Exception>(() => dfwV2.OverwritePartitions());
77+
}
78+
}
79+
}

src/csharp/Microsoft.Spark/Sql/DataFrame.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,15 @@ public DataFrame Agg(Column expr, params Column[] exprs) =>
535535
public DataFrame Observe(string name, Column expr, params Column[] exprs) =>
536536
WrapAsDataFrame(_jvmObject.Invoke("observe", name, expr, exprs));
537537

538+
/// <summary>
539+
/// Create a write configuration builder for v2 sources.
540+
/// </summary>
541+
/// <param name="table">Name of table to write to</param>
542+
/// <returns>DataFrameWriterV2 object</returns>
543+
[Since(Versions.V3_0_0)]
544+
public DataFrameWriterV2 WriteTo(string table) =>
545+
new DataFrameWriterV2((JvmObjectReference)_jvmObject.Invoke("writeTo", table));
546+
538547
/// <summary>
539548
/// Returns a new `DataFrame` by taking the first `number` rows.
540549
/// </summary>
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using Microsoft.Spark.Interop.Ipc;
7+
8+
namespace Microsoft.Spark.Sql
9+
{
10+
/// <summary>
11+
/// Interface used to write a <see cref="DataFrame"/> to external storage using the v2
12+
/// API.
13+
/// </summary>
14+
[Since(Versions.V3_0_0)]
15+
public sealed class DataFrameWriterV2 : IJvmObjectReferenceProvider
16+
{
17+
private readonly JvmObjectReference _jvmObject;
18+
19+
internal DataFrameWriterV2(JvmObjectReference jvmObject) => _jvmObject = jvmObject;
20+
21+
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
22+
23+
/// <summary>
24+
/// Specifies a provider for the underlying output data source. Spark's default catalog
25+
/// supports "parquet", "json", etc.
26+
/// </summary>
27+
/// <param name="provider">Provider name</param>
28+
/// <returns>This DataFrameWriterV2 object</returns>
29+
public DataFrameWriterV2 Using(string provider)
30+
{
31+
_jvmObject.Invoke("using", provider);
32+
return this;
33+
}
34+
35+
/// <summary>
36+
/// Adds an output option for the underlying data source.
37+
/// </summary>
38+
/// <param name="key">Name of the option</param>
39+
/// <param name="value">string value of the option</param>
40+
/// <returns>This DataFrameWriterV2 object</returns>
41+
public DataFrameWriterV2 Option(string key, string value)
42+
{
43+
_jvmObject.Invoke("option", key, value);
44+
return this;
45+
}
46+
47+
/// <summary>
48+
/// Adds an output option for the underlying data source.
49+
/// </summary>
50+
/// <param name="key">Name of the option</param>
51+
/// <param name="value">bool value of the option</param>
52+
/// <returns>This DataFrameWriterV2 object</returns>
53+
public DataFrameWriterV2 Option(string key, bool value)
54+
{
55+
_jvmObject.Invoke("option", key, value);
56+
return this;
57+
}
58+
59+
/// <summary>
60+
/// Adds an output option for the underlying data source.
61+
/// </summary>
62+
/// <param name="key">Name of the option</param>
63+
/// <param name="value">Long value of the option</param>
64+
/// <returns>This DataFrameWriterV2 object</returns>
65+
public DataFrameWriterV2 Option(string key, long value)
66+
{
67+
_jvmObject.Invoke("option", key, value);
68+
return this;
69+
}
70+
71+
/// <summary>
72+
/// Adds an output option for the underlying data source.
73+
/// </summary>
74+
/// <param name="key">Name of the option</param>
75+
/// <param name="value">Double value of the option</param>
76+
/// <returns>This DataFrameWriterV2 object</returns>
77+
public DataFrameWriterV2 Option(string key, double value)
78+
{
79+
_jvmObject.Invoke("option", key, value);
80+
return this;
81+
}
82+
83+
/// <summary>
84+
/// Adds output options for the underlying data source.
85+
/// </summary>
86+
/// <param name="options">Key/value options</param>
87+
/// <returns>This DataFrameWriterV2 object</returns>
88+
public DataFrameWriterV2 Options(Dictionary<string, string> options)
89+
{
90+
_jvmObject.Invoke("options", options);
91+
return this;
92+
}
93+
94+
/// <summary>
95+
/// Add a table property.
96+
/// </summary>
97+
/// <param name="property">Name of property</param>
98+
/// <param name="value">Value of the property</param>
99+
/// <returns>This DataFrameWriterV2 object</returns>
100+
public DataFrameWriterV2 TableProperty(string property, string value)
101+
{
102+
_jvmObject.Invoke("tableProperty", property, value);
103+
return this;
104+
}
105+
106+
/// <summary>
107+
/// Partition the output table created by <see cref="Create"/>,
108+
/// <see cref="CreateOrReplace"/>, or <see cref="Replace"/> using the given columns or
109+
/// transforms.
110+
/// </summary>
111+
/// <param name="column">Column name to partition on</param>
112+
/// <param name="columns">Columns to partition on</param>
113+
/// <returns>This DataFrameWriterV2 object</returns>
114+
public DataFrameWriterV2 PartitionedBy(Column column, params Column[] columns)
115+
{
116+
_jvmObject.Invoke("partitionedBy", column, columns);
117+
return this;
118+
}
119+
120+
/// <summary>
121+
/// Create a new table from the contents of the data frame.
122+
/// </summary>
123+
public void Create() => _jvmObject.Invoke("create");
124+
125+
/// <summary>
126+
/// Replace an existing table with the contents of the data frame.
127+
/// </summary>
128+
public void Replace() => _jvmObject.Invoke("replace");
129+
130+
/// <summary>
131+
/// Create a new table or replace an existing table with the contents of the data frame.
132+
/// </summary>
133+
public void CreateOrReplace() => _jvmObject.Invoke("createOrReplace");
134+
135+
/// <summary>
136+
/// Append the contents of the data frame to the output table.
137+
/// </summary>
138+
public void Append() => _jvmObject.Invoke("append");
139+
140+
/// <summary>
141+
/// Overwrite rows matching the given filter condition with the contents of the data frame
142+
/// in the output table.
143+
/// </summary>
144+
/// <param name="condition">Condition filter to overwrite based on</param>
145+
public void Overwrite(Column condition) => _jvmObject.Invoke("overwrite", condition);
146+
147+
/// <summary>
148+
/// Overwrite all partition for which the data frame contains at least one row with the
149+
/// contents of the data frame in the output table.
150+
/// </summary>
151+
public void OverwritePartitions() => _jvmObject.Invoke("overwritePartitions");
152+
}
153+
}

0 commit comments

Comments
 (0)