Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ class ParquetOptions(
shortParquetCompressionCodecNames(codecName).name()
}

/**
* Parquet writer version to use. If this returns None, the default format should be used.
*/
val writerVersion: Option[String] = parameters.get(WRITER_VERSION)

/**
* The Parquet physical type to use for timestamp columns. If None, the default type from the
* SQL conf should be used.
*/
val outputTimestampType: Option[String] = parameters.get(OUTPUT_TIMESTAMP_TYPE)

/**
* Whether it merges schemas or not. When the given Parquet files have different schemas,
* the schemas can be merged. By default use the value specified in SQLConf.
Expand Down Expand Up @@ -108,6 +119,8 @@ object ParquetOptions extends DataSourceOptions {
val MERGE_SCHEMA = newOption("mergeSchema")
val PARQUET_COMPRESSION = newOption(ParquetOutputFormat.COMPRESSION)
val COMPRESSION = newOption("compression")
val WRITER_VERSION = newOption(ParquetOutputFormat.WRITER_VERSION)
val OUTPUT_TIMESTAMP_TYPE = newOption(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)

// The option controls rebasing of the DATE and TIMESTAMP values between
// Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,9 @@ object ParquetUtils extends Logging {
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sqlConf.writeLegacyParquetFormat.toString)

conf.set(
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
sqlConf.parquetOutputTimestampType.toString)
val outputTimestampType = parquetOptions.outputTimestampType
.getOrElse(sqlConf.parquetOutputTimestampType.toString)
conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, outputTimestampType)

conf.set(
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
Expand All @@ -530,6 +530,8 @@ object ParquetUtils extends Logging {
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)

parquetOptions.writerVersion.foreach(conf.set(ParquetOptions.WRITER_VERSION, _))

// SPARK-15719: Disables writing Parquet summary files by default.
if (conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null
&& conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

import org.apache.spark.TestUtils
import org.apache.spark.memory.MemoryMode
Expand Down Expand Up @@ -236,4 +237,45 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
}
}
}

test("writer version write option produces v2 data pages") {
val hadoopConf = spark.sessionState.newHadoopConf()
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/test.parquet"
spark.range(1, 100).toDF("id")
.write
.option(ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_2_0.toString)
.mode("overwrite")
.parquet(path)

for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) {
for (blockMetadata <- footer.getParquetMetadata.getBlocks.asScala) {
val columnChunkMetadata = blockMetadata.getColumns.asScala.head
assert(columnChunkMetadata.getEncodings.contains(Encoding.DELTA_BINARY_PACKED))
}
}
}
}

test("output timestamp type write option overrides session default") {
val hadoopConf = spark.sessionState.newHadoopConf()
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/test.parquet"
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") {
spark.sql("SELECT TIMESTAMP '2024-01-01 12:00:00' AS ts")
.write
.option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS")
.mode("overwrite")
.parquet(path)
}

for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) {
val schema = footer.getParquetMetadata.getFileMetaData.getSchema
val tsField = schema.getFields.asScala.find(_.getName == "ts").get
.asPrimitiveType()
assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64)
}
}
}
}