Skip to content

Commit 061c928

Browse files
committed
#776 Implement BCD encoders in the COBOL data writer.
1 parent 6b6e276 commit 061c928

File tree

5 files changed

+152
-40
lines changed

5 files changed

+152
-40
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
2626

2727

2828
class Copybook(val ast: CopybookAST) extends Logging with Serializable {
29+
import Copybook._
2930

3031
def getCobolSchema: CopybookAST = ast
3132

@@ -215,38 +216,6 @@ class Copybook(val ast: CopybookAST) extends Logging with Serializable {
215216
field.decodeTypeValue(0, slicedBytes)
216217
}
217218

218-
/**
219-
* Set value of a field of the copybook record by the AST object of the field
220-
*
221-
* Nested field names can contain '.' to identify the exact field.
222-
* If the field name is unique '.' is not required.
223-
*
224-
* @param field The AST object of the field
225-
* @param bytes Binary encoded data of the record
226-
* @param startOffset An offset to the beginning of the field in the data (in bytes).
227-
* @return The value of the field
228-
*
229-
*/
230-
def setPrimitiveField(field: Primitive, recordBytes: Array[Byte], value: Any, startOffset: Int = 0): Unit = {
231-
field.encode match {
232-
case Some(encode) =>
233-
val fieldBytes = encode(value)
234-
val startByte = field.binaryProperties.offset + startOffset
235-
val endByte = field.binaryProperties.offset + startOffset + field.binaryProperties.actualSize
236-
237-
if (startByte < 0 || endByte > recordBytes.length) {
238-
throw new IllegalArgumentException(s"Cannot set value for field '${field.name}' because the field is out of bounds of the record.")
239-
}
240-
if (fieldBytes.length != field.binaryProperties.dataSize) {
241-
throw new IllegalArgumentException(s"Cannot set value for field '${field.name}' because the encoded value has a different size than the field size.")
242-
}
243-
244-
System.arraycopy(fieldBytes, 0, recordBytes, startByte, fieldBytes.length)
245-
case None =>
246-
throw new IllegalStateException(s"Cannot set value for field '${field.name}' because it does not have an encoder defined.")
247-
}
248-
}
249-
250219
/** This routine is used for testing by generating a layout position information to compare with mainframe output */
251220
def generateRecordLayoutPositions(): String = {
252221
var fieldCounter: Int = 0
@@ -442,4 +411,36 @@ object Copybook {
442411

443412
new Copybook(schema)
444413
}
414+
415+
/**
416+
* Set value of a field of the copybook record by the AST object of the field
417+
*
418+
* Nested field names can contain '.' to identify the exact field.
419+
* If the field name is unique '.' is not required.
420+
*
421+
* @param field The AST object of the field
422+
* @param recordBytes Binary encoded data of the record
423+
* @param startOffset An offset to the beginning of the field in the data (in bytes).
424+
* @return The value of the field
425+
*
426+
*/
427+
def setPrimitiveField(field: Primitive, recordBytes: Array[Byte], value: Any, startOffset: Int = 0): Unit = {
428+
field.encode match {
429+
case Some(encode) =>
430+
val fieldBytes = encode(value)
431+
val startByte = field.binaryProperties.offset + startOffset
432+
val endByte = field.binaryProperties.offset + startOffset + field.binaryProperties.actualSize
433+
434+
if (startByte < 0 || endByte > recordBytes.length) {
435+
throw new IllegalArgumentException(s"Cannot set value for field '${field.name}' because the field is out of bounds of the record.")
436+
}
437+
if (fieldBytes.length != field.binaryProperties.dataSize) {
438+
throw new IllegalArgumentException(s"Cannot set value for field '${field.name}' because the encoded value has a different size than the field size.")
439+
}
440+
441+
System.arraycopy(fieldBytes, 0, recordBytes, startByte, fieldBytes.length)
442+
case None =>
443+
throw new IllegalStateException(s"Cannot set value for field '${field.name}' because it does not have an encoder defined.")
444+
}
445+
}
445446
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/EncoderSelector.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package za.co.absa.cobrix.cobol.parser.encoding
1818

19-
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType}
19+
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP3, COMP3U, CobolType, Decimal, Integral}
2020
import za.co.absa.cobrix.cobol.parser.encoding.codepage.{CodePage, CodePageCommon}
2121

