From 2f13e77fbf989fe82f691c87513ba80550868dc9 Mon Sep 17 00:00:00 2001 From: wangwei Date: Fri, 19 Dec 2025 16:08:40 +0800 Subject: [PATCH 1/6] fix(cluster): Changing invokerRefreshLock from ReentrantLock to ReentrantReadWriteLock avoids concurrency issues, and using invokerRefreshReadLock avoids lock blocking during high concurrency reads #15881 --- .../cluster/directory/AbstractDirectory.java | 87 ++++++++++--------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index be68810f4aee..8c6bbeac4e5b 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -58,7 +58,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; @@ -125,7 +125,10 @@ public abstract class AbstractDirectory implements Directory { private volatile ScheduledFuture connectivityCheckFuture; - private final ReentrantLock invokerRefreshLock = new ReentrantLock(); + private final ReentrantReadWriteLock invokerRefreshLock = new ReentrantReadWriteLock(); + + private final ReentrantReadWriteLock.ReadLock invokerRefreshReadLock = invokerRefreshLock.readLock(); + private final ReentrantReadWriteLock.WriteLock invokerRefreshWriteLock = invokerRefreshLock.writeLock(); /** * The max count of invokers for each reconnect task select to try to reconnect. @@ -208,9 +211,12 @@ public List> list(Invocation invocation) throws RpcException { BitList> availableInvokers; SingleRouterChain singleChain = null; try { + if (routerChain != null) { + routerChain.getLock().readLock().lock(); + } try { - if (routerChain != null) { - routerChain.getLock().readLock().lock(); + if (!invokerRefreshReadLock.tryLock(LockUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)) { + throw new RpcException("Failed to acquire invokerRefreshLock for reading invokers"); } // use clone to avoid being modified at doList(). if (invokersInitialized) { @@ -218,17 +224,17 @@ public List> list(Invocation invocation) throws RpcException { } else { availableInvokers = invokers.clone(); } - - if (routerChain != null) { - singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation); - singleChain.getLock().readLock().lock(); - } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RpcException("Failed to refresh invokers, cause: " + e.getMessage(), e); } finally { - if (routerChain != null) { - routerChain.getLock().readLock().unlock(); - } + invokerRefreshReadLock.unlock(); } + if (routerChain != null) { + singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation); + singleChain.getLock().readLock().lock(); + } List> routedResult = doList(singleChain, availableInvokers, invocation); if (routedResult.isEmpty()) { // 2-2 - No provider available. @@ -249,6 +255,9 @@ public List> list(Invocation invocation) throws RpcException { if (singleChain != null) { singleChain.getLock().readLock().unlock(); } + if (routerChain != null) { + routerChain.getLock().readLock().unlock(); + } } } @@ -298,7 +307,7 @@ public void discordAddresses() { @Override public void addInvalidateInvoker(Invoker invoker) { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { // 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time if (removeValidInvoker(invoker)) { // 2. add this invoker to reconnect list @@ -329,7 +338,7 @@ public void checkConnectivity() { // 1. pick invokers from invokersToReconnect // limit max reconnectTaskTryCount, prevent this task hang up all the connectivityExecutor // for long time - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (invokersToReconnect.size() < reconnectTaskTryCount) { invokersToTry.addAll(invokersToReconnect); } else { @@ -348,7 +357,7 @@ public void checkConnectivity() { // 2. try to check the invoker's status for (Invoker invoker : invokersToTry) { AtomicBoolean invokerExist = new AtomicBoolean(false); - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { invokerExist.set(invokers.contains(invoker)); }); // Should not lock here, `invoker.isAvailable` may need some time to check @@ -362,7 +371,7 @@ public void checkConnectivity() { } // 3. recover valid invoker - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { for (Invoker tInvoker : needDeleteList) { if (invokers.contains(tInvoker)) { addValidInvoker(tInvoker); @@ -388,7 +397,7 @@ public void checkConnectivity() { } // 4. submit new task if it has more to recover - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (!invokersToReconnect.isEmpty()) { checkConnectivity(); } @@ -411,7 +420,7 @@ public void checkConnectivity() { * 4. all the invokers disappeared from total invokers should be removed in the disabled invokers list */ public void refreshInvoker() { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (invokersInitialized) { refreshInvokerInternal(); } @@ -445,7 +454,7 @@ private void refreshInvokers(BitList> targetInvokers, Collection invoker) { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (invokers.contains(invoker)) { disabledInvokers.add(invoker); removeValidInvoker(invoker); @@ -458,7 +467,7 @@ public void addDisabledInvoker(Invoker invoker) { @Override public void recoverDisabledInvoker(Invoker invoker) { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (disabledInvokers.remove(invoker)) { try { addValidInvoker(invoker); @@ -482,9 +491,9 @@ protected final void refreshRouter(BitList> newlyInvokers, Runnable s "", "Error occurred when refreshing router chain. " + "The addresses from notification: " + newlyInvokers.stream() - .map(Invoker::getUrl) - .map(URL::getAddress) - .collect(Collectors.joining(", ")), + .map(Invoker::getUrl) + .map(URL::getAddress) + .collect(Collectors.joining(", ")), t); throw t; @@ -526,7 +535,7 @@ public Set> getDisabledInvokers() { } protected void setInvokers(BitList> invokers) { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { this.invokers = invokers; refreshInvokerInternal(); this.invokersInitialized = true; @@ -538,7 +547,7 @@ protected void setInvokers(BitList> invokers) { protected void destroyInvokers() { // set empty instead of clearing to support concurrent access. - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { this.invokers = BitList.emptyList(); this.validInvokers = BitList.emptyList(); this.invokersInitialized = false; @@ -547,7 +556,7 @@ protected void destroyInvokers() { private boolean addValidInvoker(Invoker invoker) { AtomicBoolean result = new AtomicBoolean(false); - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { result.set(this.validInvokers.add(invoker)); }); MetricsEventBus.publish( @@ -557,7 +566,7 @@ private boolean addValidInvoker(Invoker invoker) { private boolean removeValidInvoker(Invoker invoker) { AtomicBoolean result = new AtomicBoolean(false); - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { result.set(this.validInvokers.remove(invoker)); }); MetricsEventBus.publish( @@ -600,24 +609,24 @@ public String toString() { return "Directory(" + "invokers: " + invokers.size() + "[" + invokers.stream() - .map(Invoker::getUrl) - .map(URL::getAddress) - .limit(3) - .collect(Collectors.joining(", ")) + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + ", validInvokers: " + validInvokers.size() + "[" + validInvokers.stream() - .map(Invoker::getUrl) - .map(URL::getAddress) - .limit(3) - .collect(Collectors.joining(", ")) + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + ", invokersToReconnect: " + invokersToReconnect.size() + "[" + invokersToReconnect.stream() - .map(Invoker::getUrl) - .map(URL::getAddress) - .limit(3) - .collect(Collectors.joining(", ")) + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + ')'; } } From a9fb820b42aee8ae697c8afce166dd07cda7cb0c Mon Sep 17 00:00:00 2001 From: wangwei Date: Fri, 19 Dec 2025 16:28:16 +0800 Subject: [PATCH 2/6] Formatting --- .../cluster/directory/AbstractDirectory.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index 8c6bbeac4e5b..a78602e82cca 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -491,9 +491,9 @@ protected final void refreshRouter(BitList> newlyInvokers, Runnable s "", "Error occurred when refreshing router chain. " + "The addresses from notification: " + newlyInvokers.stream() - .map(Invoker::getUrl) - .map(URL::getAddress) - .collect(Collectors.joining(", ")), + .map(Invoker::getUrl) + .map(URL::getAddress) + .collect(Collectors.joining(", ")), t); throw t; @@ -609,24 +609,24 @@ public String toString() { return "Directory(" + "invokers: " + invokers.size() + "[" + invokers.stream() - .map(Invoker::getUrl) - .map(URL::getAddress) - .limit(3) - .collect(Collectors.joining(", ")) + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + ", validInvokers: " + validInvokers.size() + "[" + validInvokers.stream() - .map(Invoker::getUrl) - .map(URL::getAddress) - .limit(3) - .collect(Collectors.joining(", ")) + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + ", invokersToReconnect: " + invokersToReconnect.size() + "[" + invokersToReconnect.stream() - .map(Invoker::getUrl) - .map(URL::getAddress) - .limit(3) - .collect(Collectors.joining(", ")) + .map(Invoker::getUrl) + .map(URL::getAddress) + .limit(3) + .collect(Collectors.joining(", ")) + "]" + ')'; } } From 2cb32dee9c7344873d259e1c9985301581a27309 Mon Sep 17 00:00:00 2001 From: wangwei Date: Mon, 22 Dec 2025 18:48:53 +0800 Subject: [PATCH 3/6] Fix the issues raised by Copilot --- .../rpc/cluster/directory/AbstractDirectory.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index a78602e82cca..78b7ef46f004 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -214,10 +214,12 @@ public List> list(Invocation invocation) throws RpcException { if (routerChain != null) { routerChain.getLock().readLock().lock(); } + boolean lockAcquired = false; try { if (!invokerRefreshReadLock.tryLock(LockUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)) { - throw new RpcException("Failed to acquire invokerRefreshLock for reading invokers"); + throw new RpcException("Failed to acquire read lock on invokerRefreshLock within timeout"); } + lockAcquired = true; // use clone to avoid being modified at doList(). if (invokersInitialized) { availableInvokers = validInvokers.clone(); @@ -226,9 +228,12 @@ public List> list(Invocation invocation) throws RpcException { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RpcException("Failed to refresh invokers, cause: " + e.getMessage(), e); + throw new RpcException( + "Interrupted while acquiring read lock for invoker access, cause: " + e.getMessage(), e); } finally { - invokerRefreshReadLock.unlock(); + if (lockAcquired) { + invokerRefreshReadLock.unlock(); + } } if (routerChain != null) { From 3a5272e0ece8aed9f73fa258d5079cc1d719748f Mon Sep 17 00:00:00 2001 From: vqianxiao <489305497@qq.com> Date: Mon, 22 Dec 2025 23:53:33 +0800 Subject: [PATCH 4/6] exception message --- .../dubbo/rpc/cluster/directory/AbstractDirectory.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index 78b7ef46f004..c16671cb6a47 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -217,7 +217,13 @@ public List> list(Invocation invocation) throws RpcException { boolean lockAcquired = false; try { if (!invokerRefreshReadLock.tryLock(LockUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)) { - throw new RpcException("Failed to acquire read lock on invokerRefreshLock within timeout"); + throw new RpcException( + "Failed to acquire read lock on invokerRefreshLock within timeout. " + "Timeout: " + + LockUtils.DEFAULT_TIMEOUT + "ms, " + "Lock state: [readLockHeld=" + + invokerRefreshLock.getReadLockCount() + ", writeLockHeld=" + + invokerRefreshLock.isWriteLocked() + ", writeLockHeldByCurrentThread=" + + invokerRefreshLock.isWriteLockedByCurrentThread() + "], Service: " + + getConsumerUrl().getServiceKey()); } lockAcquired = true; // use clone to avoid being modified at doList(). From a322354daee2ded57f746a93f120539544f0c9c9 Mon Sep 17 00:00:00 2001 From: wangwei Date: Tue, 23 Dec 2025 18:44:03 +0800 Subject: [PATCH 5/6] test case --- .../AbstractDirectoryConcurrencyTest.java | 276 ++++++++++++++++++ 1 file changed, 276 insertions(+) create mode 100644 dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java new file mode 100644 index 000000000000..0840e1bddb56 --- /dev/null +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.directory; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.RouterChain; +import org.apache.dubbo.rpc.cluster.SingleRouterChain; +import org.apache.dubbo.rpc.cluster.router.state.BitList; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.mockito.Mockito.mock; + +class AbstractDirectoryConcurrencyTest { + + private TestDirectory directory; + private URL url; + private ExecutorService executor; + + @BeforeEach + void setUp() { + url = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880/com.foo.BarService"); + directory = new TestDirectory(url); + executor = Executors.newFixedThreadPool(10); + } + + @AfterEach + void tearDown() { + if (directory != null) { + directory.destroy(); + } + if (executor != null) { + executor.shutdownNow(); + } + } + + @Test + void testMultipleReadLocks() throws InterruptedException { + int threadCount = 5; + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + AtomicBoolean failed = new AtomicBoolean(false); + + // Setup the directory with a slow list implementation to simulate work holding the read lock + directory.setListAction(() -> { + try { + // Wait for the latch to ensure all threads are in doList + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + directory.list(mock(Invocation.class)); + } catch (Exception e) { + e.printStackTrace(); + failed.set(true); + } finally { + doneLatch.countDown(); + } + }); + } + + // Give threads time to start and acquire read lock + Thread.sleep(100); + // Release the latch, letting them proceed + latch.countDown(); + + Assertions.assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "All list calls should complete"); + Assertions.assertFalse(failed.get(), "No exceptions should occur during concurrent reads"); + } + + @Test + void testWriteBlocksRead() throws InterruptedException { + CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1); + CountDownLatch releaseWriteLockLatch = new CountDownLatch(1); + AtomicReference readBlocked = new AtomicReference<>(false); + + // Thread to hold write lock + executor.submit(() -> { + directory.simulateWriteLock(writeLockAcquiredLatch, releaseWriteLockLatch); + }); + + // Wait for write lock to be acquired + Assertions.assertTrue(writeLockAcquiredLatch.await(5, TimeUnit.SECONDS)); + + // Try to read in another thread + Future readFuture = executor.submit(() -> { + long start = System.currentTimeMillis(); + directory.list(mock(Invocation.class)); + long duration = System.currentTimeMillis() - start; + // If duration is > 100ms, we assume it was blocked + readBlocked.set(duration >= 100); + }); + + // Sleep to ensure read thread tries to acquire lock and blocks + Thread.sleep(200); + + // Release write lock + releaseWriteLockLatch.countDown(); + + try { + readFuture.get(5, TimeUnit.SECONDS); + } catch (Exception e) { + Assertions.fail("Read execution failed"); + } + + Assertions.assertTrue(readBlocked.get(), "Read operation should be blocked by write lock"); + } + + @Test + void testConcurrentReadAndWrite() throws InterruptedException { + int readThreads = 10; + int writeThreads = 2; + int iterations = 100; + CountDownLatch doneLatch = new CountDownLatch(readThreads + writeThreads); + AtomicBoolean failed = new AtomicBoolean(false); + + directory.setListAction(() -> { + // Simulate some work + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + }); + + // Start read threads + for (int i = 0; i < readThreads; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < iterations; j++) { + directory.list(mock(Invocation.class)); + } + } catch (Exception e) { + e.printStackTrace(); + failed.set(true); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start write threads + for (int i = 0; i < writeThreads; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < iterations; j++) { + // Use setInvokers to trigger write lock + directory.setInvokers(new BitList<>(Collections.emptyList())); + Thread.sleep(2); + } + } catch (Exception e) { + e.printStackTrace(); + failed.set(true); + } finally { + doneLatch.countDown(); + } + }); + } + + Assertions.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "All operations should complete"); + Assertions.assertFalse(failed.get(), "No exceptions should occur during concurrent read/write"); + } + + // Helper class to expose protected methods and hook into list() + static class TestDirectory extends AbstractDirectory { + private Runnable listAction = () -> {}; + + public TestDirectory(URL url) { + super(url); + // Initialize with empty router chain to avoid NPE + setRouterChain(RouterChain.buildChain(Object.class, url)); + } + + public void setListAction(Runnable listAction) { + this.listAction = listAction; + } + + @Override + public Class getInterface() { + return Object.class; + } + + @Override + public List> getAllInvokers() { + return Collections.emptyList(); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + protected List> doList( + SingleRouterChain singleRouterChain, BitList> invokers, Invocation invocation) + throws RpcException { + listAction.run(); + return Collections.emptyList(); + } + + // Helper to simulate holding write lock + public void simulateWriteLock(CountDownLatch acquired, CountDownLatch release) { + // We use refreshInvoker to acquire write lock, but we need to inject our blocking logic + // Since we can't easily inject into refreshInvoker without complex mocking, + // we'll use a trick: override setInvokers logic? No, setInvokers uses lock internally. + // But we can use the fact that addRouters/etc might not use the same lock? No. + // We can't access the lock directly. + // However, we can use 'addInvalidateInvoker' or similar if we can hook into it. + + // Actually, we can use a method that holds the lock and calls something we can override? + // AbstractDirectory doesn't call many overridable methods inside the lock. + // refreshInvoker calls refreshInvokerInternal (private). + + // Wait, we can use reflection to get the lock and lock it manually for this test helper. + try { + java.lang.reflect.Field lockField = AbstractDirectory.class.getDeclaredField("invokerRefreshLock"); + lockField.setAccessible(true); + java.util.concurrent.locks.ReadWriteLock lock = + (java.util.concurrent.locks.ReadWriteLock) lockField.get(this); + + lock.writeLock().lock(); + try { + acquired.countDown(); + release.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + lock.writeLock().unlock(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Expose setInvokers for test + @Override + public void setInvokers(BitList> invokers) { + super.setInvokers(invokers); + } + } +} From af439eec9eb6e84107f9e6b286fdab736bbb0a62 Mon Sep 17 00:00:00 2001 From: wangwei Date: Wed, 7 Jan 2026 17:45:20 +0800 Subject: [PATCH 6/6] use fair lock --- .../apache/dubbo/rpc/cluster/directory/AbstractDirectory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index c16671cb6a47..ab0b44e3eb10 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -125,7 +125,7 @@ public abstract class AbstractDirectory implements Directory { private volatile ScheduledFuture connectivityCheckFuture; - private final ReentrantReadWriteLock invokerRefreshLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock invokerRefreshLock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock.ReadLock invokerRefreshReadLock = invokerRefreshLock.readLock(); private final ReentrantReadWriteLock.WriteLock invokerRefreshWriteLock = invokerRefreshLock.writeLock();