Skip to content

Commit

Permalink
fix: directpath enabled attribute (#3477)
Browse files Browse the repository at this point in the history
* chore: add directpath_enabled attribute

* Created new grpc wrapper

* Created new grpc wrapper

* review comments
  • Loading branch information
surbhigarg92 authored Nov 20, 2024
1 parent bf350b0 commit ea1ebad
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_HASH_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
Expand Down Expand Up @@ -83,8 +82,6 @@ Map<String, String> createClientAttributes(String projectId, String client_name)
Map<String, String> clientAttributes = new HashMap<>();
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId);
// TODO: Replace this with real value.
clientAttributes.put(DIRECT_PATH_ENABLED_KEY.getKey(), "false");
clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown");
clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name);
String clientUid = getDefaultTaskValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcStubCallableFactory;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
Expand Down Expand Up @@ -78,13 +79,14 @@
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
import com.google.cloud.spanner.encryption.EncryptionConfigProtoMapper;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -276,6 +278,8 @@ public class GapicSpannerRpc implements SpannerRpc {
private final int numChannels;
private final boolean isGrpcGcpExtensionEnabled;

private Supplier<Boolean> directPathEnabledSupplier = () -> false;

public static GapicSpannerRpc create(SpannerOptions options) {
return new GapicSpannerRpc(options);
}
Expand Down Expand Up @@ -351,7 +355,9 @@ public GapicSpannerRpc(final SpannerOptions options) {
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault(options.getOpenTelemetry())))
SpannerInterceptorProvider.createDefault(
options.getOpenTelemetry(),
(() -> directPathEnabledSupplier.get()))))
// This sets the trace context headers.
.withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry())
// This sets the response compressor (Server -> Client).
Expand Down Expand Up @@ -396,18 +402,27 @@ public GapicSpannerRpc(final SpannerOptions options) {
final String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");

try {
SpannerStubSettings spannerStubSettings =
options
.getSpannerStubSettings()
.toBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.setTracerFactory(
options.getApiTracerFactory(
/* isAdminClient = */ false, isEmulatorEnabled(options, emulatorHost)))
.build();
ClientContext clientContext = ClientContext.create(spannerStubSettings);
this.spannerStub =
GrpcSpannerStub.create(
options
.getSpannerStubSettings()
.toBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.setTracerFactory(
options.getApiTracerFactory(
/* isAdminClient = */ false, isEmulatorEnabled(options, emulatorHost)))
.build());
GrpcSpannerStubWithStubSettingsAndClientContext.create(
spannerStubSettings, clientContext);
this.directPathEnabledSupplier =
Suppliers.memoize(
() -> {
return ((GrpcTransportChannel) clientContext.getTransportChannel()).isDirectPath()
&& isAttemptDirectPathXds;
});
this.readRetrySettings =
options.getSpannerStubSettings().streamingReadSettings().getRetrySettings();
this.readRetryableCodes =
Expand Down Expand Up @@ -455,7 +470,8 @@ public GapicSpannerRpc(final SpannerOptions options) {
.getStreamWatchdogProvider()
.withCheckInterval(pdmlSettings.getStreamWatchdogCheckInterval()));
}
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());
this.partitionedDmlStub =
GrpcSpannerStubWithStubSettingsAndClientContext.create(pdmlSettings.build());
this.instanceAdminStubSettings =
options
.getInstanceAdminStubSettings()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.spi.v1;

import com.google.api.gax.rpc.ClientContext;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import java.io.IOException;

/**
* Wrapper around {@link GrpcSpannerStub} to make the constructor available inside this package.
* This makes it possible to create a {@link GrpcSpannerStub} with a {@link SpannerStubSettings} and
* a {@link ClientContext}.
*/
class GrpcSpannerStubWithStubSettingsAndClientContext extends GrpcSpannerStub {

static final GrpcSpannerStubWithStubSettingsAndClientContext create(
SpannerStubSettings settings, ClientContext clientContext) throws IOException {
return new GrpcSpannerStubWithStubSettingsAndClientContext(settings, clientContext);
}

protected GrpcSpannerStubWithStubSettingsAndClientContext(
SpannerStubSettings settings, ClientContext clientContext) throws IOException {
super(settings, clientContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.spanner.CompositeTracer;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.base.Supplier;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.spanner.admin.database.v1.DatabaseName;
Expand Down Expand Up @@ -93,8 +94,12 @@ class HeaderInterceptor implements ClientInterceptor {
private static final Level LEVEL = Level.INFO;
private final SpannerRpcMetrics spannerRpcMetrics;

HeaderInterceptor(SpannerRpcMetrics spannerRpcMetrics) {
private final Supplier<Boolean> directPathEnabledSupplier;

HeaderInterceptor(
SpannerRpcMetrics spannerRpcMetrics, Supplier<Boolean> directPathEnabledSupplier) {
this.spannerRpcMetrics = spannerRpcMetrics;
this.directPathEnabledSupplier = directPathEnabledSupplier;
}

@Override
Expand Down Expand Up @@ -228,6 +233,9 @@ private Map<String, String> getBuiltInMetricAttributes(String key, DatabaseName
attributes.put(BuiltInMetricsConstant.DATABASE_KEY.getKey(), databaseName.getDatabase());
attributes.put(
BuiltInMetricsConstant.INSTANCE_ID_KEY.getKey(), databaseName.getInstance());
attributes.put(
BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY.getKey(),
String.valueOf(this.directPathEnabledSupplier.get()));
return attributes;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import io.opentelemetry.api.GlobalOpenTelemetry;
Expand Down Expand Up @@ -46,11 +48,22 @@ public static SpannerInterceptorProvider createDefault() {
}

public static SpannerInterceptorProvider createDefault(OpenTelemetry openTelemetry) {
return createDefault(
openTelemetry,
Suppliers.memoize(
() -> {
return false;
}));
}

public static SpannerInterceptorProvider createDefault(
OpenTelemetry openTelemetry, Supplier<Boolean> directPathEnabledSupplier) {
List<ClientInterceptor> defaultInterceptorList = new ArrayList<>();
defaultInterceptorList.add(new SpannerErrorInterceptor());
defaultInterceptorList.add(
new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER));
defaultInterceptorList.add(new HeaderInterceptor(new SpannerRpcMetrics(openTelemetry)));
defaultInterceptorList.add(
new HeaderInterceptor(new SpannerRpcMetrics(openTelemetry), directPathEnabledSupplier));
return new SpannerInterceptorProvider(ImmutableList.copyOf(defaultInterceptorList));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public static void setup() {
Attributes.builder()
.put(BuiltInMetricsConstant.PROJECT_ID_KEY, "test-project")
.put(BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY, "unknown")
.put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false")
.put(
BuiltInMetricsConstant.LOCATION_ID_KEY,
BuiltInOpenTelemetryMetricsProvider.detectClientLocation())
Expand Down

0 comments on commit ea1ebad

Please sign in to comment.