2222
import java.nio.charset.{Charset, StandardCharsets}
@@ -31,6 +31,14 @@ object EncoderSelector {
3131
dataType match {
3232
case alphaNumeric: AlphaNumeric if alphaNumeric.compact.isEmpty =>
3333
getStringEncoder(alphaNumeric.enc.getOrElse(EBCDIC), ebcdicCodePage, asciiCharset, alphaNumeric.length)
34+
case integralComp3: Integral if integralComp3.compact.exists(_.isInstanceOf[COMP3]) =>
35+
Option(getBdcEncoder(integralComp3.precision, 0, 0, integralComp3.signPosition.isDefined, mandatorySignNibble = true))
36+
case integralComp3: Integral if integralComp3.compact.exists(_.isInstanceOf[COMP3U]) =>
37+
Option(getBdcEncoder(integralComp3.precision, 0, 0, integralComp3.signPosition.isDefined, mandatorySignNibble = false))
38+
case decimalComp3: Decimal if decimalComp3.compact.exists(_.isInstanceOf[COMP3]) =>
39+
Option(getBdcEncoder(decimalComp3.precision, decimalComp3.scale, decimalComp3.scaleFactor, decimalComp3.signPosition.isDefined, mandatorySignNibble = true))
40+
case decimalComp3: Decimal if decimalComp3.compact.exists(_.isInstanceOf[COMP3U]) =>
41+
Option(getBdcEncoder(decimalComp3.precision, decimalComp3.scale, decimalComp3.scaleFactor, decimalComp3.signPosition.isDefined, mandatorySignNibble = false))
3442
case _ =>
3543
None
3644
}
@@ -80,4 +88,26 @@ object EncoderSelector {
8088
buf
8189
}
8290

91+
def getBdcEncoder(precision: Int,
92+
scale: Int,
93+
scaleFactor: Int,
94+
signed: Boolean,
95+
mandatorySignNibble: Boolean): Encoder = {
96+
if (signed && !mandatorySignNibble)
97+
throw new IllegalArgumentException("If signed is true, mandatorySignNibble must also be true.")
98+
99+
(a: Any) => {
100+
val number = a match {
101+
case null => null
102+
case d: java.math.BigDecimal => d
103+
case n: java.math.BigInteger => new java.math.BigDecimal(n)
104+
case n: Byte => new java.math.BigDecimal(n)
105+
case n: Int => new java.math.BigDecimal(n)
106+
case n: Long => new java.math.BigDecimal(n)
107+
case x => new java.math.BigDecimal(x.toString)
108+
}
109+
BCDNumberEncoders.encodeBCDNumber(number, precision, scale, scaleFactor, signed, mandatorySignNibble)
110+
}
111+
}
112+
83113
}

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/extract/BinaryExtractorSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class BinaryExtractorSpec extends AnyFunSuite {
216216
val fieldName2: String = "COMPANY.COMPANY-ID-NUM"
217217
val fields2 = copybook.getFieldByName(fieldName2)
218218
assert(fields2.isInstanceOf[Primitive])
219-
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
219+
assert(fields2.asInstanceOf[Primitive].encode.nonEmpty)
220220
}
221221

