Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] hoodie.parquet.outputtimestamptype setting not converting to TIMESTAMP_MILLIS #12339

Open
KendallRackley opened this issue Nov 26, 2024 · 1 comment
Labels
priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types

Comments

@KendallRackley
Copy link

Describe the problem you faced

Hey team,
Here's a link to the thread on the Apache Hudi Slack channel where I posted this issue:
https://apache-hudi.slack.com/archives/C4D716NPQ/p1731532187806959

I'm running a PySpark script in AWS Glue ETL. It is reading from a Postgres database table via a JDBC connection and writing the dataframe to Hudi. This DataFrame contains 7 columns. Three of the columns are type Long, with LogicalType "timestamp-micros".

I used these settings in the hoodie config:

"hoodie.table.name": TABLE_NAME,
"hoodie.database.name": "default_database",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"path": TABLE_PATH,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.metadata.enable": "false",
"hoodie.index.type": "BUCKET",
"hoodie.parquet.outputtimestamptype": "TIMESTAMP_MILLIS",
"hoodie.datasource.write.operation": "BULK_INSERT",
"hoodie.datasource.hive_sync.support_timestamp": "true",
"hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled": "true"

Added this in the spark config also:
conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")

but it still outputs "timestamp-micros" for field3, field4 and field7:

{
  "type" : "record",
  "name" : "name",
  "namespace" : "namespace",
  "fields" : [ {
    "name" : "field1",
    "type" : "string"
  }, {
    "name" : "field2",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "field3",
    "type" : [ "null", {
      "type" : "long",
      "logicalType" : "timestamp-micros"
    } ],
    "default" : null
  }, {
    "name" : "field4",
    "type" : [ "null", {
      "type" : "long",
      "logicalType" : "timestamp-micros"
    } ],
    "default" : null
  }, {
    "name" : "field5",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "field6",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "field7",
    "type" : [ "null", {
      "type" : "long",
      "logicalType" : "timestamp-micros"
    } ],
    "default" : null
  } ]
}

I tried converting it to timestamp-millis by manually setting the schema and generating a new dataframe from it:

schema = StructType([
    StructField("field1", StringType(), nullable=False),
    StructField("field2", StringType(), nullable=True),
    StructField("field3", TimestampType(), nullable=True, metadata={'logicalType': 'timestamp-millis'}),
    StructField("field4", TimestampType(), nullable=True, metadata={'logicalType': 'timestamp-millis'}),
    StructField("field5", StringType(), nullable=True),
    StructField("field6", StringType(), nullable=True),
    StructField("field7", TimestampType(), nullable=True, metadata={'logicalType': 'timestamp-millis'})
])

new_df = spark.createDataFrame(df.rdd, schema)

I've tried casting it to milliseconds within the timestamp and this does not work either:

new_df = new_df.withColumn("field3", to_timestamp(col("field3"), 'yyyy-MM-dd HH:mm:ss.SSS'))

It truncates the data in the field from microseconds to milliseconds in the data, but it does not convert the datatypes for those columns eg.

2007-03-11 15:46:41.540000 -----> 2007-03-11 15:46:41.5400

Does the setting "hoodie.parquet.outputtimestamptype" just not work? Is it not possible to output timestamp-milliseconds with Spark?


To Reproduce

Steps to reproduce the behavior:

Ranga Reddy on the channel attempted to recreate this issue by setting a dataframe schema with the TimestampType class. He inserted some rows that had timestamps up to microseconds. The setting hoodie.parquet.outputtimestamptype was set to TIMESTAMP_MILLIS, but when writing to Hudi, the logicalType of the timestamp was still TIMESTAMP_MICROS even though the schema was set and the outputtimestamptype setting was added too.

  1. Run Ranga's code sample:
import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.internal.SQLConf

object TestTimeStamp extends App {
  val name = this.getClass.getSimpleName.replace("$", "")
  val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

  val spark = SparkSession.builder.appName(name).config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .getOrCreate()

  val tableName = name
  val basePath = f"file:///tmp/warehouse/$tableName"

  val schema = StructType(Array(
    StructField("field1", IntegerType, nullable = false),
    StructField("field2", StringType, nullable = true),
    StructField("field3", TimestampType, nullable = false)
  ))

  val data = Seq(
    Row(1, "A", java.sql.Timestamp.valueOf("2023-10-01 10:00:00.540040")),
    Row(2, "B", java.sql.Timestamp.valueOf("2023-10-01 11:30:00.240030")),
    Row(3, "C", java.sql.Timestamp.valueOf("2023-10-01 12:45:00.140022"))
  )

  spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
  spark.sql("SET spark.sql.parquet.outputTimestampType=TIMESTAMP_MILLIS")
  val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

  // Hudi write options
  val hudiOptions = Map(
    "hoodie.table.name" -> tableName,
    "hoodie.datasource.write.recordkey.field" -> "field1",
    "hoodie.datasource.write.precombine.field" -> "field2",
    "hoodie.parquet.outputtimestamptype" -> "TIMESTAMP_MILLIS"
    //"hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled" -> "true"
  )

  // Write the DataFrame to Hudi
  df.write.format("hudi").options(hudiOptions).mode("overwrite").save(basePath)
  df.show(truncate = false)
  spark.read.format("hudi").load(basePath).show(false)
  spark.stop()
}

Expected behavior

I expect the parquet schema for field3 to be TIMESTAMP_MILLIS instead of TIMESTAMP_MICROS. This is what the schema output should be:

"fields" : [ {
"name" : "field1",
"type" : "integer"
}, {
"name" : "field2",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "field3",
"type" : [ "null", {
"type" : "long",
"logicalType" : "timestamp-micros"
} ],
"default" : null
}

Environment Description

  • Hudi version : Hudi/AWS Bundle 0.14

  • Spark version : 3.3

  • Hive version : Not sure

  • Hadoop version : N/A

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Stacktrace

No Stacktrace, just output

@rangareddy
Copy link

Created Hudi Jira - https://issues.apache.org/jira/browse/HUDI-8592

@ad1happy2go ad1happy2go added schema-and-data-types priority:critical production down; pipelines stalled; Need help asap. labels Nov 27, 2024
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Nov 27, 2024
@ad1happy2go ad1happy2go moved this from ⏳ Awaiting Triage to 🏁 Triaged in Hudi Issue Support Dec 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

3 participants