Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,25 @@ CompletableFuture<Void> createTable(
CompletableFuture<Void> alterTable(
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists);

/**
* Rename the table with the given table path asynchronously.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
*
* <ul>
* <li>{@link TableNotExistException} if the source table does not exist and {@code
* ignoreIfNotExists} is false.
* <li>{@link TableAlreadyExistException} if the target table already exists.
* </ul>
*
* @param fromTablePath The source table path of the table.
* @param toTablePath The target table path of the table.
* @param ignoreIfNotExists Flag to specify behavior when a table with the given name does not
* exist: if set to false, throw a TableNotExistException, if set to true, do nothing.
*/
CompletableFuture<Void> renameTable(
TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists);

/**
* List all partitions in the given table in fluss cluster asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
import org.apache.fluss.rpc.messages.PbPartitionSpec;
import org.apache.fluss.rpc.messages.PbTablePath;
import org.apache.fluss.rpc.messages.RenameTableRequest;
import org.apache.fluss.rpc.messages.TableExistsRequest;
import org.apache.fluss.rpc.messages.TableExistsResponse;
import org.apache.fluss.rpc.protocol.ApiError;
Expand Down Expand Up @@ -253,6 +254,21 @@ public CompletableFuture<Void> alterTable(
return gateway.alterTable(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Void> renameTable(
TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists) {
RenameTableRequest request = new RenameTableRequest();
request.setIgnoreIfNotExists(ignoreIfNotExists)
.setFromTablePath(
new PbTablePath()
.setDatabaseName(fromTablePath.getDatabaseName())
.setTableName(fromTablePath.getTableName()))
.setToTablePath()
.setDatabaseName(toTablePath.getDatabaseName())
.setTableName(toTablePath.getTableName());
return gateway.renameTable(request).thenApply(r -> null);
}

@Override
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
GetTableInfoRequest request = new GetTableInfoRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context c
void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException;

/**
* Rename a table in lake.
*
* @param fromTablePath path of the table to be renamed
* @param toTablePath new path of the table to be renamed
* @param tableDescriptor The descriptor of the table to be renamed
* @param context contextual information needed for rename table
* @throws TableAlreadyExistException if the target table already exists
*/
void renameTable(
TablePath fromTablePath,
TablePath toTablePath,
TableDescriptor tableDescriptor,
Context context)
throws TableAlreadyExistException;

@Override
default void close() throws Exception {
// default do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
}
}

@Override
public void renameTable(
TablePath fromTablePath,
TablePath toTablePath,
TableDescriptor tableDescriptor,
Context context)
throws TableAlreadyExistException {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
inner.renameTable(fromTablePath, toTablePath, tableDescriptor, context);
}
}