222222
test("Test padding when setting field value by name") {
@@ -230,7 +230,7 @@ class BinaryExtractorSpec extends AnyFunSuite {
230230
val fieldName2: String = "COMPANY.COMPANY-ID-NUM"
231231
val fields2 = copybook2.getFieldByName(fieldName2)
232232
assert(fields2.isInstanceOf[Primitive])
233-
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
233+
assert(fields2.asInstanceOf[Primitive].encode.nonEmpty)
234234
}
235235

236236
test("Test truncating when setting field value by name") {
@@ -244,6 +244,6 @@ class BinaryExtractorSpec extends AnyFunSuite {
244244
val fieldName2: String = "COMPANY.COMPANY-ID-NUM"
245245
val fields2 = copybook2.getFieldByName(fieldName2)
246246
assert(fields2.isInstanceOf[Primitive])
247-
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
247+
assert(fields2.asInstanceOf[Primitive].encode.nonEmpty)
248248
}
249249
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,24 @@ package za.co.absa.cobrix.spark.cobol.writer
1818

1919
import org.apache.spark.rdd.RDD
2020
import org.apache.spark.sql.DataFrame
21+
import za.co.absa.cobrix.cobol.parser.Copybook
2122
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
2223
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
2324
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
25+
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP3, COMP3U, CobolType, Decimal, Integral}
26+
27+
import java.io.{ByteArrayOutputStream, ObjectOutputStream}
2428

2529
class BasicRecordCombiner extends RecordCombiner {
30+
31+
import BasicRecordCombiner._
32+
2633
override def combine(df: DataFrame, cobolSchema: CobolSchema, readerParameters: ReaderParameters): RDD[Array[Byte]] = {
2734
val ast = getAst(cobolSchema)
2835
val copybookFields = ast.children.filter {
2936
case p: Primitive => !p.isFiller
30-
case g: Group => !g.isFiller
31-
case _ => true
37+
case g: Group => !g.isFiller
38+
case _ => true
3239
}
3340

3441
validateSchema(df, copybookFields.toSeq)
@@ -38,7 +45,9 @@ class BasicRecordCombiner extends RecordCombiner {
3845

3946
cobolFields.foreach(cobolField =>
4047
if (cobolField.encode.isEmpty) {
41-
throw new IllegalArgumentException(s"Field '${cobolField.name}' does not have an encoding defined in the copybook. 'PIC ${cobolField.dataType.originalPic}' is not yet supported.")
48+
val fieldDefinition = getFieldDefinition(cobolField)
49+
throw new IllegalArgumentException(s"Field '${cobolField.name}' does not have an encoding defined in the copybook. " +
50+
s"'PIC $fieldDefinition' is not yet supported.")
4251
}
4352
)
4453

@@ -62,7 +71,7 @@ class BasicRecordCombiner extends RecordCombiner {
6271
if (!row.isNullAt(sparkIdx)) {
6372
val fieldStr = row.get(sparkIdx)
6473
val cobolField = cobolFields(cobolIdx)
65-
cobolSchema.copybook.setPrimitiveField(cobolField, ar, fieldStr, 0)
74+
Copybook.setPrimitiveField(cobolField, ar, fieldStr, 0)
6675
}
6776
}
6877

@@ -107,3 +116,17 @@ class BasicRecordCombiner extends RecordCombiner {
107116
}
108117
}
109118
}
119+
120+
object BasicRecordCombiner {
121+
def getFieldDefinition(field: Primitive): String = {
122+
val pic = field.dataType.originalPic.getOrElse(field.dataType.pic)
123+
124+
val usage = field.dataType match {
125+
case dt: Integral => dt.compact.map(_.toString).getOrElse("USAGE IS DISPLAY")
126+
case dt: Decimal => dt.compact.map(_.toString).getOrElse("USAGE IS DISPLAY")
127+
case _ => ""
128+
}
129+
130+
s"$pic $usage".trim
131+
}
132+
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/FixedLengthEbcdicWriterSuite.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,64 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
126126
}
127127
}
128128

129+
"write data frames with COMP-3 fields" in {
130+
withTempDirectory("cobol_writer1") { tempDir =>
131+
val df = List(
132+
(1, 100.5, new java.math.BigDecimal(10.23), 1, 100.5, new java.math.BigDecimal(10.12)),
133+
(2, 800.4, new java.math.BigDecimal(30), 2, 800.4, new java.math.BigDecimal(30)),
134+
(3, 22.33, new java.math.BigDecimal(-20), 3, 22.33, new java.math.BigDecimal(-20))
135+
).toDF("A", "B", "C", "D", "E", "F")
136+
137+
val path = new Path(tempDir, "writer1")
138+
139+
val copybookContentsWithFilers =
140+
""" 01 RECORD.
141+
05 A PIC S9(1) COMP-3.
142+
05 B PIC 9(4)V9(2) COMP-3.
143+
05 C PIC S9(2)V9(2) COMP-3.
144+
05 D PIC 9(1) COMP-3U.
145+
05 E PIC 9(4)V9(2) COMP-3U.
146+
05 F PIC 9(2)V9(2) COMP-3U.
147+
"""
148+
149+
df.coalesce(1)
150+
.orderBy("A")
151+
.write
152+
.format("cobol")
153+
.mode(SaveMode.Overwrite)
154+
.option("copybook_contents", copybookContentsWithFilers)
155+
.save(path.toString)
156+
157+
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
158+
159+
assert(fs.exists(path), "Output directory should exist")
160+
val files = fs.listStatus(path)
161+
.filter(_.getPath.getName.startsWith("part-"))
162+
assert(files.nonEmpty, "Output directory should contain part files")
163+
164+
val partFile = files.head.getPath
165+
val data = fs.open(partFile)
166+
val bytes = new Array[Byte](files.head.getLen.toInt)
167+
data.readFully(bytes)
168+
data.close()
169+
170+
// Expected EBCDIC data for sample test data
171+
val expected = Array(
172+
0x1C, 0x00, 0x10, 0x05, 0x0F, 0x01, 0x02, 0x3C, 0x01, 0x01, 0x00, 0x50, 0x10, 0x12,
173+
0x2C, 0x00, 0x80, 0x04, 0x0F, 0x03, 0x00, 0x0C, 0x02, 0x08, 0x00, 0x40, 0x30, 0x00,
174+
0x3C, 0x00, 0x02, 0x23, 0x3F, 0x02, 0x00, 0x0D, 0x03, 0x00, 0x22, 0x33, 0x00, 0x00
175+
).map(_.toByte)
176+
177+
if (!bytes.sameElements(expected)) {
178+
println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}")
179+
println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}")
180+
181+
assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding")
182+
}
183+
}
184+
}
185+
186+
129187
"write should fail with save mode append and the path exists" in {
130188
withTempDirectory("cobol_writer3") { tempDir =>
131189
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")

0 commit comments

Comments
 (0)