From ab961c211690b241730a62f5776955bcb580f191 Mon Sep 17 00:00:00 2001 From: shidayang <530847445@qq.com> Date: Fri, 3 Nov 2023 15:04:12 +0800 Subject: [PATCH] [Hotfix]Delete sync tables when drop external catalogs (#2235) --- .../arctic/server/catalog/ServerCatalog.java | 12 ++++++ .../server/table/DefaultTableService.java | 41 ++++++++++++++----- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/catalog/ServerCatalog.java b/ams/server/src/main/java/com/netease/arctic/server/catalog/ServerCatalog.java index d5a2d63d70..6214d41d8e 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/catalog/ServerCatalog.java +++ b/ams/server/src/main/java/com/netease/arctic/server/catalog/ServerCatalog.java @@ -3,6 +3,7 @@ import com.netease.arctic.AmoroTable; import com.netease.arctic.TableIDWithFormat; import com.netease.arctic.ams.api.CatalogMeta; +import com.netease.arctic.server.exception.IllegalMetadataException; import com.netease.arctic.server.persistence.PersistentBase; import com.netease.arctic.server.persistence.mapper.CatalogMetaMapper; @@ -40,4 +41,15 @@ public void updateMetadata(CatalogMeta metadata) { public abstract List listTables(String database); public abstract AmoroTable loadTable(String database, String tableName); + + public void dispose() { + doAsTransaction( + () -> + doAsExisted( + CatalogMetaMapper.class, + mapper -> mapper.deleteCatalog(name()), + () -> + new IllegalMetadataException( + "Catalog " + name() + " has more than one database or table"))); + } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/DefaultTableService.java b/ams/server/src/main/java/com/netease/arctic/server/table/DefaultTableService.java index 7c1ffd7d8f..39ab34bc0d 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/DefaultTableService.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/DefaultTableService.java @@ -140,12 +140,13 @@ private void initServerCatalog(CatalogMeta catalogMeta) { @Override public void dropCatalog(String catalogName) { checkStarted(); - doAsExisted( - CatalogMetaMapper.class, - mapper -> mapper.deleteCatalog(catalogName), - () -> - new IllegalMetadataException( - "Catalog " + catalogName + " has more than one database or table")); + ServerCatalog serverCatalog = getServerCatalog(catalogName); + if (serverCatalog == null) { + throw new ObjectNotExistsException("Catalog " + catalogName); + } + + // TableRuntime cleanup is responsibility by exploreExternalCatalog method + serverCatalog.dispose(); internalCatalogMap.remove(catalogName); externalCatalogMap.remove(catalogName); } @@ -491,8 +492,7 @@ void exploreExternalCatalog() { CompletableFuture.runAsync( () -> { try { - disposeTable( - externalCatalog, serverTableIdentifiers.get(tableIdentity)); + disposeTable(serverTableIdentifiers.get(tableIdentity)); } catch (Exception e) { LOG.error( "TableExplorer dispose table {} error", @@ -511,6 +511,20 @@ void exploreExternalCatalog() { LOG.error("TableExplorer error", e); } } + + // Clear TableRuntime objects that do not correspond to a catalog. + // This scenario is mainly due to the fact that TableRuntime objects were not cleaned up in a + // timely manner during the process of dropping the catalog due to concurrency considerations. + // It is permissible to have some erroneous states in the middle, as long as the final data is + // consistent. + Set catalogNames = + listCatalogMetas().stream().map(CatalogMeta::getCatalogName).collect(Collectors.toSet()); + for (TableRuntime tableRuntime : tableRuntimeMap.values()) { + if (!catalogNames.contains(tableRuntime.getTableIdentifier().getCatalog())) { + disposeTable(tableRuntime.getTableIdentifier()); + } + } + long end = System.currentTimeMillis(); LOG.info("Syncing external catalogs took {} ms.", end - start); } @@ -600,9 +614,14 @@ private void revertTableRuntimeAdded( } } - private void disposeTable( - ExternalCatalog externalCatalog, ServerTableIdentifier tableIdentifier) { - externalCatalog.disposeTable(tableIdentifier.getDatabase(), tableIdentifier.getTableName()); + private void disposeTable(ServerTableIdentifier tableIdentifier) { + doAs( + TableMetaMapper.class, + mapper -> + mapper.deleteTableIdByName( + tableIdentifier.getCatalog(), + tableIdentifier.getDatabase(), + tableIdentifier.getTableName())); Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier)) .ifPresent( tableRuntime -> {