Skip to content

Commit 01e8709

Browse files
committed
#415 Fix valuable nitpick PR suggestions from @coderabbitai.
1 parent 1facde1 commit 01e8709

File tree

8 files changed

+87
-19
lines changed

8 files changed

+87
-19
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1691,6 +1691,15 @@ The writer is still in its early stages and has several limitations:
16911691
- Save mode `append` is not supported; only `overwrite` is.
16921692
- Partitioning by DataFrame fields is not supported.
16931693

1694+
### Implementation details
1695+
Handling of `PIC X(n)`:
1696+
- Values are truncated when longer than n and right-padded when shorter.
1697+
- The padding byte is EBCDIC space `0x40`.
1698+
- `null` values in DataFrames are written as `0x00` bytes.
1699+
1700+
Handling of `FILLES`s
1701+
- FILLER record spaces are populated with `0x00` bytes.
1702+
16941703
## Performance Analysis
16951704

16961705
Performance tests were performed on synthetic datasets. The setup and results are as follows.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/EncoderSelector.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType}
2020
import za.co.absa.cobrix.cobol.parser.encoding.codepage.{CodePage, CodePageCommon}
2121

2222
import java.nio.charset.{Charset, StandardCharsets}
23+
import java.util
2324

2425
object EncoderSelector {
2526
type Encoder = Any => Array[Byte]
@@ -35,7 +36,7 @@ object EncoderSelector {
3536
}
3637
}
3738

