From 6d254a9f0654877fdc2d37adb73777ae26d704a2 Mon Sep 17 00:00:00 2001 From: Darcy Date: Wed, 15 Jan 2025 20:20:39 +0800 Subject: [PATCH 1/3] feature: add mvp code. --- .../server/table/TableConfigurations.java | 5 ++ .../apache/amoro/config/OptimizingConfig.java | 15 ++++ amoro-format-iceberg/pom.xml | 33 +++++++ .../plan/AbstractOptimizingEvaluator.java | 89 ++++++++++++++++++- .../apache/amoro/table/TableProperties.java | 3 + 5 files changed, 144 insertions(+), 1 deletion(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java index bb7823c99e..e500f9f9c7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java @@ -294,6 +294,11 @@ public static OptimizingConfig parseOptimizingConfig(Map propert properties, TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES, TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT)) + .setPartitionFilter( + CompatiblePropertyUtil.propertyAsString( + properties, + TableProperties.SELF_OPTIMIZING_PARTITION_FILTER, + TableProperties.SELF_OPTIMIZING_PARTITION_FILTER_DEFAULT)) .setBaseHashBucket( CompatiblePropertyUtil.propertyAsInt( properties, diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index ef92ac5fb1..54078367f8 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -74,6 +74,9 @@ public class OptimizingConfig { // self-optimizing.full.rewrite-all-files private boolean fullRewriteAllFiles; + // self-optimizing.partition-filter + private String partitionFilter; + // base.file-index.hash-bucket private int baseHashBucket; @@ -240,6 +243,15 @@ public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) { return this; } + public OptimizingConfig setPartitionFilter(String partitionFilter) { + this.partitionFilter = partitionFilter; + return this; + } + + public String getPartitionFilter() { + return partitionFilter; + } + public int getBaseHashBucket() { return baseHashBucket; } @@ -291,6 +303,7 @@ public boolean equals(Object o) { && Double.compare(that.majorDuplicateRatio, majorDuplicateRatio) == 0 && fullTriggerInterval == that.fullTriggerInterval && fullRewriteAllFiles == that.fullRewriteAllFiles + && Objects.equal(partitionFilter, that.partitionFilter) && baseHashBucket == that.baseHashBucket && baseRefreshInterval == that.baseRefreshInterval && hiveRefreshInterval == that.hiveRefreshInterval @@ -317,6 +330,7 @@ public int hashCode() { majorDuplicateRatio, fullTriggerInterval, fullRewriteAllFiles, + partitionFilter, baseHashBucket, baseRefreshInterval, hiveRefreshInterval, @@ -341,6 +355,7 @@ public String toString() { .add("majorDuplicateRatio", majorDuplicateRatio) .add("fullTriggerInterval", fullTriggerInterval) .add("fullRewriteAllFiles", fullRewriteAllFiles) + .add("partitionFilter", partitionFilter) .add("baseHashBucket", baseHashBucket) .add("baseRefreshInterval", baseRefreshInterval) .add("hiveRefreshInterval", hiveRefreshInterval) diff --git a/amoro-format-iceberg/pom.xml b/amoro-format-iceberg/pom.xml index 47919fce4e..13fa3a1157 100644 --- a/amoro-format-iceberg/pom.xml +++ b/amoro-format-iceberg/pom.xml @@ -154,6 +154,39 @@ paimon-bundle + + org.apache.spark + spark-sql_2.12 + ${terminal.spark.version} + compile + + + hive-storage-api + org.apache.hive + + + jackson-databind + com.fasterxml.jackson.core + + + orc-core + org.apache.orc + + + orc-mapreduce + org.apache.orc + + + parquet-column + org.apache.parquet + + + parquet-hadoop + org.apache.parquet + + + + org.roaringbitmap RoaringBitmap diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java index abd30b002f..6373d1036e 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java @@ -41,6 +41,20 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; +import org.apache.spark.sql.catalyst.expressions.Alias; +import org.apache.spark.sql.catalyst.expressions.And; +import org.apache.spark.sql.catalyst.expressions.AttributeReference; +import org.apache.spark.sql.catalyst.expressions.EqualTo; +import org.apache.spark.sql.catalyst.expressions.GreaterThan; +import org.apache.spark.sql.catalyst.expressions.LessThan; +import org.apache.spark.sql.catalyst.expressions.Literal; +import org.apache.spark.sql.catalyst.expressions.Or; +import org.apache.spark.sql.catalyst.plans.logical.Filter; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.SparkSqlParser; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,7 +125,80 @@ protected void initEvaluator() { } protected Expression getPartitionFilter() { - return Expressions.alwaysTrue(); + String partitionFilter = config.getPartitionFilter(); + return partitionFilter == null + ? Expressions.alwaysTrue() + : convertSqlToIcebergExpression(partitionFilter); + } + + protected static Expression convertSqlToIcebergExpression(String sql) { + try { + SparkSqlParser parser = new SparkSqlParser(); + LogicalPlan logicalPlan = parser.parsePlan("SELECT * FROM dummy WHERE " + sql); + + Filter filter = (Filter) logicalPlan.children().head(); + org.apache.spark.sql.catalyst.expressions.Expression sparkExpr = filter.condition(); + return convertSparkExpressionToIceberg(sparkExpr); + } catch (Exception e) { + throw new IllegalArgumentException("Failed to parse where condition: " + sql, e); + } + } + + private static Expression convertSparkExpressionToIceberg( + org.apache.spark.sql.catalyst.expressions.Expression sparkExpr) { + if (sparkExpr instanceof EqualTo) { + EqualTo eq = (EqualTo) sparkExpr; + return Expressions.equal(getColumnName(eq.left()), getValue(eq.right())); + } else if (sparkExpr instanceof GreaterThan) { + GreaterThan gt = (GreaterThan) sparkExpr; + return Expressions.greaterThan(getColumnName(gt.left()), getValue(gt.right())); + } else if (sparkExpr instanceof LessThan) { + LessThan lt = (LessThan) sparkExpr; + return Expressions.lessThan(getColumnName(lt.left()), getValue(lt.right())); + } else if (sparkExpr instanceof And) { + And and = (And) sparkExpr; + return Expressions.and( + convertSparkExpressionToIceberg(and.left()), + convertSparkExpressionToIceberg(and.right())); + } else if (sparkExpr instanceof Or) { + Or or = (Or) sparkExpr; + return Expressions.or( + convertSparkExpressionToIceberg(or.left()), convertSparkExpressionToIceberg(or.right())); + } + + throw new UnsupportedOperationException("Unsupported expression: " + sparkExpr); + } + + private static String getColumnName(org.apache.spark.sql.catalyst.expressions.Expression expr) { + if (expr instanceof AttributeReference) { + return ((AttributeReference) expr).name(); + } + + if (expr instanceof Alias) { + return getColumnName(((Alias) expr).child()); + } + + if (expr instanceof UnresolvedAttribute) { + return ((UnresolvedAttribute) expr).name(); + } + throw new IllegalArgumentException("Expected column reference, got: " + expr); + } + + private static Object getValue(org.apache.spark.sql.catalyst.expressions.Expression expr) { + if (expr instanceof Literal) { + return convertLiteral((Literal) expr); + } + + throw new IllegalArgumentException("Expected literal value, got: " + expr); + } + + private static Object convertLiteral(Literal literal) { + if (literal.value() instanceof UTF8String) { + return ((UTF8String) literal.value()).toString(); + } else if (literal.value() instanceof Decimal) { + return ((Decimal) literal.value()).toJavaBigDecimal(); + } + return literal.value(); } private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index 9f2aeca5d4..9bead26961 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -115,6 +115,9 @@ private TableProperties() {} "self-optimizing.full.rewrite-all-files"; public static final boolean SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT = true; + public static final String SELF_OPTIMIZING_PARTITION_FILTER = "self-optimizing.partition-filter"; + public static final String SELF_OPTIMIZING_PARTITION_FILTER_DEFAULT = null; + public static final String SELF_OPTIMIZING_MIN_PLAN_INTERVAL = "self-optimizing.min-plan-interval"; public static final long SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT = 60000; From f8c5734f039cbba31ccfde965f5625e47910d27f Mon Sep 17 00:00:00 2001 From: Darcy Date: Tue, 18 Feb 2025 18:00:36 +0800 Subject: [PATCH 2/3] feature: use jsqlparser replace spark-sql --- amoro-format-iceberg/pom.xml | 33 +-- .../plan/AbstractOptimizingEvaluator.java | 230 +++++++++++++----- .../plan/TestAbstractOptimizingEvaluator.java | 123 ++++++++++ docs/user-guides/configurations.md | 35 +-- 4 files changed, 311 insertions(+), 110 deletions(-) create mode 100644 amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/plan/TestAbstractOptimizingEvaluator.java diff --git a/amoro-format-iceberg/pom.xml b/amoro-format-iceberg/pom.xml index 13fa3a1157..5c758396c1 100644 --- a/amoro-format-iceberg/pom.xml +++ b/amoro-format-iceberg/pom.xml @@ -155,36 +155,9 @@ - org.apache.spark - spark-sql_2.12 - ${terminal.spark.version} - compile - - - hive-storage-api - org.apache.hive - - - jackson-databind - com.fasterxml.jackson.core - - - orc-core - org.apache.orc - - - orc-mapreduce - org.apache.orc - - - parquet-column - org.apache.parquet - - - parquet-hadoop - org.apache.parquet - - + com.github.jsqlparser + jsqlparser + 5.1 diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java index 6373d1036e..0b5d003038 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java @@ -18,6 +18,26 @@ package org.apache.amoro.optimizing.plan; +import net.sf.jsqlparser.expression.BooleanValue; +import net.sf.jsqlparser.expression.CastExpression; +import net.sf.jsqlparser.expression.DoubleValue; +import net.sf.jsqlparser.expression.LongValue; +import net.sf.jsqlparser.expression.NotExpression; +import net.sf.jsqlparser.expression.StringValue; +import net.sf.jsqlparser.expression.operators.conditional.AndExpression; +import net.sf.jsqlparser.expression.operators.conditional.OrExpression; +import net.sf.jsqlparser.expression.operators.relational.EqualsTo; +import net.sf.jsqlparser.expression.operators.relational.ExpressionList; +import net.sf.jsqlparser.expression.operators.relational.GreaterThan; +import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; +import net.sf.jsqlparser.expression.operators.relational.InExpression; +import net.sf.jsqlparser.expression.operators.relational.IsNullExpression; +import net.sf.jsqlparser.expression.operators.relational.MinorThan; +import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; +import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.schema.Column; +import net.sf.jsqlparser.statement.select.PlainSelect; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.config.OptimizingConfig; @@ -39,29 +59,24 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; -import org.apache.spark.sql.catalyst.expressions.Alias; -import org.apache.spark.sql.catalyst.expressions.And; -import org.apache.spark.sql.catalyst.expressions.AttributeReference; -import org.apache.spark.sql.catalyst.expressions.EqualTo; -import org.apache.spark.sql.catalyst.expressions.GreaterThan; -import org.apache.spark.sql.catalyst.expressions.LessThan; -import org.apache.spark.sql.catalyst.expressions.Literal; -import org.apache.spark.sql.catalyst.expressions.Or; -import org.apache.spark.sql.catalyst.plans.logical.Filter; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.execution.SparkSqlParser; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.ZoneOffset; +import java.time.temporal.ChronoField; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -128,77 +143,166 @@ protected Expression getPartitionFilter() { String partitionFilter = config.getPartitionFilter(); return partitionFilter == null ? Expressions.alwaysTrue() - : convertSqlToIcebergExpression(partitionFilter); + : convertSqlToIcebergExpression(partitionFilter, mixedTable.schema().columns()); } - protected static Expression convertSqlToIcebergExpression(String sql) { + protected static Expression convertSqlToIcebergExpression( + String sql, List tableColumns) { try { - SparkSqlParser parser = new SparkSqlParser(); - LogicalPlan logicalPlan = parser.parsePlan("SELECT * FROM dummy WHERE " + sql); - - Filter filter = (Filter) logicalPlan.children().head(); - org.apache.spark.sql.catalyst.expressions.Expression sparkExpr = filter.condition(); - return convertSparkExpressionToIceberg(sparkExpr); + PlainSelect select = (PlainSelect) CCJSqlParserUtil.parse("SELECT * FROM dummy WHERE " + sql); + return convertSparkExpressionToIceberg(select.getWhere(), tableColumns); } catch (Exception e) { throw new IllegalArgumentException("Failed to parse where condition: " + sql, e); } } private static Expression convertSparkExpressionToIceberg( - org.apache.spark.sql.catalyst.expressions.Expression sparkExpr) { - if (sparkExpr instanceof EqualTo) { - EqualTo eq = (EqualTo) sparkExpr; - return Expressions.equal(getColumnName(eq.left()), getValue(eq.right())); - } else if (sparkExpr instanceof GreaterThan) { - GreaterThan gt = (GreaterThan) sparkExpr; - return Expressions.greaterThan(getColumnName(gt.left()), getValue(gt.right())); - } else if (sparkExpr instanceof LessThan) { - LessThan lt = (LessThan) sparkExpr; - return Expressions.lessThan(getColumnName(lt.left()), getValue(lt.right())); - } else if (sparkExpr instanceof And) { - And and = (And) sparkExpr; + net.sf.jsqlparser.expression.Expression whereExpr, List tableColumns) { + if (whereExpr instanceof IsNullExpression) { + IsNullExpression isNull = (IsNullExpression) whereExpr; + Types.NestedField column = getColumn(isNull.getLeftExpression(), tableColumns); + return isNull.isNot() + ? Expressions.notNull(column.name()) + : Expressions.isNull(column.name()); + } else if (whereExpr instanceof EqualsTo) { + EqualsTo eq = (EqualsTo) whereExpr; + Types.NestedField column = getColumn(eq.getLeftExpression(), tableColumns); + return Expressions.equal(column.name(), getValue(eq.getRightExpression(), column)); + } else if (whereExpr instanceof NotEqualsTo) { + NotEqualsTo ne = (NotEqualsTo) whereExpr; + Types.NestedField column = getColumn(ne.getLeftExpression(), tableColumns); + return Expressions.notEqual(column.name(), getValue(ne.getRightExpression(), column)); + } else if (whereExpr instanceof GreaterThan) { + GreaterThan gt = (GreaterThan) whereExpr; + Types.NestedField column = getColumn(gt.getLeftExpression(), tableColumns); + return Expressions.greaterThan(column.name(), getValue(gt.getRightExpression(), column)); + } else if (whereExpr instanceof GreaterThanEquals) { + GreaterThanEquals ge = (GreaterThanEquals) whereExpr; + Types.NestedField column = getColumn(ge.getLeftExpression(), tableColumns); + return Expressions.greaterThanOrEqual( + column.name(), getValue(ge.getRightExpression(), column)); + } else if (whereExpr instanceof MinorThan) { + MinorThan lt = (MinorThan) whereExpr; + Types.NestedField column = getColumn(lt.getLeftExpression(), tableColumns); + return Expressions.lessThan(column.name(), getValue(lt.getRightExpression(), column)); + } else if (whereExpr instanceof MinorThanEquals) { + MinorThanEquals le = (MinorThanEquals) whereExpr; + Types.NestedField column = getColumn(le.getLeftExpression(), tableColumns); + return Expressions.lessThanOrEqual(column.name(), getValue(le.getRightExpression(), column)); + } else if (whereExpr instanceof InExpression) { + InExpression in = (InExpression) whereExpr; + Types.NestedField column = getColumn(in.getLeftExpression(), tableColumns); + net.sf.jsqlparser.expression.Expression rightExpr = in.getRightExpression(); + List values = new ArrayList<>(); + if (rightExpr instanceof ExpressionList) { + for (net.sf.jsqlparser.expression.Expression expr : ((ExpressionList) rightExpr)) { + values.add(getValue(expr, column)); + } + } else { + throw new UnsupportedOperationException("Subquery IN not supported"); + } + return in.isNot() + ? Expressions.notIn(column.name(), values) + : Expressions.in(column.name(), values); + } else if (whereExpr instanceof NotExpression) { + NotExpression not = (NotExpression) whereExpr; + return Expressions.not(convertSparkExpressionToIceberg(not.getExpression(), tableColumns)); + } else if (whereExpr instanceof AndExpression) { + AndExpression and = (AndExpression) whereExpr; return Expressions.and( - convertSparkExpressionToIceberg(and.left()), - convertSparkExpressionToIceberg(and.right())); - } else if (sparkExpr instanceof Or) { - Or or = (Or) sparkExpr; + convertSparkExpressionToIceberg(and.getLeftExpression(), tableColumns), + convertSparkExpressionToIceberg(and.getRightExpression(), tableColumns)); + } else if (whereExpr instanceof OrExpression) { + OrExpression or = (OrExpression) whereExpr; return Expressions.or( - convertSparkExpressionToIceberg(or.left()), convertSparkExpressionToIceberg(or.right())); + convertSparkExpressionToIceberg(or.getLeftExpression(), tableColumns), + convertSparkExpressionToIceberg(or.getRightExpression(), tableColumns)); } - - throw new UnsupportedOperationException("Unsupported expression: " + sparkExpr); + throw new UnsupportedOperationException("Unsupported expression: " + whereExpr); } - private static String getColumnName(org.apache.spark.sql.catalyst.expressions.Expression expr) { - if (expr instanceof AttributeReference) { - return ((AttributeReference) expr).name(); - } - - if (expr instanceof Alias) { - return getColumnName(((Alias) expr).child()); + private static Types.NestedField getColumn( + net.sf.jsqlparser.expression.Expression expr, List tableColumns) { + if (expr instanceof Column) { + String columnName = ((Column) expr).getColumnName(); + Optional column = + tableColumns.stream().filter(c -> c.name().equals(columnName)).findFirst(); + if (column.isPresent()) { + return column.get(); + } + throw new IllegalArgumentException("Column not found: " + columnName); } - if (expr instanceof UnresolvedAttribute) { - return ((UnresolvedAttribute) expr).name(); - } throw new IllegalArgumentException("Expected column reference, got: " + expr); } - private static Object getValue(org.apache.spark.sql.catalyst.expressions.Expression expr) { - if (expr instanceof Literal) { - return convertLiteral((Literal) expr); + private static Object getValue( + net.sf.jsqlparser.expression.Expression expr, Types.NestedField column) { + try { + return convertValue(expr, column); + } catch (Exception e) { + throw new IllegalArgumentException("Failed to convert value: " + expr, e); } - - throw new IllegalArgumentException("Expected literal value, got: " + expr); } - private static Object convertLiteral(Literal literal) { - if (literal.value() instanceof UTF8String) { - return ((UTF8String) literal.value()).toString(); - } else if (literal.value() instanceof Decimal) { - return ((Decimal) literal.value()).toJavaBigDecimal(); - } - return literal.value(); + private static Object convertValue( + net.sf.jsqlparser.expression.Expression expr, Types.NestedField column) { + switch (column.type().typeId()) { + case BOOLEAN: + return ((BooleanValue) expr).getValue(); + case STRING: + return ((StringValue) expr).getValue(); + case INTEGER: + case LONG: + return ((LongValue) expr).getValue(); + case FLOAT: + case DOUBLE: + return ((DoubleValue) expr).getValue(); + case DATE: + String dateStr = null; + if (expr instanceof StringValue) { + dateStr = ((StringValue) expr).getValue(); + } else if (expr instanceof CastExpression + && ((CastExpression) expr).getColDataType().getDataType().equalsIgnoreCase("date")) { + dateStr = ((StringValue) ((CastExpression) expr).getLeftExpression()).getValue(); + } + if (dateStr != null) { + return Date.valueOf(dateStr).toLocalDate().toEpochDay(); + } + break; + case TIME: + String timeStr = null; + if (expr instanceof StringValue) { + timeStr = ((StringValue) expr).getValue(); + } else if (expr instanceof CastExpression + && ((CastExpression) expr).getColDataType().getDataType().equalsIgnoreCase("time")) { + timeStr = ((StringValue) ((CastExpression) expr).getLeftExpression()).getValue(); + } + if (timeStr != null) { + return Time.valueOf(timeStr).toLocalTime().getLong(ChronoField.MICRO_OF_DAY); + } + break; + case TIMESTAMP: + String timestampStr = null; + if (expr instanceof StringValue) { + timestampStr = ((StringValue) expr).getValue(); + } else if (expr instanceof CastExpression + && ((CastExpression) expr) + .getColDataType() + .getDataType() + .equalsIgnoreCase("timestamp")) { + timestampStr = ((StringValue) ((CastExpression) expr).getLeftExpression()).getValue(); + } + if (timestampStr != null) { + return Timestamp.valueOf(timestampStr) + .toLocalDateTime() + .toEpochSecond(ZoneOffset.ofHours(0)) + * 1_000_000L; + } + break; + } + throw new IllegalArgumentException( + expr + " can not be converted to column type: " + column.type()); } private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/plan/TestAbstractOptimizingEvaluator.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/plan/TestAbstractOptimizingEvaluator.java new file mode 100644 index 0000000000..03b42b5606 --- /dev/null +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/plan/TestAbstractOptimizingEvaluator.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizing.plan; + +import static org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator.convertSqlToIcebergExpression; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.ZoneOffset; +import java.time.temporal.ChronoField; +import java.util.ArrayList; +import java.util.List; + +public class TestAbstractOptimizingEvaluator { + @Test + public void testConvertSqlToIcebergExpression() { + List fields = new ArrayList<>(); + fields.add(Types.NestedField.optional(1, "column_a", Types.IntegerType.get())); + assertEqualExpressions( + Expressions.isNull("column_a"), convertSqlToIcebergExpression("column_a IS NULL", fields)); + assertEqualExpressions( + Expressions.notNull("column_a"), + convertSqlToIcebergExpression("column_a IS NOT NULL", fields)); + + testConvertSqlToIcebergExpressionByType("1", "'1'", Types.StringType.get()); + testConvertSqlToIcebergExpressionByType(1, "1", Types.IntegerType.get()); + testConvertSqlToIcebergExpressionByType(1L, "1", Types.LongType.get()); + testConvertSqlToIcebergExpressionByType(1.15f, "1.15", Types.FloatType.get()); + testConvertSqlToIcebergExpressionByType(1.15, "1.15", Types.DoubleType.get()); + testConvertSqlToIcebergExpressionByType(true, "true", Types.BooleanType.get()); + testConvertSqlToIcebergExpressionByType(false, "false", Types.BooleanType.get()); + + long epochDay = Date.valueOf("2022-01-01").toLocalDate().toEpochDay(); + testConvertSqlToIcebergExpressionByType(epochDay, "DATE '2022-01-01'", Types.DateType.get()); + testConvertSqlToIcebergExpressionByType(epochDay, "'2022-01-01'", Types.DateType.get()); + + long microOfDay = Time.valueOf("12:12:12").toLocalTime().getLong(ChronoField.MICRO_OF_DAY); + testConvertSqlToIcebergExpressionByType(microOfDay, "TIME '12:12:12'", Types.TimeType.get()); + testConvertSqlToIcebergExpressionByType(microOfDay, "'12:12:12'", Types.TimeType.get()); + + long epochMicroSecond = + Timestamp.valueOf("2022-01-01 12:12:12") + .toLocalDateTime() + .toEpochSecond(ZoneOffset.ofHours(0)) + * 1_000_000L; + testConvertSqlToIcebergExpressionByType( + epochMicroSecond, "TIMESTAMP '2022-01-01 12:12:12'", Types.TimestampType.withoutZone()); + testConvertSqlToIcebergExpressionByType( + epochMicroSecond, "'2022-01-01 12:12:12'", Types.TimestampType.withoutZone()); + } + + public void testConvertSqlToIcebergExpressionByType(T exprValue, String sqlValue, Type type) { + List fields = new ArrayList<>(); + fields.add(Types.NestedField.optional(1, "column_a", type)); + fields.add(Types.NestedField.optional(2, "column_b", type)); + + assertEqualExpressions( + Expressions.equal("column_a", exprValue), + convertSqlToIcebergExpression("column_a = " + sqlValue, fields)); + assertEqualExpressions( + Expressions.notEqual("column_a", exprValue), + convertSqlToIcebergExpression("column_a != " + sqlValue, fields)); + assertEqualExpressions( + Expressions.greaterThan("column_a", exprValue), + convertSqlToIcebergExpression("column_a > " + sqlValue, fields)); + assertEqualExpressions( + Expressions.greaterThanOrEqual("column_a", exprValue), + convertSqlToIcebergExpression("column_a >= " + sqlValue, fields)); + assertEqualExpressions( + Expressions.lessThan("column_a", exprValue), + convertSqlToIcebergExpression("column_a < " + sqlValue, fields)); + assertEqualExpressions( + Expressions.lessThanOrEqual("column_a", exprValue), + convertSqlToIcebergExpression("column_a <= " + sqlValue, fields)); + assertEqualExpressions( + Expressions.in("column_a", exprValue), + convertSqlToIcebergExpression("column_a IN (" + sqlValue + ")", fields)); + assertEqualExpressions( + Expressions.notIn("column_a", exprValue), + convertSqlToIcebergExpression("column_a NOT IN (" + sqlValue + ")", fields)); + assertEqualExpressions( + Expressions.not(Expressions.equal("column_a", exprValue)), + convertSqlToIcebergExpression("NOT column_a = " + sqlValue, fields)); + assertEqualExpressions( + Expressions.and( + Expressions.equal("column_a", exprValue), Expressions.equal("column_b", exprValue)), + convertSqlToIcebergExpression( + "column_a = " + sqlValue + " AND column_b = " + sqlValue, fields)); + assertEqualExpressions( + Expressions.or( + Expressions.equal("column_a", exprValue), Expressions.equal("column_b", exprValue)), + convertSqlToIcebergExpression( + "column_a = " + sqlValue + " OR column_b = " + sqlValue, fields)); + } + + private void assertEqualExpressions(Expression exp1, Expression exp2) { + Assert.assertEquals(exp1.toString(), exp2.toString()); + } +} diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index 5bcf7afbbd..cdc9f87e76 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -43,23 +43,24 @@ modified through [Alter Table](../using-tables/#modify-table) operations. Self-optimizing configurations are applicable to both Iceberg Format and Mixed streaming Format. -| Key | Default | Description | -|-----------------------------------------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------| -| self-optimizing.enabled | true | Enables Self-optimizing | -| self-optimizing.group | default | Optimizer group for Self-optimizing | -| self-optimizing.quota | 0.1 | Quota for Self-optimizing, indicating the CPU resource the table can take up | -| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | -| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | -| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | -| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | -| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | -| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | -| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | -| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | -| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | -| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | -| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | -| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | +| Key | Default | Description | +|------------------------------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------| +| self-optimizing.enabled | true | Enables Self-optimizing | +| self-optimizing.group | default | Optimizer group for Self-optimizing | +| self-optimizing.quota | 0.1 | Quota for Self-optimizing, indicating the CPU resource the table can take up | +| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | +| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | +| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | +| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | +| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | +| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | +| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | +| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | +| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | +| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | +| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | +| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | +| self-optimizing.partition-filter | null | The filter for Self-optimizing | ## Data-cleaning configurations From 72767042316e9a8d051f99012618a81cdccff90f Mon Sep 17 00:00:00 2001 From: Darcy Date: Tue, 18 Feb 2025 19:48:03 +0800 Subject: [PATCH 3/3] feature: degrade jsqlparser version to 4.5 --- amoro-format-iceberg/pom.xml | 2 +- .../plan/AbstractOptimizingEvaluator.java | 56 +++++++++---------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/amoro-format-iceberg/pom.xml b/amoro-format-iceberg/pom.xml index 5c758396c1..7fda5200aa 100644 --- a/amoro-format-iceberg/pom.xml +++ b/amoro-format-iceberg/pom.xml @@ -157,7 +157,7 @@ com.github.jsqlparser jsqlparser - 5.1 + 4.5 diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java index 0b5d003038..e33066d93d 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java @@ -18,8 +18,7 @@ package org.apache.amoro.optimizing.plan; -import net.sf.jsqlparser.expression.BooleanValue; -import net.sf.jsqlparser.expression.CastExpression; +import net.sf.jsqlparser.expression.DateTimeLiteralExpression; import net.sf.jsqlparser.expression.DoubleValue; import net.sf.jsqlparser.expression.LongValue; import net.sf.jsqlparser.expression.NotExpression; @@ -32,12 +31,14 @@ import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; import net.sf.jsqlparser.expression.operators.relational.InExpression; import net.sf.jsqlparser.expression.operators.relational.IsNullExpression; +import net.sf.jsqlparser.expression.operators.relational.ItemsList; import net.sf.jsqlparser.expression.operators.relational.MinorThan; import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; import net.sf.jsqlparser.parser.CCJSqlParserUtil; import net.sf.jsqlparser.schema.Column; import net.sf.jsqlparser.statement.select.PlainSelect; +import net.sf.jsqlparser.statement.select.Select; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.config.OptimizingConfig; @@ -149,7 +150,8 @@ protected Expression getPartitionFilter() { protected static Expression convertSqlToIcebergExpression( String sql, List tableColumns) { try { - PlainSelect select = (PlainSelect) CCJSqlParserUtil.parse("SELECT * FROM dummy WHERE " + sql); + Select statement = (Select) CCJSqlParserUtil.parse("SELECT * FROM dummy WHERE " + sql); + PlainSelect select = (PlainSelect) statement.getSelectBody(); return convertSparkExpressionToIceberg(select.getWhere(), tableColumns); } catch (Exception e) { throw new IllegalArgumentException("Failed to parse where condition: " + sql, e); @@ -192,10 +194,11 @@ private static Expression convertSparkExpressionToIceberg( } else if (whereExpr instanceof InExpression) { InExpression in = (InExpression) whereExpr; Types.NestedField column = getColumn(in.getLeftExpression(), tableColumns); - net.sf.jsqlparser.expression.Expression rightExpr = in.getRightExpression(); + ItemsList rightItems = in.getRightItemsList(); List values = new ArrayList<>(); - if (rightExpr instanceof ExpressionList) { - for (net.sf.jsqlparser.expression.Expression expr : ((ExpressionList) rightExpr)) { + if (rightItems instanceof ExpressionList) { + for (net.sf.jsqlparser.expression.Expression expr : + ((ExpressionList) rightItems).getExpressions()) { values.add(getValue(expr, column)); } } else { @@ -249,7 +252,7 @@ private static Object convertValue( net.sf.jsqlparser.expression.Expression expr, Types.NestedField column) { switch (column.type().typeId()) { case BOOLEAN: - return ((BooleanValue) expr).getValue(); + return Boolean.valueOf(((Column) expr).getColumnName()); case STRING: return ((StringValue) expr).getValue(); case INTEGER: @@ -259,40 +262,19 @@ private static Object convertValue( case DOUBLE: return ((DoubleValue) expr).getValue(); case DATE: - String dateStr = null; - if (expr instanceof StringValue) { - dateStr = ((StringValue) expr).getValue(); - } else if (expr instanceof CastExpression - && ((CastExpression) expr).getColDataType().getDataType().equalsIgnoreCase("date")) { - dateStr = ((StringValue) ((CastExpression) expr).getLeftExpression()).getValue(); - } + String dateStr = getDateTimeLiteralStr(expr, "date"); if (dateStr != null) { return Date.valueOf(dateStr).toLocalDate().toEpochDay(); } break; case TIME: - String timeStr = null; - if (expr instanceof StringValue) { - timeStr = ((StringValue) expr).getValue(); - } else if (expr instanceof CastExpression - && ((CastExpression) expr).getColDataType().getDataType().equalsIgnoreCase("time")) { - timeStr = ((StringValue) ((CastExpression) expr).getLeftExpression()).getValue(); - } + String timeStr = getDateTimeLiteralStr(expr, "time"); if (timeStr != null) { return Time.valueOf(timeStr).toLocalTime().getLong(ChronoField.MICRO_OF_DAY); } break; case TIMESTAMP: - String timestampStr = null; - if (expr instanceof StringValue) { - timestampStr = ((StringValue) expr).getValue(); - } else if (expr instanceof CastExpression - && ((CastExpression) expr) - .getColDataType() - .getDataType() - .equalsIgnoreCase("timestamp")) { - timestampStr = ((StringValue) ((CastExpression) expr).getLeftExpression()).getValue(); - } + String timestampStr = getDateTimeLiteralStr(expr, "timestamp"); if (timestampStr != null) { return Timestamp.valueOf(timestampStr) .toLocalDateTime() @@ -305,6 +287,18 @@ private static Object convertValue( expr + " can not be converted to column type: " + column.type()); } + private static String getDateTimeLiteralStr( + net.sf.jsqlparser.expression.Expression expr, String type) { + String timestampStr = null; + if (expr instanceof StringValue) { + timestampStr = ((StringValue) expr).getValue(); + } else if (expr instanceof DateTimeLiteralExpression + && ((DateTimeLiteralExpression) expr).getType().name().equalsIgnoreCase(type)) { + timestampStr = ((DateTimeLiteralExpression) expr).getValue().replaceAll("^'(.*)'$", "$1"); + } + return timestampStr; + } + private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { long startTime = System.currentTimeMillis(); long count = 0;