Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public static class Options {
private int mInterval = 128;
private int mHeight = 2;
private int initialCapacityMB = 20;
private int avroSyncInterval = 10 << 20;

private GenericData model = SpecificData.get();

Expand Down Expand Up @@ -443,6 +444,15 @@ public Options withInitialBufferSizeMB(int mb) {
public CodecFactory getCodec() {
return this.codec;
}

public int getAvroSyncInterval() {
return avroSyncInterval;
}

public Options withAvroSyncInterval(int avroSyncInterval) {
this.avroSyncInterval = avroSyncInterval;
return this;
}
}

/**
Expand Down Expand Up @@ -578,7 +588,7 @@ public BufferedWriter(Writer.Options options, Schema schema, String keyValueFiel

this.schema = schema;
this.recordsBuffer = recordsBuffer;
this.memoryWriter = inMemoryWriter.setSyncInterval(1 << 20);
this.memoryWriter = inMemoryWriter.setSyncInterval(options.getAvroSyncInterval());
this.headerPosition = inMemoryWriter.sync();
this.options = options;
syncs = new LinkedList<>();
Expand All @@ -592,7 +602,6 @@ private DataFileWriter<GenericRecord> createMemoryFileWriter(Writer.Options opti
DatumWriter<GenericRecord> datumWriter = model.createDatumWriter(schema);
DataFileWriter<GenericRecord> inMemoryWriter =
new DataFileWriter<>(datumWriter)
.setSyncInterval(1 << 20)
.setCodec(options.getCodec())
.create(schema, recordsBuffer);
return inMemoryWriter;
Expand Down Expand Up @@ -628,7 +637,7 @@ public void reverseAndClose(FSDataOutputStream output) throws IOException {
.setMeta(DATA_SIZE_KEY, dataSize) // put data size in metadata:
.setMeta(KEY_VALUE_HEADER_NAME, keyValueFields) // put data size in metadata:
.setCodec(options.getCodec())
.setSyncInterval(1 << 20)
.setSyncInterval(options.getAvroSyncInterval())
.create(schema, output);

// read blocks backwards, and append to the real file:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class AvroBtreeOutputWriter( path: String,
.withInterval(jobOptions.interval)
.withHeight(jobOptions.height)
.withCodec(codecFactory)
.withAvroSyncInterval(context.getConfiguration.getInt("btree.avro.syncInterval", 10 << 20))
.withPath(new Path(path))

new AvroBtreeFile.Writer(avroBtreeFileOptions)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package com.paypal.dione.spark.avro.btree

import java.io.{ByteArrayInputStream, DataInputStream}
import java.util.Base64

import org.apache.avro.file.CodecFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.StructType

import scala.util.Try
Expand All @@ -19,31 +14,30 @@ class AvroBtreeJobOptions(val schema: StructType,
) extends Serializable {
validate()

def hadoopConf = {
val conf = new Configuration()
conf.readFields(new DataInputStream(new ByteArrayInputStream(Base64.getDecoder.decode(map("__hadoop.conf")))))
conf
}

def basePath: Path = new Path(map("path"))
// def stagingPath: Path = basePath.child(".staging")

// def stagingPath: Path = basePath.child(".staging")
def keyRecordName: String = map.getOrElse("avro.keyRecordName", "key")

def valueRecordName: String = map.getOrElse("avro.valueRecordName", "value")

def recordNamespace: String = map.getOrElse("avro.recordNamespace", "indexer")

def compression: String = map.getOrElse("avro.compression", "deflate")

def keyFields: Seq[String] = map("key.fields").split(",").map(_.trim)

def valueFields: Seq[String] = map("value.fields").split(",").map(_.trim)

def interval: Int = map.getOrElse("btree.interval", "1000").toInt

def height: Int = map.getOrElse("btree.height", "4").toInt

def cleanup = map.get("cleanup").forall(_.toBoolean)
def cleanup: Boolean = map.get("cleanup").forall(_.toBoolean)


private def validate() = {
// required options:
// Seq("key.fields", "value.fields", "__hadoop.conf").foreach { key =>
Seq("key.fields", "value.fields").foreach { key =>
map.getOrElse(key, throw new IllegalArgumentException("missing option: " + key))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,19 @@ object IndexManagerUtils {
val schemaStr = Seq(colsSchema, IndexManager.indexSchema)
.flatMap(schema => schema.fields.map(field => field.name + " " + field.dataType.typeName)).mkString(", ")

// resolve partitions' schema
val partitionsKeys = spark.catalog.listColumns(dataTableName).filter(_.isPartition).collect().map(_.name)
val partitionsSchema = spark.table(dataTableName).select(partitionsKeys.map(col): _*).schema
val partitionsSchemaStr = partitionsSchema.fields.map(field => field.name + " " + field.dataType.typeName).mkString(", ")

val tblproperties = Seq("index.meta.dataTableName" -> dataTableName, "index.meta.keys" -> keys.mkString("|"),
"index.meta.moreFields" -> moreFields.mkString("|")).map(t => "'" + t._1 + "'='" + t._2 + "'")

spark.sql(s"create table $indexTableName (" + schemaStr + ") partitioned by (" +
partitionsSchemaStr + ") stored as avro TBLPROPERTIES (" + tblproperties.mkString(",") + ")")
// resolve partitions' schema
val partitionsKeys = spark.catalog.listColumns(dataTableName).filter(_.isPartition).collect().map(_.name)
val partitionedStr = if (partitionsKeys.nonEmpty) {
val partitionsSchema = spark.table(dataTableName).select(partitionsKeys.map(col): _*).schema
val partitionsSchemaStr = partitionsSchema.fields.map(field => field.name + " " + field.dataType.typeName).mkString(", ")
" partitioned by (" + partitionsSchemaStr + ") "
} else ""

spark.sql(s"create table $indexTableName ($schemaStr)" +
s"$partitionedStr stored as avro TBLPROPERTIES (${tblproperties.mkString(",")})")
}

def getTablePartitions(tableName: String, spark: SparkSession): Seq[Seq[(String, String)]] = {
Expand Down
6 changes: 3 additions & 3 deletions dione-spark/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ log4j.appender.stdout.layout.ConversionPattern=%d %5p %c{3} - %m%n
#######################################
### Logger Categories #################
#######################################
log4j.category.com.paypal.dione=INFO
log4j.category.com.paypal.dione=WARN
log4j.category.com.paypal.dione.kvstorage.hadoop.avro=DEBUG
log4j.category.com.paypal.dione.avro=INFO
#log4j.category.com.paypal.dione.spark=DEBUG
Expand All @@ -27,8 +27,8 @@ log4j.category.org.apache.hadoop.hive=WARN
log4j.category.hive.log=WARN
log4j.category.ql.metadata.Hive=WARN
log4j.category.hive.ql.parse.ParseDriver=WARN
log4j.category.org.apache.parquet=WARN
log4j.category.org.apache.hadoop.io=WARN
log4j.category.org.apache.parquet=ERROR
log4j.category.org.apache.hadoop=WARN

log4j.category.org.spark-project.jetty.server.handler=WARN
log4j.category.org.spark_project.jetty.server.handler=WARN
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.paypal.dione.spark.index.avro

import com.paypal.dione.SparkCleanTestDB
import com.paypal.dione.spark.avro.btree.SparkAvroBtreeUtils
import com.paypal.dione.spark.index.{IndexManager, IndexSpec}
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation
import org.junit.jupiter.api._


object TestAvroPartialBlock extends SparkCleanTestDB {

override val baseTestPath: String = "TestData/TestAvroPartialBlock/"
override val dbName: String = "TestAvroPartialBlock"

@BeforeAll
def initData(): Unit = {
import spark.implicits._

val N = 100000
println("ohad: " + N)
val varsDF = (1 to N).map(i => (i, "v1_" + i, "asdasd_"+i+"_blahblah", ("asd"+i)*10))
.toDF("key", "val1", "strstr", "strstr2")

varsDF.createOrReplaceTempView("t")
spark.table("t").write.saveAsTable("tbl_data")

}

}

@TestMethodOrder(classOf[OrderAnnotation])
class TestAvroPartialBlock {

import TestAvroPartialBlock.spark

@Test
@Order(1)
def test1(): Unit = {
val im = IndexManager.createNew(IndexSpec("tbl_data", "index_tbl_data", Seq("key"), Seq("val1", "strstr", "strstr2")))(spark)
SparkAvroBtreeUtils.writeDFasAvroBtree(spark.table("tbl_data"),
Seq("key"), im.indexFolder, 1, 10, 2)(spark)
spark.table("index_tbl_data").show()
}

}