diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index be68810f4aee..ab0b44e3eb10 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -58,7 +58,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; @@ -125,7 +125,10 @@ public abstract class AbstractDirectory implements Directory { private volatile ScheduledFuture connectivityCheckFuture; - private final ReentrantLock invokerRefreshLock = new ReentrantLock(); + private final ReentrantReadWriteLock invokerRefreshLock = new ReentrantReadWriteLock(true); + + private final ReentrantReadWriteLock.ReadLock invokerRefreshReadLock = invokerRefreshLock.readLock(); + private final ReentrantReadWriteLock.WriteLock invokerRefreshWriteLock = invokerRefreshLock.writeLock(); /** * The max count of invokers for each reconnect task select to try to reconnect. @@ -208,27 +211,41 @@ public List> list(Invocation invocation) throws RpcException { BitList> availableInvokers; SingleRouterChain singleChain = null; try { + if (routerChain != null) { + routerChain.getLock().readLock().lock(); + } + boolean lockAcquired = false; try { - if (routerChain != null) { - routerChain.getLock().readLock().lock(); + if (!invokerRefreshReadLock.tryLock(LockUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)) { + throw new RpcException( + "Failed to acquire read lock on invokerRefreshLock within timeout. " + "Timeout: " + + LockUtils.DEFAULT_TIMEOUT + "ms, " + "Lock state: [readLockHeld=" + + invokerRefreshLock.getReadLockCount() + ", writeLockHeld=" + + invokerRefreshLock.isWriteLocked() + ", writeLockHeldByCurrentThread=" + + invokerRefreshLock.isWriteLockedByCurrentThread() + "], Service: " + + getConsumerUrl().getServiceKey()); } + lockAcquired = true; // use clone to avoid being modified at doList(). if (invokersInitialized) { availableInvokers = validInvokers.clone(); } else { availableInvokers = invokers.clone(); } - - if (routerChain != null) { - singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation); - singleChain.getLock().readLock().lock(); - } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RpcException( + "Interrupted while acquiring read lock for invoker access, cause: " + e.getMessage(), e); } finally { - if (routerChain != null) { - routerChain.getLock().readLock().unlock(); + if (lockAcquired) { + invokerRefreshReadLock.unlock(); } } + if (routerChain != null) { + singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation); + singleChain.getLock().readLock().lock(); + } List> routedResult = doList(singleChain, availableInvokers, invocation); if (routedResult.isEmpty()) { // 2-2 - No provider available. @@ -249,6 +266,9 @@ public List> list(Invocation invocation) throws RpcException { if (singleChain != null) { singleChain.getLock().readLock().unlock(); } + if (routerChain != null) { + routerChain.getLock().readLock().unlock(); + } } } @@ -298,7 +318,7 @@ public void discordAddresses() { @Override public void addInvalidateInvoker(Invoker invoker) { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { // 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time if (removeValidInvoker(invoker)) { // 2. add this invoker to reconnect list @@ -329,7 +349,7 @@ public void checkConnectivity() { // 1. pick invokers from invokersToReconnect // limit max reconnectTaskTryCount, prevent this task hang up all the connectivityExecutor // for long time - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (invokersToReconnect.size() < reconnectTaskTryCount) { invokersToTry.addAll(invokersToReconnect); } else { @@ -348,7 +368,7 @@ public void checkConnectivity() { // 2. try to check the invoker's status for (Invoker invoker : invokersToTry) { AtomicBoolean invokerExist = new AtomicBoolean(false); - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { invokerExist.set(invokers.contains(invoker)); }); // Should not lock here, `invoker.isAvailable` may need some time to check @@ -362,7 +382,7 @@ public void checkConnectivity() { } // 3. recover valid invoker - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { for (Invoker tInvoker : needDeleteList) { if (invokers.contains(tInvoker)) { addValidInvoker(tInvoker); @@ -388,7 +408,7 @@ public void checkConnectivity() { } // 4. submit new task if it has more to recover - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (!invokersToReconnect.isEmpty()) { checkConnectivity(); } @@ -411,7 +431,7 @@ public void checkConnectivity() { * 4. all the invokers disappeared from total invokers should be removed in the disabled invokers list */ public void refreshInvoker() { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (invokersInitialized) { refreshInvokerInternal(); } @@ -445,7 +465,7 @@ private void refreshInvokers(BitList> targetInvokers, Collection invoker) { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (invokers.contains(invoker)) { disabledInvokers.add(invoker); removeValidInvoker(invoker); @@ -458,7 +478,7 @@ public void addDisabledInvoker(Invoker invoker) { @Override public void recoverDisabledInvoker(Invoker invoker) { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { if (disabledInvokers.remove(invoker)) { try { addValidInvoker(invoker); @@ -526,7 +546,7 @@ public Set> getDisabledInvokers() { } protected void setInvokers(BitList> invokers) { - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { this.invokers = invokers; refreshInvokerInternal(); this.invokersInitialized = true; @@ -538,7 +558,7 @@ protected void setInvokers(BitList> invokers) { protected void destroyInvokers() { // set empty instead of clearing to support concurrent access. - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { this.invokers = BitList.emptyList(); this.validInvokers = BitList.emptyList(); this.invokersInitialized = false; @@ -547,7 +567,7 @@ protected void destroyInvokers() { private boolean addValidInvoker(Invoker invoker) { AtomicBoolean result = new AtomicBoolean(false); - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { result.set(this.validInvokers.add(invoker)); }); MetricsEventBus.publish( @@ -557,7 +577,7 @@ private boolean addValidInvoker(Invoker invoker) { private boolean removeValidInvoker(Invoker invoker) { AtomicBoolean result = new AtomicBoolean(false); - LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> { result.set(this.validInvokers.remove(invoker)); }); MetricsEventBus.publish( diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java new file mode 100644 index 000000000000..0840e1bddb56 --- /dev/null +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.directory; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.RouterChain; +import org.apache.dubbo.rpc.cluster.SingleRouterChain; +import org.apache.dubbo.rpc.cluster.router.state.BitList; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.mockito.Mockito.mock; + +class AbstractDirectoryConcurrencyTest { + + private TestDirectory directory; + private URL url; + private ExecutorService executor; + + @BeforeEach + void setUp() { + url = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880/com.foo.BarService"); + directory = new TestDirectory(url); + executor = Executors.newFixedThreadPool(10); + } + + @AfterEach + void tearDown() { + if (directory != null) { + directory.destroy(); + } + if (executor != null) { + executor.shutdownNow(); + } + } + + @Test + void testMultipleReadLocks() throws InterruptedException { + int threadCount = 5; + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + AtomicBoolean failed = new AtomicBoolean(false); + + // Setup the directory with a slow list implementation to simulate work holding the read lock + directory.setListAction(() -> { + try { + // Wait for the latch to ensure all threads are in doList + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + directory.list(mock(Invocation.class)); + } catch (Exception e) { + e.printStackTrace(); + failed.set(true); + } finally { + doneLatch.countDown(); + } + }); + } + + // Give threads time to start and acquire read lock + Thread.sleep(100); + // Release the latch, letting them proceed + latch.countDown(); + + Assertions.assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "All list calls should complete"); + Assertions.assertFalse(failed.get(), "No exceptions should occur during concurrent reads"); + } + + @Test + void testWriteBlocksRead() throws InterruptedException { + CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1); + CountDownLatch releaseWriteLockLatch = new CountDownLatch(1); + AtomicReference readBlocked = new AtomicReference<>(false); + + // Thread to hold write lock + executor.submit(() -> { + directory.simulateWriteLock(writeLockAcquiredLatch, releaseWriteLockLatch); + }); + + // Wait for write lock to be acquired + Assertions.assertTrue(writeLockAcquiredLatch.await(5, TimeUnit.SECONDS)); + + // Try to read in another thread + Future readFuture = executor.submit(() -> { + long start = System.currentTimeMillis(); + directory.list(mock(Invocation.class)); + long duration = System.currentTimeMillis() - start; + // If duration is > 100ms, we assume it was blocked + readBlocked.set(duration >= 100); + }); + + // Sleep to ensure read thread tries to acquire lock and blocks + Thread.sleep(200); + + // Release write lock + releaseWriteLockLatch.countDown(); + + try { + readFuture.get(5, TimeUnit.SECONDS); + } catch (Exception e) { + Assertions.fail("Read execution failed"); + } + + Assertions.assertTrue(readBlocked.get(), "Read operation should be blocked by write lock"); + } + + @Test + void testConcurrentReadAndWrite() throws InterruptedException { + int readThreads = 10; + int writeThreads = 2; + int iterations = 100; + CountDownLatch doneLatch = new CountDownLatch(readThreads + writeThreads); + AtomicBoolean failed = new AtomicBoolean(false); + + directory.setListAction(() -> { + // Simulate some work + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + }); + + // Start read threads + for (int i = 0; i < readThreads; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < iterations; j++) { + directory.list(mock(Invocation.class)); + } + } catch (Exception e) { + e.printStackTrace(); + failed.set(true); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start write threads + for (int i = 0; i < writeThreads; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < iterations; j++) { + // Use setInvokers to trigger write lock + directory.setInvokers(new BitList<>(Collections.emptyList())); + Thread.sleep(2); + } + } catch (Exception e) { + e.printStackTrace(); + failed.set(true); + } finally { + doneLatch.countDown(); + } + }); + } + + Assertions.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "All operations should complete"); + Assertions.assertFalse(failed.get(), "No exceptions should occur during concurrent read/write"); + } + + // Helper class to expose protected methods and hook into list() + static class TestDirectory extends AbstractDirectory { + private Runnable listAction = () -> {}; + + public TestDirectory(URL url) { + super(url); + // Initialize with empty router chain to avoid NPE + setRouterChain(RouterChain.buildChain(Object.class, url)); + } + + public void setListAction(Runnable listAction) { + this.listAction = listAction; + } + + @Override + public Class getInterface() { + return Object.class; + } + + @Override + public List> getAllInvokers() { + return Collections.emptyList(); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + protected List> doList( + SingleRouterChain singleRouterChain, BitList> invokers, Invocation invocation) + throws RpcException { + listAction.run(); + return Collections.emptyList(); + } + + // Helper to simulate holding write lock + public void simulateWriteLock(CountDownLatch acquired, CountDownLatch release) { + // We use refreshInvoker to acquire write lock, but we need to inject our blocking logic + // Since we can't easily inject into refreshInvoker without complex mocking, + // we'll use a trick: override setInvokers logic? No, setInvokers uses lock internally. + // But we can use the fact that addRouters/etc might not use the same lock? No. + // We can't access the lock directly. + // However, we can use 'addInvalidateInvoker' or similar if we can hook into it. + + // Actually, we can use a method that holds the lock and calls something we can override? + // AbstractDirectory doesn't call many overridable methods inside the lock. + // refreshInvoker calls refreshInvokerInternal (private). + + // Wait, we can use reflection to get the lock and lock it manually for this test helper. + try { + java.lang.reflect.Field lockField = AbstractDirectory.class.getDeclaredField("invokerRefreshLock"); + lockField.setAccessible(true); + java.util.concurrent.locks.ReadWriteLock lock = + (java.util.concurrent.locks.ReadWriteLock) lockField.get(this); + + lock.writeLock().lock(); + try { + acquired.countDown(); + release.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + lock.writeLock().unlock(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Expose setInvokers for test + @Override + public void setInvokers(BitList> invokers) { + super.setInvokers(invokers); + } + } +}