Skip to content

Commit 1facde1

Browse files
committed
#415 Fix PR suggestions (Thanks @coderabbitai), add new unit test cases.
1 parent 6f7a75a commit 1facde1

File tree

3 files changed

+147
-18
lines changed

3 files changed

+147
-18
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class DefaultSource
6565

6666
/** Writer relation */
6767
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
68+
val outSqlContext = sqlContext
6869
val path = parameters.getOrElse("path",
6970
throw new IllegalArgumentException("Path is required for this data source."))
7071

@@ -73,27 +74,42 @@ class DefaultSource
7374

7475
val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)
7576

77+
val outputPath = new Path(path)
78+
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
79+
val fs = outputPath.getFileSystem(hadoopConf)
80+
7681
mode match {
7782
case SaveMode.Overwrite =>
78-
val outputPath = new Path(path)
79-
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
80-
val fs = outputPath.getFileSystem(hadoopConf)
8183
if (fs.exists(outputPath)) {
8284
fs.delete(outputPath, true)
8385
}
8486
case SaveMode.Append =>
85-
throw new IllegalArgumentException(
86-
s"Save mode '$mode' is not supported by the 'spark-cobol' data source at the moment. " +
87-
"Please use 'Overwrite' mode to write data to a file or folder."
88-
)
87+
if (fs.exists(outputPath)) {
88+
throw new IllegalArgumentException(
89+
s"Save mode '$mode' is not supported by the 'spark-cobol' data source at the moment. " +
90+
"Please use 'Overwrite' mode to write data to a file or folder."
91+
)
92+
}
93+
case SaveMode.ErrorIfExists =>
94+
if (fs.exists(outputPath)) {
95+
throw new IllegalArgumentException(
96+
s"Path '$path' already exists; SaveMode.ErrorIfExists prevents overwriting."
97+
)
98+
}
99+
case SaveMode.Ignore =>
100+
if (fs.exists(outputPath)) {
101+
// Skip the write entirely
102+
return new BaseRelation {
103+
override val sqlContext: SQLContext = outSqlContext
104+
override def schema: StructType = data.schema
105+
}
106+
}
89107
case _ =>
90108
}
91109

92110
val copybookContent = CopybookContentLoader.load(cobolParameters, sqlContext.sparkContext.hadoopConfiguration)
93111
val cobolSchema = CobolSchema.fromReaderParameters(copybookContent, readerParameters)
94-
95112
val combiner = RecordCombinerSelector.selectCombiner(cobolSchema, readerParameters)
96-
97113
val rdd = combiner.combine(data, cobolSchema, readerParameters)
98114

99115
rdd.map(bytes => (NullWritable.get(), new BytesWritable(bytes)))
@@ -105,8 +121,7 @@ class DefaultSource
105121
)
106122

107123
new BaseRelation {
108-
override def sqlContext: SQLContext = sqlContext
109-
124+
override def sqlContext: SQLContext = outSqlContext
110125
override def schema: StructType = data.schema
111126
}
112127
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,28 @@ class BasicRecordCombiner extends RecordCombiner {
3636
}
3737
)
3838

