Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert into clickhouse table with toYYYYMM(key) partition key raises org.apache.spark.sql.AnalysisException: months(key) is not currently supported #222

Open
Yxang opened this issue Feb 24, 2023 · 3 comments

Comments

@Yxang
Copy link
Contributor

Yxang commented Feb 24, 2023

env

  • spark: 3.3.2
  • clickhouse: 22.12.1.1752
  • package version: master with commit 2158794
  • package config
spark.clickhouse.write.format                      json
spark.clickhouse.ignoreUnsupportedTransform        true
spark.clickhouse.write.batchSize                   100000
spark.clickhouse.write.repartitionByPartition      true
spark.clickhouse.write.repartitionNum              40

reproduce

  • create table in clickhouse
create table test.test
(
    `datetime` DateTime,
    `value` String
)
ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMM(datetime)
ORDER BY datetime
SETTINGS index_granularity = 8192;
  • insert in spark
insert into clickhouse.test.test
select col1 as datetime, col2 as value
from (values ('2023-01-01 12:00:00', '1'),
             ('2023-01-03 12:00:00', '2'),
             ('2023-02-01 12:00:00', '3'),
             ('2023-02-03 12:00:00', '4'))
  • trace (SQL run by spark thrift server)
[2023-02-24 12:39:13] org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: months(datetime) is not currently supported
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:43)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:325)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230)
[2023-02-24 12:39:13] 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225)
[2023-02-24 12:39:13] 	at java.security.AccessController.doPrivileged(Native Method)
[2023-02-24 12:39:13] 	at javax.security.auth.Subject.doAs(Subject.java:422)
[2023-02-24 12:39:13] 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239)
[2023-02-24 12:39:13] 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[2023-02-24 12:39:13] 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[2023-02-24 12:39:13] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[2023-02-24 12:39:13] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[2023-02-24 12:39:13] 	at java.lang.Thread.run(Thread.java:745)
[2023-02-24 12:39:13] Caused by: org.apache.spark.sql.AnalysisException: months(datetime) is not currently supported
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
[2023-02-24 12:39:13] 	at scala.Option.getOrElse(Option.scala:189)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
[2023-02-24 12:39:13] 	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
[2023-02-24 12:39:13] 	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
[2023-02-24 12:39:13] 	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
[2023-02-24 12:39:13] 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
[2023-02-24 12:39:13] 	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
[2023-02-24 12:39:13] 	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
[2023-02-24 12:39:13] 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:47)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
[2023-02-24 12:39:13] 	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
[2023-02-24 12:39:13] 	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
[2023-02-24 12:39:13] 	at scala.collection.immutable.List.foldLeft(List.scala:91)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
[2023-02-24 12:39:13] 	at scala.collection.immutable.List.foreach(List.scala:431)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:106)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
[2023-02-24 12:39:13] 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:291)
[2023-02-24 12:39:13] 	... 16 more

Seems that org.apache.spark.sql.connector.expressions.Expressions#months provided by spark is problematic?

@Yxang Yxang changed the title Insert into clickhouse table with 'toYYYYMM(key)' partition key raises org.apache.spark.sql.AnalysisException: months(key) is not currently supported Insert into clickhouse table with toYYYYMM(key) partition key raises org.apache.spark.sql.AnalysisException: months(key) is not currently supported Feb 24, 2023
@pan3793
Copy link
Collaborator

pan3793 commented Feb 24, 2023

On the Spark side, it requires SPARK-39607; on the Connector side, we should register v2 functions in the function catalog. So basically, it is an unaccomplished feature.

@pan3793
Copy link
Collaborator

pan3793 commented Feb 24, 2023

So temporary solution is to disable spark.clickhouse.write.repartitionByPartition or avoid the expression in the partition definition.

@Yxang
Copy link
Contributor Author

Yxang commented Jun 25, 2023

@pan3793 Hi, I implemented a patch supporting this, also other hash UDF with some limitations in #253. Would you look at it, or maybe use it as a starting point for such an important feature?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants