diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index a7ae6d96dc..41bcffa5ce 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -99,13 +99,22 @@ public void onTableCreated(InternalCatalog catalog, ServerTableIdentifier identi @Override public void onTableDropped(InternalCatalog catalog, ServerTableIdentifier identifier) { - Optional.ofNullable(tableRuntimeMap.remove(identifier.getId())) + Optional.ofNullable(tableRuntimeMap.get(identifier.getId())) .ifPresent( tableRuntime -> { - if (headHandler != null) { - headHandler.fireTableRemoved(tableRuntime); + try { + if (headHandler != null) { + headHandler.fireTableRemoved(tableRuntime); + } + tableRuntime.dispose(); + tableRuntimeMap.remove( + identifier.getId()); // remove only after successful operation + } catch (Exception e) { + LOG.error( + "Error occurred while removing tableRuntime of table {}", + identifier.getId(), + e); } - tableRuntime.dispose(); }); } @@ -255,7 +264,8 @@ void exploreTableRuntimes() { LOG.info("Syncing external catalogs took {} ms.", end - start); } - private void exploreExternalCatalog(ExternalCatalog externalCatalog) { + @VisibleForTesting + public void exploreExternalCatalog(ExternalCatalog externalCatalog) { final List>> tableIdentifiersFutures = Lists.newArrayList(); externalCatalog @@ -458,6 +468,23 @@ private void revertTableRuntimeAdded( } private void disposeTable(ServerTableIdentifier tableIdentifier) { + // Here, we first remove the tableRuntime before removing the tableIdentifier. This follows the + // reverse process of the syncTable() method, where we first add the tableIdentifier and then + // add the tableRuntime. + Optional.ofNullable(tableRuntimeMap.get(tableIdentifier.getId())) + .ifPresent( + tableRuntime -> { + try { + if (headHandler != null) { + headHandler.fireTableRemoved(tableRuntime); + } + tableRuntime.dispose(); + tableRuntimeMap.remove( + tableIdentifier.getId()); // remove only after successful operation + } catch (Exception e) { + LOG.error("Error occurred while disposing table {}", tableIdentifier, e); + } + }); doAs( TableMetaMapper.class, mapper -> @@ -465,14 +492,6 @@ private void disposeTable(ServerTableIdentifier tableIdentifier) { tableIdentifier.getCatalog(), tableIdentifier.getDatabase(), tableIdentifier.getTableName())); - Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier.getId())) - .ifPresent( - tableRuntime -> { - if (headHandler != null) { - headHandler.fireTableRemoved(tableRuntime); - } - tableRuntime.dispose(); - }); } private static class TableIdentity { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java index f6733af838..2bd5ae30cc 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -174,6 +174,9 @@ public void registerMetric(MetricRegistry metricRegistry) { } public void dispose() { + tableSummaryMetrics.unregister(); + orphanFilesCleaningMetrics.unregister(); + optimizingMetrics.unregister(); tableLock.lock(); try { doAsTransaction( @@ -185,9 +188,6 @@ public void dispose() { } finally { tableLock.unlock(); } - optimizingMetrics.unregister(); - orphanFilesCleaningMetrics.unregister(); - tableSummaryMetrics.unregister(); } public void beginPlanning() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java index e89567c9c1..d12ba1607a 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java @@ -59,6 +59,10 @@ public static void disposeTableService() { EventsManager.dispose(); } + protected DefaultCatalogManager catalogManager() { + return CATALOG_MANAGER; + } + protected TableManager tableManager() { return TABLE_MANAGER; } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java index 262f0176f2..352f0280e0 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java @@ -249,6 +249,17 @@ protected void dropTable() { tableService().exploreTableRuntimes(); } + protected void dropTableOnly() { + if (externalCatalog == null) { + mixedTables.dropTableByMeta(tableMeta, true); + tableManager().dropTableMetadata(tableMeta.getTableIdentifier(), true); + } else { + String database = tableTestHelper.id().getDatabase(); + String table = tableTestHelper.id().getTableName(); + externalCatalog.dropTable(database, table, true); + } + } + protected CatalogTestHelper catalogTestHelper() { return catalogTestHelper; } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java new file mode 100644 index 0000000000..a5e8b10532 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import static org.apache.amoro.TableTestHelper.TEST_DB_NAME; +import static org.apache.amoro.TableTestHelper.TEST_TABLE_NAME; +import static org.apache.amoro.catalog.CatalogTestHelper.TEST_CATALOG_NAME; + +import org.apache.amoro.BasicTableTestHelper; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableIDWithFormat; +import org.apache.amoro.TableTestHelper; +import org.apache.amoro.TestedCatalogs; +import org.apache.amoro.api.TableIdentifier; +import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; +import org.apache.amoro.hive.catalog.HiveTableTestHelper; +import org.apache.amoro.server.catalog.ExternalCatalog; +import org.apache.amoro.server.catalog.ServerCatalog; +import org.apache.amoro.server.manager.MetricManager; +import org.apache.amoro.server.metrics.MetricRegistry; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; +import org.apache.amoro.server.persistence.mapper.TableMetaMapper; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class TestSyncTableOfExternalCatalog extends AMSTableTestBase { + + @Parameterized.Parameters(name = "{0}, {1}") + public static Object[] parameters() { + return new Object[][] { + {TestedCatalogs.hadoopCatalog(TableFormat.ICEBERG), new BasicTableTestHelper(true, true)}, + { + new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(true, true) + } + }; + } + + public TestSyncTableOfExternalCatalog( + CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { + super(catalogTestHelper, tableTestHelper); + } + + private final Persistency persistency = new Persistency(); + + private ExternalCatalog initExternalCatalog() { + ServerCatalog serverCatalog = catalogManager().getServerCatalog(TEST_CATALOG_NAME); + return catalogTestHelper().isInternalCatalog() ? null : (ExternalCatalog) serverCatalog; + } + + /** Test synchronization after creating a table. */ + @Test + public void testSynchronizationAfterCreateTable() { + // create table and sync table to Amoro server catalog + createTable(); + + // test list tables + List tableRuntimeMetaListAfterAddTable = persistency.getTableRuntimeMetas(); + List tableRuntimeMetaListForOptimizerGroupAfterAddTable = + persistency.getTableRuntimeMetasForOptimizerGroup(defaultResourceGroup().getName()); + Assert.assertEquals( + tableRuntimeMetaListForOptimizerGroupAfterAddTable.size(), + tableRuntimeMetaListAfterAddTable.size()); + + dropTable(); + dropDatabase(); + } + + /** Test synchronization after dropping a table. */ + @Test + public void testSynchronizationAfterDropTable() { + createTable(); + // drop table and sync table to Amoro server catalog + dropTable(); + + // test list tables + List tableRuntimeMetaListAfterDropTable = persistency.getTableRuntimeMetas(); + List tableRuntimeMetaListForOptimizerGroupAfterDropTable = + persistency.getTableRuntimeMetasForOptimizerGroup(defaultResourceGroup().getName()); + Assert.assertEquals( + tableRuntimeMetaListForOptimizerGroupAfterDropTable.size(), + tableRuntimeMetaListAfterDropTable.size()); + + dropDatabase(); + } + + /** + * Test synchronization in the anomaly scenario where the tableIdentifier is remained in + * persistence but iceberg table does not exist. + */ + @Test + public void testSynchronizationWithLegacyTableIdentifierAndNonExistingIcebergTable() { + // Simulate the anomaly scenario by only adding the table identifier in the persistent table + createDatabase(); + ExternalCatalog externalCatalog = initExternalCatalog(); + + persistency.addTableIdentifier( + TEST_CATALOG_NAME, TEST_DB_NAME, TEST_TABLE_NAME, TableFormat.ICEBERG); + + // Verify that the table not exists + List tableIdentifierList = + catalogManager().getServerCatalog(TEST_CATALOG_NAME).listTables(TEST_DB_NAME); + Assert.assertEquals(0, tableIdentifierList.size()); + + // Verify that the tables do not match exactly in tableIdentifier and tableRuntime + List tableRuntimeMetaList = persistency.getTableRuntimeMetas(); + List serverTableIdentifierList = tableManager().listManagedTables(); + Assert.assertEquals(0, tableRuntimeMetaList.size()); + Assert.assertEquals(1, serverTableIdentifierList.size()); + + // Verify that the synchronization works by firing exploreExternalCatalog + tableService().exploreExternalCatalog(externalCatalog); + + List tableRuntimeMetaListAfterSync = persistency.getTableRuntimeMetas(); + List serverTableIdentifierListAfterSync = + tableManager().listManagedTables(); + Assert.assertEquals(0, tableRuntimeMetaListAfterSync.size()); + Assert.assertEquals(0, serverTableIdentifierListAfterSync.size()); + + dropDatabase(); + } + + /** + * Test synchronization in the anomaly scenario where the tableRuntime is remained but iceberg + * table does not exist. + * + *

NOTE: exploreExternalCatalog does not work. + */ + @Test + public void testSynchronizationWithLegacyTableRuntimeAndNonExistingIcebergTable() { + ExternalCatalog externalCatalog = initExternalCatalog(); + // Simulate the anomaly scenario by only removing the table runtime in the persistent table + // after dropping table + createTable(); + dropTableOnly(); + persistency.deleteTableIdentifier( + serverTableIdentifier().getIdentifier().buildTableIdentifier()); + + // Verify that the table is dropped + List tableIdentifierList = + catalogManager().getServerCatalog(TEST_CATALOG_NAME).listTables(TEST_DB_NAME); + Assert.assertEquals(0, tableIdentifierList.size()); + + // Verify that the tables do not match exactly in tableIdentifier and tableRuntime + List serverTableIdentifierList = tableManager().listManagedTables(); + List tableRuntimeMetaListForOptimizerGroup = + persistency.getTableRuntimeMetasForOptimizerGroup(defaultResourceGroup().getName()); + Assert.assertEquals(0, serverTableIdentifierList.size()); + Assert.assertEquals(1, tableRuntimeMetaListForOptimizerGroup.size()); + + // Verify that the synchronization does not work by firing exploreExternalCatalog + tableService().exploreExternalCatalog(externalCatalog); + + List serverTableIdentifierListAfterSync = + tableManager().listManagedTables(); + List tableRuntimeMetaListForOptimizerGroupAfterSync = + persistency.getTableRuntimeMetasForOptimizerGroup(defaultResourceGroup().getName()); + Assert.assertEquals(0, serverTableIdentifierListAfterSync.size()); + Assert.assertEquals(1, tableRuntimeMetaListForOptimizerGroupAfterSync.size()); + + // The existed tableRuntime will prevent the table from being added again + Assert.assertThrows(IndexOutOfBoundsException.class, this::createTable); + + MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry(); + globalRegistry.getMetrics().keySet().forEach(globalRegistry::unregister); + persistency.deleteTableRuntime(serverTableIdentifier().getId()); + dropTable(); + dropDatabase(); + } + + /** + * Test synchronization in the anomaly scenario where the tableIdentifier is remained in + * persistence and the tableRuntime is remained in memory but iceberg table does not exist. + */ + @Test + public void + testSynchronizationWithLegacyTableIdentifierAndLegacyTableRuntimeAndNonExistingIcebergTable() { + ExternalCatalog externalCatalog = initExternalCatalog(); + // Simulate the anomaly scenario by only removing the table runtime in the persistent table + // after dropping table + createTable(); + dropTableOnly(); + List tableRuntimeMetaListForOptimizerGroup = + persistency.getTableRuntimeMetasForOptimizerGroup(defaultResourceGroup().getName()); + persistency.deleteTableRuntime(tableRuntimeMetaListForOptimizerGroup.get(0).getTableId()); + + // Verify that the table is dropped + List tableIdentifierList = + catalogManager().getServerCatalog(TEST_CATALOG_NAME).listTables(TEST_DB_NAME); + Assert.assertEquals(0, tableIdentifierList.size()); + + // Verify that the tables do not match exactly in tableIdentifier and tableRuntime + List tableRuntimeMetaList = persistency.getTableRuntimeMetas(); + List serverTableIdentifierList = tableManager().listManagedTables(); + Assert.assertEquals(1, serverTableIdentifierList.size()); + Assert.assertEquals(0, tableRuntimeMetaList.size()); + + // Verify that the synchronization works by firing exploreExternalCatalog + tableService().exploreExternalCatalog(externalCatalog); + + List tableRuntimeMetaListAfterSync = persistency.getTableRuntimeMetas(); + List serverTableIdentifierListAfterSync = + tableManager().listManagedTables(); + Assert.assertEquals(0, serverTableIdentifierListAfterSync.size()); + Assert.assertEquals(0, tableRuntimeMetaListAfterSync.size()); + + // Verify that recreating the table works + createTable(); + tableRuntimeMetaListAfterSync = persistency.getTableRuntimeMetas(); + serverTableIdentifierListAfterSync = tableManager().listManagedTables(); + Assert.assertEquals(1, serverTableIdentifierListAfterSync.size()); + Assert.assertEquals(1, tableRuntimeMetaListAfterSync.size()); + + dropTable(); + dropDatabase(); + } + + private static class Persistency extends PersistentBase { + public void addTableIdentifier( + String catalog, String database, String tableName, TableFormat format) { + ServerTableIdentifier tableIdentifier = + ServerTableIdentifier.of(catalog, database, tableName, format); + doAs(TableMetaMapper.class, mapper -> mapper.insertTable(tableIdentifier)); + } + + public void deleteTableIdentifier(TableIdentifier tableIdentifier) { + doAs( + TableMetaMapper.class, + mapper -> + mapper.deleteTableIdByName( + tableIdentifier.getCatalog(), + tableIdentifier.getDatabase(), + tableIdentifier.getTableName())); + } + + public void deleteTableRuntime(Long tableId) { + doAs(TableMetaMapper.class, mapper -> mapper.deleteOptimizingRuntime(tableId)); + } + + public List getTableRuntimeMetasForOptimizerGroup(String optimizerGroup) { + return getAs( + TableMetaMapper.class, + mapper -> mapper.selectTableRuntimesForOptimizerGroup(optimizerGroup, null, null, null)); + } + + public List getTableRuntimeMetas() { + return getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); + } + } +}