diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java index cd6b386dc8a7..fc82c530fc6f 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java @@ -41,19 +41,27 @@ public interface ChannelEndpoint { String getAddress(); /** - * Returns whether this server is ready to accept RPCs. + * Returns whether this server's channel is in {@code READY} state and can accept location-aware + * RPCs. * - *

A server is considered unhealthy if: + *

Only endpoints in {@code READY} state are eligible for location-aware routing. Endpoints in + * {@code IDLE}, {@code CONNECTING}, {@code TRANSIENT_FAILURE}, or {@code SHUTDOWN} are not + * considered healthy for location-aware routing purposes. * - *

- * - * @return true if the server is healthy and ready to accept RPCs + * @return true if the channel is in READY state */ boolean isHealthy(); + /** + * Returns whether this server's channel is in {@code TRANSIENT_FAILURE} state. + * + *

When an endpoint is in transient failure, it should be reported as a skipped tablet in + * routing hints so the server can refresh the client cache. + * + * @return true if the channel is in TRANSIENT_FAILURE state + */ + boolean isTransientFailure(); + /** * Returns the gRPC channel for making RPCs to this server. * diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java index 879ed546f2c2..db2af4902f84 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java @@ -54,6 +54,19 @@ public interface ChannelEndpointCache { */ ChannelEndpoint get(String address); + /** + * Returns a cached channel for the given address without creating it. + * + *

Unlike {@link #get(String)}, this method does not create a new endpoint if one does not + * already exist in the cache. This is used by location-aware routing to avoid foreground endpoint + * creation on the request path. + * + * @param address the server address in "host:port" format + * @return the cached channel instance, or null if no endpoint exists for this address + */ + @javax.annotation.Nullable + ChannelEndpoint getIfPresent(String address); + /** * Evicts a server connection from the cache and gracefully shuts down its channel. * diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index 4fd95b305b5c..2f229dddee44 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -22,17 +22,22 @@ import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.Group; import com.google.spanner.v1.Mutation; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RoutingHint; +import com.google.spanner.v1.Tablet; import com.google.spanner.v1.TransactionOptions; import com.google.spanner.v1.TransactionSelector; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import javax.annotation.Nullable; /** * Finds a server for a request using location-aware routing metadata. @@ -47,9 +52,25 @@ public final class ChannelFinder { private final AtomicLong databaseId = new AtomicLong(); private final KeyRecipeCache recipeCache = new KeyRecipeCache(); private final KeyRangeCache rangeCache; + @Nullable private final EndpointLifecycleManager lifecycleManager; + @Nullable private final String finderKey; public ChannelFinder(ChannelEndpointCache endpointCache) { - this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache)); + this(endpointCache, null, null); + } + + public ChannelFinder( + ChannelEndpointCache endpointCache, @Nullable EndpointLifecycleManager lifecycleManager) { + this(endpointCache, lifecycleManager, null); + } + + ChannelFinder( + ChannelEndpointCache endpointCache, + @Nullable EndpointLifecycleManager lifecycleManager, + @Nullable String finderKey) { + this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager); + this.lifecycleManager = lifecycleManager; + this.finderKey = finderKey; } void useDeterministicRandom() { @@ -70,6 +91,24 @@ public void update(CacheUpdate update) { recipeCache.addRecipes(update.getKeyRecipes()); } rangeCache.addRanges(update); + + // Notify the lifecycle manager about server addresses so it can create endpoints + // in the background and start probing, and evict stale endpoints atomically. + if (lifecycleManager != null && finderKey != null) { + Set currentAddresses = new HashSet<>(); + for (Group group : update.getGroupList()) { + for (Tablet tablet : group.getTabletsList()) { + String addr = tablet.getServerAddress(); + if (!addr.isEmpty()) { + currentAddresses.add(addr); + } + } + } + // Also include addresses from existing cached tablets not in this update. + currentAddresses.addAll(rangeCache.getActiveAddresses()); + // Atomically ensure endpoints exist and evict stale ones. + lifecycleManager.updateActiveAddresses(finderKey, currentAddresses); + } } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java new file mode 100644 index 000000000000..0cc593c7a86b --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java @@ -0,0 +1,572 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.api.core.InternalApi; +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Manages the lifecycle of location-aware routing endpoints including background probing, traffic + * tracking, and idle eviction. + * + *

This manager is the only component that proactively creates routed replica endpoints. It: + * + *

+ */ +@InternalApi +class EndpointLifecycleManager { + + private static final Logger logger = Logger.getLogger(EndpointLifecycleManager.class.getName()); + + /** Default probe interval: 60 seconds. Keeps channels from drifting into IDLE. */ + @VisibleForTesting static final long DEFAULT_PROBE_INTERVAL_SECONDS = 60; + + /** Default idle eviction threshold: 30 minutes without real traffic. */ + @VisibleForTesting static final Duration DEFAULT_IDLE_EVICTION_DURATION = Duration.ofMinutes(30); + + /** Interval for checking idle eviction: every 5 minutes. */ + private static final long EVICTION_CHECK_INTERVAL_SECONDS = 300; + + /** + * Maximum consecutive TRANSIENT_FAILURE probes before evicting an endpoint. Gives the channel + * time to recover from transient network issues before we tear it down and recreate. + */ + private static final int MAX_TRANSIENT_FAILURE_COUNT = 3; + + /** Per-endpoint lifecycle state. */ + static final class EndpointState { + final String address; + volatile Instant lastProbeAt; + volatile Instant lastRealTrafficAt; + volatile Instant lastReadyAt; + volatile int consecutiveTransientFailures; + + // Guarded by synchronizing on this EndpointState instance. + ScheduledFuture probeFuture; + + EndpointState(String address, Instant now) { + this.address = address; + this.lastRealTrafficAt = now; + this.lastProbeAt = null; + this.lastReadyAt = null; + this.consecutiveTransientFailures = 0; + } + } + + private final ChannelEndpointCache endpointCache; + private final Map endpoints = new ConcurrentHashMap<>(); + + /** + * Active addresses reported by each ChannelFinder, keyed by database id. + * + *

ChannelFinder instances are held via SoftReference in KeyAwareChannel, so this map uses a + * stable database-id key instead of a strong ChannelFinder reference. KeyAwareChannel unregisters + * stale entries when a finder is cleared. + * + *

All reads and writes to this map, and stale-endpoint eviction based on it, are synchronized + * on {@link #activeAddressLock}. + */ + private final Map> activeAddressesPerFinder = new ConcurrentHashMap<>(); + + private final Object activeAddressLock = new Object(); + + private final ScheduledExecutorService scheduler; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final long probeIntervalSeconds; + private final Duration idleEvictionDuration; + private final Clock clock; + private final String defaultEndpointAddress; + + private ScheduledFuture evictionFuture; + + EndpointLifecycleManager(ChannelEndpointCache endpointCache) { + this( + endpointCache, + DEFAULT_PROBE_INTERVAL_SECONDS, + DEFAULT_IDLE_EVICTION_DURATION, + Clock.systemUTC()); + } + + @VisibleForTesting + EndpointLifecycleManager( + ChannelEndpointCache endpointCache, + long probeIntervalSeconds, + Duration idleEvictionDuration, + Clock clock) { + this.endpointCache = endpointCache; + this.probeIntervalSeconds = probeIntervalSeconds; + this.idleEvictionDuration = idleEvictionDuration; + this.clock = clock; + this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress(); + this.scheduler = + Executors.newScheduledThreadPool( + 2, + r -> { + Thread t = new Thread(r, "spanner-endpoint-lifecycle"); + t.setDaemon(true); + return t; + }); + + // Start periodic eviction checks. + this.evictionFuture = + scheduler.scheduleAtFixedRate( + this::checkIdleEviction, + EVICTION_CHECK_INTERVAL_SECONDS, + EVICTION_CHECK_INTERVAL_SECONDS, + TimeUnit.SECONDS); + } + + /** + * Ensures an endpoint state exists for the given address. + * + *

This is only called from {@link #updateActiveAddresses} under {@link #activeAddressLock} to + * guarantee that newly created endpoints are registered as active before any stale-eviction check + * can see them. Background creation tasks are scheduled by the caller after {@code + * computeIfAbsent} returns, so the entry is visible in the map before the scheduler thread checks + * it. + * + * @return true if a new endpoint state was created (caller should schedule background creation) + */ + private boolean ensureEndpointExists(String address) { + if (isShutdown.get() || address == null || address.isEmpty()) { + return false; + } + // Don't manage the default endpoint. + if (defaultEndpointAddress.equals(address)) { + return false; + } + + boolean[] created = {false}; + endpoints.computeIfAbsent( + address, + addr -> { + logger.log(Level.FINE, "Creating endpoint state for address: {0}", addr); + created[0] = true; + return new EndpointState(addr, clock.instant()); + }); + return created[0]; + } + + /** + * Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle + * eviction timer for this endpoint. + */ + void recordRealTraffic(String address) { + if (address == null || defaultEndpointAddress.equals(address)) { + return; + } + EndpointState state = endpoints.get(address); + if (state != null) { + state.lastRealTrafficAt = clock.instant(); + } + } + + /** + * Atomically ensures endpoints exist for all active addresses and evicts any managed endpoints + * that are no longer referenced by any finder. This handles the case where a tablet's server + * address changes (e.g. from server1:15000 to server2:15000) — the old endpoint is shut down + * promptly instead of lingering until idle eviction. + * + *

Both endpoint creation and stale-eviction are performed under the same lock to prevent a + * race condition where a newly created endpoint could be evicted by a concurrent call from + * another finder before it is registered as active. + * + * @param finderKey stable identifier of the ChannelFinder reporting its active addresses + * @param activeAddresses server addresses currently referenced by tablets in this finder + */ + void updateActiveAddresses(String finderKey, Set activeAddresses) { + if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) { + return; + } + List newlyCreated = new ArrayList<>(); + synchronized (activeAddressLock) { + // Ensure endpoints exist for all active addresses while holding the lock. + // This guarantees the addresses are in the endpoints map before we compute stale entries. + for (String address : activeAddresses) { + if (ensureEndpointExists(address)) { + newlyCreated.add(address); + } + } + + activeAddressesPerFinder.put(finderKey, activeAddresses); + + // Compute the union of all active addresses across all finders. + Set allActive = new HashSet<>(); + for (Set addresses : activeAddressesPerFinder.values()) { + allActive.addAll(addresses); + } + + // Evict managed endpoints not referenced by any finder. + List stale = new ArrayList<>(); + for (String address : endpoints.keySet()) { + if (!allActive.contains(address)) { + stale.add(address); + } + } + + for (String address : stale) { + logger.log( + Level.FINE, "Evicting stale endpoint {0}: no longer referenced by any tablet", address); + evictEndpoint(address); + } + } + + // Schedule background creation tasks AFTER computeIfAbsent has returned and the entries + // are visible to other threads. Submitting from inside computeIfAbsent creates a race + // where the scheduler thread can run before the entry is published in the map. + for (String address : newlyCreated) { + scheduler.submit(() -> createAndStartProbing(address)); + } + } + + /** + * Unregisters a finder and evicts any managed endpoints that are no longer referenced by the + * remaining finders. + */ + void unregisterFinder(String finderKey) { + if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) { + return; + } + synchronized (activeAddressLock) { + if (activeAddressesPerFinder.remove(finderKey) == null) { + return; + } + + Set allActive = new HashSet<>(); + for (Set addresses : activeAddressesPerFinder.values()) { + allActive.addAll(addresses); + } + + List stale = new ArrayList<>(); + for (String address : endpoints.keySet()) { + if (!allActive.contains(address)) { + stale.add(address); + } + } + + for (String address : stale) { + logger.log( + Level.FINE, + "Evicting stale endpoint {0}: finder {1} was unregistered", + new Object[] {address, finderKey}); + evictEndpoint(address); + } + } + } + + /** Creates an endpoint and starts probing. Runs on the scheduler thread. */ + private void createAndStartProbing(String address) { + if (isShutdown.get() || !endpoints.containsKey(address)) { + return; + } + try { + endpointCache.get(address); + logger.log(Level.FINE, "Background endpoint creation completed for: {0}", address); + + // If the endpoint was evicted between the containsKey check above and channel creation, + // the channel would leak in the endpoint cache without lifecycle tracking. Clean it up. + if (!endpoints.containsKey(address)) { + logger.log( + Level.FINE, + "Endpoint {0} was evicted during channel creation, cleaning up leaked channel", + address); + endpointCache.evict(address); + return; + } + + startProbing(address); + } catch (Exception e) { + logger.log( + Level.FINE, "Failed to create endpoint for address: " + address + ", will retry", e); + // Schedule a retry after one probe interval, but only if still managed. + if (!isShutdown.get() && endpoints.containsKey(address)) { + scheduler.schedule( + () -> createAndStartProbing(address), probeIntervalSeconds, TimeUnit.SECONDS); + } + } + } + + /** Starts periodic probing for an endpoint. */ + private void startProbing(String address) { + EndpointState state = endpoints.get(address); + if (state == null || isShutdown.get()) { + return; + } + + synchronized (state) { + // Re-check after acquiring lock — state may have been evicted concurrently. + if (!endpoints.containsKey(address)) { + return; + } + + // Cancel any existing probe schedule. + if (state.probeFuture != null) { + state.probeFuture.cancel(false); + } + + state.probeFuture = + scheduler.scheduleAtFixedRate( + () -> probe(address), 0, probeIntervalSeconds, TimeUnit.SECONDS); + } + logger.log( + Level.FINE, + "Prober started for endpoint {0} with interval {1}s", + new Object[] {address, probeIntervalSeconds}); + } + + /** Stops probing for an endpoint. */ + private void stopProbing(String address) { + EndpointState state = endpoints.get(address); + if (state == null) { + return; + } + synchronized (state) { + if (state.probeFuture != null) { + state.probeFuture.cancel(false); + state.probeFuture = null; + logger.log(Level.FINE, "Prober stopped for endpoint: {0}", address); + } + } + } + + /** + * Probes the endpoint by checking channel connectivity state and warming up IDLE channels. + * + *

Uses {@code getState(true)} to request a connection attempt on IDLE channels instead of + * sending a GetSession RPC. This is lighter weight and avoids routing application-level RPCs + * through the endpoint's channel pool. + * + *

If the channel is in TRANSIENT_FAILURE, increments a consecutive failure counter. After + * {@link #MAX_TRANSIENT_FAILURE_COUNT} consecutive failures, the endpoint is evicted and shut + * down so it can be recreated fresh when needed again. + * + *

All exceptions are caught to prevent {@link ScheduledExecutorService} from cancelling future + * runs of this task. + */ + private void probe(String address) { + try { + if (isShutdown.get()) { + return; + } + + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint == null) { + logger.log(Level.FINE, "Probe skipped for {0}: endpoint not in cache", address); + return; + } + + EndpointState state = endpoints.get(address); + if (state == null) { + return; + } + + ManagedChannel channel = endpoint.getChannel(); + state.lastProbeAt = clock.instant(); + + // getState(false) reads current state without triggering a connection. + ConnectivityState channelState = channel.getState(false); + logger.log( + Level.FINE, "Probe for {0}: channel state is {1}", new Object[] {address, channelState}); + + switch (channelState) { + case READY: + state.lastReadyAt = clock.instant(); + state.consecutiveTransientFailures = 0; + break; + + case IDLE: + // Warm up the channel by requesting a connection attempt. + logger.log( + Level.FINE, "Probe for {0}: channel IDLE, requesting connection (warmup)", address); + channel.getState(true); + state.consecutiveTransientFailures = 0; + break; + + case CONNECTING: + state.consecutiveTransientFailures = 0; + break; + + case TRANSIENT_FAILURE: + state.consecutiveTransientFailures++; + logger.log( + Level.FINE, + "Probe for {0}: channel in TRANSIENT_FAILURE ({1}/{2})", + new Object[] { + address, state.consecutiveTransientFailures, MAX_TRANSIENT_FAILURE_COUNT + }); + if (state.consecutiveTransientFailures >= MAX_TRANSIENT_FAILURE_COUNT) { + logger.log( + Level.FINE, + "Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes", + new Object[] {address, state.consecutiveTransientFailures}); + evictEndpoint(address); + } + break; + + case SHUTDOWN: + logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address); + evictEndpoint(address); + break; + + default: + break; + } + } catch (Exception e) { + logger.log(Level.FINE, "Probe failed for endpoint " + address, e); + } + } + + /** Checks all managed endpoints for idle eviction. */ + @VisibleForTesting + void checkIdleEviction() { + if (isShutdown.get()) { + return; + } + + Instant now = clock.instant(); + List toEvict = new ArrayList<>(); + + for (Map.Entry entry : endpoints.entrySet()) { + String address = entry.getKey(); + EndpointState state = entry.getValue(); + + // Never evict the default endpoint. + if (defaultEndpointAddress.equals(address)) { + continue; + } + + Duration sinceLastRealTraffic = Duration.between(state.lastRealTrafficAt, now); + if (sinceLastRealTraffic.compareTo(idleEvictionDuration) > 0) { + toEvict.add(address); + } + } + + for (String address : toEvict) { + evictEndpoint(address); + } + } + + /** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */ + private void evictEndpoint(String address) { + logger.log(Level.FINE, "Evicting endpoint {0}", address); + + stopProbing(address); + endpoints.remove(address); + endpointCache.evict(address); + } + + /** + * Requests that an evicted endpoint be recreated. The endpoint is created in the background and + * probing starts immediately. The endpoint will only become eligible for location-aware routing + * once it reaches READY state. + */ + void requestEndpointRecreation(String address) { + if (isShutdown.get() || address == null || address.isEmpty()) { + return; + } + if (defaultEndpointAddress.equals(address)) { + return; + } + + // Only recreate if not already managed. + if (endpoints.containsKey(address)) { + return; + } + + logger.log(Level.FINE, "Recreating previously evicted endpoint for address: {0}", address); + EndpointState state = new EndpointState(address, clock.instant()); + if (endpoints.putIfAbsent(address, state) == null) { + // Schedule after putIfAbsent returns so the entry is visible to the scheduler thread. + scheduler.submit(() -> createAndStartProbing(address)); + } + } + + /** Returns whether an endpoint is being actively managed. */ + boolean isManaged(String address) { + return endpoints.containsKey(address); + } + + /** Returns the endpoint state for testing. */ + @VisibleForTesting + EndpointState getEndpointState(String address) { + return endpoints.get(address); + } + + /** Returns the number of managed endpoints. */ + @VisibleForTesting + int managedEndpointCount() { + return endpoints.size(); + } + + /** Shuts down the lifecycle manager and all probing. */ + void shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } + + logger.log(Level.FINE, "Shutting down endpoint lifecycle manager"); + + if (evictionFuture != null) { + evictionFuture.cancel(false); + } + + for (EndpointState state : endpoints.values()) { + synchronized (state) { + if (state.probeFuture != null) { + state.probeFuture.cancel(false); + } + } + } + endpoints.clear(); + + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java index 3ee4d789592e..98e7f83b094f 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java @@ -17,10 +17,8 @@ package com.google.cloud.spanner.spi.v1; import com.google.api.core.InternalApi; -import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder; -import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.common.annotations.VisibleForTesting; @@ -33,6 +31,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; /** * gRPC implementation of {@link ChannelEndpointCache}. @@ -44,6 +45,8 @@ @InternalApi class GrpcChannelEndpointCache implements ChannelEndpointCache { + private static final Logger logger = Logger.getLogger(GrpcChannelEndpointCache.class.getName()); + /** Timeout for graceful channel shutdown. */ private static final long SHUTDOWN_TIMEOUT_SECONDS = 5; @@ -87,16 +90,27 @@ public ChannelEndpoint get(String address) { try { // Create a new provider with the same config but different endpoint. // This is thread-safe as withEndpoint() returns a new provider instance. - TransportChannelProvider newProvider = createProviderWithAuthorityOverride(addr); - return new GrpcChannelEndpoint(addr, newProvider); + InstantiatingGrpcChannelProvider newProvider = + createProviderWithAuthorityOverride(addr); + GrpcChannelEndpoint endpoint = new GrpcChannelEndpoint(addr, newProvider); + logger.log(Level.FINE, "Location-aware endpoint created for address: {0}", addr); + return endpoint; } catch (IOException e) { + logger.log( + Level.FINE, "Failed to create location-aware endpoint for address: " + addr, e); throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e); } }); } - private TransportChannelProvider createProviderWithAuthorityOverride(String address) { + @Override + @Nullable + public ChannelEndpoint getIfPresent(String address) { + return servers.get(address); + } + + private InstantiatingGrpcChannelProvider createProviderWithAuthorityOverride(String address) { InstantiatingGrpcChannelProvider endpointProvider = (InstantiatingGrpcChannelProvider) baseProvider.withEndpoint(address); if (Objects.equals(defaultAuthority, address)) { @@ -176,15 +190,19 @@ static class GrpcChannelEndpoint implements ChannelEndpoint { * @param provider the channel provider (must be a gRPC provider) * @throws IOException if the channel cannot be created */ - GrpcChannelEndpoint(String address, TransportChannelProvider provider) throws IOException { + GrpcChannelEndpoint(String address, InstantiatingGrpcChannelProvider provider) + throws IOException { this.address = address; - TransportChannelProvider readyProvider = provider; + // Build a raw ManagedChannel directly instead of going through getTransportChannel(), + // which wraps the channel in a ChannelPool that does not support getState(). + // Location-aware routing needs getState() to check channel connectivity. + InstantiatingGrpcChannelProvider readyProvider = provider; if (provider.needsHeaders()) { - readyProvider = provider.withHeaders(java.util.Collections.emptyMap()); + readyProvider = + (InstantiatingGrpcChannelProvider) + provider.withHeaders(java.util.Collections.emptyMap()); } - GrpcTransportChannel transportChannel = - (GrpcTransportChannel) readyProvider.getTransportChannel(); - this.channel = (ManagedChannel) transportChannel.getChannel(); + this.channel = readyProvider.createDecoratedChannelBuilder().build(); } /** @@ -210,13 +228,38 @@ public boolean isHealthy() { return false; } // Check connectivity state without triggering a connection attempt. + // Only READY channels are considered healthy for location-aware routing. // Some channel implementations don't support getState(), in which case - // we assume the channel is healthy if it's not shutdown/terminated. + // we treat the endpoint as not ready for location-aware routing (defensive). try { ConnectivityState state = channel.getState(false); - return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE; - } catch (UnsupportedOperationException ignore) { - return true; + boolean ready = state == ConnectivityState.READY; + if (!ready) { + logger.log( + Level.FINE, + "Location-aware endpoint {0} is not ready for location-aware routing, state: {1}", + new Object[] {address, state}); + } + return ready; + } catch (UnsupportedOperationException e) { + logger.log( + Level.FINE, + "getState(false) unsupported for location-aware endpoint {0}, treating as not ready", + address); + return false; + } + } + + @Override + public boolean isTransientFailure() { + if (channel.isShutdown() || channel.isTerminated()) { + return false; + } + try { + ConnectivityState state = channel.getState(false); + return state == ConnectivityState.TRANSIENT_FAILURE; + } catch (UnsupportedOperationException e) { + return false; } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index 9205cf0eace0..b6c22ad4e3f8 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -44,6 +44,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; import java.io.IOException; +import java.lang.ref.ReferenceQueue; import java.lang.ref.SoftReference; import java.util.HashSet; import java.util.Map; @@ -51,6 +52,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; /** @@ -63,6 +66,9 @@ */ @InternalApi final class KeyAwareChannel extends ManagedChannel { + + private static final Logger logger = Logger.getLogger(KeyAwareChannel.class.getName()); + private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L; private static final long MAX_TRACKED_EXCLUDED_LOGICAL_REQUESTS = 100_000L; private static final long EXCLUDED_LOGICAL_REQUEST_TTL_MINUTES = 10L; @@ -77,10 +83,11 @@ final class KeyAwareChannel extends ManagedChannel { private final ManagedChannel defaultChannel; private final ChannelEndpointCache endpointCache; + @Nullable private final EndpointLifecycleManager lifecycleManager; private final String authority; private final String defaultEndpointAddress; - private final Map> channelFinders = - new ConcurrentHashMap<>(); + private final ReferenceQueue channelFinderReferenceQueue = new ReferenceQueue<>(); + private final Map channelFinders = new ConcurrentHashMap<>(); private final Map transactionAffinities = new ConcurrentHashMap<>(); // Maps read-only transaction IDs to their preferLeader value. // Strong reads → true (prefer leader), Stale reads → false (any replica). @@ -108,6 +115,11 @@ private KeyAwareChannel( this.defaultChannel = endpointCache.defaultChannel().getChannel(); this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress(); this.authority = this.defaultChannel.authority(); + // Only create lifecycle manager for production (non-factory) path. + // Factory path is used by tests with custom caches where background probing + // would interfere with test assertions. + this.lifecycleManager = + (endpointCacheFactory == null) ? new EndpointLifecycleManager(endpointCache) : null; } static KeyAwareChannel create( @@ -117,6 +129,18 @@ static KeyAwareChannel create( return new KeyAwareChannel(channelProvider, endpointCacheFactory); } + private static final class ChannelFinderReference extends SoftReference { + final String databaseId; + + ChannelFinderReference( + String databaseId, + ChannelFinder referent, + ReferenceQueue referenceQueue) { + super(referent, referenceQueue); + this.databaseId = databaseId; + } + } + private String extractDatabaseIdFromSession(String session) { if (session == null || session.isEmpty()) { return null; @@ -128,30 +152,64 @@ private String extractDatabaseIdFromSession(String session) { return session.substring(0, sessionsIndex); } + private void cleanupStaleChannelFinders() { + ChannelFinderReference reference; + while ((reference = (ChannelFinderReference) channelFinderReferenceQueue.poll()) != null) { + if (channelFinders.remove(reference.databaseId, reference) && lifecycleManager != null) { + lifecycleManager.unregisterFinder(reference.databaseId); + } + } + } + private ChannelFinder getOrCreateChannelFinder(String databaseId) { - SoftReference ref = channelFinders.get(databaseId); + cleanupStaleChannelFinders(); + ChannelFinderReference ref = channelFinders.get(databaseId); ChannelFinder finder = (ref != null) ? ref.get() : null; if (finder == null) { synchronized (channelFinders) { + cleanupStaleChannelFinders(); ref = channelFinders.get(databaseId); finder = (ref != null) ? ref.get() : null; if (finder == null) { - finder = new ChannelFinder(endpointCache); - channelFinders.put(databaseId, new SoftReference<>(finder)); + finder = + lifecycleManager != null + ? new ChannelFinder(endpointCache, lifecycleManager, databaseId) + : new ChannelFinder(endpointCache); + channelFinders.put( + databaseId, + new ChannelFinderReference(databaseId, finder, channelFinderReferenceQueue)); } } } return finder; } + /** Records real traffic to the selected endpoint for idle eviction tracking. */ + private void onRequestRouted(@Nullable ChannelEndpoint selectedEndpoint) { + if (lifecycleManager == null) { + return; + } + if (selectedEndpoint != null && !defaultEndpointAddress.equals(selectedEndpoint.getAddress())) { + lifecycleManager.recordRealTraffic(selectedEndpoint.getAddress()); + } + } + @Override public ManagedChannel shutdown() { + cleanupStaleChannelFinders(); + if (lifecycleManager != null) { + lifecycleManager.shutdown(); + } endpointCache.shutdown(); return this; } @Override public ManagedChannel shutdownNow() { + cleanupStaleChannelFinders(); + if (lifecycleManager != null) { + lifecycleManager.shutdown(); + } endpointCache.shutdown(); return this; } @@ -205,7 +263,23 @@ private ChannelEndpoint affinityEndpoint( if (address == null || excludedEndpoints.test(address)) { return null; } - return endpointCache.get(address); + // Use non-creating lookup and require READY state for location-aware routing. + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint == null) { + logger.log( + Level.FINE, + "Affinity endpoint for address {0} not present in cache, falling back to default", + address); + return null; + } + if (!endpoint.isHealthy()) { + logger.log( + Level.FINE, + "Affinity endpoint for address {0} not READY, falling back to default", + address); + return null; + } + return endpoint; } private void clearAffinity(ByteString transactionId) { @@ -495,6 +569,10 @@ public void sendMessage(RequestT message) { } selectedEndpoint = endpoint; this.channelFinder = finder; + + // Record real traffic for idle eviction tracking. + parentChannel.onRequestRouted(endpoint); + recordRouteSelectionTrace( methodDescriptor, endpoint.getAddress(), diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java index 09ecf19625c6..3a4e49d7148c 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java @@ -30,14 +30,18 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Objects; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.IntStream; /** Cache for routing information used by location-aware routing. */ @@ -45,6 +49,8 @@ public final class KeyRangeCache { private static final Predicate NO_EXCLUDED_ENDPOINTS = address -> false; + private static final Logger logger = Logger.getLogger(KeyRangeCache.class.getName()); + private static final int MAX_LOCAL_REPLICA_DISTANCE = 5; private static final int DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK = 1000; @@ -57,6 +63,7 @@ public enum RangeMode { } private final ChannelEndpointCache endpointCache; + @javax.annotation.Nullable private final EndpointLifecycleManager lifecycleManager; private final NavigableMap ranges = new TreeMap<>(ByteString.unsignedLexicographicalComparator()); private final Map groups = new HashMap<>(); @@ -67,7 +74,14 @@ public enum RangeMode { private volatile int minCacheEntriesForRandomPick = DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK; public KeyRangeCache(ChannelEndpointCache endpointCache) { + this(endpointCache, null); + } + + public KeyRangeCache( + ChannelEndpointCache endpointCache, + @javax.annotation.Nullable EndpointLifecycleManager lifecycleManager) { this.endpointCache = Objects.requireNonNull(endpointCache); + this.lifecycleManager = lifecycleManager; } @VisibleForTesting @@ -137,6 +151,23 @@ public ChannelEndpoint fillRoutingHint( preferLeader, directedReadOptions, hintBuilder, excludedEndpoints); } + /** Returns all server addresses currently referenced by cached tablets. */ + Set getActiveAddresses() { + Set addresses = new HashSet<>(); + synchronized (lock) { + for (CachedGroup group : groups.values()) { + synchronized (group) { + for (CachedTablet tablet : group.tablets) { + if (!tablet.serverAddress.isEmpty()) { + addresses.add(tablet.serverAddress); + } + } + } + } + } + return addresses; + } + public void clear() { synchronized (lock) { for (CachedRange range : ranges.values()) { @@ -482,24 +513,88 @@ private boolean matches(DirectedReadOptions.ReplicaSelection selection) { } } + /** + * Evaluates whether this tablet should be skipped for location-aware routing. + * + *

State-aware skip logic: + * + *

+ */ boolean shouldSkip(RoutingHint.Builder hintBuilder, Predicate excludedEndpoints) { - if (skip - || serverAddress.isEmpty() - || excludedEndpoints.test(serverAddress) - || (endpoint != null && !endpoint.isHealthy())) { - RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder(); - skipped.setTabletUid(tabletUid); - skipped.setIncarnation(incarnation); + // Server-marked skip, no address, or excluded endpoint: always report. + if (skip || serverAddress.isEmpty() || excludedEndpoints.test(serverAddress)) { + addSkippedTablet(hintBuilder); + return true; + } + + // If the cached endpoint's channel has been shut down (e.g. after idle eviction), + // discard the stale reference so we re-lookup from the cache below. + if (endpoint != null && endpoint.getChannel().isShutdown()) { + logger.log( + Level.FINE, + "Tablet {0} at {1}: cached endpoint is shutdown, clearing stale reference", + new Object[] {tabletUid, serverAddress}); + endpoint = null; + } + + // Lookup without creating: location-aware routing should not trigger foreground endpoint + // creation. + if (endpoint == null) { + endpoint = endpointCache.getIfPresent(serverAddress); + } + + // No endpoint exists yet - skip silently, request background recreation so the + // endpoint becomes available for future requests. + if (endpoint == null) { + logger.log( + Level.FINE, + "Tablet {0} at {1}: no endpoint present, skipping silently", + new Object[] {tabletUid, serverAddress}); + if (lifecycleManager != null) { + lifecycleManager.requestEndpointRecreation(serverAddress); + } return true; } - return false; + + // READY - usable for location-aware routing. + if (endpoint.isHealthy()) { + return false; + } + + // TRANSIENT_FAILURE - skip and report so server can refresh client cache. + if (endpoint.isTransientFailure()) { + logger.log( + Level.FINE, + "Tablet {0} at {1}: endpoint in TRANSIENT_FAILURE, adding to skipped_tablets", + new Object[] {tabletUid, serverAddress}); + addSkippedTablet(hintBuilder); + return true; + } + + // IDLE, CONNECTING, SHUTDOWN, or unsupported - skip silently. + logger.log( + Level.FINE, + "Tablet {0} at {1}: endpoint not ready, skipping silently", + new Object[] {tabletUid, serverAddress}); + return true; + } + + private void addSkippedTablet(RoutingHint.Builder hintBuilder) { + RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder(); + skipped.setTabletUid(tabletUid); + skipped.setIncarnation(incarnation); } ChannelEndpoint pick(RoutingHint.Builder hintBuilder) { hintBuilder.setTabletUid(tabletUid); - if (endpoint == null && !serverAddress.isEmpty()) { - endpoint = endpointCache.get(serverAddress); - } + // Endpoint must already exist and be READY if shouldSkip returned false. return endpoint; } @@ -584,13 +679,12 @@ ChannelEndpoint fillRoutingHint( directedReadOptions.getReplicasCase() != DirectedReadOptions.ReplicasCase.REPLICAS_NOT_SET; - // Fast path: pick a tablet while holding the lock. If the endpoint is already - // cached on the tablet, return it immediately without releasing the lock. - // If the endpoint needs to be created (blocking network dial), release the - // lock first so other threads are not blocked during channel creation. - CachedTablet selected; + // Select a tablet while holding the lock. With state-aware routing, only READY + // endpoints pass shouldSkip(), so the selected tablet always has a cached + // endpoint. No foreground endpoint creation is needed — the lifecycle manager + // creates endpoints in the background. synchronized (this) { - selected = + CachedTablet selected = selectTabletLocked( preferLeader, hasDirectedReadOptions, @@ -600,25 +694,7 @@ ChannelEndpoint fillRoutingHint( if (selected == null) { return null; } - if (selected.endpoint != null || selected.serverAddress.isEmpty()) { - return selected.pick(hintBuilder); - } - // Slow path: endpoint not yet created. Capture the address and release the - // lock before calling endpointCache.get(), which may block on network dial. - hintBuilder.setTabletUid(selected.tabletUid); - } - - String serverAddress = selected.serverAddress; - ChannelEndpoint endpoint = endpointCache.get(serverAddress); - - synchronized (this) { - // Only update if the tablet's address hasn't changed since we released the lock. - if (selected.endpoint == null && selected.serverAddress.equals(serverAddress)) { - selected.endpoint = endpoint; - } - // Re-set tabletUid with the latest value in case update() ran concurrently. - hintBuilder.setTabletUid(selected.tabletUid); - return selected.endpoint; + return selected.pick(hintBuilder); } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java index 525313f1ab4e..7e9d2476346b 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java @@ -131,6 +131,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + // Auto-create for golden tests — simulates lifecycle manager having pre-created endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -158,6 +164,11 @@ public boolean isHealthy() { return !unhealthyServers.contains(address); } + @Override + public boolean isTransientFailure() { + return unhealthyServers.contains(address); + } + @Override public ManagedChannel getChannel() { return new ManagedChannel() { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java new file mode 100644 index 000000000000..6c45114435fb --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java @@ -0,0 +1,372 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; +import java.util.function.BooleanSupplier; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class EndpointLifecycleManagerTest { + + private EndpointLifecycleManager manager; + + /** Counter for generating unique finder keys in tests. */ + private static final AtomicLong TEST_FINDER_ID = new AtomicLong(1000); + + @After + public void tearDown() { + if (manager != null) { + manager.shutdown(); + } + } + + /** + * Registers addresses with the lifecycle manager via updateActiveAddresses, which atomically + * creates endpoints and registers them as active. This mirrors how ChannelFinder.update() works. + */ + private static String registerAddresses(EndpointLifecycleManager mgr, String... addresses) { + String finderId = "finder-" + TEST_FINDER_ID.incrementAndGet(); + Set addressSet = new HashSet<>(); + Collections.addAll(addressSet, addresses); + mgr.updateActiveAddresses(finderId, addressSet); + return finderId; + } + + private static void awaitCondition(String message, BooleanSupplier condition) { + long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadlineNanos) { + if (condition.getAsBoolean()) { + return; + } + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10)); + } + assertTrue(message, condition.getAsBoolean()); + } + + @Test + public void endpointCreationStartsProbing() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC()); + + registerAddresses(manager, "server1"); + awaitCondition( + "endpoint should be created in background", () -> cache.getIfPresent("server1") != null); + + // Endpoint should be created in the cache. + assertNotNull(cache.getIfPresent("server1")); + + // Should be managed. + assertTrue(manager.isManaged("server1")); + assertNotNull(manager.getEndpointState("server1")); + assertEquals(1, manager.managedEndpointCount()); + } + + @Test + public void duplicateRegistrationIsNoop() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + String finderId = registerAddresses(manager, "server1"); + // Re-register with the same finder ID — should not create duplicate state. + manager.updateActiveAddresses(finderId, Collections.singleton("server1")); + manager.updateActiveAddresses(finderId, Collections.singleton("server1")); + + assertEquals(1, manager.managedEndpointCount()); + } + + @Test + public void defaultEndpointIsNotManaged() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + registerAddresses(manager, "default"); + + assertFalse(manager.isManaged("default")); + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void probeTrafficDoesNotUpdateLastRealTrafficAt() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), clock); + + Instant creationTime = clock.instant(); + registerAddresses(manager, "server1"); + awaitCondition( + "probe should run after background endpoint creation", + () -> { + EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1"); + return state != null && state.lastProbeAt != null; + }); + + // Probe traffic should not change lastRealTrafficAt. + EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1"); + assertNotNull(state); + assertEquals(creationTime, state.lastRealTrafficAt); + } + + @Test + public void realRoutedTrafficUpdatesLastRealTrafficAt() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), clock); + + registerAddresses(manager, "server1"); + + Instant before = clock.instant(); + clock.advance(Duration.ofMinutes(5)); + manager.recordRealTraffic("server1"); + + EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1"); + assertNotNull(state); + assertTrue(state.lastRealTrafficAt.isAfter(before)); + } + + @Test + public void endpointWithOnlyProbeTrafficIsEvictedAfterIdleDuration() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + registerAddresses(manager, "server1"); + awaitCondition( + "endpoint should be created in background", () -> cache.getIfPresent("server1") != null); + + assertTrue(manager.isManaged("server1")); + + // Advance past idle threshold. + clock.advance(Duration.ofMinutes(31)); + + // Trigger eviction check manually. + manager.checkIdleEviction(); + + // Endpoint should be evicted. + assertFalse(manager.isManaged("server1")); + assertNull(cache.getIfPresent("server1")); + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void endpointWithRecentRealTrafficIsNotEvicted() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + registerAddresses(manager, "server1"); + + // Record real traffic at 20 minutes. + clock.advance(Duration.ofMinutes(20)); + manager.recordRealTraffic("server1"); + + // Advance to 31 minutes (only 11 minutes since last real traffic). + clock.advance(Duration.ofMinutes(11)); + manager.checkIdleEviction(); + + // Should NOT be evicted because last real traffic was 11 minutes ago. + assertTrue(manager.isManaged("server1")); + } + + @Test + public void evictedEndpointIsRecreatedOnDemand() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + registerAddresses(manager, "server1"); + awaitCondition( + "endpoint should be created in background", () -> cache.getIfPresent("server1") != null); + + // Evict. + clock.advance(Duration.ofMinutes(31)); + manager.checkIdleEviction(); + assertFalse(manager.isManaged("server1")); + + // Recreate. + manager.requestEndpointRecreation("server1"); + awaitCondition( + "endpoint should be recreated in background", () -> cache.getIfPresent("server1") != null); + + assertTrue(manager.isManaged("server1")); + assertNotNull(cache.getIfPresent("server1")); + } + + @Test + public void shutdownStopsAllProbing() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC()); + + registerAddresses(manager, "server1", "server2"); + + assertEquals(2, manager.managedEndpointCount()); + + manager.shutdown(); + + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void emptyOrNullAddressIsIgnored() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.updateActiveAddresses("finder-1", Collections.singleton("")); + manager.updateActiveAddresses("finder-2", Collections.emptySet()); + + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void recordRealTrafficForDefaultEndpointIsIgnored() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + // Should not throw or create state. + manager.recordRealTraffic("default"); + manager.recordRealTraffic(null); + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void staleEndpointEvictedWhenNoLongerActive() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + // Finder 1 reports server1 and server2. + String finder1 = registerAddresses(manager, "server1", "server2"); + assertEquals(2, manager.managedEndpointCount()); + + // Finder 1 updates: server1 is gone, only server2 remains. + manager.updateActiveAddresses(finder1, Collections.singleton("server2")); + + // server1 should be evicted since no finder references it. + assertFalse(manager.isManaged("server1")); + assertTrue(manager.isManaged("server2")); + assertEquals(1, manager.managedEndpointCount()); + } + + @Test + public void endpointKeptIfReferencedByAnotherFinder() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + // Finder 1 reports server1. + String finder1 = registerAddresses(manager, "server1"); + // Finder 2 also reports server1. + registerAddresses(manager, "server1"); + + // Finder 1 drops server1, but finder 2 still references it. + manager.updateActiveAddresses(finder1, Collections.emptySet()); + + assertTrue(manager.isManaged("server1")); + assertEquals(1, manager.managedEndpointCount()); + } + + @Test + public void unregisterFinderEvictsEndpointsNoLongerReferenced() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + String finder1 = registerAddresses(manager, "server1"); + String finder2 = registerAddresses(manager, "server2"); + + manager.unregisterFinder(finder1); + + assertFalse(manager.isManaged("server1")); + assertTrue(manager.isManaged("server2")); + assertEquals(1, manager.managedEndpointCount()); + + manager.unregisterFinder(finder2); + + assertEquals(0, manager.managedEndpointCount()); + } + + /** Test clock that can be advanced manually. */ + private static final class TestClock extends Clock { + private Instant now; + + TestClock(Instant now) { + this.now = now; + } + + void advance(Duration duration) { + now = now.plus(duration); + } + + @Override + public Instant instant() { + return now; + } + + @Override + public ZoneId getZone() { + return ZoneId.of("UTC"); + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + } +} diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java index 56e6d3cfc2bc..74afec18bfc3 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java @@ -114,10 +114,13 @@ public void healthReflectsChannelShutdown() throws Exception { GrpcChannelEndpointCache cache = new GrpcChannelEndpointCache(createProvider("localhost:1234")); try { ChannelEndpoint server = cache.get("localhost:1111"); - assertThat(server.isHealthy()).isTrue(); + // Newly created channel is not READY (likely IDLE), so isHealthy is false for location aware. + // isHealthy now requires READY state for location aware routing. + assertThat(server.isHealthy()).isFalse(); server.getChannel().shutdownNow(); assertThat(server.isHealthy()).isFalse(); + assertThat(server.isTransientFailure()).isFalse(); } finally { cache.shutdown(); } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java index 542d6e27ab6d..fdaeab3914bf 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java @@ -197,7 +197,8 @@ public void timeoutOnCommitClearsTransactionAffinity() throws Exception { commitCall.sendMessage( CommitRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build()); - assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(1); + // affinityEndpoint now uses getIfPresent (non-creating), so getCount stays at 0. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); @SuppressWarnings("unchecked") RecordingClientCall commitDelegate = @@ -211,7 +212,8 @@ public void timeoutOnCommitClearsTransactionAffinity() throws Exception { rollbackCall.sendMessage( RollbackRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build()); - assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(1); + // Rollback also uses getIfPresent for affinity, so getCount remains 0. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); } @Test @@ -1283,6 +1285,16 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + if (defaultAddress.equals(address)) { + return defaultEndpoint; + } + // Auto-create for integration tests — simulates lifecycle manager having pre-created + // endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -1344,6 +1356,11 @@ public boolean isHealthy() { return true; } + @Override + public boolean isTransientFailure() { + return false; + } + @Override public ManagedChannel getChannel() { return channel; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java index 763a36dbfd64..7fa2874ada5b 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java @@ -125,6 +125,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + // Auto-create for golden tests — simulates lifecycle manager having pre-created endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -154,6 +160,11 @@ public boolean isHealthy() { return true; } + @Override + public boolean isTransientFailure() { + return false; + } + @Override public ManagedChannel getChannel() { return channel; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java index 027994a6f2fe..c3b5a0ca16db 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java @@ -18,6 +18,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; import com.google.spanner.v1.CacheUpdate; @@ -33,6 +35,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -41,53 +44,33 @@ public class KeyRangeCacheTest { @Test - public void skipsUnhealthyTabletAfterItIsCached() { + public void skipsTransientFailureTabletWithSkippedTablet() { FakeEndpointCache endpointCache = new FakeEndpointCache(); KeyRangeCache cache = new KeyRangeCache(endpointCache); - cache.addRanges( - CacheUpdate.newBuilder() - .addRange( - Range.newBuilder() - .setStartKey(bytes("a")) - .setLimitKey(bytes("z")) - .setGroupUid(5) - .setSplitId(1) - .setGeneration(bytes("1"))) - .addGroup( - Group.newBuilder() - .setGroupUid(5) - .setGeneration(bytes("1")) - .setLeaderIndex(0) - .addTablets( - Tablet.newBuilder() - .setTabletUid(1) - .setServerAddress("server1") - .setIncarnation(bytes("1")) - .setDistance(0)) - .addTablets( - Tablet.newBuilder() - .setTabletUid(2) - .setServerAddress("server2") - .setIncarnation(bytes("1")) - .setDistance(0))) - .build()); + cache.addRanges(twoReplicaUpdate()); + + // Pre-create endpoints. + endpointCache.get("server1"); + endpointCache.get("server2"); + // Initial routing works. RoutingHint.Builder initialHint = RoutingHint.newBuilder().setKey(bytes("a")); ChannelEndpoint initialServer = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), initialHint); assertNotNull(initialServer); - endpointCache.setHealthy("server1", false); + // Mark server1 as TRANSIENT_FAILURE. + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); ChannelEndpoint server = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), hint); @@ -131,6 +114,10 @@ public void skipsExplicitlyExcludedTablet() { .setDistance(0))) .build()); + // Pre-create endpoints so getIfPresent() returns them. + endpointCache.get("server1"); + endpointCache.get("server2"); + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); ChannelEndpoint server = cache.fillRoutingHint( @@ -173,6 +160,8 @@ public void shrinkToEvictsRanges() { .setIncarnation(bytes("1")))) .build(); cache.addRanges(update); + // Pre-create endpoint so READY state check passes in shouldSkip. + endpointCache.get("server" + i); } checkContents(cache, numRanges, numRanges); @@ -188,6 +177,425 @@ public void shrinkToEvictsRanges() { checkContents(cache, 0, numRanges); } + @Test + public void readyEndpointIsUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Pre-create endpoint so getIfPresent finds it. Default state is READY. + endpointCache.get("server1"); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server1", server.getAddress()); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void idleEndpointIsNotUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Ensure endpoint exists in cache first. + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.IDLE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // IDLE causes silent skip — falls back to null (default host), no skipped_tablets. + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void connectingEndpointIsNotUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.CONNECTING); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureEndpointIsNotUsable() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // TRANSIENT_FAILURE: skip with skipped_tablets. + assertNull(server); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + @Test + public void unsupportedGetStateTreatedAsNotReady() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.UNSUPPORTED); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // Unsupported state: skip silently, no skipped_tablets. + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void missingEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + // Endpoint not in cache at all — getIfPresent returns null. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + endpointCache.setCreateOnGet(false); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void idleEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.IDLE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void connectingEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.CONNECTING); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureEndpointCausesSkippedTabletPlusDefaultHostFallback() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + @Test + public void oneUnusableReplicaAndOneReadyReplicaUsesReady() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(twoReplicaUpdate()); + + // Make both endpoints present. + endpointCache.get("server1"); + endpointCache.get("server2"); + + // server1 is IDLE (not ready), server2 is READY. + endpointCache.setState("server1", EndpointHealthState.IDLE); + endpointCache.setState("server2", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server2", server.getAddress()); + // server1 was IDLE, so no skipped_tablets for it. + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void readyEndpointIsUsedForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server1", server.getAddress()); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureReplicaSkippedAndReadyReplicaSelected() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(twoReplicaUpdate()); + + endpointCache.get("server1"); + endpointCache.get("server2"); + + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + endpointCache.setState("server2", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server2", server.getAddress()); + // server1 was TRANSIENT_FAILURE, so it should be in skipped_tablets. + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + // --- Eviction and recreation tests --- + + @Test + public void staleShutdownEndpointIsClearedAndRelookedUp() { + // Bug 1 regression: after idle eviction shuts down a channel, the tablet's cached + // endpoint reference becomes stale. shouldSkip must detect the shutdown channel, + // discard it, and re-lookup from the cache. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Route once so the tablet caches the endpoint reference. + endpointCache.get("server1"); + RoutingHint.Builder hint1 = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint first = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint1); + assertNotNull(first); + assertEquals("server1", first.getAddress()); + + // Simulate idle eviction: shut down the channel and evict from cache. + first.getChannel().shutdownNow(); + endpointCache.evict("server1"); + + // Without the fix, the tablet would keep using the stale shutdown endpoint forever. + // With the fix, shouldSkip detects the shutdown, clears it, and re-lookups from cache. + + // Re-create the endpoint (simulating lifecycle manager recreation). + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.READY); + + RoutingHint.Builder hint2 = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint second = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint2); + + // Should find the new READY endpoint. + assertNotNull(second); + assertEquals("server1", second.getAddress()); + assertEquals(0, hint2.getSkippedTabletUidCount()); + } + + @Test + public void missingEndpointTriggersRecreationViaLifecycleManager() { + // Bug 2 regression: when a routing lookup finds no endpoint, it should call + // requestEndpointRecreation so the endpoint becomes available for future requests. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + TrackingLifecycleManager tracking = new TrackingLifecycleManager(endpointCache); + try { + KeyRangeCache cache = new KeyRangeCache(endpointCache, tracking); + cache.addRanges(singleReplicaUpdate("server1")); + + // No endpoint exists in cache. + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // Should fall back to default. + assertNull(server); + + // Lifecycle manager should have been asked to recreate the endpoint. + assertTrue( + "requestEndpointRecreation should have been called for server1", + tracking.recreationRequested.contains("server1")); + } finally { + tracking.shutdown(); + } + } + + /** Minimal lifecycle manager stub that records recreation requests. */ + private static final class TrackingLifecycleManager extends EndpointLifecycleManager { + final java.util.Set recreationRequested = new java.util.HashSet<>(); + + TrackingLifecycleManager(ChannelEndpointCache cache) { + super(cache); + } + + @Override + void requestEndpointRecreation(String address) { + recreationRequested.add(address); + } + } + + // --- Helper methods --- + + private static CacheUpdate singleReplicaUpdate(String serverAddress) { + return CacheUpdate.newBuilder() + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(5) + .setSplitId(1) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(5) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1) + .setServerAddress(serverAddress) + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + + private static CacheUpdate twoReplicaUpdate() { + return CacheUpdate.newBuilder() + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(5) + .setSplitId(1) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(5) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1) + .setServerAddress("server1") + .setIncarnation(bytes("1")) + .setDistance(0)) + .addTablets( + Tablet.newBuilder() + .setTabletUid(2) + .setServerAddress("server2") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + private static void checkContents(KeyRangeCache cache, int expectedSize, int mustBeInCache) { assertEquals(expectedSize, cache.size()); int hitCount = 0; @@ -195,7 +603,7 @@ private static void checkContents(KeyRangeCache cache, int expectedSize, int mus RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes(String.format("%04d", i))); ChannelEndpoint server = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), hint); @@ -214,9 +622,23 @@ private static ByteString bytes(String value) { return ByteString.copyFromUtf8(value); } - private static final class FakeEndpointCache implements ChannelEndpointCache { + // --- Health state for testing --- + + enum EndpointHealthState { + READY, + IDLE, + CONNECTING, + TRANSIENT_FAILURE, + SHUTDOWN, + UNSUPPORTED + } + + // --- Test doubles --- + + static final class FakeEndpointCache implements ChannelEndpointCache { private final Map endpoints = new HashMap<>(); private final FakeEndpoint defaultEndpoint = new FakeEndpoint("default"); + private boolean createOnGet = true; @Override public ChannelEndpoint defaultChannel() { @@ -228,6 +650,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + @Nullable + public ChannelEndpoint getIfPresent(String address) { + return endpoints.get(address); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -238,18 +666,28 @@ public void shutdown() { endpoints.clear(); } - void setHealthy(String address, boolean healthy) { + void setCreateOnGet(boolean createOnGet) { + this.createOnGet = createOnGet; + } + + void setState(String address, EndpointHealthState state) { FakeEndpoint endpoint = endpoints.get(address); if (endpoint != null) { - endpoint.setHealthy(healthy); + endpoint.setState(state); } } + + @Deprecated + void setHealthy(String address, boolean healthy) { + setState( + address, healthy ? EndpointHealthState.READY : EndpointHealthState.TRANSIENT_FAILURE); + } } - private static final class FakeEndpoint implements ChannelEndpoint { + static final class FakeEndpoint implements ChannelEndpoint { private final String address; private final ManagedChannel channel = new FakeManagedChannel(); - private boolean healthy = true; + private EndpointHealthState state = EndpointHealthState.READY; FakeEndpoint(String address) { this.address = address; @@ -262,7 +700,12 @@ public String getAddress() { @Override public boolean isHealthy() { - return healthy; + return state == EndpointHealthState.READY; + } + + @Override + public boolean isTransientFailure() { + return state == EndpointHealthState.TRANSIENT_FAILURE; } @Override @@ -270,8 +713,8 @@ public ManagedChannel getChannel() { return channel; } - void setHealthy(boolean healthy) { - this.healthy = healthy; + void setState(EndpointHealthState state) { + this.state = state; } }