Skip to content

Commit ed463d4

Browse files
committed
#415 Implement a simple binary record writer.
1 parent 74e9518 commit ed463d4

File tree

7 files changed

+199
-50
lines changed

7 files changed

+199
-50
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,14 @@ object CobolParametersParser extends Logging {
246246
recordFormatDefined
247247
}
248248

249+
val copybookPaths = params.get(PARAM_MULTI_COPYBOOK_PATH) match {
250+
case Some(paths) => paths.split(',').toSeq
251+
case None => Seq.empty[String]
252+
}
253+
249254
val cobolParameters = CobolParameters(
250255
getParameter(PARAM_COPYBOOK_PATH, params),
251-
params.getOrElse(PARAM_MULTI_COPYBOOK_PATH, "").split(','),
256+
copybookPaths,
252257
getParameter(PARAM_COPYBOOK_CONTENTS, params),
253258
paths,
254259
recordFormat,

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717
package za.co.absa.cobrix.spark.cobol.source
1818

1919
import org.apache.hadoop.fs.Path
20+
import org.apache.hadoop.io.{BytesWritable, NullWritable}
2021
import org.apache.spark.sql.sources._
2122
import org.apache.spark.sql.types.StructType
2223
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
2324
import za.co.absa.cobrix.cobol.internal.Logging
2425
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
2526
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
27+
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2628
import za.co.absa.cobrix.spark.cobol.reader._
2729
import za.co.absa.cobrix.spark.cobol.source.copybook.CopybookContentLoader
2830
import za.co.absa.cobrix.spark.cobol.source.parameters._
2931
import za.co.absa.cobrix.spark.cobol.utils.{BuildProperties, SparkUtils}
32+
import za.co.absa.cobrix.spark.cobol.writer.{BasicRecordCombiner, RawBinaryOutputFormat}
3033

3134
/**
3235
* This class represents a Cobol data source.
@@ -65,6 +68,11 @@ class DefaultSource
6568
val path = parameters.getOrElse("path",
6669
throw new IllegalArgumentException("Path is required for this data source."))
6770

71+
val cobolParameters = CobolParametersParser.parse(new Parameters(parameters))
72+
CobolParametersValidator.checkSanity(cobolParameters)
73+
74+
val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)
75+
6876
mode match {
6977
case SaveMode.Overwrite =>
7078
val outputPath = new Path(path)
@@ -77,10 +85,20 @@ class DefaultSource
7785
case _ =>
7886
}
7987

80-
// Simply save each row as comma-separated values in a text file
81-
data.rdd
82-
.map(row => row.mkString(","))
83-
.saveAsTextFile(path)
88+
val copybookContent = CopybookContentLoader.load(cobolParameters, sqlContext.sparkContext.hadoopConfiguration)
89+
val cobolSchema = CobolSchema.fromReaderParameters(copybookContent, readerParameters)
90+
91+
val combiner = new BasicRecordCombiner
92+
93+
val rdd = combiner.combine(data, cobolSchema, readerParameters)
94+
95+
rdd.map(bytes => (NullWritable.get(), new BytesWritable(bytes)))
96+
.saveAsNewAPIHadoopFile(
97+
path,
98+
classOf[NullWritable],
99+
classOf[BytesWritable],
100+
classOf[RawBinaryOutputFormat]
101+
)
84102

85103
new BaseRelation {
86104
override def sqlContext: SQLContext = sqlContext
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.writer
18+
19+
import org.apache.spark.rdd.RDD
20+
import org.apache.spark.sql.DataFrame
21+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
22+
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
23+
24+
import java.util
25+
import scala.util.Random
26+
27+
class BasicRecordCombiner extends RecordCombiner {
28+
override def combine(df: DataFrame, cobolSchema: CobolSchema, readerParameters: ReaderParameters): RDD[Array[Byte]] = {
29+
df.rdd.map { row =>
30+
val r = Random.nextInt(100)
31+
val ar = new Array[Byte](10)
32+
33+
util.Arrays.fill(ar, r.toByte)
34+
35+
ar
36+
}
37+
}
38+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.writer
18+
19+
import org.apache.hadoop.mapreduce._
20+
import org.apache.hadoop.io.{BytesWritable, NullWritable}
21+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
22+
23+
import java.io.DataOutputStream
24+
25+
/**
26+
* A custom implementation of `FileOutputFormat` that outputs raw binary data for fixed record length
27+
* outputs or for variable record length outputs when record size headers are already embedded into
28+
* each record array of bytes.
29+
*
30+
* The `RawBinaryOutputFormat` class is designed to write binary data into output files
31+
* without adding any additional structure or metadata. Each record is directly written
32+
* as a stream of bytes to the output.
33+
*
34+
* This output format only handles records that are represented as `BytesWritable` and ignores the key.
35+
*
36+
* - The key type for the output is `NullWritable` because the key is not used.
37+
* - The value type for the output is `BytesWritable`, which represents the binary data to be written.
38+
*/
39+
class RawBinaryOutputFormat extends FileOutputFormat[NullWritable, BytesWritable] {
40+
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[NullWritable, BytesWritable] = {
41+
val file = getDefaultWorkFile(context, "")
42+
val out: DataOutputStream = file.getFileSystem(context.getConfiguration).create(file)
43+
new RecordWriter[NullWritable, BytesWritable] {
44+
override def write(key: NullWritable, value: BytesWritable): Unit = {
45+
out.write(value.getBytes, 0, value.getLength)
46+
}
47+
override def close(context: TaskAttemptContext): Unit = out.close()
48+
}
49+
}
50+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.writer
18+
19+
import org.apache.spark.rdd.RDD
20+
import org.apache.spark.sql.DataFrame
21+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
22+
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
23+
24+
trait RecordCombiner {
25+
def combine(df: DataFrame, cobolSchema: CobolSchema, readerParameters: ReaderParameters): RDD[Array[Byte]]
26+
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/WriterSourceSpec.scala

Lines changed: 0 additions & 45 deletions
This file was deleted.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.writer
18+
19+
import org.apache.hadoop.fs.Path
20+
import org.apache.spark.sql.SaveMode
21+
import org.scalatest.wordspec.AnyWordSpec
22+
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
23+
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
24+
25+
class FixedLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFixture {
26+
import spark.implicits._
27+
28+
private val copybookContents =
29+
""" 01 RECORD.
30+
05 A PIC X(1).
31+
05 B PIC X(5).
32+
"""
33+
34+
"cobol writer" should {
35+
"write simple fixed-record-length EBCDIC data files" in {
36+
withTempDirectory("cobol_writer1") { tempDir =>
37+
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")
38+
39+
val path = new Path(tempDir, "writer1")
40+
41+
df.write
42+
.format("cobol")
43+
.mode(SaveMode.Overwrite)
44+
.option("copybook_contents", copybookContents)
45+
.save(path.toString)
46+
47+
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
48+
49+
assert(fs.exists(path), "Output directory should exist")
50+
val files = fs.listStatus(path)
51+
.filter(_.getPath.getName.startsWith("part-"))
52+
assert(files.nonEmpty, "Output directory should contain part files")
53+
}
54+
}
55+
}
56+
57+
}

0 commit comments

Comments
 (0)