diff --git a/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java b/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java index 63938abb..1536df72 100644 --- a/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java +++ b/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java @@ -357,6 +357,7 @@ public static class Options { private int mInterval = 128; private int mHeight = 2; private int initialCapacityMB = 20; + private int avroSyncInterval = 10 << 20; private GenericData model = SpecificData.get(); @@ -443,6 +444,15 @@ public Options withInitialBufferSizeMB(int mb) { public CodecFactory getCodec() { return this.codec; } + + public int getAvroSyncInterval() { + return avroSyncInterval; + } + + public Options withAvroSyncInterval(int avroSyncInterval) { + this.avroSyncInterval = avroSyncInterval; + return this; + } } /** @@ -578,7 +588,7 @@ public BufferedWriter(Writer.Options options, Schema schema, String keyValueFiel this.schema = schema; this.recordsBuffer = recordsBuffer; - this.memoryWriter = inMemoryWriter.setSyncInterval(1 << 20); + this.memoryWriter = inMemoryWriter.setSyncInterval(options.getAvroSyncInterval()); this.headerPosition = inMemoryWriter.sync(); this.options = options; syncs = new LinkedList<>(); @@ -592,7 +602,6 @@ private DataFileWriter createMemoryFileWriter(Writer.Options opti DatumWriter datumWriter = model.createDatumWriter(schema); DataFileWriter inMemoryWriter = new DataFileWriter<>(datumWriter) - .setSyncInterval(1 << 20) .setCodec(options.getCodec()) .create(schema, recordsBuffer); return inMemoryWriter; @@ -628,7 +637,7 @@ public void reverseAndClose(FSDataOutputStream output) throws IOException { .setMeta(DATA_SIZE_KEY, dataSize) // put data size in metadata: .setMeta(KEY_VALUE_HEADER_NAME, keyValueFields) // put data size in metadata: .setCodec(options.getCodec()) - .setSyncInterval(1 << 20) + .setSyncInterval(options.getAvroSyncInterval()) .create(schema, output); // read blocks backwards, and append to the real file: diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/AvroBtreeOutputWriter.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/AvroBtreeOutputWriter.scala index b3c3e28e..ec625829 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/AvroBtreeOutputWriter.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/AvroBtreeOutputWriter.scala @@ -63,6 +63,7 @@ class AvroBtreeOutputWriter( path: String, .withInterval(jobOptions.interval) .withHeight(jobOptions.height) .withCodec(codecFactory) + .withAvroSyncInterval(context.getConfiguration.getInt("btree.avro.syncInterval", 10 << 20)) .withPath(new Path(path)) new AvroBtreeFile.Writer(avroBtreeFileOptions) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/WriterOptions.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/WriterOptions.scala index ff02c68d..ee942331 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/WriterOptions.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/WriterOptions.scala @@ -1,12 +1,7 @@ package com.paypal.dione.spark.avro.btree -import java.io.{ByteArrayInputStream, DataInputStream} -import java.util.Base64 - import org.apache.avro.file.CodecFactory -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SaveMode import org.apache.spark.sql.types.StructType import scala.util.Try @@ -19,31 +14,30 @@ class AvroBtreeJobOptions(val schema: StructType, ) extends Serializable { validate() - def hadoopConf = { - val conf = new Configuration() - conf.readFields(new DataInputStream(new ByteArrayInputStream(Base64.getDecoder.decode(map("__hadoop.conf"))))) - conf - } - def basePath: Path = new Path(map("path")) -// def stagingPath: Path = basePath.child(".staging") + + // def stagingPath: Path = basePath.child(".staging") def keyRecordName: String = map.getOrElse("avro.keyRecordName", "key") + def valueRecordName: String = map.getOrElse("avro.valueRecordName", "value") + def recordNamespace: String = map.getOrElse("avro.recordNamespace", "indexer") + def compression: String = map.getOrElse("avro.compression", "deflate") def keyFields: Seq[String] = map("key.fields").split(",").map(_.trim) + def valueFields: Seq[String] = map("value.fields").split(",").map(_.trim) def interval: Int = map.getOrElse("btree.interval", "1000").toInt + def height: Int = map.getOrElse("btree.height", "4").toInt - def cleanup = map.get("cleanup").forall(_.toBoolean) + def cleanup: Boolean = map.get("cleanup").forall(_.toBoolean) private def validate() = { // required options: -// Seq("key.fields", "value.fields", "__hadoop.conf").foreach { key => Seq("key.fields", "value.fields").foreach { key => map.getOrElse(key, throw new IllegalArgumentException("missing option: " + key)) } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManagerUtils.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManagerUtils.scala index 21f991c1..b0040073 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManagerUtils.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManagerUtils.scala @@ -188,16 +188,19 @@ object IndexManagerUtils { val schemaStr = Seq(colsSchema, IndexManager.indexSchema) .flatMap(schema => schema.fields.map(field => field.name + " " + field.dataType.typeName)).mkString(", ") - // resolve partitions' schema - val partitionsKeys = spark.catalog.listColumns(dataTableName).filter(_.isPartition).collect().map(_.name) - val partitionsSchema = spark.table(dataTableName).select(partitionsKeys.map(col): _*).schema - val partitionsSchemaStr = partitionsSchema.fields.map(field => field.name + " " + field.dataType.typeName).mkString(", ") - val tblproperties = Seq("index.meta.dataTableName" -> dataTableName, "index.meta.keys" -> keys.mkString("|"), "index.meta.moreFields" -> moreFields.mkString("|")).map(t => "'" + t._1 + "'='" + t._2 + "'") - spark.sql(s"create table $indexTableName (" + schemaStr + ") partitioned by (" + - partitionsSchemaStr + ") stored as avro TBLPROPERTIES (" + tblproperties.mkString(",") + ")") + // resolve partitions' schema + val partitionsKeys = spark.catalog.listColumns(dataTableName).filter(_.isPartition).collect().map(_.name) + val partitionedStr = if (partitionsKeys.nonEmpty) { + val partitionsSchema = spark.table(dataTableName).select(partitionsKeys.map(col): _*).schema + val partitionsSchemaStr = partitionsSchema.fields.map(field => field.name + " " + field.dataType.typeName).mkString(", ") + " partitioned by (" + partitionsSchemaStr + ") " + } else "" + + spark.sql(s"create table $indexTableName ($schemaStr)" + + s"$partitionedStr stored as avro TBLPROPERTIES (${tblproperties.mkString(",")})") } def getTablePartitions(tableName: String, spark: SparkSession): Seq[Seq[(String, String)]] = { diff --git a/dione-spark/src/test/resources/log4j.properties b/dione-spark/src/test/resources/log4j.properties index a107573f..782fdb19 100644 --- a/dione-spark/src/test/resources/log4j.properties +++ b/dione-spark/src/test/resources/log4j.properties @@ -13,7 +13,7 @@ log4j.appender.stdout.layout.ConversionPattern=%d %5p %c{3} - %m%n ####################################### ### Logger Categories ################# ####################################### -log4j.category.com.paypal.dione=INFO +log4j.category.com.paypal.dione=WARN log4j.category.com.paypal.dione.kvstorage.hadoop.avro=DEBUG log4j.category.com.paypal.dione.avro=INFO #log4j.category.com.paypal.dione.spark=DEBUG @@ -27,8 +27,8 @@ log4j.category.org.apache.hadoop.hive=WARN log4j.category.hive.log=WARN log4j.category.ql.metadata.Hive=WARN log4j.category.hive.ql.parse.ParseDriver=WARN -log4j.category.org.apache.parquet=WARN -log4j.category.org.apache.hadoop.io=WARN +log4j.category.org.apache.parquet=ERROR +log4j.category.org.apache.hadoop=WARN log4j.category.org.spark-project.jetty.server.handler=WARN log4j.category.org.spark_project.jetty.server.handler=WARN diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroPartialBlock.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroPartialBlock.scala new file mode 100644 index 00000000..3fc6f7fe --- /dev/null +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroPartialBlock.scala @@ -0,0 +1,45 @@ +package com.paypal.dione.spark.index.avro + +import com.paypal.dione.SparkCleanTestDB +import com.paypal.dione.spark.avro.btree.SparkAvroBtreeUtils +import com.paypal.dione.spark.index.{IndexManager, IndexSpec} +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation +import org.junit.jupiter.api._ + + +object TestAvroPartialBlock extends SparkCleanTestDB { + + override val baseTestPath: String = "TestData/TestAvroPartialBlock/" + override val dbName: String = "TestAvroPartialBlock" + + @BeforeAll + def initData(): Unit = { + import spark.implicits._ + + val N = 100000 + println("ohad: " + N) + val varsDF = (1 to N).map(i => (i, "v1_" + i, "asdasd_"+i+"_blahblah", ("asd"+i)*10)) + .toDF("key", "val1", "strstr", "strstr2") + + varsDF.createOrReplaceTempView("t") + spark.table("t").write.saveAsTable("tbl_data") + + } + +} + +@TestMethodOrder(classOf[OrderAnnotation]) +class TestAvroPartialBlock { + + import TestAvroPartialBlock.spark + + @Test + @Order(1) + def test1(): Unit = { + val im = IndexManager.createNew(IndexSpec("tbl_data", "index_tbl_data", Seq("key"), Seq("val1", "strstr", "strstr2")))(spark) + SparkAvroBtreeUtils.writeDFasAvroBtree(spark.table("tbl_data"), + Seq("key"), im.indexFolder, 1, 10, 2)(spark) + spark.table("index_tbl_data").show() + } + +} \ No newline at end of file