Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.api.core.InternalApi;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CacheUpdate;
import com.google.spanner.v1.CommitRequest;
Expand All @@ -32,14 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand All @@ -52,69 +44,25 @@
@InternalApi
public final class ChannelFinder {
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
private static final int CACHE_UPDATE_DRAIN_BATCH_SIZE = 64;
private static final int MAX_CACHE_UPDATE_THREADS =
Math.max(2, Runtime.getRuntime().availableProcessors());
private static final ExecutorService CACHE_UPDATE_POOL = createCacheUpdatePool();

private final Object updateLock = new Object();
private final AtomicLong databaseId = new AtomicLong();
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
private final KeyRangeCache rangeCache;
private final ConcurrentLinkedQueue<PendingCacheUpdate> pendingUpdates =
new ConcurrentLinkedQueue<>();
private final AtomicBoolean drainScheduled = new AtomicBoolean();
private volatile java.util.concurrent.CountDownLatch drainingLatch =
new java.util.concurrent.CountDownLatch(0);
@Nullable private final EndpointLifecycleManager lifecycleManager;
@Nullable private final String finderKey;

public ChannelFinder(ChannelEndpointCache endpointCache) {
this(endpointCache, null, null);
this(endpointCache, null);
}

public ChannelFinder(
ChannelEndpointCache endpointCache, @Nullable EndpointLifecycleManager lifecycleManager) {
this(endpointCache, lifecycleManager, null);
}

ChannelFinder(
ChannelEndpointCache endpointCache,
@Nullable EndpointLifecycleManager lifecycleManager,
@Nullable String finderKey) {
this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager);
this.lifecycleManager = lifecycleManager;
this.finderKey = finderKey;
}

void useDeterministicRandom() {
rangeCache.useDeterministicRandom();
}

private static ExecutorService createCacheUpdatePool() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
MAX_CACHE_UPDATE_THREADS,
MAX_CACHE_UPDATE_THREADS,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("spanner-cache-update-%d")
.build());
executor.allowCoreThreadTimeOut(true);
return executor;
}

private static final class PendingCacheUpdate {
private final CacheUpdate update;

private PendingCacheUpdate(CacheUpdate update) {
this.update = update;
}
}

private boolean isMaterialUpdate(CacheUpdate update) {
return update.getGroupCount() > 0
|| update.getRangeCount() > 0
Expand All @@ -130,65 +78,16 @@ private boolean shouldProcessUpdate(CacheUpdate update) {
}

public void update(CacheUpdate update) {
Set<String> currentAddresses;
synchronized (updateLock) {
applyUpdateLocked(update);
currentAddresses = snapshotActiveAddressesLocked();
}
publishLifecycleUpdate(currentAddresses);
}

public void updateAsync(CacheUpdate update) {
if (!shouldProcessUpdate(update)) {
return;
}
pendingUpdates.add(new PendingCacheUpdate(update));
if (drainScheduled.compareAndSet(false, true)) {
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
drainingLatch = latch;
CACHE_UPDATE_POOL.execute(
() -> {
try {
drainPendingUpdate();
} finally {
latch.countDown();
}
});
}
}

private void drainPendingUpdate() {
List<PendingCacheUpdate> batch = new ArrayList<>(CACHE_UPDATE_DRAIN_BATCH_SIZE);
while (true) {
drainBatch(batch);
if (!batch.isEmpty()) {
applyBatch(batch);
batch.clear();
}
drainScheduled.set(false);
if (pendingUpdates.isEmpty() || !drainScheduled.compareAndSet(false, true)) {
return;
}
}
}

private void drainBatch(List<PendingCacheUpdate> batch) {
PendingCacheUpdate toApply;
while (batch.size() < CACHE_UPDATE_DRAIN_BATCH_SIZE
&& (toApply = pendingUpdates.poll()) != null) {
batch.add(toApply);
}
}

private void applyBatch(List<PendingCacheUpdate> batch) {
Set<String> currentAddresses;
synchronized (updateLock) {
for (PendingCacheUpdate pendingUpdate : batch) {
applyUpdateLocked(pendingUpdate.update);
}
currentAddresses = snapshotActiveAddressesLocked();
}
publishLifecycleUpdate(currentAddresses);
update(update);
}

private void applyUpdateLocked(CacheUpdate update) {
Expand All @@ -207,42 +106,11 @@ private void applyUpdateLocked(CacheUpdate update) {
rangeCache.addRanges(update);
}

@Nullable
private Set<String> snapshotActiveAddressesLocked() {
if (lifecycleManager == null || finderKey == null) {
return null;
}
return rangeCache.getActiveAddresses();
}

private void publishLifecycleUpdate(@Nullable Set<String> currentAddresses) {
if (currentAddresses == null) {
return;
}
lifecycleManager.updateActiveAddressesAsync(finderKey, currentAddresses);
}

/**
* Test-only hook used by {@link KeyAwareChannel#awaitPendingCacheUpdates()} to wait until the
* async cache update worker has finished applying the latest pending update.
* Test-only hook retained for compatibility with callers that previously waited on async work.
*/
@VisibleForTesting
void awaitPendingUpdates() throws InterruptedException {
long deadline = System.nanoTime() + java.util.concurrent.TimeUnit.SECONDS.toNanos(5);
while (System.nanoTime() < deadline) {
java.util.concurrent.CountDownLatch latch = drainingLatch;
if (latch != null) {
long remainingNanos = deadline - System.nanoTime();
if (remainingNanos <= 0) {
break;
}
latch.await(remainingNanos, java.util.concurrent.TimeUnit.NANOSECONDS);
}
if (pendingUpdates.isEmpty() && !drainScheduled.get()) {
return;
}
}
}
void awaitPendingUpdates() {}

public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
Expand Down
Loading
Loading