diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java
index cd6b386dc8a7..fc82c530fc6f 100644
--- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java
+++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java
@@ -41,19 +41,27 @@ public interface ChannelEndpoint {
String getAddress();
/**
- * Returns whether this server is ready to accept RPCs.
+ * Returns whether this server's channel is in {@code READY} state and can accept location-aware
+ * RPCs.
*
- *
A server is considered unhealthy if:
+ *
Only endpoints in {@code READY} state are eligible for location-aware routing. Endpoints in
+ * {@code IDLE}, {@code CONNECTING}, {@code TRANSIENT_FAILURE}, or {@code SHUTDOWN} are not
+ * considered healthy for location-aware routing purposes.
*
- *
- * - The underlying channel is shutdown or terminated
- *
- The channel is in a transient failure state
- *
- *
- * @return true if the server is healthy and ready to accept RPCs
+ * @return true if the channel is in READY state
*/
boolean isHealthy();
+ /**
+ * Returns whether this server's channel is in {@code TRANSIENT_FAILURE} state.
+ *
+ * When an endpoint is in transient failure, it should be reported as a skipped tablet in
+ * routing hints so the server can refresh the client cache.
+ *
+ * @return true if the channel is in TRANSIENT_FAILURE state
+ */
+ boolean isTransientFailure();
+
/**
* Returns the gRPC channel for making RPCs to this server.
*
diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java
index 879ed546f2c2..db2af4902f84 100644
--- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java
+++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java
@@ -54,6 +54,19 @@ public interface ChannelEndpointCache {
*/
ChannelEndpoint get(String address);
+ /**
+ * Returns a cached channel for the given address without creating it.
+ *
+ *
Unlike {@link #get(String)}, this method does not create a new endpoint if one does not
+ * already exist in the cache. This is used by location-aware routing to avoid foreground endpoint
+ * creation on the request path.
+ *
+ * @param address the server address in "host:port" format
+ * @return the cached channel instance, or null if no endpoint exists for this address
+ */
+ @javax.annotation.Nullable
+ ChannelEndpoint getIfPresent(String address);
+
/**
* Evicts a server connection from the cache and gracefully shuts down its channel.
*
diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java
index 4fd95b305b5c..2f229dddee44 100644
--- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java
+++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java
@@ -22,17 +22,22 @@
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
+import com.google.spanner.v1.Group;
import com.google.spanner.v1.Mutation;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RoutingHint;
+import com.google.spanner.v1.Tablet;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+import javax.annotation.Nullable;
/**
* Finds a server for a request using location-aware routing metadata.
@@ -47,9 +52,25 @@ public final class ChannelFinder {
private final AtomicLong databaseId = new AtomicLong();
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
private final KeyRangeCache rangeCache;
+ @Nullable private final EndpointLifecycleManager lifecycleManager;
+ @Nullable private final String finderKey;
public ChannelFinder(ChannelEndpointCache endpointCache) {
- this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache));
+ this(endpointCache, null, null);
+ }
+
+ public ChannelFinder(
+ ChannelEndpointCache endpointCache, @Nullable EndpointLifecycleManager lifecycleManager) {
+ this(endpointCache, lifecycleManager, null);
+ }
+
+ ChannelFinder(
+ ChannelEndpointCache endpointCache,
+ @Nullable EndpointLifecycleManager lifecycleManager,
+ @Nullable String finderKey) {
+ this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager);
+ this.lifecycleManager = lifecycleManager;
+ this.finderKey = finderKey;
}
void useDeterministicRandom() {
@@ -70,6 +91,24 @@ public void update(CacheUpdate update) {
recipeCache.addRecipes(update.getKeyRecipes());
}
rangeCache.addRanges(update);
+
+ // Notify the lifecycle manager about server addresses so it can create endpoints
+ // in the background and start probing, and evict stale endpoints atomically.
+ if (lifecycleManager != null && finderKey != null) {
+ Set currentAddresses = new HashSet<>();
+ for (Group group : update.getGroupList()) {
+ for (Tablet tablet : group.getTabletsList()) {
+ String addr = tablet.getServerAddress();
+ if (!addr.isEmpty()) {
+ currentAddresses.add(addr);
+ }
+ }
+ }
+ // Also include addresses from existing cached tablets not in this update.
+ currentAddresses.addAll(rangeCache.getActiveAddresses());
+ // Atomically ensure endpoints exist and evict stale ones.
+ lifecycleManager.updateActiveAddresses(finderKey, currentAddresses);
+ }
}
}
diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java
new file mode 100644
index 000000000000..0cc593c7a86b
--- /dev/null
+++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java
@@ -0,0 +1,572 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner.spi.v1;
+
+import com.google.api.core.InternalApi;
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the lifecycle of location-aware routing endpoints including background probing, traffic
+ * tracking, and idle eviction.
+ *
+ * This manager is the only component that proactively creates routed replica endpoints. It:
+ *
+ *
+ * - Creates endpoints in the background when new server addresses appear in cache updates.
+ *
- Periodically checks channel state and uses {@code getState(true)} to warm up IDLE channels
+ * without sending application RPCs.
+ *
- Tracks real traffic per endpoint.
+ *
- Evicts endpoints that have had no real traffic for the configured idle duration, or that
+ * are in TRANSIENT_FAILURE state.
+ *
- Recreates and reprobes endpoints when they are needed again after eviction.
+ *
+ */
+@InternalApi
+class EndpointLifecycleManager {
+
+ private static final Logger logger = Logger.getLogger(EndpointLifecycleManager.class.getName());
+
+ /** Default probe interval: 60 seconds. Keeps channels from drifting into IDLE. */
+ @VisibleForTesting static final long DEFAULT_PROBE_INTERVAL_SECONDS = 60;
+
+ /** Default idle eviction threshold: 30 minutes without real traffic. */
+ @VisibleForTesting static final Duration DEFAULT_IDLE_EVICTION_DURATION = Duration.ofMinutes(30);
+
+ /** Interval for checking idle eviction: every 5 minutes. */
+ private static final long EVICTION_CHECK_INTERVAL_SECONDS = 300;
+
+ /**
+ * Maximum consecutive TRANSIENT_FAILURE probes before evicting an endpoint. Gives the channel
+ * time to recover from transient network issues before we tear it down and recreate.
+ */
+ private static final int MAX_TRANSIENT_FAILURE_COUNT = 3;
+
+ /** Per-endpoint lifecycle state. */
+ static final class EndpointState {
+ final String address;
+ volatile Instant lastProbeAt;
+ volatile Instant lastRealTrafficAt;
+ volatile Instant lastReadyAt;
+ volatile int consecutiveTransientFailures;
+
+ // Guarded by synchronizing on this EndpointState instance.
+ ScheduledFuture> probeFuture;
+
+ EndpointState(String address, Instant now) {
+ this.address = address;
+ this.lastRealTrafficAt = now;
+ this.lastProbeAt = null;
+ this.lastReadyAt = null;
+ this.consecutiveTransientFailures = 0;
+ }
+ }
+
+ private final ChannelEndpointCache endpointCache;
+ private final Map endpoints = new ConcurrentHashMap<>();
+
+ /**
+ * Active addresses reported by each ChannelFinder, keyed by database id.
+ *
+ * ChannelFinder instances are held via SoftReference in KeyAwareChannel, so this map uses a
+ * stable database-id key instead of a strong ChannelFinder reference. KeyAwareChannel unregisters
+ * stale entries when a finder is cleared.
+ *
+ *
All reads and writes to this map, and stale-endpoint eviction based on it, are synchronized
+ * on {@link #activeAddressLock}.
+ */
+ private final Map> activeAddressesPerFinder = new ConcurrentHashMap<>();
+
+ private final Object activeAddressLock = new Object();
+
+ private final ScheduledExecutorService scheduler;
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final long probeIntervalSeconds;
+ private final Duration idleEvictionDuration;
+ private final Clock clock;
+ private final String defaultEndpointAddress;
+
+ private ScheduledFuture> evictionFuture;
+
+ EndpointLifecycleManager(ChannelEndpointCache endpointCache) {
+ this(
+ endpointCache,
+ DEFAULT_PROBE_INTERVAL_SECONDS,
+ DEFAULT_IDLE_EVICTION_DURATION,
+ Clock.systemUTC());
+ }
+
+ @VisibleForTesting
+ EndpointLifecycleManager(
+ ChannelEndpointCache endpointCache,
+ long probeIntervalSeconds,
+ Duration idleEvictionDuration,
+ Clock clock) {
+ this.endpointCache = endpointCache;
+ this.probeIntervalSeconds = probeIntervalSeconds;
+ this.idleEvictionDuration = idleEvictionDuration;
+ this.clock = clock;
+ this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress();
+ this.scheduler =
+ Executors.newScheduledThreadPool(
+ 2,
+ r -> {
+ Thread t = new Thread(r, "spanner-endpoint-lifecycle");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Start periodic eviction checks.
+ this.evictionFuture =
+ scheduler.scheduleAtFixedRate(
+ this::checkIdleEviction,
+ EVICTION_CHECK_INTERVAL_SECONDS,
+ EVICTION_CHECK_INTERVAL_SECONDS,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * Ensures an endpoint state exists for the given address.
+ *
+ * This is only called from {@link #updateActiveAddresses} under {@link #activeAddressLock} to
+ * guarantee that newly created endpoints are registered as active before any stale-eviction check
+ * can see them. Background creation tasks are scheduled by the caller after {@code
+ * computeIfAbsent} returns, so the entry is visible in the map before the scheduler thread checks
+ * it.
+ *
+ * @return true if a new endpoint state was created (caller should schedule background creation)
+ */
+ private boolean ensureEndpointExists(String address) {
+ if (isShutdown.get() || address == null || address.isEmpty()) {
+ return false;
+ }
+ // Don't manage the default endpoint.
+ if (defaultEndpointAddress.equals(address)) {
+ return false;
+ }
+
+ boolean[] created = {false};
+ endpoints.computeIfAbsent(
+ address,
+ addr -> {
+ logger.log(Level.FINE, "Creating endpoint state for address: {0}", addr);
+ created[0] = true;
+ return new EndpointState(addr, clock.instant());
+ });
+ return created[0];
+ }
+
+ /**
+ * Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle
+ * eviction timer for this endpoint.
+ */
+ void recordRealTraffic(String address) {
+ if (address == null || defaultEndpointAddress.equals(address)) {
+ return;
+ }
+ EndpointState state = endpoints.get(address);
+ if (state != null) {
+ state.lastRealTrafficAt = clock.instant();
+ }
+ }
+
+ /**
+ * Atomically ensures endpoints exist for all active addresses and evicts any managed endpoints
+ * that are no longer referenced by any finder. This handles the case where a tablet's server
+ * address changes (e.g. from server1:15000 to server2:15000) — the old endpoint is shut down
+ * promptly instead of lingering until idle eviction.
+ *
+ *
Both endpoint creation and stale-eviction are performed under the same lock to prevent a
+ * race condition where a newly created endpoint could be evicted by a concurrent call from
+ * another finder before it is registered as active.
+ *
+ * @param finderKey stable identifier of the ChannelFinder reporting its active addresses
+ * @param activeAddresses server addresses currently referenced by tablets in this finder
+ */
+ void updateActiveAddresses(String finderKey, Set activeAddresses) {
+ if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) {
+ return;
+ }
+ List newlyCreated = new ArrayList<>();
+ synchronized (activeAddressLock) {
+ // Ensure endpoints exist for all active addresses while holding the lock.
+ // This guarantees the addresses are in the endpoints map before we compute stale entries.
+ for (String address : activeAddresses) {
+ if (ensureEndpointExists(address)) {
+ newlyCreated.add(address);
+ }
+ }
+
+ activeAddressesPerFinder.put(finderKey, activeAddresses);
+
+ // Compute the union of all active addresses across all finders.
+ Set allActive = new HashSet<>();
+ for (Set addresses : activeAddressesPerFinder.values()) {
+ allActive.addAll(addresses);
+ }
+
+ // Evict managed endpoints not referenced by any finder.
+ List stale = new ArrayList<>();
+ for (String address : endpoints.keySet()) {
+ if (!allActive.contains(address)) {
+ stale.add(address);
+ }
+ }
+
+ for (String address : stale) {
+ logger.log(
+ Level.FINE, "Evicting stale endpoint {0}: no longer referenced by any tablet", address);
+ evictEndpoint(address);
+ }
+ }
+
+ // Schedule background creation tasks AFTER computeIfAbsent has returned and the entries
+ // are visible to other threads. Submitting from inside computeIfAbsent creates a race
+ // where the scheduler thread can run before the entry is published in the map.
+ for (String address : newlyCreated) {
+ scheduler.submit(() -> createAndStartProbing(address));
+ }
+ }
+
+ /**
+ * Unregisters a finder and evicts any managed endpoints that are no longer referenced by the
+ * remaining finders.
+ */
+ void unregisterFinder(String finderKey) {
+ if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) {
+ return;
+ }
+ synchronized (activeAddressLock) {
+ if (activeAddressesPerFinder.remove(finderKey) == null) {
+ return;
+ }
+
+ Set allActive = new HashSet<>();
+ for (Set addresses : activeAddressesPerFinder.values()) {
+ allActive.addAll(addresses);
+ }
+
+ List stale = new ArrayList<>();
+ for (String address : endpoints.keySet()) {
+ if (!allActive.contains(address)) {
+ stale.add(address);
+ }
+ }
+
+ for (String address : stale) {
+ logger.log(
+ Level.FINE,
+ "Evicting stale endpoint {0}: finder {1} was unregistered",
+ new Object[] {address, finderKey});
+ evictEndpoint(address);
+ }
+ }
+ }
+
+ /** Creates an endpoint and starts probing. Runs on the scheduler thread. */
+ private void createAndStartProbing(String address) {
+ if (isShutdown.get() || !endpoints.containsKey(address)) {
+ return;
+ }
+ try {
+ endpointCache.get(address);
+ logger.log(Level.FINE, "Background endpoint creation completed for: {0}", address);
+
+ // If the endpoint was evicted between the containsKey check above and channel creation,
+ // the channel would leak in the endpoint cache without lifecycle tracking. Clean it up.
+ if (!endpoints.containsKey(address)) {
+ logger.log(
+ Level.FINE,
+ "Endpoint {0} was evicted during channel creation, cleaning up leaked channel",
+ address);
+ endpointCache.evict(address);
+ return;
+ }
+
+ startProbing(address);
+ } catch (Exception e) {
+ logger.log(
+ Level.FINE, "Failed to create endpoint for address: " + address + ", will retry", e);
+ // Schedule a retry after one probe interval, but only if still managed.
+ if (!isShutdown.get() && endpoints.containsKey(address)) {
+ scheduler.schedule(
+ () -> createAndStartProbing(address), probeIntervalSeconds, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ /** Starts periodic probing for an endpoint. */
+ private void startProbing(String address) {
+ EndpointState state = endpoints.get(address);
+ if (state == null || isShutdown.get()) {
+ return;
+ }
+
+ synchronized (state) {
+ // Re-check after acquiring lock — state may have been evicted concurrently.
+ if (!endpoints.containsKey(address)) {
+ return;
+ }
+
+ // Cancel any existing probe schedule.
+ if (state.probeFuture != null) {
+ state.probeFuture.cancel(false);
+ }
+
+ state.probeFuture =
+ scheduler.scheduleAtFixedRate(
+ () -> probe(address), 0, probeIntervalSeconds, TimeUnit.SECONDS);
+ }
+ logger.log(
+ Level.FINE,
+ "Prober started for endpoint {0} with interval {1}s",
+ new Object[] {address, probeIntervalSeconds});
+ }
+
+ /** Stops probing for an endpoint. */
+ private void stopProbing(String address) {
+ EndpointState state = endpoints.get(address);
+ if (state == null) {
+ return;
+ }
+ synchronized (state) {
+ if (state.probeFuture != null) {
+ state.probeFuture.cancel(false);
+ state.probeFuture = null;
+ logger.log(Level.FINE, "Prober stopped for endpoint: {0}", address);
+ }
+ }
+ }
+
+ /**
+ * Probes the endpoint by checking channel connectivity state and warming up IDLE channels.
+ *
+ * Uses {@code getState(true)} to request a connection attempt on IDLE channels instead of
+ * sending a GetSession RPC. This is lighter weight and avoids routing application-level RPCs
+ * through the endpoint's channel pool.
+ *
+ *
If the channel is in TRANSIENT_FAILURE, increments a consecutive failure counter. After
+ * {@link #MAX_TRANSIENT_FAILURE_COUNT} consecutive failures, the endpoint is evicted and shut
+ * down so it can be recreated fresh when needed again.
+ *
+ *
All exceptions are caught to prevent {@link ScheduledExecutorService} from cancelling future
+ * runs of this task.
+ */
+ private void probe(String address) {
+ try {
+ if (isShutdown.get()) {
+ return;
+ }
+
+ ChannelEndpoint endpoint = endpointCache.getIfPresent(address);
+ if (endpoint == null) {
+ logger.log(Level.FINE, "Probe skipped for {0}: endpoint not in cache", address);
+ return;
+ }
+
+ EndpointState state = endpoints.get(address);
+ if (state == null) {
+ return;
+ }
+
+ ManagedChannel channel = endpoint.getChannel();
+ state.lastProbeAt = clock.instant();
+
+ // getState(false) reads current state without triggering a connection.
+ ConnectivityState channelState = channel.getState(false);
+ logger.log(
+ Level.FINE, "Probe for {0}: channel state is {1}", new Object[] {address, channelState});
+
+ switch (channelState) {
+ case READY:
+ state.lastReadyAt = clock.instant();
+ state.consecutiveTransientFailures = 0;
+ break;
+
+ case IDLE:
+ // Warm up the channel by requesting a connection attempt.
+ logger.log(
+ Level.FINE, "Probe for {0}: channel IDLE, requesting connection (warmup)", address);
+ channel.getState(true);
+ state.consecutiveTransientFailures = 0;
+ break;
+
+ case CONNECTING:
+ state.consecutiveTransientFailures = 0;
+ break;
+
+ case TRANSIENT_FAILURE:
+ state.consecutiveTransientFailures++;
+ logger.log(
+ Level.FINE,
+ "Probe for {0}: channel in TRANSIENT_FAILURE ({1}/{2})",
+ new Object[] {
+ address, state.consecutiveTransientFailures, MAX_TRANSIENT_FAILURE_COUNT
+ });
+ if (state.consecutiveTransientFailures >= MAX_TRANSIENT_FAILURE_COUNT) {
+ logger.log(
+ Level.FINE,
+ "Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes",
+ new Object[] {address, state.consecutiveTransientFailures});
+ evictEndpoint(address);
+ }
+ break;
+
+ case SHUTDOWN:
+ logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address);
+ evictEndpoint(address);
+ break;
+
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ logger.log(Level.FINE, "Probe failed for endpoint " + address, e);
+ }
+ }
+
+ /** Checks all managed endpoints for idle eviction. */
+ @VisibleForTesting
+ void checkIdleEviction() {
+ if (isShutdown.get()) {
+ return;
+ }
+
+ Instant now = clock.instant();
+ List toEvict = new ArrayList<>();
+
+ for (Map.Entry entry : endpoints.entrySet()) {
+ String address = entry.getKey();
+ EndpointState state = entry.getValue();
+
+ // Never evict the default endpoint.
+ if (defaultEndpointAddress.equals(address)) {
+ continue;
+ }
+
+ Duration sinceLastRealTraffic = Duration.between(state.lastRealTrafficAt, now);
+ if (sinceLastRealTraffic.compareTo(idleEvictionDuration) > 0) {
+ toEvict.add(address);
+ }
+ }
+
+ for (String address : toEvict) {
+ evictEndpoint(address);
+ }
+ }
+
+ /** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */
+ private void evictEndpoint(String address) {
+ logger.log(Level.FINE, "Evicting endpoint {0}", address);
+
+ stopProbing(address);
+ endpoints.remove(address);
+ endpointCache.evict(address);
+ }
+
+ /**
+ * Requests that an evicted endpoint be recreated. The endpoint is created in the background and
+ * probing starts immediately. The endpoint will only become eligible for location-aware routing
+ * once it reaches READY state.
+ */
+ void requestEndpointRecreation(String address) {
+ if (isShutdown.get() || address == null || address.isEmpty()) {
+ return;
+ }
+ if (defaultEndpointAddress.equals(address)) {
+ return;
+ }
+
+ // Only recreate if not already managed.
+ if (endpoints.containsKey(address)) {
+ return;
+ }
+
+ logger.log(Level.FINE, "Recreating previously evicted endpoint for address: {0}", address);
+ EndpointState state = new EndpointState(address, clock.instant());
+ if (endpoints.putIfAbsent(address, state) == null) {
+ // Schedule after putIfAbsent returns so the entry is visible to the scheduler thread.
+ scheduler.submit(() -> createAndStartProbing(address));
+ }
+ }
+
+ /** Returns whether an endpoint is being actively managed. */
+ boolean isManaged(String address) {
+ return endpoints.containsKey(address);
+ }
+
+ /** Returns the endpoint state for testing. */
+ @VisibleForTesting
+ EndpointState getEndpointState(String address) {
+ return endpoints.get(address);
+ }
+
+ /** Returns the number of managed endpoints. */
+ @VisibleForTesting
+ int managedEndpointCount() {
+ return endpoints.size();
+ }
+
+ /** Shuts down the lifecycle manager and all probing. */
+ void shutdown() {
+ if (!isShutdown.compareAndSet(false, true)) {
+ return;
+ }
+
+ logger.log(Level.FINE, "Shutting down endpoint lifecycle manager");
+
+ if (evictionFuture != null) {
+ evictionFuture.cancel(false);
+ }
+
+ for (EndpointState state : endpoints.values()) {
+ synchronized (state) {
+ if (state.probeFuture != null) {
+ state.probeFuture.cancel(false);
+ }
+ }
+ }
+ endpoints.clear();
+
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java
index 3ee4d789592e..98e7f83b094f 100644
--- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java
+++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java
@@ -17,10 +17,8 @@
package com.google.cloud.spanner.spi.v1;
import com.google.api.core.InternalApi;
-import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
-import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -33,6 +31,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
/**
* gRPC implementation of {@link ChannelEndpointCache}.
@@ -44,6 +45,8 @@
@InternalApi
class GrpcChannelEndpointCache implements ChannelEndpointCache {
+ private static final Logger logger = Logger.getLogger(GrpcChannelEndpointCache.class.getName());
+
/** Timeout for graceful channel shutdown. */
private static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
@@ -87,16 +90,27 @@ public ChannelEndpoint get(String address) {
try {
// Create a new provider with the same config but different endpoint.
// This is thread-safe as withEndpoint() returns a new provider instance.
- TransportChannelProvider newProvider = createProviderWithAuthorityOverride(addr);
- return new GrpcChannelEndpoint(addr, newProvider);
+ InstantiatingGrpcChannelProvider newProvider =
+ createProviderWithAuthorityOverride(addr);
+ GrpcChannelEndpoint endpoint = new GrpcChannelEndpoint(addr, newProvider);
+ logger.log(Level.FINE, "Location-aware endpoint created for address: {0}", addr);
+ return endpoint;
} catch (IOException e) {
+ logger.log(
+ Level.FINE, "Failed to create location-aware endpoint for address: " + addr, e);
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e);
}
});
}
- private TransportChannelProvider createProviderWithAuthorityOverride(String address) {
+ @Override
+ @Nullable
+ public ChannelEndpoint getIfPresent(String address) {
+ return servers.get(address);
+ }
+
+ private InstantiatingGrpcChannelProvider createProviderWithAuthorityOverride(String address) {
InstantiatingGrpcChannelProvider endpointProvider =
(InstantiatingGrpcChannelProvider) baseProvider.withEndpoint(address);
if (Objects.equals(defaultAuthority, address)) {
@@ -176,15 +190,19 @@ static class GrpcChannelEndpoint implements ChannelEndpoint {
* @param provider the channel provider (must be a gRPC provider)
* @throws IOException if the channel cannot be created
*/
- GrpcChannelEndpoint(String address, TransportChannelProvider provider) throws IOException {
+ GrpcChannelEndpoint(String address, InstantiatingGrpcChannelProvider provider)
+ throws IOException {
this.address = address;
- TransportChannelProvider readyProvider = provider;
+ // Build a raw ManagedChannel directly instead of going through getTransportChannel(),
+ // which wraps the channel in a ChannelPool that does not support getState().
+ // Location-aware routing needs getState() to check channel connectivity.
+ InstantiatingGrpcChannelProvider readyProvider = provider;
if (provider.needsHeaders()) {
- readyProvider = provider.withHeaders(java.util.Collections.emptyMap());
+ readyProvider =
+ (InstantiatingGrpcChannelProvider)
+ provider.withHeaders(java.util.Collections.emptyMap());
}
- GrpcTransportChannel transportChannel =
- (GrpcTransportChannel) readyProvider.getTransportChannel();
- this.channel = (ManagedChannel) transportChannel.getChannel();
+ this.channel = readyProvider.createDecoratedChannelBuilder().build();
}
/**
@@ -210,13 +228,38 @@ public boolean isHealthy() {
return false;
}
// Check connectivity state without triggering a connection attempt.
+ // Only READY channels are considered healthy for location-aware routing.
// Some channel implementations don't support getState(), in which case
- // we assume the channel is healthy if it's not shutdown/terminated.
+ // we treat the endpoint as not ready for location-aware routing (defensive).
try {
ConnectivityState state = channel.getState(false);
- return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE;
- } catch (UnsupportedOperationException ignore) {
- return true;
+ boolean ready = state == ConnectivityState.READY;
+ if (!ready) {
+ logger.log(
+ Level.FINE,
+ "Location-aware endpoint {0} is not ready for location-aware routing, state: {1}",
+ new Object[] {address, state});
+ }
+ return ready;
+ } catch (UnsupportedOperationException e) {
+ logger.log(
+ Level.FINE,
+ "getState(false) unsupported for location-aware endpoint {0}, treating as not ready",
+ address);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isTransientFailure() {
+ if (channel.isShutdown() || channel.isTerminated()) {
+ return false;
+ }
+ try {
+ ConnectivityState state = channel.getState(false);
+ return state == ConnectivityState.TRANSIENT_FAILURE;
+ } catch (UnsupportedOperationException e) {
+ return false;
}
}
diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java
index 9205cf0eace0..b6c22ad4e3f8 100644
--- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java
+++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java
@@ -44,6 +44,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import java.io.IOException;
+import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.HashSet;
import java.util.Map;
@@ -51,6 +52,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
@@ -63,6 +66,9 @@
*/
@InternalApi
final class KeyAwareChannel extends ManagedChannel {
+
+ private static final Logger logger = Logger.getLogger(KeyAwareChannel.class.getName());
+
private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L;
private static final long MAX_TRACKED_EXCLUDED_LOGICAL_REQUESTS = 100_000L;
private static final long EXCLUDED_LOGICAL_REQUEST_TTL_MINUTES = 10L;
@@ -77,10 +83,11 @@ final class KeyAwareChannel extends ManagedChannel {
private final ManagedChannel defaultChannel;
private final ChannelEndpointCache endpointCache;
+ @Nullable private final EndpointLifecycleManager lifecycleManager;
private final String authority;
private final String defaultEndpointAddress;
- private final Map> channelFinders =
- new ConcurrentHashMap<>();
+ private final ReferenceQueue channelFinderReferenceQueue = new ReferenceQueue<>();
+ private final Map channelFinders = new ConcurrentHashMap<>();
private final Map transactionAffinities = new ConcurrentHashMap<>();
// Maps read-only transaction IDs to their preferLeader value.
// Strong reads → true (prefer leader), Stale reads → false (any replica).
@@ -108,6 +115,11 @@ private KeyAwareChannel(
this.defaultChannel = endpointCache.defaultChannel().getChannel();
this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress();
this.authority = this.defaultChannel.authority();
+ // Only create lifecycle manager for production (non-factory) path.
+ // Factory path is used by tests with custom caches where background probing
+ // would interfere with test assertions.
+ this.lifecycleManager =
+ (endpointCacheFactory == null) ? new EndpointLifecycleManager(endpointCache) : null;
}
static KeyAwareChannel create(
@@ -117,6 +129,18 @@ static KeyAwareChannel create(
return new KeyAwareChannel(channelProvider, endpointCacheFactory);
}
+ private static final class ChannelFinderReference extends SoftReference {
+ final String databaseId;
+
+ ChannelFinderReference(
+ String databaseId,
+ ChannelFinder referent,
+ ReferenceQueue super ChannelFinder> referenceQueue) {
+ super(referent, referenceQueue);
+ this.databaseId = databaseId;
+ }
+ }
+
private String extractDatabaseIdFromSession(String session) {
if (session == null || session.isEmpty()) {
return null;
@@ -128,30 +152,64 @@ private String extractDatabaseIdFromSession(String session) {
return session.substring(0, sessionsIndex);
}
+ private void cleanupStaleChannelFinders() {
+ ChannelFinderReference reference;
+ while ((reference = (ChannelFinderReference) channelFinderReferenceQueue.poll()) != null) {
+ if (channelFinders.remove(reference.databaseId, reference) && lifecycleManager != null) {
+ lifecycleManager.unregisterFinder(reference.databaseId);
+ }
+ }
+ }
+
private ChannelFinder getOrCreateChannelFinder(String databaseId) {
- SoftReference ref = channelFinders.get(databaseId);
+ cleanupStaleChannelFinders();
+ ChannelFinderReference ref = channelFinders.get(databaseId);
ChannelFinder finder = (ref != null) ? ref.get() : null;
if (finder == null) {
synchronized (channelFinders) {
+ cleanupStaleChannelFinders();
ref = channelFinders.get(databaseId);
finder = (ref != null) ? ref.get() : null;
if (finder == null) {
- finder = new ChannelFinder(endpointCache);
- channelFinders.put(databaseId, new SoftReference<>(finder));
+ finder =
+ lifecycleManager != null
+ ? new ChannelFinder(endpointCache, lifecycleManager, databaseId)
+ : new ChannelFinder(endpointCache);
+ channelFinders.put(
+ databaseId,
+ new ChannelFinderReference(databaseId, finder, channelFinderReferenceQueue));
}
}
}
return finder;
}
+ /** Records real traffic to the selected endpoint for idle eviction tracking. */
+ private void onRequestRouted(@Nullable ChannelEndpoint selectedEndpoint) {
+ if (lifecycleManager == null) {
+ return;
+ }
+ if (selectedEndpoint != null && !defaultEndpointAddress.equals(selectedEndpoint.getAddress())) {
+ lifecycleManager.recordRealTraffic(selectedEndpoint.getAddress());
+ }
+ }
+
@Override
public ManagedChannel shutdown() {
+ cleanupStaleChannelFinders();
+ if (lifecycleManager != null) {
+ lifecycleManager.shutdown();
+ }
endpointCache.shutdown();
return this;
}
@Override
public ManagedChannel shutdownNow() {
+ cleanupStaleChannelFinders();
+ if (lifecycleManager != null) {
+ lifecycleManager.shutdown();
+ }
endpointCache.shutdown();
return this;
}
@@ -205,7 +263,23 @@ private ChannelEndpoint affinityEndpoint(
if (address == null || excludedEndpoints.test(address)) {
return null;
}
- return endpointCache.get(address);
+ // Use non-creating lookup and require READY state for location-aware routing.
+ ChannelEndpoint endpoint = endpointCache.getIfPresent(address);
+ if (endpoint == null) {
+ logger.log(
+ Level.FINE,
+ "Affinity endpoint for address {0} not present in cache, falling back to default",
+ address);
+ return null;
+ }
+ if (!endpoint.isHealthy()) {
+ logger.log(
+ Level.FINE,
+ "Affinity endpoint for address {0} not READY, falling back to default",
+ address);
+ return null;
+ }
+ return endpoint;
}
private void clearAffinity(ByteString transactionId) {
@@ -495,6 +569,10 @@ public void sendMessage(RequestT message) {
}
selectedEndpoint = endpoint;
this.channelFinder = finder;
+
+ // Record real traffic for idle eviction tracking.
+ parentChannel.onRequestRouted(endpoint);
+
recordRouteSelectionTrace(
methodDescriptor,
endpoint.getAddress(),
diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java
index 09ecf19625c6..3a4e49d7148c 100644
--- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java
+++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java
@@ -30,14 +30,18 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.stream.IntStream;
/** Cache for routing information used by location-aware routing. */
@@ -45,6 +49,8 @@
public final class KeyRangeCache {
private static final Predicate NO_EXCLUDED_ENDPOINTS = address -> false;
+ private static final Logger logger = Logger.getLogger(KeyRangeCache.class.getName());
+
private static final int MAX_LOCAL_REPLICA_DISTANCE = 5;
private static final int DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK = 1000;
@@ -57,6 +63,7 @@ public enum RangeMode {
}
private final ChannelEndpointCache endpointCache;
+ @javax.annotation.Nullable private final EndpointLifecycleManager lifecycleManager;
private final NavigableMap ranges =
new TreeMap<>(ByteString.unsignedLexicographicalComparator());
private final Map groups = new HashMap<>();
@@ -67,7 +74,14 @@ public enum RangeMode {
private volatile int minCacheEntriesForRandomPick = DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK;
public KeyRangeCache(ChannelEndpointCache endpointCache) {
+ this(endpointCache, null);
+ }
+
+ public KeyRangeCache(
+ ChannelEndpointCache endpointCache,
+ @javax.annotation.Nullable EndpointLifecycleManager lifecycleManager) {
this.endpointCache = Objects.requireNonNull(endpointCache);
+ this.lifecycleManager = lifecycleManager;
}
@VisibleForTesting
@@ -137,6 +151,23 @@ public ChannelEndpoint fillRoutingHint(
preferLeader, directedReadOptions, hintBuilder, excludedEndpoints);
}
+ /** Returns all server addresses currently referenced by cached tablets. */
+ Set getActiveAddresses() {
+ Set addresses = new HashSet<>();
+ synchronized (lock) {
+ for (CachedGroup group : groups.values()) {
+ synchronized (group) {
+ for (CachedTablet tablet : group.tablets) {
+ if (!tablet.serverAddress.isEmpty()) {
+ addresses.add(tablet.serverAddress);
+ }
+ }
+ }
+ }
+ }
+ return addresses;
+ }
+
public void clear() {
synchronized (lock) {
for (CachedRange range : ranges.values()) {
@@ -482,24 +513,88 @@ private boolean matches(DirectedReadOptions.ReplicaSelection selection) {
}
}
+ /**
+ * Evaluates whether this tablet should be skipped for location-aware routing.
+ *
+ * State-aware skip logic:
+ *
+ *
+ * - Server-marked skip, empty address, or excluded endpoint: skip and report in
+ * skipped_tablets.
+ *
- Endpoint exists and READY: usable, do not skip.
+ *
- Endpoint exists and TRANSIENT_FAILURE: skip and report in skipped_tablets.
+ *
- Endpoint absent, IDLE, CONNECTING, SHUTDOWN, or unsupported: skip silently (no
+ * skipped_tablets).
+ *
+ */
boolean shouldSkip(RoutingHint.Builder hintBuilder, Predicate excludedEndpoints) {
- if (skip
- || serverAddress.isEmpty()
- || excludedEndpoints.test(serverAddress)
- || (endpoint != null && !endpoint.isHealthy())) {
- RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder();
- skipped.setTabletUid(tabletUid);
- skipped.setIncarnation(incarnation);
+ // Server-marked skip, no address, or excluded endpoint: always report.
+ if (skip || serverAddress.isEmpty() || excludedEndpoints.test(serverAddress)) {
+ addSkippedTablet(hintBuilder);
+ return true;
+ }
+
+ // If the cached endpoint's channel has been shut down (e.g. after idle eviction),
+ // discard the stale reference so we re-lookup from the cache below.
+ if (endpoint != null && endpoint.getChannel().isShutdown()) {
+ logger.log(
+ Level.FINE,
+ "Tablet {0} at {1}: cached endpoint is shutdown, clearing stale reference",
+ new Object[] {tabletUid, serverAddress});
+ endpoint = null;
+ }
+
+ // Lookup without creating: location-aware routing should not trigger foreground endpoint
+ // creation.
+ if (endpoint == null) {
+ endpoint = endpointCache.getIfPresent(serverAddress);
+ }
+
+ // No endpoint exists yet - skip silently, request background recreation so the
+ // endpoint becomes available for future requests.
+ if (endpoint == null) {
+ logger.log(
+ Level.FINE,
+ "Tablet {0} at {1}: no endpoint present, skipping silently",
+ new Object[] {tabletUid, serverAddress});
+ if (lifecycleManager != null) {
+ lifecycleManager.requestEndpointRecreation(serverAddress);
+ }
return true;
}
- return false;
+
+ // READY - usable for location-aware routing.
+ if (endpoint.isHealthy()) {
+ return false;
+ }
+
+ // TRANSIENT_FAILURE - skip and report so server can refresh client cache.
+ if (endpoint.isTransientFailure()) {
+ logger.log(
+ Level.FINE,
+ "Tablet {0} at {1}: endpoint in TRANSIENT_FAILURE, adding to skipped_tablets",
+ new Object[] {tabletUid, serverAddress});
+ addSkippedTablet(hintBuilder);
+ return true;
+ }
+
+ // IDLE, CONNECTING, SHUTDOWN, or unsupported - skip silently.
+ logger.log(
+ Level.FINE,
+ "Tablet {0} at {1}: endpoint not ready, skipping silently",
+ new Object[] {tabletUid, serverAddress});
+ return true;
+ }
+
+ private void addSkippedTablet(RoutingHint.Builder hintBuilder) {
+ RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder();
+ skipped.setTabletUid(tabletUid);
+ skipped.setIncarnation(incarnation);
}
ChannelEndpoint pick(RoutingHint.Builder hintBuilder) {
hintBuilder.setTabletUid(tabletUid);
- if (endpoint == null && !serverAddress.isEmpty()) {
- endpoint = endpointCache.get(serverAddress);
- }
+ // Endpoint must already exist and be READY if shouldSkip returned false.
return endpoint;
}
@@ -584,13 +679,12 @@ ChannelEndpoint fillRoutingHint(
directedReadOptions.getReplicasCase()
!= DirectedReadOptions.ReplicasCase.REPLICAS_NOT_SET;
- // Fast path: pick a tablet while holding the lock. If the endpoint is already
- // cached on the tablet, return it immediately without releasing the lock.
- // If the endpoint needs to be created (blocking network dial), release the
- // lock first so other threads are not blocked during channel creation.
- CachedTablet selected;
+ // Select a tablet while holding the lock. With state-aware routing, only READY
+ // endpoints pass shouldSkip(), so the selected tablet always has a cached
+ // endpoint. No foreground endpoint creation is needed — the lifecycle manager
+ // creates endpoints in the background.
synchronized (this) {
- selected =
+ CachedTablet selected =
selectTabletLocked(
preferLeader,
hasDirectedReadOptions,
@@ -600,25 +694,7 @@ ChannelEndpoint fillRoutingHint(
if (selected == null) {
return null;
}
- if (selected.endpoint != null || selected.serverAddress.isEmpty()) {
- return selected.pick(hintBuilder);
- }
- // Slow path: endpoint not yet created. Capture the address and release the
- // lock before calling endpointCache.get(), which may block on network dial.
- hintBuilder.setTabletUid(selected.tabletUid);
- }
-
- String serverAddress = selected.serverAddress;
- ChannelEndpoint endpoint = endpointCache.get(serverAddress);
-
- synchronized (this) {
- // Only update if the tablet's address hasn't changed since we released the lock.
- if (selected.endpoint == null && selected.serverAddress.equals(serverAddress)) {
- selected.endpoint = endpoint;
- }
- // Re-set tabletUid with the latest value in case update() ran concurrently.
- hintBuilder.setTabletUid(selected.tabletUid);
- return selected.endpoint;
+ return selected.pick(hintBuilder);
}
}
diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java
index 525313f1ab4e..7e9d2476346b 100644
--- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java
+++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java
@@ -131,6 +131,12 @@ public ChannelEndpoint get(String address) {
return endpoints.computeIfAbsent(address, FakeEndpoint::new);
}
+ @Override
+ public ChannelEndpoint getIfPresent(String address) {
+ // Auto-create for golden tests — simulates lifecycle manager having pre-created endpoints.
+ return endpoints.computeIfAbsent(address, FakeEndpoint::new);
+ }
+
@Override
public void evict(String address) {
endpoints.remove(address);
@@ -158,6 +164,11 @@ public boolean isHealthy() {
return !unhealthyServers.contains(address);
}
+ @Override
+ public boolean isTransientFailure() {
+ return unhealthyServers.contains(address);
+ }
+
@Override
public ManagedChannel getChannel() {
return new ManagedChannel() {
diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java
new file mode 100644
index 000000000000..6c45114435fb
--- /dev/null
+++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner.spi.v1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.BooleanSupplier;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class EndpointLifecycleManagerTest {
+
+ private EndpointLifecycleManager manager;
+
+ /** Counter for generating unique finder keys in tests. */
+ private static final AtomicLong TEST_FINDER_ID = new AtomicLong(1000);
+
+ @After
+ public void tearDown() {
+ if (manager != null) {
+ manager.shutdown();
+ }
+ }
+
+ /**
+ * Registers addresses with the lifecycle manager via updateActiveAddresses, which atomically
+ * creates endpoints and registers them as active. This mirrors how ChannelFinder.update() works.
+ */
+ private static String registerAddresses(EndpointLifecycleManager mgr, String... addresses) {
+ String finderId = "finder-" + TEST_FINDER_ID.incrementAndGet();
+ Set addressSet = new HashSet<>();
+ Collections.addAll(addressSet, addresses);
+ mgr.updateActiveAddresses(finderId, addressSet);
+ return finderId;
+ }
+
+ private static void awaitCondition(String message, BooleanSupplier condition) {
+ long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (System.nanoTime() < deadlineNanos) {
+ if (condition.getAsBoolean()) {
+ return;
+ }
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
+ }
+ assertTrue(message, condition.getAsBoolean());
+ }
+
+ @Test
+ public void endpointCreationStartsProbing() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC());
+
+ registerAddresses(manager, "server1");
+ awaitCondition(
+ "endpoint should be created in background", () -> cache.getIfPresent("server1") != null);
+
+ // Endpoint should be created in the cache.
+ assertNotNull(cache.getIfPresent("server1"));
+
+ // Should be managed.
+ assertTrue(manager.isManaged("server1"));
+ assertNotNull(manager.getEndpointState("server1"));
+ assertEquals(1, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void duplicateRegistrationIsNoop() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
+
+ String finderId = registerAddresses(manager, "server1");
+ // Re-register with the same finder ID — should not create duplicate state.
+ manager.updateActiveAddresses(finderId, Collections.singleton("server1"));
+ manager.updateActiveAddresses(finderId, Collections.singleton("server1"));
+
+ assertEquals(1, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void defaultEndpointIsNotManaged() {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
+
+ registerAddresses(manager, "default");
+
+ assertFalse(manager.isManaged("default"));
+ assertEquals(0, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void probeTrafficDoesNotUpdateLastRealTrafficAt() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ TestClock clock = new TestClock(Instant.now());
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), clock);
+
+ Instant creationTime = clock.instant();
+ registerAddresses(manager, "server1");
+ awaitCondition(
+ "probe should run after background endpoint creation",
+ () -> {
+ EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1");
+ return state != null && state.lastProbeAt != null;
+ });
+
+ // Probe traffic should not change lastRealTrafficAt.
+ EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1");
+ assertNotNull(state);
+ assertEquals(creationTime, state.lastRealTrafficAt);
+ }
+
+ @Test
+ public void realRoutedTrafficUpdatesLastRealTrafficAt() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ TestClock clock = new TestClock(Instant.now());
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), clock);
+
+ registerAddresses(manager, "server1");
+
+ Instant before = clock.instant();
+ clock.advance(Duration.ofMinutes(5));
+ manager.recordRealTraffic("server1");
+
+ EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1");
+ assertNotNull(state);
+ assertTrue(state.lastRealTrafficAt.isAfter(before));
+ }
+
+ @Test
+ public void endpointWithOnlyProbeTrafficIsEvictedAfterIdleDuration() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ TestClock clock = new TestClock(Instant.now());
+ Duration idleDuration = Duration.ofMinutes(30);
+ manager =
+ new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock);
+
+ registerAddresses(manager, "server1");
+ awaitCondition(
+ "endpoint should be created in background", () -> cache.getIfPresent("server1") != null);
+
+ assertTrue(manager.isManaged("server1"));
+
+ // Advance past idle threshold.
+ clock.advance(Duration.ofMinutes(31));
+
+ // Trigger eviction check manually.
+ manager.checkIdleEviction();
+
+ // Endpoint should be evicted.
+ assertFalse(manager.isManaged("server1"));
+ assertNull(cache.getIfPresent("server1"));
+ assertEquals(0, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void endpointWithRecentRealTrafficIsNotEvicted() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ TestClock clock = new TestClock(Instant.now());
+ Duration idleDuration = Duration.ofMinutes(30);
+ manager =
+ new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock);
+
+ registerAddresses(manager, "server1");
+
+ // Record real traffic at 20 minutes.
+ clock.advance(Duration.ofMinutes(20));
+ manager.recordRealTraffic("server1");
+
+ // Advance to 31 minutes (only 11 minutes since last real traffic).
+ clock.advance(Duration.ofMinutes(11));
+ manager.checkIdleEviction();
+
+ // Should NOT be evicted because last real traffic was 11 minutes ago.
+ assertTrue(manager.isManaged("server1"));
+ }
+
+ @Test
+ public void evictedEndpointIsRecreatedOnDemand() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ TestClock clock = new TestClock(Instant.now());
+ Duration idleDuration = Duration.ofMinutes(30);
+ manager =
+ new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock);
+
+ registerAddresses(manager, "server1");
+ awaitCondition(
+ "endpoint should be created in background", () -> cache.getIfPresent("server1") != null);
+
+ // Evict.
+ clock.advance(Duration.ofMinutes(31));
+ manager.checkIdleEviction();
+ assertFalse(manager.isManaged("server1"));
+
+ // Recreate.
+ manager.requestEndpointRecreation("server1");
+ awaitCondition(
+ "endpoint should be recreated in background", () -> cache.getIfPresent("server1") != null);
+
+ assertTrue(manager.isManaged("server1"));
+ assertNotNull(cache.getIfPresent("server1"));
+ }
+
+ @Test
+ public void shutdownStopsAllProbing() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC());
+
+ registerAddresses(manager, "server1", "server2");
+
+ assertEquals(2, manager.managedEndpointCount());
+
+ manager.shutdown();
+
+ assertEquals(0, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void emptyOrNullAddressIsIgnored() {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
+
+ manager.updateActiveAddresses("finder-1", Collections.singleton(""));
+ manager.updateActiveAddresses("finder-2", Collections.emptySet());
+
+ assertEquals(0, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void recordRealTrafficForDefaultEndpointIsIgnored() {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
+
+ // Should not throw or create state.
+ manager.recordRealTraffic("default");
+ manager.recordRealTraffic(null);
+ assertEquals(0, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void staleEndpointEvictedWhenNoLongerActive() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
+
+ // Finder 1 reports server1 and server2.
+ String finder1 = registerAddresses(manager, "server1", "server2");
+ assertEquals(2, manager.managedEndpointCount());
+
+ // Finder 1 updates: server1 is gone, only server2 remains.
+ manager.updateActiveAddresses(finder1, Collections.singleton("server2"));
+
+ // server1 should be evicted since no finder references it.
+ assertFalse(manager.isManaged("server1"));
+ assertTrue(manager.isManaged("server2"));
+ assertEquals(1, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void endpointKeptIfReferencedByAnotherFinder() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
+
+ // Finder 1 reports server1.
+ String finder1 = registerAddresses(manager, "server1");
+ // Finder 2 also reports server1.
+ registerAddresses(manager, "server1");
+
+ // Finder 1 drops server1, but finder 2 still references it.
+ manager.updateActiveAddresses(finder1, Collections.emptySet());
+
+ assertTrue(manager.isManaged("server1"));
+ assertEquals(1, manager.managedEndpointCount());
+ }
+
+ @Test
+ public void unregisterFinderEvictsEndpointsNoLongerReferenced() throws Exception {
+ KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
+ manager =
+ new EndpointLifecycleManager(
+ cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
+
+ String finder1 = registerAddresses(manager, "server1");
+ String finder2 = registerAddresses(manager, "server2");
+
+ manager.unregisterFinder(finder1);
+
+ assertFalse(manager.isManaged("server1"));
+ assertTrue(manager.isManaged("server2"));
+ assertEquals(1, manager.managedEndpointCount());
+
+ manager.unregisterFinder(finder2);
+
+ assertEquals(0, manager.managedEndpointCount());
+ }
+
+ /** Test clock that can be advanced manually. */
+ private static final class TestClock extends Clock {
+ private Instant now;
+
+ TestClock(Instant now) {
+ this.now = now;
+ }
+
+ void advance(Duration duration) {
+ now = now.plus(duration);
+ }
+
+ @Override
+ public Instant instant() {
+ return now;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return ZoneId.of("UTC");
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ return this;
+ }
+ }
+}
diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java
index 56e6d3cfc2bc..74afec18bfc3 100644
--- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java
+++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java
@@ -114,10 +114,13 @@ public void healthReflectsChannelShutdown() throws Exception {
GrpcChannelEndpointCache cache = new GrpcChannelEndpointCache(createProvider("localhost:1234"));
try {
ChannelEndpoint server = cache.get("localhost:1111");
- assertThat(server.isHealthy()).isTrue();
+ // Newly created channel is not READY (likely IDLE), so isHealthy is false for location aware.
+ // isHealthy now requires READY state for location aware routing.
+ assertThat(server.isHealthy()).isFalse();
server.getChannel().shutdownNow();
assertThat(server.isHealthy()).isFalse();
+ assertThat(server.isTransientFailure()).isFalse();
} finally {
cache.shutdown();
}
diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java
index 542d6e27ab6d..fdaeab3914bf 100644
--- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java
+++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java
@@ -197,7 +197,8 @@ public void timeoutOnCommitClearsTransactionAffinity() throws Exception {
commitCall.sendMessage(
CommitRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build());
- assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(1);
+ // affinityEndpoint now uses getIfPresent (non-creating), so getCount stays at 0.
+ assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0);
@SuppressWarnings("unchecked")
RecordingClientCall commitDelegate =
@@ -211,7 +212,8 @@ public void timeoutOnCommitClearsTransactionAffinity() throws Exception {
rollbackCall.sendMessage(
RollbackRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build());
- assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(1);
+ // Rollback also uses getIfPresent for affinity, so getCount remains 0.
+ assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0);
}
@Test
@@ -1283,6 +1285,16 @@ public ChannelEndpoint get(String address) {
return endpoints.computeIfAbsent(address, FakeEndpoint::new);
}
+ @Override
+ public ChannelEndpoint getIfPresent(String address) {
+ if (defaultAddress.equals(address)) {
+ return defaultEndpoint;
+ }
+ // Auto-create for integration tests — simulates lifecycle manager having pre-created
+ // endpoints.
+ return endpoints.computeIfAbsent(address, FakeEndpoint::new);
+ }
+
@Override
public void evict(String address) {
endpoints.remove(address);
@@ -1344,6 +1356,11 @@ public boolean isHealthy() {
return true;
}
+ @Override
+ public boolean isTransientFailure() {
+ return false;
+ }
+
@Override
public ManagedChannel getChannel() {
return channel;
diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java
index 763a36dbfd64..7fa2874ada5b 100644
--- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java
+++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java
@@ -125,6 +125,12 @@ public ChannelEndpoint get(String address) {
return endpoints.computeIfAbsent(address, FakeEndpoint::new);
}
+ @Override
+ public ChannelEndpoint getIfPresent(String address) {
+ // Auto-create for golden tests — simulates lifecycle manager having pre-created endpoints.
+ return endpoints.computeIfAbsent(address, FakeEndpoint::new);
+ }
+
@Override
public void evict(String address) {
endpoints.remove(address);
@@ -154,6 +160,11 @@ public boolean isHealthy() {
return true;
}
+ @Override
+ public boolean isTransientFailure() {
+ return false;
+ }
+
@Override
public ManagedChannel getChannel() {
return channel;
diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java
index 027994a6f2fe..c3b5a0ca16db 100644
--- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java
+++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java
@@ -18,6 +18,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.CacheUpdate;
@@ -33,6 +35,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -41,53 +44,33 @@
public class KeyRangeCacheTest {
@Test
- public void skipsUnhealthyTabletAfterItIsCached() {
+ public void skipsTransientFailureTabletWithSkippedTablet() {
FakeEndpointCache endpointCache = new FakeEndpointCache();
KeyRangeCache cache = new KeyRangeCache(endpointCache);
- cache.addRanges(
- CacheUpdate.newBuilder()
- .addRange(
- Range.newBuilder()
- .setStartKey(bytes("a"))
- .setLimitKey(bytes("z"))
- .setGroupUid(5)
- .setSplitId(1)
- .setGeneration(bytes("1")))
- .addGroup(
- Group.newBuilder()
- .setGroupUid(5)
- .setGeneration(bytes("1"))
- .setLeaderIndex(0)
- .addTablets(
- Tablet.newBuilder()
- .setTabletUid(1)
- .setServerAddress("server1")
- .setIncarnation(bytes("1"))
- .setDistance(0))
- .addTablets(
- Tablet.newBuilder()
- .setTabletUid(2)
- .setServerAddress("server2")
- .setIncarnation(bytes("1"))
- .setDistance(0)))
- .build());
+ cache.addRanges(twoReplicaUpdate());
+
+ // Pre-create endpoints.
+ endpointCache.get("server1");
+ endpointCache.get("server2");
+ // Initial routing works.
RoutingHint.Builder initialHint = RoutingHint.newBuilder().setKey(bytes("a"));
ChannelEndpoint initialServer =
cache.fillRoutingHint(
- /* preferLeader= */ false,
+ false,
KeyRangeCache.RangeMode.COVERING_SPLIT,
DirectedReadOptions.getDefaultInstance(),
initialHint);
assertNotNull(initialServer);
- endpointCache.setHealthy("server1", false);
+ // Mark server1 as TRANSIENT_FAILURE.
+ endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE);
RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
ChannelEndpoint server =
cache.fillRoutingHint(
- /* preferLeader= */ false,
+ false,
KeyRangeCache.RangeMode.COVERING_SPLIT,
DirectedReadOptions.getDefaultInstance(),
hint);
@@ -131,6 +114,10 @@ public void skipsExplicitlyExcludedTablet() {
.setDistance(0)))
.build());
+ // Pre-create endpoints so getIfPresent() returns them.
+ endpointCache.get("server1");
+ endpointCache.get("server2");
+
RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
ChannelEndpoint server =
cache.fillRoutingHint(
@@ -173,6 +160,8 @@ public void shrinkToEvictsRanges() {
.setIncarnation(bytes("1"))))
.build();
cache.addRanges(update);
+ // Pre-create endpoint so READY state check passes in shouldSkip.
+ endpointCache.get("server" + i);
}
checkContents(cache, numRanges, numRanges);
@@ -188,6 +177,425 @@ public void shrinkToEvictsRanges() {
checkContents(cache, 0, numRanges);
}
+ @Test
+ public void readyEndpointIsUsableForLocationAware() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ // Pre-create endpoint so getIfPresent finds it. Default state is READY.
+ endpointCache.get("server1");
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNotNull(server);
+ assertEquals("server1", server.getAddress());
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void idleEndpointIsNotUsableForLocationAware() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ // Ensure endpoint exists in cache first.
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.IDLE);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ // IDLE causes silent skip — falls back to null (default host), no skipped_tablets.
+ assertNull(server);
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void connectingEndpointIsNotUsableForLocationAware() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.CONNECTING);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNull(server);
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void transientFailureEndpointIsNotUsable() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ // TRANSIENT_FAILURE: skip with skipped_tablets.
+ assertNull(server);
+ assertEquals(1, hint.getSkippedTabletUidCount());
+ assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid());
+ }
+
+ @Test
+ public void unsupportedGetStateTreatedAsNotReady() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.UNSUPPORTED);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ // Unsupported state: skip silently, no skipped_tablets.
+ assertNull(server);
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void missingEndpointCausesDefaultHostFallbackWithoutSkippedTablet() {
+ // Endpoint not in cache at all — getIfPresent returns null.
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ endpointCache.setCreateOnGet(false);
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNull(server);
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void idleEndpointCausesDefaultHostFallbackWithoutSkippedTablet() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.IDLE);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNull(server);
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void connectingEndpointCausesDefaultHostFallbackWithoutSkippedTablet() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.CONNECTING);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNull(server);
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void transientFailureEndpointCausesSkippedTabletPlusDefaultHostFallback() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNull(server);
+ assertEquals(1, hint.getSkippedTabletUidCount());
+ assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid());
+ }
+
+ @Test
+ public void oneUnusableReplicaAndOneReadyReplicaUsesReady() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(twoReplicaUpdate());
+
+ // Make both endpoints present.
+ endpointCache.get("server1");
+ endpointCache.get("server2");
+
+ // server1 is IDLE (not ready), server2 is READY.
+ endpointCache.setState("server1", EndpointHealthState.IDLE);
+ endpointCache.setState("server2", EndpointHealthState.READY);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNotNull(server);
+ assertEquals("server2", server.getAddress());
+ // server1 was IDLE, so no skipped_tablets for it.
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void readyEndpointIsUsedForLocationAware() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.READY);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNotNull(server);
+ assertEquals("server1", server.getAddress());
+ assertEquals(0, hint.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void transientFailureReplicaSkippedAndReadyReplicaSelected() {
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(twoReplicaUpdate());
+
+ endpointCache.get("server1");
+ endpointCache.get("server2");
+
+ endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE);
+ endpointCache.setState("server2", EndpointHealthState.READY);
+
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ assertNotNull(server);
+ assertEquals("server2", server.getAddress());
+ // server1 was TRANSIENT_FAILURE, so it should be in skipped_tablets.
+ assertEquals(1, hint.getSkippedTabletUidCount());
+ assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid());
+ }
+
+ // --- Eviction and recreation tests ---
+
+ @Test
+ public void staleShutdownEndpointIsClearedAndRelookedUp() {
+ // Bug 1 regression: after idle eviction shuts down a channel, the tablet's cached
+ // endpoint reference becomes stale. shouldSkip must detect the shutdown channel,
+ // discard it, and re-lookup from the cache.
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ KeyRangeCache cache = new KeyRangeCache(endpointCache);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ // Route once so the tablet caches the endpoint reference.
+ endpointCache.get("server1");
+ RoutingHint.Builder hint1 = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint first =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint1);
+ assertNotNull(first);
+ assertEquals("server1", first.getAddress());
+
+ // Simulate idle eviction: shut down the channel and evict from cache.
+ first.getChannel().shutdownNow();
+ endpointCache.evict("server1");
+
+ // Without the fix, the tablet would keep using the stale shutdown endpoint forever.
+ // With the fix, shouldSkip detects the shutdown, clears it, and re-lookups from cache.
+
+ // Re-create the endpoint (simulating lifecycle manager recreation).
+ endpointCache.get("server1");
+ endpointCache.setState("server1", EndpointHealthState.READY);
+
+ RoutingHint.Builder hint2 = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint second =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint2);
+
+ // Should find the new READY endpoint.
+ assertNotNull(second);
+ assertEquals("server1", second.getAddress());
+ assertEquals(0, hint2.getSkippedTabletUidCount());
+ }
+
+ @Test
+ public void missingEndpointTriggersRecreationViaLifecycleManager() {
+ // Bug 2 regression: when a routing lookup finds no endpoint, it should call
+ // requestEndpointRecreation so the endpoint becomes available for future requests.
+ FakeEndpointCache endpointCache = new FakeEndpointCache();
+ TrackingLifecycleManager tracking = new TrackingLifecycleManager(endpointCache);
+ try {
+ KeyRangeCache cache = new KeyRangeCache(endpointCache, tracking);
+ cache.addRanges(singleReplicaUpdate("server1"));
+
+ // No endpoint exists in cache.
+ RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a"));
+ ChannelEndpoint server =
+ cache.fillRoutingHint(
+ false,
+ KeyRangeCache.RangeMode.COVERING_SPLIT,
+ DirectedReadOptions.getDefaultInstance(),
+ hint);
+
+ // Should fall back to default.
+ assertNull(server);
+
+ // Lifecycle manager should have been asked to recreate the endpoint.
+ assertTrue(
+ "requestEndpointRecreation should have been called for server1",
+ tracking.recreationRequested.contains("server1"));
+ } finally {
+ tracking.shutdown();
+ }
+ }
+
+ /** Minimal lifecycle manager stub that records recreation requests. */
+ private static final class TrackingLifecycleManager extends EndpointLifecycleManager {
+ final java.util.Set recreationRequested = new java.util.HashSet<>();
+
+ TrackingLifecycleManager(ChannelEndpointCache cache) {
+ super(cache);
+ }
+
+ @Override
+ void requestEndpointRecreation(String address) {
+ recreationRequested.add(address);
+ }
+ }
+
+ // --- Helper methods ---
+
+ private static CacheUpdate singleReplicaUpdate(String serverAddress) {
+ return CacheUpdate.newBuilder()
+ .addRange(
+ Range.newBuilder()
+ .setStartKey(bytes("a"))
+ .setLimitKey(bytes("z"))
+ .setGroupUid(5)
+ .setSplitId(1)
+ .setGeneration(bytes("1")))
+ .addGroup(
+ Group.newBuilder()
+ .setGroupUid(5)
+ .setGeneration(bytes("1"))
+ .setLeaderIndex(0)
+ .addTablets(
+ Tablet.newBuilder()
+ .setTabletUid(1)
+ .setServerAddress(serverAddress)
+ .setIncarnation(bytes("1"))
+ .setDistance(0)))
+ .build();
+ }
+
+ private static CacheUpdate twoReplicaUpdate() {
+ return CacheUpdate.newBuilder()
+ .addRange(
+ Range.newBuilder()
+ .setStartKey(bytes("a"))
+ .setLimitKey(bytes("z"))
+ .setGroupUid(5)
+ .setSplitId(1)
+ .setGeneration(bytes("1")))
+ .addGroup(
+ Group.newBuilder()
+ .setGroupUid(5)
+ .setGeneration(bytes("1"))
+ .setLeaderIndex(0)
+ .addTablets(
+ Tablet.newBuilder()
+ .setTabletUid(1)
+ .setServerAddress("server1")
+ .setIncarnation(bytes("1"))
+ .setDistance(0))
+ .addTablets(
+ Tablet.newBuilder()
+ .setTabletUid(2)
+ .setServerAddress("server2")
+ .setIncarnation(bytes("1"))
+ .setDistance(0)))
+ .build();
+ }
+
private static void checkContents(KeyRangeCache cache, int expectedSize, int mustBeInCache) {
assertEquals(expectedSize, cache.size());
int hitCount = 0;
@@ -195,7 +603,7 @@ private static void checkContents(KeyRangeCache cache, int expectedSize, int mus
RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes(String.format("%04d", i)));
ChannelEndpoint server =
cache.fillRoutingHint(
- /* preferLeader= */ false,
+ false,
KeyRangeCache.RangeMode.COVERING_SPLIT,
DirectedReadOptions.getDefaultInstance(),
hint);
@@ -214,9 +622,23 @@ private static ByteString bytes(String value) {
return ByteString.copyFromUtf8(value);
}
- private static final class FakeEndpointCache implements ChannelEndpointCache {
+ // --- Health state for testing ---
+
+ enum EndpointHealthState {
+ READY,
+ IDLE,
+ CONNECTING,
+ TRANSIENT_FAILURE,
+ SHUTDOWN,
+ UNSUPPORTED
+ }
+
+ // --- Test doubles ---
+
+ static final class FakeEndpointCache implements ChannelEndpointCache {
private final Map endpoints = new HashMap<>();
private final FakeEndpoint defaultEndpoint = new FakeEndpoint("default");
+ private boolean createOnGet = true;
@Override
public ChannelEndpoint defaultChannel() {
@@ -228,6 +650,12 @@ public ChannelEndpoint get(String address) {
return endpoints.computeIfAbsent(address, FakeEndpoint::new);
}
+ @Override
+ @Nullable
+ public ChannelEndpoint getIfPresent(String address) {
+ return endpoints.get(address);
+ }
+
@Override
public void evict(String address) {
endpoints.remove(address);
@@ -238,18 +666,28 @@ public void shutdown() {
endpoints.clear();
}
- void setHealthy(String address, boolean healthy) {
+ void setCreateOnGet(boolean createOnGet) {
+ this.createOnGet = createOnGet;
+ }
+
+ void setState(String address, EndpointHealthState state) {
FakeEndpoint endpoint = endpoints.get(address);
if (endpoint != null) {
- endpoint.setHealthy(healthy);
+ endpoint.setState(state);
}
}
+
+ @Deprecated
+ void setHealthy(String address, boolean healthy) {
+ setState(
+ address, healthy ? EndpointHealthState.READY : EndpointHealthState.TRANSIENT_FAILURE);
+ }
}
- private static final class FakeEndpoint implements ChannelEndpoint {
+ static final class FakeEndpoint implements ChannelEndpoint {
private final String address;
private final ManagedChannel channel = new FakeManagedChannel();
- private boolean healthy = true;
+ private EndpointHealthState state = EndpointHealthState.READY;
FakeEndpoint(String address) {
this.address = address;
@@ -262,7 +700,12 @@ public String getAddress() {
@Override
public boolean isHealthy() {
- return healthy;
+ return state == EndpointHealthState.READY;
+ }
+
+ @Override
+ public boolean isTransientFailure() {
+ return state == EndpointHealthState.TRANSIENT_FAILURE;
}
@Override
@@ -270,8 +713,8 @@ public ManagedChannel getChannel() {
return channel;
}
- void setHealthy(boolean healthy) {
- this.healthy = healthy;
+ void setState(EndpointHealthState state) {
+ this.state = state;
}
}