38-
/** Gets a decoder function for a string data type. Encoder is chosen depending on whether input encoding is EBCDIC or ASCII */
39+
/** Gets an encoder function for a string data type. The encoder is chosen depending on whether the output encoding is EBCDIC or ASCII. */
3940
private def getStringEncoder(encoding: Encoding,
4041
ebcdicCodePage: CodePage,
4142
asciiCharset: Charset,
@@ -68,6 +69,9 @@ object EncoderSelector {
6869
var i = 0
6970
val buf = new Array[Byte](length)
7071

72+
// PIC X fields are space-filled on mainframe. Use EBCDIC space 0x40.
73+
util.Arrays.fill(buf, 0x40.toByte)
74+
7175
while (i < string.length && i < length) {
7276
val asciiByte = string(i).toByte
7377
buf(i) = conversionTable((asciiByte + 256) % 256)

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,24 @@ object CobolParametersParser extends Logging {
247247
}
248248

249249
val copybookPaths = params.get(PARAM_MULTI_COPYBOOK_PATH) match {
250-
case Some(paths) => paths.split(',').toSeq
250+
case Some(paths) =>
251+
paths.split(',')
252+
.iterator
253+
.map(_.trim)
254+
.filter(_.nonEmpty)
255+
.toSeq
251256
case None => Seq.empty[String]
252257
}
253258

259+
val copybookPathOpt = getParameter(PARAM_COPYBOOK_PATH, params)
260+
if (copybookPathOpt.nonEmpty && copybookPaths.nonEmpty) {
261+
throw new IllegalArgumentException(
262+
s"Options '$PARAM_COPYBOOK_PATH' (single path) and '$PARAM_MULTI_COPYBOOK_PATH' (comma-separated list) are mutually exclusive. Use only one."
263+
)
264+
}
265+
254266
val cobolParameters = CobolParameters(
255-
getParameter(PARAM_COPYBOOK_PATH, params),
267+
copybookPathOpt,
256268
copybookPaths,
257269
getParameter(PARAM_COPYBOOK_CONTENTS, params),
258270
paths,

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/extract/BinaryExtractorSpec.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType}
2222
import za.co.absa.cobrix.cobol.parser.ast.{BinaryProperties, Group, Primitive}
2323
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
2424
import za.co.absa.cobrix.cobol.parser.encoding.{EBCDIC, EncoderSelector}
25+
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy
2526

2627
class BinaryExtractorSpec extends AnyFunSuite {
2728

@@ -217,4 +218,32 @@ class BinaryExtractorSpec extends AnyFunSuite {
217218
assert(fields2.isInstanceOf[Primitive])
218219
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
219220
}
221+
222+
test("Test padding when setting field value by name") {
223+
val fieldName1: String = "COMPANY.SHORT-NAME"
224+
val newValue1: String = "NEWN"
225+
val copybook2 = CopybookParser.parseTree(copyBookContents, stringTrimmingPolicy = StringTrimmingPolicy.KeepAll)
226+
copybook2.setFieldValueByName(fieldName1, bytes, newValue1, startOffset)
227+
val result1: Any = copybook2.getFieldValueByName(fieldName1, bytes, startOffset)
228+
assert(result1.asInstanceOf[String] === "NEWN ")
229+
230+
val fieldName2: String = "COMPANY.COMPANY-ID-NUM"
231+
val fields2 = copybook2.getFieldByName(fieldName2)
232+
assert(fields2.isInstanceOf[Primitive])
233+
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
234+
}
235+
236+
test("Test truncating when setting field value by name") {
237+
val fieldName1: String = "COMPANY.SHORT-NAME"
238+
val newValue1: String = "NEWNAME_TEST123345"
239+
val copybook2 = CopybookParser.parseTree(copyBookContents, stringTrimmingPolicy = StringTrimmingPolicy.KeepAll)
240+
copybook2.setFieldValueByName(fieldName1, bytes, newValue1, startOffset)
241+
val result1: Any = copybook2.getFieldValueByName(fieldName1, bytes, startOffset)
242+
assert(result1.asInstanceOf[String] === "NEWNAME_TE")
243+
244+
val fieldName2: String = "COMPANY.COMPANY-ID-NUM"
245+
val fields2 = copybook2.getFieldByName(fieldName2)
246+
assert(fields2.isInstanceOf[Primitive])
247+
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
248+
}
220249
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,22 @@ package za.co.absa.cobrix.spark.cobol.writer
1818

1919
import org.apache.spark.rdd.RDD
2020
import org.apache.spark.sql.DataFrame
21-
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive}
21+
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
2222
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
2323
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2424

2525
class BasicRecordCombiner extends RecordCombiner {
2626
override def combine(df: DataFrame, cobolSchema: CobolSchema, readerParameters: ReaderParameters): RDD[Array[Byte]] = {
2727
val ast = getAst(cobolSchema)
28-
validateSchema(df, ast)
28+
val copybookFields = ast.children.filter {
29+
case p: Primitive => !p.isFiller
30+
case g: Group => !g.isFiller
31+
case _ => true
32+
}
33+
34+
validateSchema(df, copybookFields.toSeq)
2935

30-
val cobolFields = ast.children.map(_.asInstanceOf[Primitive])
36+
val cobolFields = copybookFields.map(_.asInstanceOf[Primitive])
3137
val sparkFields = df.schema.fields.map(_.name.toLowerCase)
3238

3339
cobolFields.foreach(cobolField =>
@@ -64,10 +70,10 @@ class BasicRecordCombiner extends RecordCombiner {
6470
}
6571
}
6672

67-
private def validateSchema(df: DataFrame, ast: Group): Unit = {
73+
private def validateSchema(df: DataFrame, copybookFields: Seq[Statement]): Unit = {
6874
val dfFields = df.schema.fields.map(_.name.toLowerCase).toSet
6975

70-
val notFoundFields = ast.children.flatMap { field =>
76+
val notFoundFields = copybookFields.flatMap { field =>
7177
if (dfFields.contains(field.name.toLowerCase)) {
7278
None
7379
} else {
@@ -79,7 +85,7 @@ class BasicRecordCombiner extends RecordCombiner {
7985
throw new IllegalArgumentException(s"The following fields from the copybook are not found in the DataFrame: ${notFoundFields.mkString(", ")}")
8086
}
8187

82-
val unsupportedDataTypeFields = ast.children.filter { field =>
88+
val unsupportedDataTypeFields = copybookFields.filter { field =>
8389
field.isInstanceOf[Group] ||
8490
(field.isInstanceOf[Primitive] && field.asInstanceOf[Primitive].occurs.isDefined) ||
8591
field.redefines.nonEmpty

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/RawBinaryOutputFormat.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import java.io.DataOutputStream
4040

4141
class RawBinaryOutputFormat extends FileOutputFormat[NullWritable, BytesWritable] {
4242
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[NullWritable, BytesWritable] = {
43-
val path: Path = getDefaultWorkFile(context, ".dat")
43+
val extension = context.getConfiguration.get("cobol.writer.output.extension", ".dat")
44+
val path: Path = getDefaultWorkFile(context, extension)
4445
val fs = path.getFileSystem(context.getConfiguration)
4546
val out: DataOutputStream = fs.create(path, false)
4647

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/RecordCombinerSelector.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ object RecordCombinerSelector {
2727
* Currently, only basic fixed record length combiner is implemented.
2828
* This method is to be extended as writing capabilities evolve.
2929
*
30-
* @param cobolSchema The COBOL schema ot output record.
30+
* @param cobolSchema The COBOL schema of the output record.
3131
* @param readerParameters Configuration parameters that specify how records should be formed.
3232
* @return A `RecordCombiner` implementation suitable for combining records based on the given schema and parameters.
3333
*/

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/FixedLengthEbcdicWriterSuite.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
3939

4040
val path = new Path(tempDir, "writer1")
4141

42-
df.repartition(1)
42+
df.coalesce(1)
4343
.orderBy("A")
4444
.write
4545
.format("cobol")
@@ -63,8 +63,8 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
6363
// Expected EBCDIC data for sample test data
6464
val expected = Array[Byte](
6565
0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
66-
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x00.toByte, // B,Scnd_
67-
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x00.toByte // C,Last_
66+
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_
67+
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_
6868
)
6969

7070
if (!bytes.sameElements(expected)) {
@@ -82,12 +82,19 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
8282

8383
val path = new Path(tempDir, "writer1")
8484

85-
df.repartition(1)
85+
val copybookContentsWithFilers =
86+
""" 01 RECORD.
87+
05 A PIC X(1).
88+
05 FILLER PIC X(1).
89+
05 B PIC X(5).
90+
"""
91+
92+
df.coalesce(1)
8693
.orderBy("A")
8794
.write
8895
.format("cobol")
8996
.mode(SaveMode.Overwrite)
90-
.option("copybook_contents", copybookContents)
97+
.option("copybook_contents", copybookContentsWithFilers)
9198
.save(path.toString)
9299

93100
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
@@ -105,9 +112,9 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
105112

106113
// Expected EBCDIC data for sample test data
107114
val expected = Array[Byte](
108-
0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
109-
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x00.toByte, // B,Scnd_
110-
0xC3.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte // C,Last_
115+
0xC1.toByte, 0x00.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
116+
0xC2.toByte, 0x00.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_
117+
0xC3.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte // C,Last_
111118
)
112119

113120
if (!bytes.sameElements(expected)) {

0 commit comments

Comments
 (0)