39-
val sparkFieldPositions = cobolFields.map { cobolField =>
39+
val sparkFieldPositions = cobolFields.zipWithIndex.map { case (cobolField, idx) =>
4040
val fieldName = cobolField.name.toLowerCase
4141
val position = sparkFields.indexOf(fieldName)
4242

4343
if (position < 0) {
4444
throw new IllegalArgumentException(s"Field '${cobolField.name}' from the copybook is not found in the DataFrame schema.")
4545
}
4646

47-
position
47+
(idx, position)
4848
}
4949

5050
val size = cobolSchema.getRecordSize
5151

5252
df.rdd.map { row =>
5353
val ar = new Array[Byte](size)
5454

55-
sparkFieldPositions.foreach { index =>
56-
val fieldStr = row.get(index)
57-
val cobolField = cobolFields(index)
58-
cobolSchema.copybook.setPrimitiveField(cobolField, ar, fieldStr, 0)
55+
sparkFieldPositions.foreach { case (cobolIdx, sparkIdx) =>
56+
if (!row.isNullAt(sparkIdx)) {
57+
val fieldStr = row.get(sparkIdx)
58+
val cobolField = cobolFields(cobolIdx)
59+
cobolSchema.copybook.setPrimitiveField(cobolField, ar, fieldStr, 0)
60+
}
5961
}
6062

6163
ar

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/FixedLengthEbcdicWriterSuite.scala

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
2323
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
2424

2525
class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFixture {
26+
2627
import spark.implicits._
2728

2829
private val copybookContents =
@@ -37,7 +38,7 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
3738
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")
3839

3940
val path = new Path(tempDir, "writer1")
40-
41+
4142
df.repartition(1)
4243
.orderBy("A")
4344
.write
@@ -74,6 +75,117 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
7475
}
7576
}
7677
}
78+
79+
"write data frames with different field order and null values" in {
80+
withTempDirectory("cobol_writer1") { tempDir =>
81+
val df = List((1, "First", "A"), (2, "Scnd", "B"), (3, null, "C")).toDF("C", "B", "A")
82+
83+
val path = new Path(tempDir, "writer1")
84+
85+
df.repartition(1)
86+
.orderBy("A")
87+
.write
88+
.format("cobol")
89+
.mode(SaveMode.Overwrite)
90+
.option("copybook_contents", copybookContents)
91+
.save(path.toString)
92+
93+
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
94+
95+
assert(fs.exists(path), "Output directory should exist")
96+
val files = fs.listStatus(path)
97+
.filter(_.getPath.getName.startsWith("part-"))
98+
assert(files.nonEmpty, "Output directory should contain part files")
99+
100+
val partFile = files.head.getPath
101+
val data = fs.open(partFile)
102+
val bytes = new Array[Byte](files.head.getLen.toInt)
103+
data.readFully(bytes)
104+
data.close()
105+
106+
// Expected EBCDIC data for sample test data
107+
val expected = Array[Byte](
108+
0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
109+
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x00.toByte, // B,Scnd_
110+
0xC3.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte // C,Last_
111+
)
112+
113+
if (!bytes.sameElements(expected)) {
114+
println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}")
115+
println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}")
116+
117+
assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding")
118+
}
119+
}
120+
}
121+
122+
"write should fail with save mode append and the path exists" in {
123+
withTempDirectory("cobol_writer3") { tempDir =>
124+
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")
125+
126+
val path = new Path(tempDir, "writer2")
127+
128+
df.write
129+
.format("cobol")
130+
.mode(SaveMode.Append)
131+
.option("copybook_contents", copybookContents)
132+
.save(path.toString)
133+
134+
assertThrows[IllegalArgumentException] {
135+
df.write
136+
.format("cobol")
137+
.mode(SaveMode.Append)
138+
.option("copybook_contents", copybookContents)
139+
.save(path.toString)
140+
}
141+
}
142+
}
143+
144+
"write should fail with save mode fail if exists and the path exists" in {
145+
withTempDirectory("cobol_writer3") { tempDir =>
146+
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")
147+
148+
val path = new Path(tempDir, "writer2")
149+
150+
df.write
151+
.format("cobol")
152+
.mode(SaveMode.ErrorIfExists)
153+
.option("copybook_contents", copybookContents)
154+
.save(path.toString)
155+
156+
assertThrows[IllegalArgumentException] {
157+
df.write
158+
.format("cobol")
159+
.mode(SaveMode.ErrorIfExists)
160+
.option("copybook_contents", copybookContents)
161+
.save(path.toString)
162+
}
163+
}
164+
}
165+
166+
"write should be ignored when save mode is ignore" in {
167+
withTempDirectory("cobol_writer3") { tempDir =>
168+
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")
169+
170+
val path = new Path(tempDir, "writer2")
171+
172+
df.write
173+
.format("cobol")
174+
.mode(SaveMode.Ignore)
175+
.option("copybook_contents", copybookContents)
176+
.save(path.toString)
177+
178+
df.write
179+
.format("cobol")
180+
.mode(SaveMode.Ignore)
181+
.option("copybook_contents", copybookContents)
182+
.save(path.toString)
183+
184+
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
185+
assert(fs.exists(path), "Output directory should exist")
186+
}
187+
}
188+
77189
}
78190

79191
}

0 commit comments

Comments
 (0)