Skip to content

Commit 74e9518

Browse files
committed
#415 Add data source writer skeleton.
1 parent f899488 commit 74e9518

File tree

2 files changed

+79
-2
lines changed

2 files changed

+79
-2
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package za.co.absa.cobrix.spark.cobol.source
1818

1919
import org.apache.hadoop.fs.Path
20-
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider, SchemaRelationProvider}
20+
import org.apache.spark.sql.sources._
2121
import org.apache.spark.sql.types.StructType
22-
import org.apache.spark.sql.{SQLContext, SparkSession}
22+
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
2323
import za.co.absa.cobrix.cobol.internal.Logging
2424
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
2525
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
@@ -34,6 +34,7 @@ import za.co.absa.cobrix.spark.cobol.utils.{BuildProperties, SparkUtils}
3434
class DefaultSource
3535
extends RelationProvider
3636
with SchemaRelationProvider
37+
with CreatableRelationProvider
3738
with DataSourceRegister
3839
with ReaderFactory
3940
with Logging {
@@ -44,6 +45,7 @@ class DefaultSource
4445
createRelation(sqlContext, parameters, null)
4546
}
4647

48+
/** Reader relation */
4749
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
4850
CobolParametersValidator.validateOrThrow(parameters, sqlContext.sparkSession.sparkContext.hadoopConfiguration)
4951

@@ -58,6 +60,36 @@ class DefaultSource
5860
cobolParameters.debugIgnoreFileSize)(sqlContext)
5961
}
6062

63+
/** Writer relation */
64+
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
65+
val path = parameters.getOrElse("path",
66+
throw new IllegalArgumentException("Path is required for this data source."))
67+
68+
mode match {
69+
case SaveMode.Overwrite =>
70+
val outputPath = new Path(path)
71+
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
72+
val fs = outputPath.getFileSystem(hadoopConf)
73+
if (fs.exists(outputPath)) {
74+
fs.delete(outputPath, true)
75+
}
76+
case SaveMode.Append =>
77+
case _ =>
78+
}
79+
80+
// Simply save each row as comma-separated values in a text file
81+
data.rdd
82+
.map(row => row.mkString(","))
83+
.saveAsTextFile(path)
84+
85+
new BaseRelation {
86+
override def sqlContext: SQLContext = sqlContext
87+
88+
override def schema: StructType = data.schema
89+
}
90+
}
91+
92+
6193
//TODO fix with the correct implementation once the correct Reader hierarchy is put in place.
6294
override def buildReader(spark: SparkSession, parameters: Map[String, String]): FixedLenReader = null
6395

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.source
18+
19+
import org.apache.hadoop.fs.Path
20+
import org.scalatest.wordspec.AnyWordSpec
21+
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
22+
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
23+
import za.co.absa.cobrix.spark.cobol.utils.FileUtils
24+
25+
class WriterSourceSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture {
26+
27+
import spark.implicits._
28+
29+
"writer" should {
30+
"be able to write a basic dataframe" in {
31+
withTempDirectory("writer") { tempDir =>
32+
val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b")
33+
34+
val outputPath = new Path(tempDir, "test")
35+
36+
df.write.format("cobol").save(outputPath.toString)
37+
38+
val files = FileUtils.getFiles(outputPath.toString, spark.sparkContext.hadoopConfiguration)
39+
40+
assert(files.nonEmpty)
41+
}
42+
}
43+
}
44+
45+
}

0 commit comments

Comments
 (0)