Skip to content

Commit

Permalink
feat(spanner): implement custom tracer_provider injection for opentel…
Browse files Browse the repository at this point in the history
…emetry traces (#1229)

* all: implement custom tracer_provider injection

An important feature for observability is to allow the injection
of a custom tracer_provider instead of always using the global
tracer_provider by sending in

    observability_options=dict(
        tracer_provider=tracer_provider,
        enable_extended_tracing=True,
    )

* Address review feedback by attaching observability_options to Client only

* Attach observability_options directly before trace_call

* More reverts for formatting

* Plumb observability_options into _restart_on_unavailable

* completely decouple observability_options from session

* apply SPANNER_ENABLE_EXTENDED_TRACING but in inverse due to compatibility

* Document SPANNER_ENABLE_EXTENDED_TRACING in environment

* Revert a vestige of mock

* tests: add unit test for propagating TracerProvider

* Add preliminary end-to-end test to check for injection of observability_options

* Document default enable_extended_tracing value

* Carve out observability_options test

* Ensure that observability_options test sets up and deletes database

* Ensure instance.create() is invoked in system tests

* Use getattr for mock _Client

* Update with code review suggestions

* Deal with mock.Mock false positives failing tests

* Address review feedback
  • Loading branch information
odeke-em authored Nov 15, 2024
1 parent 3079bdd commit 6869ed6
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 27 deletions.
25 changes: 22 additions & 3 deletions docs/opentelemetry-tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,21 @@ We also need to tell OpenTelemetry which exporter to use. To export Spanner trac
# Create and export one trace every 1000 requests
sampler = TraceIdRatioBased(1/1000)
# Use the default tracer provider
trace.set_tracer_provider(TracerProvider(sampler=sampler))
trace.get_tracer_provider().add_span_processor(
tracer_provider = TracerProvider(sampler=sampler)
tracer_provider.add_span_processor(
# Initialize the cloud tracing exporter
BatchSpanProcessor(CloudTraceSpanExporter())
)
observability_options = dict(
tracer_provider=tracer_provider,
# By default extended_tracing is set to True due
# to legacy reasons to avoid breaking changes, you
# can modify it though using the environment variable
# SPANNER_ENABLE_EXTENDED_TRACING=false.
enable_extended_tracing=False,
)
spanner = spanner.NewClient(project_id, observability_options=observability_options)
To get more fine-grained traces from gRPC, you can enable the gRPC instrumentation by the following
Expand All @@ -52,3 +61,13 @@ Generated spanner traces should now be available on `Cloud Trace <https://consol

Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request.
For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs <https://opentelemetry-python.readthedocs.io/en/stable/>`_

Annotating spans with SQL
~~~~~~~~~~~~~~~~~~~~~~~~~

By default your spans will be annotated with SQL statements where appropriate, but that can be a PII (Personally Identifiable Information)
leak. Sadly due to legacy behavior, we cannot simply turn off this behavior by default. However you can control this behavior by setting

SPANNER_ENABLE_EXTENDED_TRACING=false

to turn it off globally or when creating each SpannerClient, please set `observability_options.enable_extended_tracing=false`
11 changes: 7 additions & 4 deletions examples/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@ def main():
tracer_provider = TracerProvider(sampler=ALWAYS_ON)
trace_exporter = CloudTraceSpanExporter(project_id=project_id)
tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
trace.set_tracer_provider(tracer_provider)
# Retrieve a tracer from the global tracer provider.
tracer = tracer_provider.get_tracer('MyApp')

# Setup the Cloud Spanner Client.
spanner_client = spanner.Client(project_id)
spanner_client = spanner.Client(
project_id,
observability_options=dict(tracer_provider=tracer_provider, enable_extended_tracing=True),
)
instance = spanner_client.instance('test-instance')
database = instance.database('test-db')

# Retrieve a tracer from our custom tracer provider.
tracer = tracer_provider.get_tracer('MyApp')

# Now run our queries
with tracer.start_as_current_span('QueryInformationSchema'):
with database.snapshot() as snapshot:
Expand Down
27 changes: 25 additions & 2 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Manages OpenTelemetry trace creation and handling"""

from contextlib import contextmanager
import os

from google.cloud.spanner_v1 import SpannerClient
from google.cloud.spanner_v1 import gapic_version
Expand All @@ -33,6 +34,9 @@

TRACER_NAME = "cloud.google.com/python/spanner"
TRACER_VERSION = gapic_version.__version__
extended_tracing_globally_disabled = (
os.getenv("SPANNER_ENABLE_EXTENDED_TRACING", "").lower() == "false"
)


def get_tracer(tracer_provider=None):
Expand All @@ -51,13 +55,26 @@ def get_tracer(tracer_provider=None):


@contextmanager
def trace_call(name, session, extra_attributes=None):
def trace_call(name, session, extra_attributes=None, observability_options=None):
if not HAS_OPENTELEMETRY_INSTALLED or not session:
# Empty context manager. Users will have to check if the generated value is None or a span
yield None
return

tracer = get_tracer()
tracer_provider = None

# By default enable_extended_tracing=True because in a bid to minimize
# breaking changes and preserve legacy behavior, we are keeping it turned
# on by default.
enable_extended_tracing = True

if isinstance(observability_options, dict): # Avoid false positives with mock.Mock
tracer_provider = observability_options.get("tracer_provider", None)
enable_extended_tracing = observability_options.get(
"enable_extended_tracing", enable_extended_tracing
)

tracer = get_tracer(tracer_provider)

# Set base attributes that we know for every trace created
attributes = {
Expand All @@ -72,6 +89,12 @@ def trace_call(name, session, extra_attributes=None):
if extra_attributes:
attributes.update(extra_attributes)

if extended_tracing_globally_disabled:
enable_extended_tracing = False

if not enable_extended_tracing:
attributes.pop("db.statement", False)

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
) as span:
Expand Down
16 changes: 14 additions & 2 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,13 @@ def commit(
max_commit_delay=max_commit_delay,
request_options=request_options,
)
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.Commit",
self._session,
trace_attributes,
observability_options=observability_options,
):
method = functools.partial(
api.commit,
request=request,
Expand Down Expand Up @@ -318,7 +324,13 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
request_options=request_options,
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
)
with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes):
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.BatchWrite",
self._session,
trace_attributes,
observability_options=observability_options,
):
method = functools.partial(
api.batch_write,
request=request,
Expand Down
21 changes: 21 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ class Client(ClientWithProject):
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.
:type observability_options: dict (str -> any) or None
:param observability_options: (Optional) the configuration to control
the tracer's behavior.
tracer_provider is the injected tracer provider
enable_extended_tracing: :type:boolean when set to true will allow for
spans that issue SQL statements to be annotated with SQL.
Default `True`, please set it to `False` to turn it off
or you can use the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=<boolean>`
to control it.
:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
"""
Expand All @@ -146,6 +156,7 @@ def __init__(
query_options=None,
route_to_leader_enabled=True,
directed_read_options=None,
observability_options=None,
):
self._emulator_host = _get_spanner_emulator_host()

Expand Down Expand Up @@ -187,6 +198,7 @@ def __init__(

self._route_to_leader_enabled = route_to_leader_enabled
self._directed_read_options = directed_read_options
self._observability_options = observability_options

@property
def credentials(self):
Expand Down Expand Up @@ -268,6 +280,15 @@ def route_to_leader_enabled(self):
"""
return self._route_to_leader_enabled

@property
def observability_options(self):
"""Getter for observability_options.
:rtype: dict
:returns: The configured observability_options if set.
"""
return self._observability_options

@property
def directed_read_options(self):
"""Getter for directed_read_options.
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ def execute_pdml():
method=method,
request=request,
transaction_selector=txn_selector,
observability_options=self.observability_options,
)

result_set = StreamedResultSet(iterator)
Expand Down Expand Up @@ -1106,6 +1107,17 @@ def set_iam_policy(self, policy):
response = api.set_iam_policy(request=request, metadata=metadata)
return response

@property
def observability_options(self):
"""
Returns the observability options that you set when creating
the SpannerClient.
"""
if not (self._instance and self._instance._client):
return None

return getattr(self._instance._client, "observability_options", None)


class BatchCheckout(object):
"""Context manager for using a batch from a database.
Expand Down
20 changes: 17 additions & 3 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ def create(self):
if self._labels:
request.session.labels = self._labels

with trace_call("CloudSpanner.CreateSession", self, self._labels):
observability_options = getattr(self._database, "observability_options", None)
with trace_call(
"CloudSpanner.CreateSession",
self,
self._labels,
observability_options=observability_options,
):
session_pb = api.create_session(
request=request,
metadata=metadata,
Expand All @@ -169,7 +175,10 @@ def exists(self):
)
)

with trace_call("CloudSpanner.GetSession", self) as span:
observability_options = getattr(self._database, "observability_options", None)
with trace_call(
"CloudSpanner.GetSession", self, observability_options=observability_options
) as span:
try:
api.get_session(name=self.name, metadata=metadata)
if span:
Expand All @@ -194,7 +203,12 @@ def delete(self):
raise ValueError("Session ID not set by back-end")
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
with trace_call("CloudSpanner.DeleteSession", self):
observability_options = getattr(self._database, "observability_options", None)
with trace_call(
"CloudSpanner.DeleteSession",
self,
observability_options=observability_options,
):
api.delete_session(name=self.name, metadata=metadata)

def ping(self):
Expand Down
Loading

0 comments on commit 6869ed6

Please sign in to comment.