Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository

void deletePageStore(SimpleString storeName) throws Exception;

/**
* Clean up orphaned paging stores for addresses that no longer exist.
* This is useful for removing stale stores that may accumulate over time.
*
* @return the number of stores that were removed
*/
default int cleanupOrphanedPageStores() {
return 0;
}

void processReload() throws Exception;

void disableCleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,50 @@ public void deletePageStore(final SimpleString storeName) throws Exception {
}
}

@Override
public int cleanupOrphanedPageStores() {
if (server == null) {
logger.warn("Server reference not available, cannot cleanup orphaned page stores");
return 0;
}

logger.debug("Starting orphaned paging store cleanup check. Total stores: {}", stores.size());

int removed = 0;
syncLock.readLock().lock();
try {
// Get a snapshot of current store names
Set<SimpleString> storeNames = new HashSet<>(stores.keySet());

for (SimpleString storeName : storeNames) {
// Check if the address still exists
if (server.getAddressInfo(storeName) == null) {
try {
logger.debug("Removing orphaned paging store for: {}", storeName);
// Remove directly from stores map - we already hold the read lock
PagingStore store = stores.remove(CompositeAddress.extractAddressName(storeName));
if (store != null) {
store.destroy();
removed++;
}
} catch (Exception e) {
logger.warn("Error removing orphaned paging store for {}: {}", storeName, e.getMessage(), e);
}
}
}

if (removed > 0) {
logger.info("Cleaned up {} orphaned paging store(s)", removed);
} else {
logger.debug("Finished orphaned paging store cleanup check. No orphaned stores found.");
}
} finally {
syncLock.readLock().unlock();
}

return removed;
}

/**
* This method creates a new store if not exist.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AckReason;
Expand Down Expand Up @@ -903,6 +904,9 @@ public AddressInfo removeAddressInfo(SimpleString address, boolean force) throws
mirrorControllerSource.deleteAddress(addressInfo);
}

// Clean up duplicate ID caches for this address (including BRIDGE. prefixed caches)
deleteDuplicateCache(address);

removeRetroactiveResources(address);
if (server.hasBrokerAddressPlugins()) {
server.callBrokerAddressPlugins(plugin -> plugin.afterRemoveAddress(address, addressInfo));
Expand Down Expand Up @@ -1512,6 +1516,92 @@ public ConcurrentMap<SimpleString, DuplicateIDCache> getDuplicateIDCaches() {
return duplicateIDCaches;
}

/**
* Clean up orphaned duplicate ID caches for addresses that no longer exist.
* This is useful for removing stale BRIDGE.* and other caches that may accumulate over time.
*
* @return the number of caches that were removed
*/
public int cleanupOrphanedDuplicateIDCaches() {
if (logger.isDebugEnabled()) {
logger.debug("Starting orphaned duplicate ID cache cleanup check. Total caches: {}, Total addresses: {}",
duplicateIDCaches.size(), addressManager.getAddresses().size());
}

// Verify that duplicate detection is enabled on cluster connections before cleaning BRIDGE caches
boolean hasDuplicateDetectionEnabled = server.getClusterManager() != null &&
server.getClusterManager().getClusterConnections().stream()
.anyMatch(cc -> {
try {
// Check if any bridge in this cluster connection has duplicate detection enabled
Bridge[] bridges = cc.getBridges();
if (bridges != null && bridges.length > 0) {
for (Bridge bridge : bridges) {
if (bridge.getConfiguration() != null &&
bridge.getConfiguration().isUseDuplicateDetection()) {
return true;
}
}
}
return false;
} catch (Exception e) {
logger.debug("Unable to check duplicate detection status for cluster connection {}: {}",
cc.getName(), e.getMessage());
return false;
}
});

int removed = 0;
Set<SimpleString> validAddresses = addressManager.getAddresses();

// Create a list of cache keys to remove (to avoid ConcurrentModificationException)
List<SimpleString> toRemove = new ArrayList<>();

for (SimpleString address : duplicateIDCaches.keySet()) {
SimpleString actualAddress = address;

// Check if this is a BRIDGE. prefixed cache
boolean isBridgeCache = address.startsWith(BRIDGE_CACHE_STR);

if (isBridgeCache) {
// Safety check: only process BRIDGE caches if duplicate detection is enabled
if (!hasDuplicateDetectionEnabled) {
logger.warn("Skipping BRIDGE cache cleanup for {} - duplicate detection is not enabled on any cluster connection", address);
continue;
}
// Extract the actual address by removing the BRIDGE. prefix
actualAddress = SimpleString.of(address.toString().substring(BRIDGE_CACHE_STR.length()));
}

// If the address no longer exists, mark the cache for removal
if (!validAddresses.contains(actualAddress)) {
toRemove.add(address);
}
}

// Remove the orphaned caches
for (SimpleString address : toRemove) {
DuplicateIDCache cache = duplicateIDCaches.remove(address);
if (cache != null) {
try {
cache.clear();
removed++;
logger.debug("Removed orphaned duplicate ID cache for: {}", address);
} catch (Exception e) {
logger.warn("Error clearing orphaned duplicate ID cache for {}: {}", address, e.getMessage(), e);
}
}
}

if (removed > 0) {
logger.info("Cleaned up {} orphaned duplicate ID cache(s)", removed);
} else {
logger.debug("Finished orphaned duplicate ID cache cleanup check. No orphaned caches found.");
}

return removed;
}

