Skip to content

Commit 1104279

Browse files
committed
#415 Implement a basic fixed record length record combiner.
1 parent ed463d4 commit 1104279

File tree

10 files changed

+158
-22
lines changed

10 files changed

+158
-22
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
2323
import za.co.absa.cobrix.cobol.parser.ast.datatype._
2424
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive}
2525
import za.co.absa.cobrix.cobol.parser.common.Constants
26-
import za.co.absa.cobrix.cobol.parser.decoders.{DecoderSelector, EncoderSelector}
26+
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
2727
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
2828
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
2929
import za.co.absa.cobrix.cobol.parser.encoding._

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
package za.co.absa.cobrix.cobol.parser.ast
1818

1919
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType, Decimal, Integral}
20-
import za.co.absa.cobrix.cobol.parser.decoders.{BinaryUtils, DecoderSelector, EncoderSelector}
20+
import za.co.absa.cobrix.cobol.parser.decoders.{BinaryUtils, DecoderSelector}
21+
import za.co.absa.cobrix.cobol.parser.encoding.EncoderSelector
2122

2223
/** An abstraction of the statements describing fields of primitive data types in the COBOL copybook
2324
*

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/asttransform/NonTerminalsAdder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
2020
import za.co.absa.cobrix.cobol.parser.ast.datatype.AlphaNumeric
2121
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
2222
import za.co.absa.cobrix.cobol.parser.common.Constants
23-
import za.co.absa.cobrix.cobol.parser.decoders.{DecoderSelector, EncoderSelector}
23+
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
2424
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
25-
import za.co.absa.cobrix.cobol.parser.encoding.Encoding
25+
import za.co.absa.cobrix.cobol.parser.encoding.{EncoderSelector, Encoding}
2626
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
2727
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
2828

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/EncoderSelector.scala renamed to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/EncoderSelector.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,10 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.cobrix.cobol.parser.decoders
17+
package za.co.absa.cobrix.cobol.parser.encoding
1818

1919
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType}
2020
import za.co.absa.cobrix.cobol.parser.encoding.codepage.{CodePage, CodePageCommon}
21-
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC, Encoding}
2221

2322
import java.nio.charset.{Charset, StandardCharsets}
2423

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/extract/BinaryExtractorSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import org.scalatest.funsuite.AnyFunSuite
2020
import za.co.absa.cobrix.cobol.parser.CopybookParser
2121
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType}
2222
import za.co.absa.cobrix.cobol.parser.ast.{BinaryProperties, Group, Primitive}
23-
import za.co.absa.cobrix.cobol.parser.decoders.{DecoderSelector, EncoderSelector}
24-
import za.co.absa.cobrix.cobol.parser.encoding.EBCDIC
23+
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
24+
import za.co.absa.cobrix.cobol.parser.encoding.{EBCDIC, EncoderSelector}
2525

2626
class BinaryExtractorSpec extends AnyFunSuite {
2727

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import org.apache.spark.sql.sources._
2222
import org.apache.spark.sql.types.StructType
2323
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
2424
import za.co.absa.cobrix.cobol.internal.Logging
25-
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
2625
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
26+
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
2727
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2828
import za.co.absa.cobrix.spark.cobol.reader._
2929
import za.co.absa.cobrix.spark.cobol.source.copybook.CopybookContentLoader
3030
import za.co.absa.cobrix.spark.cobol.source.parameters._
3131
import za.co.absa.cobrix.spark.cobol.utils.{BuildProperties, SparkUtils}
32-
import za.co.absa.cobrix.spark.cobol.writer.{BasicRecordCombiner, RawBinaryOutputFormat}
32+
import za.co.absa.cobrix.spark.cobol.writer.{RawBinaryOutputFormat, RecordCombinerSelector}
3333

3434
/**
3535
* This class represents a Cobol data source.
@@ -82,13 +82,17 @@ class DefaultSource
8282
fs.delete(outputPath, true)
8383
}
8484
case SaveMode.Append =>
85+
throw new IllegalArgumentException(
86+
s"Save mode '$mode' is not supported by the 'spark-cobol' data source at the moment. " +
87+
"Please use 'Overwrite' mode to write data to a file or folder."
88+
)
8589
case _ =>
8690
}
8791

8892
val copybookContent = CopybookContentLoader.load(cobolParameters, sqlContext.sparkContext.hadoopConfiguration)
8993
val cobolSchema = CobolSchema.fromReaderParameters(copybookContent, readerParameters)
9094

91-
val combiner = new BasicRecordCombiner
95+
val combiner = RecordCombinerSelector.selectCombiner(cobolSchema, readerParameters)
9296

9397
val rdd = combiner.combine(data, cobolSchema, readerParameters)
9498

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,84 @@ package za.co.absa.cobrix.spark.cobol.writer
1818

1919
import org.apache.spark.rdd.RDD
2020
import org.apache.spark.sql.DataFrame
21+
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive}
2122
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
2223
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2324

24-
import java.util
25-
import scala.util.Random
26-
2725
class BasicRecordCombiner extends RecordCombiner {
2826
override def combine(df: DataFrame, cobolSchema: CobolSchema, readerParameters: ReaderParameters): RDD[Array[Byte]] = {
27+
val ast = getAst(cobolSchema)
28+
validateSchema(df, ast)
29+
30+
val cobolFields = ast.children.map(_.asInstanceOf[Primitive])
31+
val sparkFields = df.schema.fields.map(_.name.toLowerCase)
32+
33+
cobolFields.foreach(cobolField =>
34+
if (cobolField.encode.isEmpty) {
35+
throw new IllegalArgumentException(s"Field '${cobolField.name}' does not have an encoding defined in the copybook. 'PIC ${cobolField.dataType.originalPic}' is not yet supported.")
36+
}
37+
)
38+
39+
val sparkFieldPositions = cobolFields.map { cobolField =>
40+
val fieldName = cobolField.name.toLowerCase
41+
val position = sparkFields.indexOf(fieldName)
42+
43+
if (position < 0) {
44+
throw new IllegalArgumentException(s"Field '${cobolField.name}' from the copybook is not found in the DataFrame schema.")
45+
}
46+
47+
position
48+
}
49+
50+
val size = cobolSchema.getRecordSize
51+
2952
df.rdd.map { row =>
30-
val r = Random.nextInt(100)
31-
val ar = new Array[Byte](10)
53+
val ar = new Array[Byte](size)
3254

33-
util.Arrays.fill(ar, r.toByte)
55+
sparkFieldPositions.foreach { index =>
56+
val fieldStr = row.get(index)
57+
val cobolField = cobolFields(index)
58+
cobolSchema.copybook.setPrimitiveField(cobolField, ar, fieldStr, 0)
59+
}
3460

3561
ar
3662
}
3763
}
64+
65+
private def validateSchema(df: DataFrame, ast: Group): Unit = {
66+
val dfFields = df.schema.fields.map(_.name.toLowerCase).toSet
67+
68+
val notFoundFields = ast.children.flatMap { field =>
69+
if (dfFields.contains(field.name.toLowerCase)) {
70+
None
71+
} else {
72+
Some(field.name)
73+
}
74+
}
75+
76+
if (notFoundFields.nonEmpty) {
77+
throw new IllegalArgumentException(s"The following fields from the copybook are not found in the DataFrame: ${notFoundFields.mkString(", ")}")
78+
}
79+
80+
val unsupportedDataTypeFields = ast.children.filter { field =>
81+
field.isInstanceOf[Group] ||
82+
(field.isInstanceOf[Primitive] && field.asInstanceOf[Primitive].occurs.isDefined) ||
83+
field.redefines.nonEmpty
84+
}
85+
86+
if (unsupportedDataTypeFields.nonEmpty) {
87+
throw new IllegalArgumentException(s"The following fields from the copybook are not supported by the 'spark-cobol' at the moment: " +
88+
s"${unsupportedDataTypeFields.map(_.name).mkString(", ")}. Only primitive fields without redefines and occurs are supported.")
89+
}
90+
}
91+
92+
private def getAst(cobolSchema: CobolSchema): Group = {
93+
val rootAst = cobolSchema.copybook.ast
94+
95+
if (rootAst.children.length == 1 && rootAst.children.head.isInstanceOf[Group]) {
96+
rootAst.children.head.asInstanceOf[Group]
97+
} else {
98+
rootAst
99+
}
100+
}
38101
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/RawBinaryOutputFormat.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package za.co.absa.cobrix.spark.cobol.writer
1818

19+
import org.apache.hadoop.fs.Path
1920
import org.apache.hadoop.mapreduce._
2021
import org.apache.hadoop.io.{BytesWritable, NullWritable}
2122
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
@@ -36,15 +37,23 @@ import java.io.DataOutputStream
3637
* - The key type for the output is `NullWritable` because the key is not used.
3738
* - The value type for the output is `BytesWritable`, which represents the binary data to be written.
3839
*/
40+
3941
class RawBinaryOutputFormat extends FileOutputFormat[NullWritable, BytesWritable] {
4042
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[NullWritable, BytesWritable] = {
41-
val file = getDefaultWorkFile(context, "")
42-
val out: DataOutputStream = file.getFileSystem(context.getConfiguration).create(file)
43+
val path: Path = getDefaultWorkFile(context, ".dat")
44+
val fs = path.getFileSystem(context.getConfiguration)
45+
val out: DataOutputStream = fs.create(path, false)
46+
4347
new RecordWriter[NullWritable, BytesWritable] {
4448
override def write(key: NullWritable, value: BytesWritable): Unit = {
45-
out.write(value.getBytes, 0, value.getLength)
49+
if (value != null) {
50+
out.write(value.getBytes, 0, value.getLength) // No separator
51+
}
52+
}
53+
override def close(context: TaskAttemptContext): Unit = {
54+
out.close()
4655
}
47-
override def close(context: TaskAttemptContext): Unit = out.close()
4856
}
4957
}
5058
}
59+
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.writer
18+
19+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
20+
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
21+
22+
object RecordCombinerSelector {
23+
/**
24+
* Selects and returns an appropriate implementation of the `RecordCombiner` based on the provided COBOL schema
25+
* and reader parameters.
26+
*
27+
* Currently, only basic fixed record length combiner is implemented.
28+
* This method is to be extended as writing capabilities evolve.
29+
*
30+
* @param cobolSchema The COBOL schema ot output record.
31+
* @param readerParameters Configuration parameters that specify how records should be formed.
32+
* @return A `RecordCombiner` implementation suitable for combining records based on the given schema and parameters.
33+
*/
34+
def selectCombiner(cobolSchema: CobolSchema, readerParameters: ReaderParameters): RecordCombiner = {
35+
new BasicRecordCombiner
36+
}
37+
38+
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/FixedLengthEbcdicWriterSuite.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
3838

3939
val path = new Path(tempDir, "writer1")
4040

41-
df.write
41+
df.repartition(1)
42+
.orderBy("A")
43+
.write
4244
.format("cobol")
4345
.mode(SaveMode.Overwrite)
4446
.option("copybook_contents", copybookContents)
@@ -50,6 +52,26 @@ class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with B
5052
val files = fs.listStatus(path)
5153
.filter(_.getPath.getName.startsWith("part-"))
5254
assert(files.nonEmpty, "Output directory should contain part files")
55+
56+
val partFile = files.head.getPath
57+
val data = fs.open(partFile)
58+
val bytes = new Array[Byte](files.head.getLen.toInt)
59+
data.readFully(bytes)
60+
data.close()
61+
62+
// Expected EBCDIC data for sample test data
63+
val expected = Array[Byte](
64+
0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
65+
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x00.toByte, // B,Scnd_
66+
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x00.toByte // C,Last_
67+
)
68+
69+
if (!bytes.sameElements(expected)) {
70+
println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}")
71+
println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}")
72+
73+
assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding")
74+
}
5375
}
5476
}
5577
}

0 commit comments

Comments
 (0)