diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index 2eab3889436..69df4d7e36f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -71,6 +71,16 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository void deletePageStore(SimpleString storeName) throws Exception; + /** + * Clean up orphaned paging stores for addresses that no longer exist. + * This is useful for removing stale stores that may accumulate over time. + * + * @return the number of stores that were removed + */ + default int cleanupOrphanedPageStores() { + return 0; + } + void processReload() throws Exception; void disableCleanup(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 4ce1cb3b3fc..22cf45933ce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -422,6 +422,50 @@ public void deletePageStore(final SimpleString storeName) throws Exception { } } + @Override + public int cleanupOrphanedPageStores() { + if (server == null) { + logger.warn("Server reference not available, cannot cleanup orphaned page stores"); + return 0; + } + + logger.debug("Starting orphaned paging store cleanup check. Total stores: {}", stores.size()); + + int removed = 0; + syncLock.readLock().lock(); + try { + // Get a snapshot of current store names + Set storeNames = new HashSet<>(stores.keySet()); + + for (SimpleString storeName : storeNames) { + // Check if the address still exists + if (server.getAddressInfo(storeName) == null) { + try { + logger.debug("Removing orphaned paging store for: {}", storeName); + // Remove directly from stores map - we already hold the read lock + PagingStore store = stores.remove(CompositeAddress.extractAddressName(storeName)); + if (store != null) { + store.destroy(); + removed++; + } + } catch (Exception e) { + logger.warn("Error removing orphaned paging store for {}: {}", storeName, e.getMessage(), e); + } + } + } + + if (removed > 0) { + logger.info("Cleaned up {} orphaned paging store(s)", removed); + } else { + logger.debug("Finished orphaned paging store cleanup check. No orphaned stores found."); + } + } finally { + syncLock.readLock().unlock(); + } + + return removed; + } + /** * This method creates a new store if not exist. */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 17a055bc396..79ec46d2dde 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -83,6 +83,7 @@ import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.impl.AckReason; @@ -903,6 +904,9 @@ public AddressInfo removeAddressInfo(SimpleString address, boolean force) throws mirrorControllerSource.deleteAddress(addressInfo); } + // Clean up duplicate ID caches for this address (including BRIDGE. prefixed caches) + deleteDuplicateCache(address); + removeRetroactiveResources(address); if (server.hasBrokerAddressPlugins()) { server.callBrokerAddressPlugins(plugin -> plugin.afterRemoveAddress(address, addressInfo)); @@ -1512,6 +1516,92 @@ public ConcurrentMap getDuplicateIDCaches() { return duplicateIDCaches; } + /** + * Clean up orphaned duplicate ID caches for addresses that no longer exist. + * This is useful for removing stale BRIDGE.* and other caches that may accumulate over time. + * + * @return the number of caches that were removed + */ + public int cleanupOrphanedDuplicateIDCaches() { + if (logger.isDebugEnabled()) { + logger.debug("Starting orphaned duplicate ID cache cleanup check. Total caches: {}, Total addresses: {}", + duplicateIDCaches.size(), addressManager.getAddresses().size()); + } + + // Verify that duplicate detection is enabled on cluster connections before cleaning BRIDGE caches + boolean hasDuplicateDetectionEnabled = server.getClusterManager() != null && + server.getClusterManager().getClusterConnections().stream() + .anyMatch(cc -> { + try { + // Check if any bridge in this cluster connection has duplicate detection enabled + Bridge[] bridges = cc.getBridges(); + if (bridges != null && bridges.length > 0) { + for (Bridge bridge : bridges) { + if (bridge.getConfiguration() != null && + bridge.getConfiguration().isUseDuplicateDetection()) { + return true; + } + } + } + return false; + } catch (Exception e) { + logger.debug("Unable to check duplicate detection status for cluster connection {}: {}", + cc.getName(), e.getMessage()); + return false; + } + }); + + int removed = 0; + Set validAddresses = addressManager.getAddresses(); + + // Create a list of cache keys to remove (to avoid ConcurrentModificationException) + List toRemove = new ArrayList<>(); + + for (SimpleString address : duplicateIDCaches.keySet()) { + SimpleString actualAddress = address; + + // Check if this is a BRIDGE. prefixed cache + boolean isBridgeCache = address.startsWith(BRIDGE_CACHE_STR); + + if (isBridgeCache) { + // Safety check: only process BRIDGE caches if duplicate detection is enabled + if (!hasDuplicateDetectionEnabled) { + logger.warn("Skipping BRIDGE cache cleanup for {} - duplicate detection is not enabled on any cluster connection", address); + continue; + } + // Extract the actual address by removing the BRIDGE. prefix + actualAddress = SimpleString.of(address.toString().substring(BRIDGE_CACHE_STR.length())); + } + + // If the address no longer exists, mark the cache for removal + if (!validAddresses.contains(actualAddress)) { + toRemove.add(address); + } + } + + // Remove the orphaned caches + for (SimpleString address : toRemove) { + DuplicateIDCache cache = duplicateIDCaches.remove(address); + if (cache != null) { + try { + cache.clear(); + removed++; + logger.debug("Removed orphaned duplicate ID cache for: {}", address); + } catch (Exception e) { + logger.warn("Error clearing orphaned duplicate ID cache for {}: {}", address, e.getMessage(), e); + } + } + } + + if (removed > 0) { + logger.info("Cleaned up {} orphaned duplicate ID cache(s)", removed); + } else { + logger.debug("Finished orphaned duplicate ID cache cleanup check. No orphaned caches found."); + } + + return removed; + } + @Override public Object getNotificationLock() { return notificationLock; @@ -2041,6 +2131,7 @@ private static boolean queueWasUsed(Queue queue, AddressSettings settings) { * To be used by the AddressQueueReaper. It is also exposed for tests through PostOfficeTestAccessor */ void reapAddresses(boolean initialCheck) { + logger.debug("starting reap addresses scanner"); getLocalQueues().forEach(queue -> { AddressSettings settings = addressSettingsRepository.getMatch(queue.getAddress().toString()); if (!queue.isInternalQueue() && queue.isAutoDelete() && QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || QueueManagerImpl.delayCheck(queue, settings)) && QueueManagerImpl.messageCountCheck(queue) && (initialCheck || queueWasUsed(queue, settings))) { @@ -2100,6 +2191,23 @@ void reapAddresses(boolean initialCheck) { } } } + + // Clean up orphaned resources after address reaping + // This handles cases where BRIDGE.* and other caches/stores may not have been properly cleaned up + if (!initialCheck) { + try { + cleanupOrphanedDuplicateIDCaches(); + } catch (Exception e) { + logger.warn("Error during orphaned duplicate ID cache cleanup: {}", e.getMessage(), e); + } + + try { + pagingManager.cleanupOrphanedPageStores(); + } catch (Exception e) { + logger.warn("Error during orphaned paging store cleanup: {}", e.getMessage(), e); + } + } + logger.debug("reap addresses scanner completed"); } private Stream getLocalQueues() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 2a595a930f3..351c1e48d05 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -404,7 +404,30 @@ private void validateRoutingTypes(SimpleString addressName, EnumSet public boolean checkAutoRemoveAddress(AddressInfo addressInfo, AddressSettings settings, boolean ignoreDelay) throws Exception { - return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(addressInfo.getName()) && (ignoreDelay || addressWasUsed(addressInfo, settings)) && (ignoreDelay || delayCheck(addressInfo, settings)); + if (addressInfo == null) { + return false; + } + + boolean autoDelete = settings.isAutoDeleteAddresses(); + boolean isAutoCreated = addressInfo.isAutoCreated(); + boolean isAddressBound = bindingsFactory.isAddressBound(addressInfo.getName()); + boolean wasUsed = ignoreDelay || addressWasUsed(addressInfo, settings); + boolean delayPassed = ignoreDelay || delayCheck(addressInfo, settings); + + boolean result = autoDelete && isAutoCreated && !isAddressBound && wasUsed && delayPassed; + + if (logger.isDebugEnabled()) { + logger.debug("checkAutoRemoveAddress for {}: autoDelete={}, isAutoCreated={}, isAddressBound={}, wasUsed={}, delayPassed={}, result={}", + addressInfo.getName(), autoDelete, isAutoCreated, isAddressBound, wasUsed, delayPassed, result); + if (addressInfo.getRoutedMessageCount() > 0) { + logger.debug(" Address {} has routed {} messages", addressInfo.getName(), addressInfo.getRoutedMessageCount()); + } + if (addressInfo.getBindingRemovedTimestamp() != -1) { + logger.debug(" Address {} had binding removed at timestamp {}", addressInfo.getName(), addressInfo.getBindingRemovedTimestamp()); + } + } + + return result; } private boolean delayCheck(AddressInfo addressInfo, AddressSettings settings) { @@ -412,7 +435,21 @@ private boolean delayCheck(AddressInfo addressInfo, AddressSettings settings) { } private boolean addressWasUsed(AddressInfo addressInfo, AddressSettings settings) { - return addressInfo.getBindingRemovedTimestamp() != -1 || settings.isAutoDeleteAddressesSkipUsageCheck(); + // An address is considered "used" if: + // 1. A binding was removed from it (traditional check), OR + // 2. Messages were routed through it (for wildcard subscription case), OR + // 3. Skip usage check is enabled + boolean hadBindingRemoved = addressInfo.getBindingRemovedTimestamp() != -1; + boolean hasRoutedMessages = addressInfo.getRoutedMessageCount() > 0; + boolean skipCheck = settings.isAutoDeleteAddressesSkipUsageCheck(); + + if (logger.isDebugEnabled()) { + logger.debug("addressWasUsed for {}: hadBindingRemoved={}, hasRoutedMessages={} (count={}), skipCheck={}", + addressInfo.getName(), hadBindingRemoved, hasRoutedMessages, + addressInfo.getRoutedMessageCount(), skipCheck); + } + + return hadBindingRemoved || hasRoutedMessages || skipCheck; } @Override diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreCleanupTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreCleanupTest.java new file mode 100644 index 00000000000..2866ce891b2 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreCleanupTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.paging.impl; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.lang.reflect.Field; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test to verify that paging stores are properly cleaned up when addresses are removed. + * This test reproduces and verifies the fix for the issue where paging stores for + * removed addresses (including BRIDGE.* addresses) were not being cleaned up properly. + */ +public class PagingStoreCleanupTest { + + @Mock + private ActiveMQServer server; + + @Mock + private PagingStore pagingStore1; + + @Mock + private PagingStore pagingStore2; + + @Mock + private PagingStore pagingStore3; + + private PagingManagerImpl pagingManager; + private ConcurrentMap stores; + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); + + // Create a minimal PagingManagerImpl instance + pagingManager = mock(PagingManagerImpl.class); + + // Access the stores field via reflection + Field storesField = PagingManagerImpl.class.getDeclaredField("stores"); + storesField.setAccessible(true); + stores = new ConcurrentHashMap<>(); + storesField.set(pagingManager, stores); + + // Access the server field via reflection + Field serverField = PagingManagerImpl.class.getDeclaredField("server"); + serverField.setAccessible(true); + serverField.set(pagingManager, server); + + // Access the syncLock field via reflection + Field syncLockField = PagingManagerImpl.class.getDeclaredField("syncLock"); + syncLockField.setAccessible(true); + syncLockField.set(pagingManager, new ReentrantReadWriteLock()); + + // Mock deletePageStore to just remove from the stores map + doAnswer(invocation -> { + SimpleString address = invocation.getArgument(0); + stores.remove(address); + return null; + }).when(pagingManager).deletePageStore(any(SimpleString.class)); + + // Call the real method for cleanupOrphanedPageStores + when(pagingManager.cleanupOrphanedPageStores()).thenCallRealMethod(); + } + + @Test + public void testOrphanedPagingStoresAreCleanedUp() throws Exception { + // Arrange: Create paging stores for addresses + SimpleString address1 = SimpleString.of("test.address.1"); + SimpleString address2 = SimpleString.of("test.address.2"); + SimpleString bridgeAddress = SimpleString.of("BRIDGE.test.address.3"); + + stores.put(address1, pagingStore1); + stores.put(address2, pagingStore2); + stores.put(bridgeAddress, pagingStore3); + + // Mock server.getAddressInfo() to simulate that only address1 still exists + when(server.getAddressInfo(address1)).thenReturn(mock(org.apache.activemq.artemis.core.server.impl.AddressInfo.class)); + when(server.getAddressInfo(address2)).thenReturn(null); // address2 was removed + when(server.getAddressInfo(bridgeAddress)).thenReturn(null); // BRIDGE address was removed + + // Act: Call the cleanup method + int removed = pagingManager.cleanupOrphanedPageStores(); + + // Assert: Should remove 2 orphaned stores (address2 and bridgeAddress) + assertEquals(2, removed, "Should remove 2 orphaned paging stores"); + + // Verify the stores map state - address1 should remain, others removed + assertTrue(stores.containsKey(address1), "address1 store should still exist"); + assertFalse(stores.containsKey(address2), "address2 store should be removed"); + assertFalse(stores.containsKey(bridgeAddress), "BRIDGE address store should be removed"); + } + + @Test + public void testMultipleBridgeStoresCleanup() throws Exception { + // Arrange: Create multiple BRIDGE.* paging stores + SimpleString bridgeAddress1 = SimpleString.of("BRIDGE.cluster.address.1"); + SimpleString bridgeAddress2 = SimpleString.of("BRIDGE.cluster.address.2"); + SimpleString bridgeAddress3 = SimpleString.of("BRIDGE.cluster.address.3"); + + stores.put(bridgeAddress1, pagingStore1); + stores.put(bridgeAddress2, pagingStore2); + stores.put(bridgeAddress3, pagingStore3); + + // Mock server.getAddressInfo() to simulate all BRIDGE addresses were removed + when(server.getAddressInfo(any())).thenReturn(null); + + // Act: Call the cleanup method + int removed = pagingManager.cleanupOrphanedPageStores(); + + // Assert: Should remove all 3 BRIDGE stores + assertEquals(3, removed, "Should remove all 3 orphaned BRIDGE paging stores"); + + // Verify all BRIDGE stores are removed + assertTrue(stores.isEmpty(), "All BRIDGE stores should be removed"); + } +} + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheCleanupTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheCleanupTest.java new file mode 100644 index 00000000000..8202ea24ab7 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheCleanupTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.postoffice.AddressManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.cluster.Bridge; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.cluster.ClusterManager; +import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test to verify that duplicate ID caches (including BRIDGE.* caches) are properly cleaned up + * when addresses are removed. This test reproduces and verifies the fix for the issue where + * BRIDGE.* endpoint caches were not being cleaned up properly. + */ +public class DuplicateIDCacheCleanupTest { + + @Mock + private ActiveMQServer server; + + @Mock + private AddressManager addressManager; + + @Mock + private PagingManager pagingManager; + + @Mock + private QueueFactory queueFactory; + + @Mock + private ManagementService managementService; + + @Mock + private HierarchicalRepository addressSettingsRepository; + + @Mock + private ClusterManager clusterManager; + + @Mock + private ClusterConnection clusterConnection; + + @Mock + private Bridge bridge; + + @Mock + private BridgeConfiguration bridgeConfiguration; + + private PostOfficeImpl postOffice; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + when(addressSettingsRepository.getMatch(anyString())).thenReturn(new AddressSettings()); + + // Mock cluster manager and bridge setup to enable duplicate detection + when(server.getClusterManager()).thenReturn(clusterManager); + when(clusterManager.getClusterConnections()).thenReturn(Collections.singleton(clusterConnection)); + when(clusterConnection.getBridges()).thenReturn(new Bridge[]{bridge}); + when(bridge.getConfiguration()).thenReturn(bridgeConfiguration); + when(bridgeConfiguration.isUseDuplicateDetection()).thenReturn(true); + + WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); + + postOffice = new PostOfficeImpl( + server, + null, // storageManager + pagingManager, + queueFactory, + managementService, + 1000, // expiryReaperPeriod + 30000, // addressQueueReaperPeriod + wildcardConfiguration, + 100, // idCacheSize + false, // persistIDCache + addressSettingsRepository + ); + } + + /** + * This test reproduces the issue: BRIDGE.* caches remain in memory after + * their corresponding addresses are removed. + */ + @Test + public void testBridgeCachesOrphanedWhenAddressRemoved() throws Exception { + SimpleString address = SimpleString.of("bridge.endpoint"); + SimpleString bridgeCache = PostOfficeImpl.BRIDGE_CACHE_STR.concat(address.toString()); + + // Initially, the address exists + Set addresses = new HashSet<>(); + addresses.add(address); + when(addressManager.getAddresses()).thenReturn(addresses); + + // Create caches for the address + ConcurrentMap caches = postOffice.getDuplicateIDCaches(); + caches.put(address, mock(DuplicateIDCache.class)); + caches.put(bridgeCache, mock(DuplicateIDCache.class)); + + assertEquals(2, caches.size(), "Should have 2 caches (regular + BRIDGE)"); + + // Simulate address being removed - now address no longer exists + addresses.clear(); + when(addressManager.getAddresses()).thenReturn(addresses); + + // BEFORE THE FIX: Caches would remain orphaned in memory + // WITH THE FIX: cleanupOrphanedDuplicateIDCaches() removes them + int removed = postOffice.cleanupOrphanedDuplicateIDCaches(); + + // Verify both caches are cleaned up + assertEquals(2, removed, "Should remove both regular and BRIDGE caches"); + assertEquals(0, caches.size(), "Should have 0 caches remaining"); + assertFalse(caches.containsKey(address), "Regular cache should be removed"); + assertFalse(caches.containsKey(bridgeCache), "BRIDGE cache should be removed"); + } + + + /** + * This test verifies the BRIDGE. prefix extraction logic + */ + @Test + public void testBridgePrefixHandling() throws Exception { + SimpleString address = SimpleString.of("my.address"); + SimpleString bridgeAddress = PostOfficeImpl.BRIDGE_CACHE_STR.concat(address.toString()); + + // Address doesn't exist + Set addresses = new HashSet<>(); + when(addressManager.getAddresses()).thenReturn(addresses); + + // Add BRIDGE cache + ConcurrentMap caches = postOffice.getDuplicateIDCaches(); + caches.put(bridgeAddress, mock(DuplicateIDCache.class)); + + // Cleanup should remove it since the base address doesn't exist + int removed = postOffice.cleanupOrphanedDuplicateIDCaches(); + + assertEquals(1, removed, "Should remove BRIDGE cache"); + assertFalse(caches.containsKey(bridgeAddress), "BRIDGE cache should be removed"); + } + + /** + * This test simulates the real-world scenario: multiple bridge endpoints + * being created and removed over time + */ + @Test + public void testMultipleBridgeEndpointsCleanup() throws Exception { + // Simulate 5 bridge endpoints + Set addresses = new HashSet<>(); + ConcurrentMap caches = postOffice.getDuplicateIDCaches(); + + for (int i = 0; i < 5; i++) { + SimpleString address = SimpleString.of("BRIDGE.endpoint." + i); + SimpleString bridgeCache = PostOfficeImpl.BRIDGE_CACHE_STR.concat(address.toString()); + + addresses.add(address); + caches.put(address, mock(DuplicateIDCache.class)); + caches.put(bridgeCache, mock(DuplicateIDCache.class)); + } + + when(addressManager.getAddresses()).thenReturn(addresses); + assertEquals(10, caches.size(), "Should have 10 caches (5 regular + 5 BRIDGE)"); + + // Now remove all addresses (simulating bridge shutdown) + addresses.clear(); + when(addressManager.getAddresses()).thenReturn(addresses); + + // Cleanup should remove all orphaned caches + int removed = postOffice.cleanupOrphanedDuplicateIDCaches(); + + assertEquals(10, removed, "Should remove all 10 caches"); + assertEquals(0, caches.size(), "Should have 0 caches remaining"); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java index c90ecd57da4..c2f184dfb12 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java @@ -146,4 +146,74 @@ public void testAutoDeleteAddressWithWildcardAddress() throws Exception { consumerSession.close(); } + + /** + * Test that addresses with only wildcard subscriptions are auto-deleted when messages + * are routed through them. This tests the fix for ARTEMIS-5773 where addresses would + * accumulate when: + * 1. A wildcard queue exists (e.g., publish/#) + * 2. Messages are sent to specific addresses (e.g., publish/uuid1/uuid2) + * 3. No direct queue binding is created on those addresses + * 4. The address should still be considered "used" due to routed messages + * + * Without the fix, these addresses would never be deleted because getRoutedMessageCount() + * was not checked in addressWasUsed(). + */ + @Test + public void testAutoDeleteAddressWithWildcardSubscriptionAndUsageCheck() throws Exception { + String prefix = "publish"; + // Enable auto-delete but WITHOUT skipUsageCheck - this is the key difference + server.getAddressSettingsRepository().addMatch(prefix + ".#", new AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesSkipUsageCheck(false).setAutoDeleteAddressesDelay(0)); + String wildcardAddress = prefix + ".#"; + String queue = RandomUtil.randomUUIDString(); + final int MESSAGE_COUNT = 10; + final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT); + + // Create a wildcard queue + server.createQueue(QueueConfiguration.of(queue).setAddress(wildcardAddress).setRoutingType(RoutingType.MULTICAST).setAutoCreated(true)); + + ClientSession consumerSession = cf.createSession(); + ClientConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageHandler(message -> { + try { + message.acknowledge(); + } catch (Exception e) { + e.printStackTrace(); + } + latch.countDown(); + }); + consumerSession.start(); + + ClientSession producerSession = cf.createSession(); + ClientProducer producer = producerSession.createProducer(); + + List addresses = new ArrayList<>(); + for (int i = 0; i < MESSAGE_COUNT; i++) { + String address = prefix + "." + RandomUtil.randomUUIDString() + "." + RandomUtil.randomUUIDString(); + addresses.add(address); + // Auto-create the address when sending the message + producer.send(address, producerSession.createMessage(false)); + } + producerSession.close(); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + + // Verify addresses were created and have paging stores + for (String address : addresses) { + assertNotNull(server.getAddressInfo(SimpleString.of(address)), "Address should exist: " + address); + Wait.assertTrue(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(SimpleString.of(address)), 2000, 100); + } + + // Run the reaper - this would fail without the fix because addressWasUsed() would return false + PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice()); + + // Verify addresses are deleted + for (String address : addresses) { + String finalAddress = address; // for lambda + Wait.assertTrue("Address should be deleted: " + address, () -> server.getAddressInfo(SimpleString.of(finalAddress)) == null, 2000, 100); + Wait.assertFalse(() -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(SimpleString.of(finalAddress)), 2000, 100); + } + + consumerSession.close(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/WildCardRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/WildCardRoutingTest.java index fd1d0326043..cc0f086c1a1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/WildCardRoutingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/WildCardRoutingTest.java @@ -755,6 +755,51 @@ public void testLargeWildcardRouting() throws Exception { assertEquals(0, server.getPostOffice().getBindingsForAddress(address).getBindings().size()); } + /** + * Test that routing messages through wildcard subscriptions increments the + * routedMessageCount on the target address. This is critical for auto-delete + * functionality (ARTEMIS-5773) where addresses should be considered "used" + * if messages were routed through them, even if no direct queue binding exists. + */ + @Test + public void testWildcardRoutingIncrementsMessageCount() throws Exception { + SimpleString wildcardAddress = SimpleString.of("test.#"); + SimpleString specificAddress1 = SimpleString.of("test.foo.bar"); + SimpleString specificAddress2 = SimpleString.of("test.baz.qux"); + SimpleString queueName = SimpleString.of("wildcardQueue"); + + // Create a wildcard queue + clientSession.createQueue(QueueConfiguration.of(queueName).setAddress(wildcardAddress).setDurable(false)); + + ClientProducer producer = clientSession.createProducer(); + ClientConsumer consumer = clientSession.createConsumer(queueName); + clientSession.start(); + + // Send messages to specific addresses that match the wildcard + producer.send(specificAddress1, createTextMessage(clientSession, "message1")); + producer.send(specificAddress2, createTextMessage(clientSession, "message2")); + + // Consume messages to ensure routing happened + ClientMessage m = consumer.receive(500); + assertNotNull(m); + m.acknowledge(); + m = consumer.receive(500); + assertNotNull(m); + m.acknowledge(); + + // Verify that the specific addresses were created and have their routed message count incremented + assertNotNull(server.getAddressInfo(specificAddress1), "Address should be auto-created"); + assertNotNull(server.getAddressInfo(specificAddress2), "Address should be auto-created"); + + // The routed message count should be > 0 for addresses that had messages routed through them + assertEquals(1, server.getAddressInfo(specificAddress1).getRoutedMessageCount(), + "Address should have routed message count incremented"); + assertEquals(1, server.getAddressInfo(specificAddress2).getRoutedMessageCount(), + "Address should have routed message count incremented"); + + consumer.close(); + } + @Override @BeforeEach public void setUp() throws Exception {