@Override
public Object getNotificationLock() {
return notificationLock;
Expand Down Expand Up @@ -2041,6 +2131,7 @@ private static boolean queueWasUsed(Queue queue, AddressSettings settings) {
* To be used by the AddressQueueReaper. It is also exposed for tests through PostOfficeTestAccessor
*/
void reapAddresses(boolean initialCheck) {
logger.debug("starting reap addresses scanner");
getLocalQueues().forEach(queue -> {
AddressSettings settings = addressSettingsRepository.getMatch(queue.getAddress().toString());
if (!queue.isInternalQueue() && queue.isAutoDelete() && QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || QueueManagerImpl.delayCheck(queue, settings)) && QueueManagerImpl.messageCountCheck(queue) && (initialCheck || queueWasUsed(queue, settings))) {
Expand Down Expand Up @@ -2100,6 +2191,23 @@ void reapAddresses(boolean initialCheck) {
}
}
}

// Clean up orphaned resources after address reaping
// This handles cases where BRIDGE.* and other caches/stores may not have been properly cleaned up
if (!initialCheck) {
try {
cleanupOrphanedDuplicateIDCaches();
} catch (Exception e) {
logger.warn("Error during orphaned duplicate ID cache cleanup: {}", e.getMessage(), e);
}

try {
pagingManager.cleanupOrphanedPageStores();
} catch (Exception e) {
logger.warn("Error during orphaned paging store cleanup: {}", e.getMessage(), e);
}
}
logger.debug("reap addresses scanner completed");
}

private Stream<Queue> getLocalQueues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,52 @@ private void validateRoutingTypes(SimpleString addressName, EnumSet<RoutingType>
public boolean checkAutoRemoveAddress(AddressInfo addressInfo,
AddressSettings settings,
boolean ignoreDelay) throws Exception {
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(addressInfo.getName()) && (ignoreDelay || addressWasUsed(addressInfo, settings)) && (ignoreDelay || delayCheck(addressInfo, settings));
if (addressInfo == null) {
return false;
}

boolean autoDelete = settings.isAutoDeleteAddresses();
boolean isAutoCreated = addressInfo.isAutoCreated();
boolean isAddressBound = bindingsFactory.isAddressBound(addressInfo.getName());
boolean wasUsed = ignoreDelay || addressWasUsed(addressInfo, settings);
boolean delayPassed = ignoreDelay || delayCheck(addressInfo, settings);

boolean result = autoDelete && isAutoCreated && !isAddressBound && wasUsed && delayPassed;

if (logger.isDebugEnabled()) {
logger.debug("checkAutoRemoveAddress for {}: autoDelete={}, isAutoCreated={}, isAddressBound={}, wasUsed={}, delayPassed={}, result={}",
addressInfo.getName(), autoDelete, isAutoCreated, isAddressBound, wasUsed, delayPassed, result);
if (addressInfo.getRoutedMessageCount() > 0) {
logger.debug(" Address {} has routed {} messages", addressInfo.getName(), addressInfo.getRoutedMessageCount());
}
if (addressInfo.getBindingRemovedTimestamp() != -1) {
logger.debug(" Address {} had binding removed at timestamp {}", addressInfo.getName(), addressInfo.getBindingRemovedTimestamp());
}
}

return result;
}

private boolean delayCheck(AddressInfo addressInfo, AddressSettings settings) {
return (!settings.isAutoDeleteAddressesSkipUsageCheck() && System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()) || (settings.isAutoDeleteAddressesSkipUsageCheck() && System.currentTimeMillis() - addressInfo.getCreatedTimestamp() >= settings.getAutoDeleteAddressesDelay());
}

private boolean addressWasUsed(AddressInfo addressInfo, AddressSettings settings) {
return addressInfo.getBindingRemovedTimestamp() != -1 || settings.isAutoDeleteAddressesSkipUsageCheck();
// An address is considered "used" if:
// 1. A binding was removed from it (traditional check), OR
// 2. Messages were routed through it (for wildcard subscription case), OR
// 3. Skip usage check is enabled
boolean hadBindingRemoved = addressInfo.getBindingRemovedTimestamp() != -1;
boolean hasRoutedMessages = addressInfo.getRoutedMessageCount() > 0;
boolean skipCheck = settings.isAutoDeleteAddressesSkipUsageCheck();

if (logger.isDebugEnabled()) {
logger.debug("addressWasUsed for {}: hadBindingRemoved={}, hasRoutedMessages={} (count={}), skipCheck={}",
addressInfo.getName(), hadBindingRemoved, hasRoutedMessages,
addressInfo.getRoutedMessageCount(), skipCheck);
}

return hadBindingRemoved || hasRoutedMessages || skipCheck;
}

@Override
Expand Down
Loading