Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
Expand Down Expand Up @@ -125,7 +125,10 @@ public abstract class AbstractDirectory<T> implements Directory<T> {

private volatile ScheduledFuture<?> connectivityCheckFuture;

private final ReentrantLock invokerRefreshLock = new ReentrantLock();
private final ReentrantReadWriteLock invokerRefreshLock = new ReentrantReadWriteLock(true);

private final ReentrantReadWriteLock.ReadLock invokerRefreshReadLock = invokerRefreshLock.readLock();
private final ReentrantReadWriteLock.WriteLock invokerRefreshWriteLock = invokerRefreshLock.writeLock();

/**
* The max count of invokers for each reconnect task select to try to reconnect.
Expand Down Expand Up @@ -208,27 +211,41 @@ public List<Invoker<T>> list(Invocation invocation) throws RpcException {
BitList<Invoker<T>> availableInvokers;
SingleRouterChain<T> singleChain = null;
try {
if (routerChain != null) {
routerChain.getLock().readLock().lock();
}
boolean lockAcquired = false;
try {
if (routerChain != null) {
routerChain.getLock().readLock().lock();
if (!invokerRefreshReadLock.tryLock(LockUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)) {
throw new RpcException(
"Failed to acquire read lock on invokerRefreshLock within timeout. " + "Timeout: "
+ LockUtils.DEFAULT_TIMEOUT + "ms, " + "Lock state: [readLockHeld="
+ invokerRefreshLock.getReadLockCount() + ", writeLockHeld="
+ invokerRefreshLock.isWriteLocked() + ", writeLockHeldByCurrentThread="
+ invokerRefreshLock.isWriteLockedByCurrentThread() + "], Service: "
+ getConsumerUrl().getServiceKey());
}
lockAcquired = true;
// use clone to avoid being modified at doList().
if (invokersInitialized) {
availableInvokers = validInvokers.clone();
} else {
availableInvokers = invokers.clone();
}

if (routerChain != null) {
singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation);
singleChain.getLock().readLock().lock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RpcException(
"Interrupted while acquiring read lock for invoker access, cause: " + e.getMessage(), e);
} finally {
if (routerChain != null) {
routerChain.getLock().readLock().unlock();
if (lockAcquired) {
invokerRefreshReadLock.unlock();
}
}

if (routerChain != null) {
singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation);
singleChain.getLock().readLock().lock();
}
List<Invoker<T>> routedResult = doList(singleChain, availableInvokers, invocation);
if (routedResult.isEmpty()) {
// 2-2 - No provider available.
Expand All @@ -249,6 +266,9 @@ public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (singleChain != null) {
singleChain.getLock().readLock().unlock();
}
if (routerChain != null) {
routerChain.getLock().readLock().unlock();
}
}
}

Expand Down Expand Up @@ -298,7 +318,7 @@ public void discordAddresses() {

@Override
public void addInvalidateInvoker(Invoker<T> invoker) {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
// 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time
if (removeValidInvoker(invoker)) {
// 2. add this invoker to reconnect list
Expand Down Expand Up @@ -329,7 +349,7 @@ public void checkConnectivity() {
// 1. pick invokers from invokersToReconnect
// limit max reconnectTaskTryCount, prevent this task hang up all the connectivityExecutor
// for long time
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (invokersToReconnect.size() < reconnectTaskTryCount) {
invokersToTry.addAll(invokersToReconnect);
} else {
Expand All @@ -348,7 +368,7 @@ public void checkConnectivity() {
// 2. try to check the invoker's status
for (Invoker<T> invoker : invokersToTry) {
AtomicBoolean invokerExist = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
invokerExist.set(invokers.contains(invoker));
});
// Should not lock here, `invoker.isAvailable` may need some time to check
Expand All @@ -362,7 +382,7 @@ public void checkConnectivity() {
}

// 3. recover valid invoker
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
for (Invoker<T> tInvoker : needDeleteList) {
if (invokers.contains(tInvoker)) {
addValidInvoker(tInvoker);
Expand All @@ -388,7 +408,7 @@ public void checkConnectivity() {
}

// 4. submit new task if it has more to recover
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (!invokersToReconnect.isEmpty()) {
checkConnectivity();
}
Expand All @@ -411,7 +431,7 @@ public void checkConnectivity() {
* 4. all the invokers disappeared from total invokers should be removed in the disabled invokers list
*/
public void refreshInvoker() {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (invokersInitialized) {
refreshInvokerInternal();
}
Expand Down Expand Up @@ -445,7 +465,7 @@ private void refreshInvokers(BitList<Invoker<T>> targetInvokers, Collection<Invo

@Override
public void addDisabledInvoker(Invoker<T> invoker) {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (invokers.contains(invoker)) {
disabledInvokers.add(invoker);
removeValidInvoker(invoker);
Expand All @@ -458,7 +478,7 @@ public void addDisabledInvoker(Invoker<T> invoker) {

@Override
public void recoverDisabledInvoker(Invoker<T> invoker) {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
if (disabledInvokers.remove(invoker)) {
try {
addValidInvoker(invoker);
Expand Down Expand Up @@ -526,7 +546,7 @@ public Set<Invoker<T>> getDisabledInvokers() {
}

protected void setInvokers(BitList<Invoker<T>> invokers) {
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
this.invokers = invokers;
refreshInvokerInternal();
this.invokersInitialized = true;
Expand All @@ -538,7 +558,7 @@ protected void setInvokers(BitList<Invoker<T>> invokers) {

protected void destroyInvokers() {
// set empty instead of clearing to support concurrent access.
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
this.invokers = BitList.emptyList();
this.validInvokers = BitList.emptyList();
this.invokersInitialized = false;
Expand All @@ -547,7 +567,7 @@ protected void destroyInvokers() {

private boolean addValidInvoker(Invoker<T> invoker) {
AtomicBoolean result = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
result.set(this.validInvokers.add(invoker));
});
MetricsEventBus.publish(
Expand All @@ -557,7 +577,7 @@ private boolean addValidInvoker(Invoker<T> invoker) {

private boolean removeValidInvoker(Invoker<T> invoker) {
AtomicBoolean result = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> {
LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, () -> {
result.set(this.validInvokers.remove(invoker));
});
MetricsEventBus.publish(
Expand Down
Loading
Loading