Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;

Expand All @@ -51,6 +52,7 @@ public class LakehouseConnector
private final LakehouseSessionProperties sessionProperties;
private final LakehouseTableProperties tableProperties;
private final IcebergMaterializedViewProperties materializedViewProperties;
private final Set<TableProcedureMetadata> tableProcedures;

@Inject
public LakehouseConnector(
Expand All @@ -62,7 +64,8 @@ public LakehouseConnector(
LakehouseNodePartitioningProvider nodePartitioningProvider,
LakehouseSessionProperties sessionProperties,
LakehouseTableProperties tableProperties,
IcebergMaterializedViewProperties materializedViewProperties)
IcebergMaterializedViewProperties materializedViewProperties,
Set<TableProcedureMetadata> tableProcedures)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand All @@ -73,6 +76,7 @@ public LakehouseConnector(
this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null");
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null");
this.tableProcedures = requireNonNull(tableProcedures, "tableProcedures is null");
}

@Override
Expand Down Expand Up @@ -148,6 +152,12 @@ public List<PropertyMetadata<?>> getMaterializedViewProperties()
return materializedViewProperties.getMaterializedViewProperties();
}

@Override
public Set<TableProcedureMetadata> getTableProcedures()
{
return tableProcedures;
Comment on lines +156 to +158
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Returning internal Set directly may expose mutability.

Return an unmodifiable view of tableProcedures to safeguard internal state from external changes.

Suggested implementation:

+    @Override
+    public Set<TableProcedureMetadata> getTableProcedures()
+    {
+        return Collections.unmodifiableSet(tableProcedures);
+    }
+
import java.util.Set;
import java.util.Collections;

}

@Override
public void shutdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.hive.HideDeltaLakeTables;
Expand All @@ -24,7 +25,17 @@
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure;
import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure;
import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure;
import io.trino.plugin.iceberg.procedure.OptimizeManifestsTableProcedure;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTableProcedure;
import io.trino.spi.connector.TableProcedureMetadata;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand Down Expand Up @@ -53,6 +64,16 @@ protected void setup(Binder binder)
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();

Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(OptimizeManifestsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RollbackToSnapshotTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON);

binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ protected QueryRunner createQueryRunner()
.addLakehouseProperty("s3.endpoint", hiveMinio.getMinio().getMinioAddress())
.addLakehouseProperty("s3.path-style-access", "true")
.addLakehouseProperty("s3.streaming.part-size", "5MB")
.addLakehouseProperty("iceberg.add-files-procedure.enabled", "true")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.junit.jupiter.api.Test;

import static io.trino.plugin.lakehouse.TableType.DELTA;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -59,4 +60,17 @@ public void testShowCreateTable()
type = 'DELTA'
)\\E""");
}

@Test
public void testOptimize()
{
String tableName = "test_optimize_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar)");
try {
assertThat(query("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")).succeeds();
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
*/
package io.trino.plugin.lakehouse;

import io.trino.Session;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.TestTable;
import org.junit.jupiter.api.Test;

import static io.trino.plugin.lakehouse.TableType.HIVE;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static org.assertj.core.api.Assertions.assertThat;

public class TestLakehouseHiveConnectorSmokeTest
Expand Down Expand Up @@ -66,4 +68,28 @@ comment varchar(152)
type = 'HIVE'
)""");
}

@Test
public void testOptimize()
Comment on lines +72 to +73
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add test for optimize procedure with invalid session property value.

Please add a test case for an invalid session property value to verify the connector's handling of unexpected input.

{
String tableName = "test_optimize_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar)");
try {
assertThat(query("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"))
.failure().hasMessage("OPTIMIZE procedure must be explicitly enabled via non_transactional_optimize_enabled session property");

Session session = optimizeEnabledSession();
assertThat(query(session, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")).succeeds();
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

private Session optimizeEnabledSession()
{
return Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "non_transactional_optimize_enabled", "true")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.junit.jupiter.api.Test;

import static io.trino.plugin.lakehouse.TableType.ICEBERG;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static org.assertj.core.api.Assertions.assertThat;

public class TestLakehouseIcebergConnectorSmokeTest
Expand Down Expand Up @@ -44,4 +45,46 @@ public void testShowCreateTable()
type = 'ICEBERG'
)\\E""");
}

@Test
public void testOptimize()
{
String tableName = "test_optimize_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar)");
try {
assertThat(query("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")).succeeds();

assertThat(query("ALTER TABLE " + tableName + " EXECUTE optimize_manifests")).succeeds();

assertThat(query("ALTER TABLE " + tableName + " EXECUTE drop_extended_stats")).succeeds();

long currentSnapshotId = getCurrentSnapshotId(tableName);
assertThat(currentSnapshotId).isGreaterThan(0);
assertThat(query("ALTER TABLE " + tableName + " EXECUTE rollback_to_snapshot(" + currentSnapshotId + ")")).succeeds();

assertThat(query("ALTER TABLE " + tableName + " EXECUTE expire_snapshots(retention_threshold => '7d')")).succeeds();

assertThat(query("ALTER TABLE " + tableName + " EXECUTE remove_orphan_files(retention_threshold => '7d')")).succeeds();

assertThat(query("ALTER TABLE " + tableName + " EXECUTE add_files(" +
" location => 's3://my-bucket/a/path'," +
" format => 'ORC')"))
.failure().hasMessage("Failed to add files: Failed to list location: s3://my-bucket/a/path");

String tableName2 = "test_optimize2_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName2 + " (key integer, value varchar)");
assertThat(query("ALTER TABLE " + tableName + " EXECUTE add_files_from_table(" +
" schema_name => CURRENT_SCHEMA," +
" table_name => '" + tableName2 + "')"))
.failure().hasMessage("Adding files from non-Hive tables is unsupported");
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

private long getCurrentSnapshotId(String tableName)
{
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
}
}