Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3424] Add the self-optimizing.partition-filter parameter #3426

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
properties,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT))
.setPartitionFilter(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.SELF_OPTIMIZING_PARTITION_FILTER,
TableProperties.SELF_OPTIMIZING_PARTITION_FILTER_DEFAULT))
.setBaseHashBucket(
CompatiblePropertyUtil.propertyAsInt(
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class OptimizingConfig {
// self-optimizing.full.rewrite-all-files
private boolean fullRewriteAllFiles;

// self-optimizing.partition-filter
private String partitionFilter;

// base.file-index.hash-bucket
private int baseHashBucket;

Expand Down Expand Up @@ -240,6 +243,15 @@ public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) {
return this;
}

public OptimizingConfig setPartitionFilter(String partitionFilter) {
this.partitionFilter = partitionFilter;
return this;
}

public String getPartitionFilter() {
return partitionFilter;
}

public int getBaseHashBucket() {
return baseHashBucket;
}
Expand Down Expand Up @@ -291,6 +303,7 @@ public boolean equals(Object o) {
&& Double.compare(that.majorDuplicateRatio, majorDuplicateRatio) == 0
&& fullTriggerInterval == that.fullTriggerInterval
&& fullRewriteAllFiles == that.fullRewriteAllFiles
&& Objects.equal(partitionFilter, that.partitionFilter)
&& baseHashBucket == that.baseHashBucket
&& baseRefreshInterval == that.baseRefreshInterval
&& hiveRefreshInterval == that.hiveRefreshInterval
Expand All @@ -317,6 +330,7 @@ public int hashCode() {
majorDuplicateRatio,
fullTriggerInterval,
fullRewriteAllFiles,
partitionFilter,
baseHashBucket,
baseRefreshInterval,
hiveRefreshInterval,
Expand All @@ -341,6 +355,7 @@ public String toString() {
.add("majorDuplicateRatio", majorDuplicateRatio)
.add("fullTriggerInterval", fullTriggerInterval)
.add("fullRewriteAllFiles", fullRewriteAllFiles)
.add("partitionFilter", partitionFilter)
.add("baseHashBucket", baseHashBucket)
.add("baseRefreshInterval", baseRefreshInterval)
.add("hiveRefreshInterval", hiveRefreshInterval)
Expand Down
6 changes: 6 additions & 0 deletions amoro-format-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@
<artifactId>paimon-bundle</artifactId>
</dependency>

<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>4.5</version>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@

package org.apache.amoro.optimizing.plan;

import net.sf.jsqlparser.expression.DateTimeLiteralExpression;
import net.sf.jsqlparser.expression.DoubleValue;
import net.sf.jsqlparser.expression.LongValue;
import net.sf.jsqlparser.expression.NotExpression;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
import net.sf.jsqlparser.expression.operators.relational.InExpression;
import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
import net.sf.jsqlparser.expression.operators.relational.ItemsList;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.config.OptimizingConfig;
Expand All @@ -39,15 +60,24 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.ZoneOffset;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -111,7 +141,162 @@ protected void initEvaluator() {
}

protected Expression getPartitionFilter() {
return Expressions.alwaysTrue();
String partitionFilter = config.getPartitionFilter();
return partitionFilter == null
? Expressions.alwaysTrue()
: convertSqlToIcebergExpression(partitionFilter, mixedTable.schema().columns());
}

protected static Expression convertSqlToIcebergExpression(
String sql, List<Types.NestedField> tableColumns) {
try {
Select statement = (Select) CCJSqlParserUtil.parse("SELECT * FROM dummy WHERE " + sql);
PlainSelect select = (PlainSelect) statement.getSelectBody();
return convertSparkExpressionToIceberg(select.getWhere(), tableColumns);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse where condition: " + sql, e);
}
}

private static Expression convertSparkExpressionToIceberg(
net.sf.jsqlparser.expression.Expression whereExpr, List<Types.NestedField> tableColumns) {
if (whereExpr instanceof IsNullExpression) {
IsNullExpression isNull = (IsNullExpression) whereExpr;
Types.NestedField column = getColumn(isNull.getLeftExpression(), tableColumns);
return isNull.isNot()
? Expressions.notNull(column.name())
: Expressions.isNull(column.name());
} else if (whereExpr instanceof EqualsTo) {
EqualsTo eq = (EqualsTo) whereExpr;
Types.NestedField column = getColumn(eq.getLeftExpression(), tableColumns);
return Expressions.equal(column.name(), getValue(eq.getRightExpression(), column));
} else if (whereExpr instanceof NotEqualsTo) {
NotEqualsTo ne = (NotEqualsTo) whereExpr;
Types.NestedField column = getColumn(ne.getLeftExpression(), tableColumns);
return Expressions.notEqual(column.name(), getValue(ne.getRightExpression(), column));
} else if (whereExpr instanceof GreaterThan) {
GreaterThan gt = (GreaterThan) whereExpr;
Types.NestedField column = getColumn(gt.getLeftExpression(), tableColumns);
return Expressions.greaterThan(column.name(), getValue(gt.getRightExpression(), column));
} else if (whereExpr instanceof GreaterThanEquals) {
GreaterThanEquals ge = (GreaterThanEquals) whereExpr;
Types.NestedField column = getColumn(ge.getLeftExpression(), tableColumns);
return Expressions.greaterThanOrEqual(
column.name(), getValue(ge.getRightExpression(), column));
} else if (whereExpr instanceof MinorThan) {
MinorThan lt = (MinorThan) whereExpr;
Types.NestedField column = getColumn(lt.getLeftExpression(), tableColumns);
return Expressions.lessThan(column.name(), getValue(lt.getRightExpression(), column));
} else if (whereExpr instanceof MinorThanEquals) {
MinorThanEquals le = (MinorThanEquals) whereExpr;
Types.NestedField column = getColumn(le.getLeftExpression(), tableColumns);
return Expressions.lessThanOrEqual(column.name(), getValue(le.getRightExpression(), column));
} else if (whereExpr instanceof InExpression) {
InExpression in = (InExpression) whereExpr;
Types.NestedField column = getColumn(in.getLeftExpression(), tableColumns);
ItemsList rightItems = in.getRightItemsList();
List<Object> values = new ArrayList<>();
if (rightItems instanceof ExpressionList) {
for (net.sf.jsqlparser.expression.Expression expr :
((ExpressionList) rightItems).getExpressions()) {
values.add(getValue(expr, column));
}
} else {
throw new UnsupportedOperationException("Subquery IN not supported");
}
return in.isNot()
? Expressions.notIn(column.name(), values)
: Expressions.in(column.name(), values);
} else if (whereExpr instanceof NotExpression) {
NotExpression not = (NotExpression) whereExpr;
return Expressions.not(convertSparkExpressionToIceberg(not.getExpression(), tableColumns));
} else if (whereExpr instanceof AndExpression) {
AndExpression and = (AndExpression) whereExpr;
return Expressions.and(
convertSparkExpressionToIceberg(and.getLeftExpression(), tableColumns),
convertSparkExpressionToIceberg(and.getRightExpression(), tableColumns));
} else if (whereExpr instanceof OrExpression) {
OrExpression or = (OrExpression) whereExpr;
return Expressions.or(
convertSparkExpressionToIceberg(or.getLeftExpression(), tableColumns),
convertSparkExpressionToIceberg(or.getRightExpression(), tableColumns));
}
throw new UnsupportedOperationException("Unsupported expression: " + whereExpr);
}

private static Types.NestedField getColumn(
net.sf.jsqlparser.expression.Expression expr, List<Types.NestedField> tableColumns) {
if (expr instanceof Column) {
String columnName = ((Column) expr).getColumnName();
Optional<Types.NestedField> column =
tableColumns.stream().filter(c -> c.name().equals(columnName)).findFirst();
if (column.isPresent()) {
return column.get();
}
throw new IllegalArgumentException("Column not found: " + columnName);
}

throw new IllegalArgumentException("Expected column reference, got: " + expr);
}

private static Object getValue(
net.sf.jsqlparser.expression.Expression expr, Types.NestedField column) {
try {
return convertValue(expr, column);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to convert value: " + expr, e);
}
}

private static Object convertValue(
net.sf.jsqlparser.expression.Expression expr, Types.NestedField column) {
switch (column.type().typeId()) {
case BOOLEAN:
return Boolean.valueOf(((Column) expr).getColumnName());
case STRING:
return ((StringValue) expr).getValue();
case INTEGER:
case LONG:
return ((LongValue) expr).getValue();
case FLOAT:
case DOUBLE:
return ((DoubleValue) expr).getValue();
case DATE:
String dateStr = getDateTimeLiteralStr(expr, "date");
if (dateStr != null) {
return Date.valueOf(dateStr).toLocalDate().toEpochDay();
}
break;
case TIME:
String timeStr = getDateTimeLiteralStr(expr, "time");
if (timeStr != null) {
return Time.valueOf(timeStr).toLocalTime().getLong(ChronoField.MICRO_OF_DAY);
}
break;
case TIMESTAMP:
String timestampStr = getDateTimeLiteralStr(expr, "timestamp");
if (timestampStr != null) {
return Timestamp.valueOf(timestampStr)
.toLocalDateTime()
.toEpochSecond(ZoneOffset.ofHours(0))
* 1_000_000L;
}
break;
}
throw new IllegalArgumentException(
expr + " can not be converted to column type: " + column.type());
}

private static String getDateTimeLiteralStr(
net.sf.jsqlparser.expression.Expression expr, String type) {
String timestampStr = null;
if (expr instanceof StringValue) {
timestampStr = ((StringValue) expr).getValue();
} else if (expr instanceof DateTimeLiteralExpression
&& ((DateTimeLiteralExpression) expr).getType().name().equalsIgnoreCase(type)) {
timestampStr = ((DateTimeLiteralExpression) expr).getValue().replaceAll("^'(.*)'$", "$1");
}
return timestampStr;
}

private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ private TableProperties() {}
"self-optimizing.full.rewrite-all-files";
public static final boolean SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT = true;

public static final String SELF_OPTIMIZING_PARTITION_FILTER = "self-optimizing.partition-filter";
public static final String SELF_OPTIMIZING_PARTITION_FILTER_DEFAULT = null;

public static final String SELF_OPTIMIZING_MIN_PLAN_INTERVAL =
"self-optimizing.min-plan-interval";
public static final long SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT = 60000;
Expand Down
Loading
Loading