@Override
public void close() throws Exception {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,13 @@ public void createTable(
@Override
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {}

@Override
public void renameTable(
TablePath fromTablePath,
TablePath toTablePath,
TableDescriptor tableDescriptor,
Context context)
throws TableAlreadyExistException {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,27 @@ public void dropTable(ObjectPath objectPath, boolean ignoreIfNotExists)
}

@Override
public void renameTable(ObjectPath objectPath, String s, boolean b)
public void renameTable(ObjectPath objectPath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
TablePath fromTablePath = toTablePath(objectPath);
ObjectPath toObjectPath = new ObjectPath(objectPath.getDatabaseName(), newTableName);
TablePath toTablePath = toTablePath(toObjectPath);
try {
admin.renameTable(fromTablePath, toTablePath, ignoreIfNotExists).get();
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
if (isTableNotExist(t)) {
throw new TableNotExistException(getName(), objectPath);
} else if (CatalogExceptionUtils.isTableAlreadyExist(t)) {
throw new TableAlreadyExistException(getName(), toObjectPath);
} else {
throw new CatalogException(
String.format(
"Failed to rename table %s to %s in %s",
objectPath, toObjectPath, getName()),
t);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ void testCreateTable() throws Exception {

checkEqualsRespectSchema((CatalogTable) tableCreated, expectedTable);

assertThatThrownBy(() -> catalog.renameTable(this.tableInDefaultDb, "newName", false))
.isInstanceOf(UnsupportedOperationException.class);

// Test lake table handling - should throw TableNotExistException for non-existent lake
// table
ObjectPath lakePath = new ObjectPath(DEFAULT_DB, "regularTable$lake");
Expand Down Expand Up @@ -892,6 +889,29 @@ void testStatisticsOperations() throws Exception {
catalog.dropTable(tablePath, false);
}

@Test
void testRenameTable() throws Exception {
Map<String, String> options = new HashMap<>();
CatalogTable table = this.newCatalogTable(options);
catalog.createTable(this.tableInDefaultDb, table, false);
assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue();
// rename table name from 't1' to 't2'.
catalog.renameTable(this.tableInDefaultDb, "t2", false);
ObjectPath t2InDefaultDb = new ObjectPath(DEFAULT_DB, "t2");
assertThat(catalog.tableExists(t2InDefaultDb)).isTrue();
assertThat(catalog.tableExists(this.tableInDefaultDb)).isFalse();

// rename an existing table from 't2' to 't3', should throw exception.
ObjectPath t3InDefaultDb = new ObjectPath(DEFAULT_DB, "t3");
catalog.createTable(t3InDefaultDb, table, false);
assertThatThrownBy(() -> catalog.renameTable(t2InDefaultDb, "t3", false))
.isInstanceOf(TableAlreadyExistException.class)
.hasMessage(
String.format(
"Table (or view) %s already exists in Catalog %s.",
t3InDefaultDb, CATALOG_NAME));
}

@Test
void testViewsAndFunctions() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
"Alter table is not supported for Iceberg at the moment");
}

@Override
public void renameTable(
TablePath fromTablePath,
TablePath toTablePath,
TableDescriptor tableDescriptor,
Context context)
throws TableAlreadyExistException {
throw new UnsupportedOperationException(
"Rename table is not supported for Iceberg at the moment");
}

private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
return TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
Expand Down Expand Up @@ -77,6 +78,17 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
"Alter table is not supported for Lance at the moment");
}

@Override
public void renameTable(
TablePath fromTablePath,
TablePath toTablePath,
TableDescriptor tableDescriptor,
Context context)
throws TableAlreadyExistException {
throw new UnsupportedOperationException(
"Rename table is not supported for Lance at the moment");
}

@Override
public void close() throws Exception {
LakeCatalog.super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
// shouldn't happen in normal cases
throw new RuntimeException(
String.format(
"Fail to create table %s in Paimon, because "
"Failed to create table %s in Paimon, because "
+ "Database %s still doesn't exist although create database "
+ "successfully, please try again.",
tablePath, tablePath.getDatabaseName()));
Expand All @@ -116,32 +116,35 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
}
}

@Override
public void renameTable(
TablePath fromTablePath,
TablePath toTablePath,
TableDescriptor tableDescriptor,
Context context)
throws TableAlreadyExistException {
Identifier fromPaimonPath = toPaimon(fromTablePath);
Identifier toPaimonPath = toPaimon(toTablePath);
try {
paimonCatalog.renameTable(fromPaimonPath, toPaimonPath, false);
} catch (Catalog.TableNotExistException e) {
// if the source table not exists, we need to create the target table directly.
createTable(toTablePath, tableDescriptor, context);
} catch (Catalog.TableAlreadyExistException e) {
// if the target table already exists, we need to validate the schema compatibility.
Schema paimonSchema = toPaimonSchema(tableDescriptor);
validateExistingTable(toTablePath, toPaimonPath, paimonSchema, false);
}
}

private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable)
throws Catalog.DatabaseNotExistException {
Identifier paimonPath = toPaimon(tablePath);
try {
// not ignore if table exists
paimonCatalog.createTable(paimonPath, schema, false);
} catch (Catalog.TableAlreadyExistException e) {
try {
Table table = paimonCatalog.getTable(paimonPath);
FileStoreTable fileStoreTable = (FileStoreTable) table;
validatePaimonSchemaCompatible(
paimonPath, fileStoreTable.schema().toSchema(), schema);
// if creating a new fluss table, we should ensure the lake table is empty
if (isCreatingFlussTable) {
checkTableIsEmpty(tablePath, fileStoreTable);
}
} catch (Catalog.TableNotExistException tableNotExistException) {
// shouldn't happen in normal cases
throw new RuntimeException(
String.format(
"Failed to create table %s in Paimon. The table already existed "
+ "during the initial creation attempt, but subsequently "
+ "could not be found when trying to get it. "
+ "Please check whether the Paimon table was manually deleted, and try again.",
tablePath));
}
validateExistingTable(tablePath, paimonPath, schema, isCreatingFlussTable);
}
}

Expand All @@ -163,6 +166,31 @@ private void alterTable(TablePath tablePath, List<SchemaChange> tableChanges)
}
}

private void validateExistingTable(
TablePath tablePath,
Identifier paimonPath,
Schema schema,
boolean isCreatingFlussTable) {
try {
Table table = paimonCatalog.getTable(paimonPath);
FileStoreTable fileStoreTable = (FileStoreTable) table;
validatePaimonSchemaCompatible(paimonPath, fileStoreTable.schema().toSchema(), schema);
// if creating a new fluss table, we should ensure the lake table is empty
if (isCreatingFlussTable) {
checkTableIsEmpty(tablePath, fileStoreTable);
}
} catch (Catalog.TableNotExistException tableNotExistException) {
// shouldn't happen in normal cases
throw new RuntimeException(
String.format(
"Failed to create table %s in Paimon. The table already existed "
+ "during the initial creation attempt, but subsequently "
+ "could not be found when trying to get it. "
+ "Please check whether the Paimon table was manually deleted, and try again.",
tablePath));
}
}

@Override
public void close() {
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
Expand Down
Loading