diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index 1ef5c30ebd..93d15e9d50 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -260,6 +260,25 @@ CompletableFuture createTable( CompletableFuture alterTable( TablePath tablePath, List tableChanges, boolean ignoreIfNotExists); + /** + * Rename the table with the given table path asynchronously. + * + *

The following exceptions can be anticipated when calling {@code get()} on returned future. + * + *

    + *
  • {@link TableNotExistException} if the source table does not exist and {@code + * ignoreIfNotExists} is false. + *
  • {@link TableAlreadyExistException} if the target table already exists. + *
+ * + * @param fromTablePath The source table path of the table. + * @param toTablePath The target table path of the table. + * @param ignoreIfNotExists Flag to specify behavior when a table with the given name does not + * exist: if set to false, throw a TableNotExistException, if set to true, do nothing. + */ + CompletableFuture renameTable( + TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists); + /** * List all partitions in the given table in fluss cluster asynchronously. * diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index 27124c70fd..fc39ec261a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -72,6 +72,7 @@ import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket; import org.apache.fluss.rpc.messages.PbPartitionSpec; import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.messages.RenameTableRequest; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -253,6 +254,21 @@ public CompletableFuture alterTable( return gateway.alterTable(request).thenApply(r -> null); } + @Override + public CompletableFuture renameTable( + TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists) { + RenameTableRequest request = new RenameTableRequest(); + request.setIgnoreIfNotExists(ignoreIfNotExists) + .setFromTablePath( + new PbTablePath() + .setDatabaseName(fromTablePath.getDatabaseName()) + .setTableName(fromTablePath.getTableName())) + .setToTablePath() + .setDatabaseName(toTablePath.getDatabaseName()) + .setTableName(toTablePath.getTableName()); + return gateway.renameTable(request).thenApply(r -> null); + } + @Override public CompletableFuture getTableInfo(TablePath tablePath) { GetTableInfoRequest request = new GetTableInfoRequest(); diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 4cbccb6c1f..c432050ddd 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -57,6 +57,22 @@ void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context c void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException; + /** + * Rename a table in lake. + * + * @param fromTablePath path of the table to be renamed + * @param toTablePath new path of the table to be renamed + * @param tableDescriptor The descriptor of the table to be renamed + * @param context contextual information needed for rename table + * @throws TableAlreadyExistException if the target table already exists + */ + void renameTable( + TablePath fromTablePath, + TablePath toTablePath, + TableDescriptor tableDescriptor, + Context context) + throws TableAlreadyExistException; + @Override default void close() throws Exception { // default do nothing diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java index 37cd17ad7e..f78de1b400 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java @@ -91,6 +91,18 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } } + @Override + public void renameTable( + TablePath fromTablePath, + TablePath toTablePath, + TableDescriptor tableDescriptor, + Context context) + throws TableAlreadyExistException { + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { + inner.renameTable(fromTablePath, toTablePath, tableDescriptor, context); + } + } + @Override public void close() throws Exception { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java index 178ec37af2..e0c27a20ab 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java @@ -153,5 +153,13 @@ public void createTable( @Override public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException {} + + @Override + public void renameTable( + TablePath fromTablePath, + TablePath toTablePath, + TableDescriptor tableDescriptor, + Context context) + throws TableAlreadyExistException {} } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 0627e3f688..be88611c33 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -376,9 +376,27 @@ public void dropTable(ObjectPath objectPath, boolean ignoreIfNotExists) } @Override - public void renameTable(ObjectPath objectPath, String s, boolean b) + public void renameTable(ObjectPath objectPath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); + TablePath fromTablePath = toTablePath(objectPath); + ObjectPath toObjectPath = new ObjectPath(objectPath.getDatabaseName(), newTableName); + TablePath toTablePath = toTablePath(toObjectPath); + try { + admin.renameTable(fromTablePath, toTablePath, ignoreIfNotExists).get(); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isTableNotExist(t)) { + throw new TableNotExistException(getName(), objectPath); + } else if (CatalogExceptionUtils.isTableAlreadyExist(t)) { + throw new TableAlreadyExistException(getName(), toObjectPath); + } else { + throw new CatalogException( + String.format( + "Failed to rename table %s to %s in %s", + objectPath, toObjectPath, getName()), + t); + } + } } @Override diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index abaf8e7e03..125b777663 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -285,9 +285,6 @@ void testCreateTable() throws Exception { checkEqualsRespectSchema((CatalogTable) tableCreated, expectedTable); - assertThatThrownBy(() -> catalog.renameTable(this.tableInDefaultDb, "newName", false)) - .isInstanceOf(UnsupportedOperationException.class); - // Test lake table handling - should throw TableNotExistException for non-existent lake // table ObjectPath lakePath = new ObjectPath(DEFAULT_DB, "regularTable$lake"); @@ -892,6 +889,29 @@ void testStatisticsOperations() throws Exception { catalog.dropTable(tablePath, false); } + @Test + void testRenameTable() throws Exception { + Map options = new HashMap<>(); + CatalogTable table = this.newCatalogTable(options); + catalog.createTable(this.tableInDefaultDb, table, false); + assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue(); + // rename table name from 't1' to 't2'. + catalog.renameTable(this.tableInDefaultDb, "t2", false); + ObjectPath t2InDefaultDb = new ObjectPath(DEFAULT_DB, "t2"); + assertThat(catalog.tableExists(t2InDefaultDb)).isTrue(); + assertThat(catalog.tableExists(this.tableInDefaultDb)).isFalse(); + + // rename an existing table from 't2' to 't3', should throw exception. + ObjectPath t3InDefaultDb = new ObjectPath(DEFAULT_DB, "t3"); + catalog.createTable(t3InDefaultDb, table, false); + assertThatThrownBy(() -> catalog.renameTable(t2InDefaultDb, "t3", false)) + .isInstanceOf(TableAlreadyExistException.class) + .hasMessage( + String.format( + "Table (or view) %s already exists in Catalog %s.", + t3InDefaultDb, CATALOG_NAME)); + } + @Test void testViewsAndFunctions() throws Exception { diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java index 4e90cb7003..aa26d917a7 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java @@ -123,6 +123,17 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont "Alter table is not supported for Iceberg at the moment"); } + @Override + public void renameTable( + TablePath fromTablePath, + TablePath toTablePath, + TableDescriptor tableDescriptor, + Context context) + throws TableAlreadyExistException { + throw new UnsupportedOperationException( + "Rename table is not supported for Iceberg at the moment"); + } + private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) { return TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); } diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java index 600dcbd0d9..4824492ae6 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java @@ -19,6 +19,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.lake.lakestorage.LakeCatalog; import org.apache.fluss.lake.lance.utils.LanceArrowUtils; @@ -77,6 +78,17 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont "Alter table is not supported for Lance at the moment"); } + @Override + public void renameTable( + TablePath fromTablePath, + TablePath toTablePath, + TableDescriptor tableDescriptor, + Context context) + throws TableAlreadyExistException { + throw new UnsupportedOperationException( + "Rename table is not supported for Lance at the moment"); + } + @Override public void close() throws Exception { LakeCatalog.super.close(); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index 42bc01942d..b60e661a98 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -96,7 +96,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co // shouldn't happen in normal cases throw new RuntimeException( String.format( - "Fail to create table %s in Paimon, because " + "Failed to create table %s in Paimon, because " + "Database %s still doesn't exist although create database " + "successfully, please try again.", tablePath, tablePath.getDatabaseName())); @@ -116,6 +116,27 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } } + @Override + public void renameTable( + TablePath fromTablePath, + TablePath toTablePath, + TableDescriptor tableDescriptor, + Context context) + throws TableAlreadyExistException { + Identifier fromPaimonPath = toPaimon(fromTablePath); + Identifier toPaimonPath = toPaimon(toTablePath); + try { + paimonCatalog.renameTable(fromPaimonPath, toPaimonPath, false); + } catch (Catalog.TableNotExistException e) { + // if the source table not exists, we need to create the target table directly. + createTable(toTablePath, tableDescriptor, context); + } catch (Catalog.TableAlreadyExistException e) { + // if the target table already exists, we need to validate the schema compatibility. + Schema paimonSchema = toPaimonSchema(tableDescriptor); + validateExistingTable(toTablePath, toPaimonPath, paimonSchema, false); + } + } + private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable) throws Catalog.DatabaseNotExistException { Identifier paimonPath = toPaimon(tablePath); @@ -123,25 +144,7 @@ private void createTable(TablePath tablePath, Schema schema, boolean isCreatingF // not ignore if table exists paimonCatalog.createTable(paimonPath, schema, false); } catch (Catalog.TableAlreadyExistException e) { - try { - Table table = paimonCatalog.getTable(paimonPath); - FileStoreTable fileStoreTable = (FileStoreTable) table; - validatePaimonSchemaCompatible( - paimonPath, fileStoreTable.schema().toSchema(), schema); - // if creating a new fluss table, we should ensure the lake table is empty - if (isCreatingFlussTable) { - checkTableIsEmpty(tablePath, fileStoreTable); - } - } catch (Catalog.TableNotExistException tableNotExistException) { - // shouldn't happen in normal cases - throw new RuntimeException( - String.format( - "Failed to create table %s in Paimon. The table already existed " - + "during the initial creation attempt, but subsequently " - + "could not be found when trying to get it. " - + "Please check whether the Paimon table was manually deleted, and try again.", - tablePath)); - } + validateExistingTable(tablePath, paimonPath, schema, isCreatingFlussTable); } } @@ -163,6 +166,31 @@ private void alterTable(TablePath tablePath, List tableChanges) } } + private void validateExistingTable( + TablePath tablePath, + Identifier paimonPath, + Schema schema, + boolean isCreatingFlussTable) { + try { + Table table = paimonCatalog.getTable(paimonPath); + FileStoreTable fileStoreTable = (FileStoreTable) table; + validatePaimonSchemaCompatible(paimonPath, fileStoreTable.schema().toSchema(), schema); + // if creating a new fluss table, we should ensure the lake table is empty + if (isCreatingFlussTable) { + checkTableIsEmpty(tablePath, fileStoreTable); + } + } catch (Catalog.TableNotExistException tableNotExistException) { + // shouldn't happen in normal cases + throw new RuntimeException( + String.format( + "Failed to create table %s in Paimon. The table already existed " + + "during the initial creation attempt, but subsequently " + + "could not be found when trying to get it. " + + "Please check whether the Paimon table was manually deleted, and try again.", + tablePath)); + } + } + @Override public void close() { IOUtils.closeQuietly(paimonCatalog, "paimon catalog"); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java index 4c99afdf74..78a4d0d352 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.paimon; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; import org.apache.fluss.metadata.Schema; @@ -26,6 +27,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.types.DataTypes; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; import org.junit.jupiter.api.BeforeEach; @@ -36,6 +38,7 @@ import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit test for {@link PaimonLakeCatalog}. */ @@ -113,7 +116,117 @@ void alterTablePropertiesWithNonExistentTable() { .hasMessage("Table alter_props_db.non_existing_table does not exist."); } + @Test + void testRenameTable() { + String database = "test_rename_table_db"; + String tableName = "test_rename_table_table_t1"; + String renamedTableName = "test_rename_table_table_t2"; + + TablePath fromTablePath = TablePath.of(database, tableName); + TablePath toTablePath = TablePath.of(database, renamedTableName); + + // create source table + createTable(database, tableName); + + // rename table + flussPaimonCatalog.renameTable( + fromTablePath, toTablePath, null, new TestingLakeCatalogContext()); + + Identifier identifier = Identifier.create(database, renamedTableName); + assertThatCode( + () -> { + Table paimonTable = + flussPaimonCatalog.getPaimonCatalog().getTable(identifier); + assertThat(paimonTable.rowType()).isNotNull(); + }) + .doesNotThrowAnyException(); + } + + @Test + void testRenameTableWithSourceTableNotExist() { + String database = "test_rename_table_db"; + String tableName = "test_rename_table_table_t3"; + String renamedTableName = "test_rename_table_table_t4"; + + TablePath fromTablePath = TablePath.of(database, tableName); + TablePath toTablePath = TablePath.of(database, renamedTableName); + + assertThatThrownBy( + () -> + flussPaimonCatalog + .getPaimonCatalog() + .getTable(Identifier.create(database, tableName))) + .isInstanceOf(Catalog.TableNotExistException.class); + // rename table + flussPaimonCatalog.renameTable( + fromTablePath, + toTablePath, + createTableDescriptor(), + new TestingLakeCatalogContext()); + + Identifier identifier = Identifier.create(database, renamedTableName); + + assertThatCode( + () -> { + Table paimonTable = + flussPaimonCatalog.getPaimonCatalog().getTable(identifier); + assertThat(paimonTable.rowType()).isNotNull(); + }) + .doesNotThrowAnyException(); + } + + @Test + void testRenameTableWithTargetTableExists() { + String database = "test_rename_table_db"; + String tableName = "test_rename_table_table_t5"; + String renamedTableName = "test_rename_table_table_t6"; + + TablePath fromTablePath = TablePath.of(database, tableName); + TablePath toTablePath = TablePath.of(database, renamedTableName); + + // create source table + createTable(database, tableName); + + // create target table + createTable(database, renamedTableName); + + // rename table + assertThatCode( + () -> + flussPaimonCatalog.renameTable( + fromTablePath, + toTablePath, + createTableDescriptor(), + new TestingLakeCatalogContext())) + .doesNotThrowAnyException(); + + // rename table with incompatible schema + Schema flussSchema = + Schema.newBuilder() + .column("id1", DataTypes.BIGINT()) + .column("name2", DataTypes.STRING()) + .build(); + + TableDescriptor td = TableDescriptor.builder().schema(flussSchema).distributedBy(3).build(); + assertThatThrownBy( + () -> + flussPaimonCatalog.renameTable( + fromTablePath, + toTablePath, + td, + new TestingLakeCatalogContext())) + .isInstanceOf(TableAlreadyExistException.class); + } + private void createTable(String database, String tableName) { + TableDescriptor td = createTableDescriptor(); + + TablePath tablePath = TablePath.of(database, tableName); + + flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext()); + } + + private TableDescriptor createTableDescriptor() { Schema flussSchema = Schema.newBuilder() .column("id", DataTypes.BIGINT()) @@ -122,14 +235,9 @@ private void createTable(String database, String tableName) { .column("address", DataTypes.STRING()) .build(); - TableDescriptor td = - TableDescriptor.builder() - .schema(flussSchema) - .distributedBy(3) // no bucket key - .build(); - - TablePath tablePath = TablePath.of(database, tableName); - - flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext()); + return TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(3) // no bucket key + .build(); } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java index 7b322ff67e..19aac3d1d2 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java @@ -37,6 +37,8 @@ import org.apache.fluss.rpc.messages.DropPartitionResponse; import org.apache.fluss.rpc.messages.DropTableRequest; import org.apache.fluss.rpc.messages.DropTableResponse; +import org.apache.fluss.rpc.messages.RenameTableRequest; +import org.apache.fluss.rpc.messages.RenameTableResponse; import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.rpc.protocol.RPC; @@ -76,6 +78,14 @@ public interface AdminGateway extends AdminReadOnlyGateway { @RPC(api = ApiKeys.ALTER_TABLE) CompletableFuture alterTable(AlterTableRequest request); + /** + * Rename a table. + * + * @param request Rename table request + */ + @RPC(api = ApiKeys.RENAME_TABLE) + CompletableFuture renameTable(RenameTableRequest request); + /** * Drop a table. * @@ -119,7 +129,4 @@ public interface AdminGateway extends AdminReadOnlyGateway { @RPC(api = ApiKeys.ALTER_CLUSTER_CONFIGS) CompletableFuture alterClusterConfigs( AlterClusterConfigsRequest request); - - // todo: rename table & alter table - } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 8acec7a36d..2f444f39ba 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -74,7 +74,8 @@ public enum ApiKeys { CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE), ALTER_TABLE(1044, 0, 0, PUBLIC), DESCRIBE_CLUSTER_CONFIGS(1045, 0, 0, PUBLIC), - ALTER_CLUSTER_CONFIGS(1046, 0, 0, PUBLIC); + ALTER_CLUSTER_CONFIGS(1046, 0, 0, PUBLIC), + RENAME_TABLE(1047, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index a08eeaba4a..00276bd028 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -122,7 +122,15 @@ message AlterTableRequest { message AlterTableResponse { } +// rename table request and response +message RenameTableRequest { + required PbTablePath from_table_path = 1; + required PbTablePath to_table_path = 2; + required bool ignore_if_not_exists = 3; +} +message RenameTableResponse { +} // get table request and response message GetTableInfoRequest { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 73a7603379..a67e3dad30 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -42,6 +42,7 @@ import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.messages.AdjustIsrRequest; @@ -81,6 +82,8 @@ import org.apache.fluss.rpc.messages.PbAlterConfig; import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable; import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable; +import org.apache.fluss.rpc.messages.RenameTableRequest; +import org.apache.fluss.rpc.messages.RenameTableResponse; import org.apache.fluss.rpc.netty.server.Session; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; @@ -444,6 +447,48 @@ private boolean isDataLakeEnabled(TableDescriptor tableDescriptor) { return Boolean.parseBoolean(dataLakeEnabledValue); } + @Override + public CompletableFuture renameTable(RenameTableRequest request) { + TablePath fromTablePath = toTablePath(request.getFromTablePath()); + fromTablePath.validate(); + TablePath toTablePath = toTablePath(request.getToTablePath()); + toTablePath.validate(); + if (authorizer != null) { + authorizer.authorize( + currentSession(), OperationType.ALTER, Resource.table(fromTablePath)); + authorizer.authorize( + currentSession(), + OperationType.CREATE, + Resource.database(toTablePath.getDatabaseName())); + } + + TableInfo table = metadataManager.getTable(fromTablePath); + TableDescriptor tableDescriptor = table.toTableDescriptor(); + // before rename table in fluss, we may rename in lake + if (isDataLakeEnabled(tableDescriptor)) { + LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer = + lakeCatalogDynamicLoader.getLakeCatalogContainer(); + try { + checkNotNull(lakeCatalogContainer.getLakeCatalog()) + .renameTable( + fromTablePath, + toTablePath, + tableDescriptor, + new DefaultLakeCatalogContext( + true, currentSession().getPrincipal())); + } catch (TableAlreadyExistException e) { + throw new LakeTableAlreadyExistException(e.getMessage(), e); + } + } + + // then rename table; + metadataManager.renameTable( + toTablePath(request.getFromTablePath()), + toTablePath(request.getToTablePath()), + request.isIgnoreIfNotExists()); + return CompletableFuture.completedFuture(new RenameTableResponse()); + } + @Override public CompletableFuture dropTable(DropTableRequest request) { TablePath tablePath = toTablePath(request.getTablePath()); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index a074827d8c..6b6e035ceb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -549,6 +549,25 @@ public void removeSensitiveTableOptions(Map tableLakeOptions) { } } + public void renameTable( + TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException { + if (!tableExists(fromTablePath)) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException("Table " + fromTablePath + " does not exist."); + } + + if (tableExists(toTablePath)) { + throw new TableAlreadyExistException("Table " + toTablePath + " already exists."); + } + + uncheck( + () -> zookeeperClient.renameTable(fromTablePath, toTablePath), + "Failed to rename table. from: " + fromTablePath + ", to: " + toTablePath); + } + public TableInfo getTable(TablePath tablePath) throws TableNotExistException { Optional optionalTable; try { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index 75ca2b7b4b..09653c196c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -90,10 +90,12 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -605,6 +607,13 @@ public boolean tableExist(TablePath tablePath) throws Exception { return stat != null && stat.getDataLength() > 0; } + /** Rename the table in ZK. */ + public void renameTable(TablePath fromTablePath, TablePath toTablePath) throws Exception { + copyPath(TableZNode.path(fromTablePath), TableZNode.path(toTablePath)); + deleteTable(fromTablePath); + LOG.info("Renamed table {} to {}.", fromTablePath, toTablePath); + } + /** Get the partitions of a table in ZK. */ public Set getPartitions(TablePath tablePath) throws Exception { String path = PartitionsZNode.path(tablePath); @@ -1231,6 +1240,33 @@ public void deletePath(String path) throws Exception { } } + /** Copy a path recursively from fromPath to toPath. */ + public void copyPath(String fromPath, String toPath) throws Exception { + Deque> stack = new ArrayDeque<>(); + stack.push(Tuple2.of(fromPath, toPath)); + + while (!stack.isEmpty()) { + Tuple2 zkNode = stack.pop(); + Optional fromStat = getStat(zkNode.f0); + if (!fromStat.isPresent()) { + continue; + } + + byte[] data = zkClient.getData().forPath(zkNode.f0); + + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(zkNode.f1, data); + + List children = zkClient.getChildren().forPath(zkNode.f0); + for (int i = children.size() - 1; i >= 0; i--) { + String child = children.get(i); + stack.push(Tuple2.of(zkNode.f0 + "/" + child, zkNode.f1 + "/" + child)); + } + } + } + public CuratorFramework getCuratorClient() { return zkClient; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index ea20e62f99..e807ea0a95 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -83,6 +83,8 @@ import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.RenameTableRequest; +import org.apache.fluss.rpc.messages.RenameTableResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -150,6 +152,11 @@ public CompletableFuture alterTable(AlterTableRequest reques throw new UnsupportedOperationException(); } + @Override + public CompletableFuture renameTable(RenameTableRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture dropTable(DropTableRequest request) { throw new UnsupportedOperationException(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java index aa030d0256..88da3cc3f3 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java @@ -92,6 +92,16 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont // do nothing } + @Override + public void renameTable( + TablePath fromTablePath, + TablePath toTablePath, + TableDescriptor tableDescriptor, + Context context) + throws TableAlreadyExistException { + // do nothing + } + public TableDescriptor getTable(TablePath tablePath) { return tableByPath.get(tablePath); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index f42d381faf..02bbc827e8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -364,9 +364,17 @@ void testTable() throws Exception { assertThat(optionalTable1.isPresent()).isTrue(); assertThat(optionalTable1.get()).isEqualTo(tableReg1); - // delete table. - zookeeperClient.deleteTable(tablePath1); + // rename table. + TablePath tablePath3 = TablePath.of("db", "tb3"); + assertThat(zookeeperClient.getTable(tablePath3)).isEmpty(); + + zookeeperClient.renameTable(tablePath1, tablePath3); assertThat(zookeeperClient.getTable(tablePath1)).isEmpty(); + assertThat(zookeeperClient.getTable(tablePath3)).isNotEmpty(); + + // delete table. + zookeeperClient.deleteTable(tablePath3); + assertThat(zookeeperClient.getTable(tablePath3)).isEmpty(); } @Test