Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Allow setting arbitrary table properties on Iceberg tables #8

Draft
wants to merge 9 commits into
base: iceberg-extra-props
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,11 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
- Comma-separated list of columns to use for Parquet bloom filter. It improves
the performance of queries using Equality and IN predicates when reading
Parquet files. Requires Parquet format. Defaults to `[]`.
* - ``extra_properties``
- Additional properties added to an Iceberg table. The properties are not
used by Trino, and are available in the ``$properties`` metadata table.
The properties are not included in the output of ``SHOW CREATE TABLE``
statements.
:::

The table definition below specifies to use Parquet files, partitioning by columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,10 @@
import static io.trino.plugin.iceberg.IcebergTableName.isIcebergTableName;
import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage;
import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom;
import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ILLEGAL_EXTRA_PROPERTIES;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
Expand Down Expand Up @@ -364,7 +366,7 @@ public class IcebergMetadata
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final String RETENTION_THRESHOLD = "retention_threshold";
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY, SORTED_BY_PROPERTY);
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY, SORTED_BY_PROPERTY, EXTRA_PROPERTIES);

public static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES";
private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME);
Expand Down Expand Up @@ -2125,6 +2127,18 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
}
}

if (properties.containsKey(EXTRA_PROPERTIES)) {
Map<String, String> extraProperties = (Map<String, String>) properties.get(EXTRA_PROPERTIES)
.orElseThrow(() -> new IllegalArgumentException("extra_properties property cannot be empty"));

Set<String> illegalExtraProperties = Sets.intersection(ILLEGAL_EXTRA_PROPERTIES, extraProperties.keySet());
if (!illegalExtraProperties.isEmpty()) {
throw new TrinoException(
INVALID_TABLE_PROPERTY,
"Illegal keys in extra_properties: " + illegalExtraProperties);
}
extraProperties.forEach(updateProperties::set);
}
commitTransaction(transaction, "set table properties");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.spi.TrinoException;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.TypeManager;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
Expand All @@ -46,13 +50,23 @@ public class IcebergTableProperties
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
public static final String EXTRA_PROPERTIES = "extra_properties";
public static final Set<String> ILLEGAL_EXTRA_PROPERTIES = ImmutableSet.of(
FILE_FORMAT_PROPERTY,
PARTITIONING_PROPERTY,
SORTED_BY_PROPERTY,
LOCATION_PROPERTY,
FORMAT_VERSION_PROPERTY,
ORC_BLOOM_FILTER_COLUMNS,
ORC_BLOOM_FILTER_FPP);

private final List<PropertyMetadata<?>> tableProperties;

@Inject
public IcebergTableProperties(
IcebergConfig icebergConfig,
OrcWriterConfig orcWriterConfig)
OrcWriterConfig orcWriterConfig,
TypeManager typeManager)
{
tableProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(enumProperty(
Expand Down Expand Up @@ -120,6 +134,24 @@ public IcebergTableProperties(
.map(name -> name.toLowerCase(ENGLISH))
.collect(toImmutableList()),
value -> value))
.add(new PropertyMetadata<>(
EXTRA_PROPERTIES,
"Extra table properties",
new MapType(VARCHAR, VARCHAR, typeManager.getTypeOperators()),
Map.class,
null,
true, // These properties are not listed in SHOW CREATE TABLE
value -> {
Map<String, String> extraProperties = (Map<String, String>) value;
if (extraProperties.containsValue(null)) {
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Extra table property value cannot be null '%s'", extraProperties));
}
if (extraProperties.containsKey(null)) {
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Extra table property key cannot be null '%s'", extraProperties));
}
return extraProperties;
},
value -> value))
.build();
}

Expand Down Expand Up @@ -188,4 +220,9 @@ public static List<String> getParquetBloomFilterColumns(Map<String, Object> tabl
List<String> parquetBloomFilterColumns = (List<String>) tableProperties.get(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY);
return parquetBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(parquetBloomFilterColumns);
}

public static Optional<Map<String, String>> getExtraProperties(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Map<String, String>) tableProperties.get(EXTRA_PROPERTIES));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,16 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ILLEGAL_EXTRA_PROPERTIES;
import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getExtraProperties;
import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterColumns;
import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterFpp;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getSortOrder;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
Expand Down Expand Up @@ -832,7 +836,25 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}
return propertiesBuilder.buildOrThrow();

Map<String, String> baseProperties = propertiesBuilder.buildOrThrow();

// Add properties set via "extra_properties" table property.
Map<String, String> extraProperties = getExtraProperties(tableMetadata.getProperties())
.orElseGet(ImmutableMap::of);
Set<String> illegalExtraProperties = Sets.intersection(ILLEGAL_EXTRA_PROPERTIES, extraProperties.keySet());
if (!illegalExtraProperties.isEmpty()) {
throw new TrinoException(
INVALID_TABLE_PROPERTY,
"Illegal keys in extra_properties: " + illegalExtraProperties);
}

Map<String, String> properties = ImmutableMap.<String, String>builder()
.putAll(baseProperties)
.putAll(extraProperties)
.buildOrThrow();

return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, targetPath, properties);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryFailedException;
import io.trino.testing.QueryRunner;
import io.trino.testing.QueryRunner.MaterializedResultWithPlan;
import io.trino.testing.TestingConnectorBehavior;
Expand All @@ -62,6 +63,7 @@
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.util.JsonUtil;
import org.assertj.core.api.Condition;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -8577,4 +8579,105 @@ private void assertQueryIdStored(String tableName, QueryId queryId)
assertThat(getFieldFromLatestSnapshotSummary(tableName, TRINO_QUERY_ID_NAME))
.isEqualTo(queryId.toString());
}

@Test
public void testExtraProperties()
{
String tableName = format("%s.%s.create_table_with_multiple_extra_properties_%s", getSession().getCatalog().get(), getSession().getSchema().get(), randomNameSuffix());
assertUpdate("CREATE TABLE %s (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property.one', 'extra.property.two'], ARRAY['one', 'two']))".formatted(tableName));

assertQuery(
"SELECT \"extra.property.one\", \"extra.property.two\" FROM \"%s$properties\"".formatted(tableName),
"SELECT 'one', 'two'");

// Assert that SHOW CREATE TABLE does not contain extra_properties
assertThat((String) computeActual("SHOW CREATE TABLE %s".formatted(tableName)).getOnlyValue())
.satisfies(new Condition<>(
queryResult -> queryResult.contains("extra_properties"), "noExtraProperties"
));

assertUpdate("DROP TABLE %s".formatted(tableName));
}

@Test
public void testExtraPropertiesWithCtas()
{
String tableName = format("%s.%s.create_table_ctas_with_multiple_extra_properties_%s", getSession().getCatalog().get(), getSession().getSchema().get(), randomNameSuffix());
assertUpdate("CREATE TABLE %s (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property.one', 'extra.property.two'], ARRAY['one', 'two']))".formatted(tableName));

assertQuery(
"SELECT \"extra.property.one\", \"extra.property.two\" FROM \"%s$properties\"".formatted(tableName),
"SELECT 'one', 'two'");

// Assert that SHOW CREATE TABLE does not contain extra_properties
assertThat((String) computeActual("SHOW CREATE TABLE %s".formatted(tableName)).getOnlyValue())
.satisfies(new Condition<>(
queryResult -> queryResult.contains("extra_properties"), "noExtraProperties"
));

assertUpdate("DROP TABLE %s".formatted(tableName));
}

@Test
public void testShowCreateWithExtraProperties()
{
String tableName = format("%s.%s.show_create_table_with_extra_properties_%s", getSession().getCatalog().get(), getSession().getSchema().get(), randomNameSuffix());
assertUpdate("CREATE TABLE %s (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property.one', 'extra.property.two'], ARRAY['one', 'two']))".formatted(tableName));

// Assert that SHOW CREATE TABLE does not contain extra_properties
assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue())
.satisfies(new Condition<>(
queryResult -> queryResult.contains("extra_properties"), "noExtraProperties"
));

assertUpdate("DROP TABLE %s".formatted(tableName));
}

@Test
public void testDuplicateExtraProperties()
{
assertQueryFails(
"CREATE TABLE create_table_with_duplicate_extra_properties (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property', 'extra.property'], ARRAY['true', 'false']))",
"Invalid value for catalog 'iceberg' table property 'extra_properties': Cannot convert.*");
assertQueryFails(
"CREATE TABLE create_table_select_as_with_duplicate_extra_properties (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property', 'extra.property'], ARRAY['true', 'false']))",
"Invalid value for catalog 'iceberg' table property 'extra_properties': Cannot convert.*");
}

@Test
public void testOverwriteExistingPropertyWithExtraProperties()
{
assertThatThrownBy(() -> assertUpdate("CREATE TABLE create_table_with_overwrite_extra_properties (c1 integer) WITH (extra_properties = MAP(ARRAY['write.format.default'], ARRAY['foobar']))"))
.isInstanceOf(QueryFailedException.class)
.hasMessage("Illegal keys in extra_properties: [write.format.default]");

assertThatThrownBy(() -> assertUpdate("CREATE TABLE create_table_as_select_with_extra_properties WITH (extra_properties = MAP(ARRAY['format-version'], ARRAY['10'])) AS SELECT 1 as c1"))
.isInstanceOf(QueryFailedException.class)
.hasMessage("Illegal keys in extra_properties: [format-version]");
}

@Test
public void testNullExtraProperty()
{
assertQueryFails(
"CREATE TABLE create_table_with_duplicate_extra_properties (c1 integer) WITH (extra_properties = MAP(ARRAY['null.property'], ARRAY[null]))",
".*Extra table property value cannot be null '\\{null.property=null}'.*");
assertQueryFails(
"CREATE TABLE create_table_as_select_with_extra_properties WITH (extra_properties = MAP(ARRAY['null.property'], ARRAY[null])) AS SELECT 1 as c1",
".*Extra table property value cannot be null '\\{null.property=null}'.*");
}

@Test
public void testCollidingMixedCaseProperty()
{
String tableName = "create_table_with_mixed_case_extra_properties" + randomNameSuffix();

assertUpdate("CREATE TABLE %s (c1 integer) WITH (extra_properties = MAP(ARRAY['one', 'ONE'], ARRAY['one', 'ONE']))".formatted(tableName));
// TODO: (https://github.com/trinodb/trino/issues/17) This should run successfully
assertThatThrownBy(() -> query("SELECT * FROM \"%s$properties\"".formatted(tableName)))
.isInstanceOf(QueryFailedException.class)
.hasMessageContaining("Multiple entries with same key: one=one and one=one");

assertUpdate("DROP TABLE %s".formatted(tableName));
}
}
Loading