From 2bc8ad9c06474e754cc2e0c86e3aaa4a61977842 Mon Sep 17 00:00:00 2001 From: Eno Compton Date: Fri, 27 Mar 2026 19:31:03 -0600 Subject: [PATCH] feat: add support for disabling built-in metrics This commit adds the OpenTelemetry-based wiring to report on internal operations to improve connectivity. To disable this internal metric collection, set enable_builtin_telemetry to False when creating a Connetor or AsyncConnector. Fixes #449 --- .../cloud/alloydbconnector/async_connector.py | 117 ++++- google/cloud/alloydbconnector/connector.py | 170 ++++++- google/cloud/alloydbconnector/exceptions.py | 12 + google/cloud/alloydbconnector/instance.py | 20 + .../alloydbconnector/instrumented_socket.py | 114 +++++ google/cloud/alloydbconnector/lazy.py | 21 + google/cloud/alloydbconnector/telemetry.py | 429 ++++++++++++++++++ pyproject.toml | 4 + tests/unit/test_instrumented_socket.py | 219 +++++++++ tests/unit/test_telemetry.py | 122 +++++ 10 files changed, 1206 insertions(+), 22 deletions(-) create mode 100644 google/cloud/alloydbconnector/instrumented_socket.py create mode 100644 google/cloud/alloydbconnector/telemetry.py create mode 100644 tests/unit/test_instrumented_socket.py create mode 100644 tests/unit/test_telemetry.py diff --git a/google/cloud/alloydbconnector/async_connector.py b/google/cloud/alloydbconnector/async_connector.py index d834f262..84c69561 100644 --- a/google/cloud/alloydbconnector/async_connector.py +++ b/google/cloud/alloydbconnector/async_connector.py @@ -16,10 +16,12 @@ import asyncio import logging +import time from types import TracebackType from typing import TYPE_CHECKING from typing import Any from typing import Optional +import uuid import google.auth from google.auth.credentials import with_scopes_if_required @@ -30,10 +32,21 @@ from google.cloud.alloydbconnector.enums import RefreshStrategy from google.cloud.alloydbconnector.exceptions import ClosedConnectorError from google.cloud.alloydbconnector.instance import RefreshAheadCache +from google.cloud.alloydbconnector.instance import _parse_instance_uri from google.cloud.alloydbconnector.lazy import LazyRefreshCache +from google.cloud.alloydbconnector.telemetry import DIAL_CACHE_ERROR +from google.cloud.alloydbconnector.telemetry import DIAL_SUCCESS +from google.cloud.alloydbconnector.telemetry import DIAL_TCP_ERROR +from google.cloud.alloydbconnector.telemetry import REFRESH_AHEAD_TYPE +from google.cloud.alloydbconnector.telemetry import REFRESH_LAZY_TYPE +from google.cloud.alloydbconnector.telemetry import MetricRecorderType +from google.cloud.alloydbconnector.telemetry import TelemetryAttributes +from google.cloud.alloydbconnector.telemetry import TelemetryProviderType +from google.cloud.alloydbconnector.telemetry import new_telemetry_provider from google.cloud.alloydbconnector.types import CacheTypes from google.cloud.alloydbconnector.utils import generate_keys from google.cloud.alloydbconnector.utils import strip_http_prefix +from google.cloud.alloydbconnector.version import __version__ if TYPE_CHECKING: from google.auth.credentials import Credentials @@ -72,6 +85,12 @@ class AsyncConnector: of the following: RefreshStrategy.LAZY ("LAZY") or RefreshStrategy.BACKGROUND ("BACKGROUND"). Default: RefreshStrategy.BACKGROUND + enable_builtin_telemetry (bool): Enable built-in telemetry that + reports connector metrics to the + alloydb.googleapis.com/client/connector metric prefix in + Cloud Monitoring. These metrics help AlloyDB improve performance + and identify client connectivity problems. Set to False to + disable. Default: True. """ def __init__( @@ -84,6 +103,7 @@ def __init__( ip_type: str | IPTypes = IPTypes.PRIVATE, user_agent: Optional[str] = None, refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND, + enable_builtin_telemetry: bool = True, ) -> None: self._cache: dict[str, CacheTypes] = {} # initialize default params @@ -132,6 +152,55 @@ def __init__( pass self._client: Optional[AlloyDBClient] = None self._closed = False + # built-in telemetry + self._enable_builtin_telemetry = enable_builtin_telemetry + self._client_uid = str(uuid.uuid4()) + self._metric_recorders: dict[str, MetricRecorderType] = {} + self._telemetry_provider: Optional[TelemetryProviderType] = None + self._monitoring_client: Optional[object] = None + if self._enable_builtin_telemetry: + try: + from google.cloud.monitoring_v3 import MetricServiceClient + + self._monitoring_client = MetricServiceClient( + credentials=self._credentials + ) + except Exception as e: + logger.debug(f"Built-in metrics exporter failed to initialize: {e}") + + def _get_telemetry_provider(self, project_id: str) -> TelemetryProviderType: + """Get or lazily create the TelemetryProvider on first connect.""" + if self._telemetry_provider is not None: + return self._telemetry_provider + self._telemetry_provider = new_telemetry_provider( + enabled=self._enable_builtin_telemetry, + project_id=project_id, + client_uid=self._client_uid, + version=__version__, + monitoring_client=self._monitoring_client, + ) + return self._telemetry_provider + + def _metric_recorder( + self, + instance_uri: str, + project: str, + region: str, + cluster: str, + name: str, + ) -> MetricRecorderType: + """Get or lazily create a MetricRecorder for the given instance.""" + if instance_uri in self._metric_recorders: + return self._metric_recorders[instance_uri] + provider = self._get_telemetry_provider(project) + mr = provider.create_metric_recorder( + project_id=project, + location=region, + cluster=cluster, + instance=name, + ) + self._metric_recorders[instance_uri] = mr + return mr async def connect( self, @@ -175,20 +244,36 @@ async def connect( enable_iam_auth = kwargs.pop("enable_iam_auth", self._enable_iam_auth) + # parse instance URI for telemetry resource labels + project, region, cluster, name = _parse_instance_uri(instance_uri) + mr = self._metric_recorder(instance_uri, project, region, cluster, name) + + attrs = TelemetryAttributes( + iam_authn=enable_iam_auth, + refresh_type=( + REFRESH_LAZY_TYPE + if self._refresh_strategy == RefreshStrategy.LAZY + else REFRESH_AHEAD_TYPE + ), + ) + start_time = time.monotonic() + # use existing connection info if possible - if instance_uri in self._cache: + cache_hit = instance_uri in self._cache + attrs.cache_hit = cache_hit + if cache_hit: cache = self._cache[instance_uri] else: if self._refresh_strategy == RefreshStrategy.LAZY: logger.debug( f"['{instance_uri}']: Refresh strategy is set to lazy refresh" ) - cache = LazyRefreshCache(instance_uri, self._client, self._keys) + cache = LazyRefreshCache(instance_uri, self._client, self._keys, mr) else: logger.debug( f"['{instance_uri}']: Refresh strategy is set to background refresh" ) - cache = RefreshAheadCache(instance_uri, self._client, self._keys) + cache = RefreshAheadCache(instance_uri, self._client, self._keys, mr) self._cache[instance_uri] = cache logger.debug(f"['{instance_uri}']: Connection info added to cache") @@ -218,6 +303,8 @@ async def connect( except Exception: # with an error from AlloyDB API call or IP type, invalidate the # cache and re-raise the error + attrs.dial_status = DIAL_CACHE_ERROR + mr.record_dial_count(attrs) await self._remove_cached(instance_uri) raise logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433") @@ -235,14 +322,24 @@ async def get_authentication_token() -> str: if enable_iam_auth: kwargs["password"] = get_authentication_token try: - return await connector( + conn = await connector( ip_address, await conn_info.create_ssl_context(), **kwargs ) except Exception: - # we attempt a force refresh, then throw the error + # The Async connector doesn't distinguish between TCP, TLS, or MDX + # errors. So treat all errors as TCP errors. + attrs.dial_status = DIAL_TCP_ERROR + mr.record_dial_count(attrs) await cache.force_refresh() raise + # record successful dial metrics + attrs.dial_status = DIAL_SUCCESS + latency_ms = (time.monotonic() - start_time) * 1000 + mr.record_dial_count(attrs) + mr.record_dial_latency(latency_ms) + return conn + async def _remove_cached(self, instance_uri: str) -> None: """Stops all background refreshes and deletes the connection info cache from the map of caches. @@ -269,4 +366,14 @@ async def close(self) -> None: """Helper function to cancel RefreshAheadCaches' tasks and close client.""" await asyncio.gather(*[cache.close() for cache in self._cache.values()]) + # shut down telemetry provider in executor to avoid blocking the + # event loop (shutdown triggers a final gRPC export) + if self._telemetry_provider is not None: + loop = asyncio.get_event_loop() + try: + await loop.run_in_executor( + None, self._telemetry_provider.shutdown + ) + except Exception: + pass self._closed = True diff --git a/google/cloud/alloydbconnector/connector.py b/google/cloud/alloydbconnector/connector.py index 3a269e91..2f30255e 100644 --- a/google/cloud/alloydbconnector/connector.py +++ b/google/cloud/alloydbconnector/connector.py @@ -21,13 +21,16 @@ import io import logging import socket +import ssl import struct from threading import Thread +import time from types import TracebackType from typing import TYPE_CHECKING from typing import Any from typing import Callable from typing import Optional +import uuid from google.auth import default from google.auth.credentials import TokenState @@ -38,18 +41,33 @@ from google.cloud.alloydbconnector.enums import IPTypes from google.cloud.alloydbconnector.enums import RefreshStrategy from google.cloud.alloydbconnector.exceptions import ClosedConnectorError +from google.cloud.alloydbconnector.exceptions import MetadataExchangeError +from google.cloud.alloydbconnector.exceptions import TCPConnectionError +from google.cloud.alloydbconnector.exceptions import TLSHandshakeError from google.cloud.alloydbconnector.instance import RefreshAheadCache +from google.cloud.alloydbconnector.instance import _parse_instance_uri +from google.cloud.alloydbconnector.instrumented_socket import InstrumentedSocket from google.cloud.alloydbconnector.lazy import LazyRefreshCache import google.cloud.alloydbconnector.pg8000 as pg8000 import google.cloud.alloydbconnector.psycopg as psycopg from google.cloud.alloydbconnector.static import StaticConnectionInfoCache +from google.cloud.alloydbconnector.telemetry import DIAL_CACHE_ERROR +from google.cloud.alloydbconnector.telemetry import DIAL_MDX_ERROR +from google.cloud.alloydbconnector.telemetry import DIAL_SUCCESS +from google.cloud.alloydbconnector.telemetry import DIAL_TCP_ERROR +from google.cloud.alloydbconnector.telemetry import DIAL_TLS_ERROR +from google.cloud.alloydbconnector.telemetry import REFRESH_AHEAD_TYPE +from google.cloud.alloydbconnector.telemetry import REFRESH_LAZY_TYPE +from google.cloud.alloydbconnector.telemetry import MetricRecorderType +from google.cloud.alloydbconnector.telemetry import TelemetryAttributes +from google.cloud.alloydbconnector.telemetry import TelemetryProviderType +from google.cloud.alloydbconnector.telemetry import new_telemetry_provider from google.cloud.alloydbconnector.types import CacheTypes from google.cloud.alloydbconnector.utils import generate_keys from google.cloud.alloydbconnector.utils import strip_http_prefix +from google.cloud.alloydbconnector.version import __version__ if TYPE_CHECKING: - import ssl - from google.auth.credentials import Credentials logger = logging.getLogger(name=__name__) @@ -95,6 +113,12 @@ class Connector: This is a *dev-only* option and should not be used in production as it will result in failed connections after the client certificate expires. + enable_builtin_telemetry (bool): Enable built-in telemetry that + reports connector metrics to the + alloydb.googleapis.com/client/connector metric prefix in + Cloud Monitoring. These metrics help AlloyDB improve performance + and identify client connectivity problems. Set to False to + disable. Default: True. """ def __init__( @@ -108,6 +132,7 @@ def __init__( user_agent: Optional[str] = None, refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND, static_conn_info: Optional[io.TextIOBase] = None, + enable_builtin_telemetry: bool = True, ) -> None: # create event loop and start it in background thread self._loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() @@ -151,6 +176,55 @@ def __init__( self._client: Optional[AlloyDBClient] = None self._static_conn_info = static_conn_info self._closed = False + # built-in telemetry + self._enable_builtin_telemetry = enable_builtin_telemetry + self._client_uid = str(uuid.uuid4()) + self._metric_recorders: dict[str, MetricRecorderType] = {} + self._telemetry_provider: Optional[TelemetryProviderType] = None + self._monitoring_client: Optional[object] = None + if self._enable_builtin_telemetry: + try: + from google.cloud.monitoring_v3 import MetricServiceClient + + self._monitoring_client = MetricServiceClient( + credentials=self._credentials + ) + except Exception as e: + logger.debug(f"Built-in metrics exporter failed to initialize: {e}") + + def _get_telemetry_provider(self, project_id: str) -> TelemetryProviderType: + """Get or lazily create the TelemetryProvider on first connect.""" + if self._telemetry_provider is not None: + return self._telemetry_provider + self._telemetry_provider = new_telemetry_provider( + enabled=self._enable_builtin_telemetry, + project_id=project_id, + client_uid=self._client_uid, + version=__version__, + monitoring_client=self._monitoring_client, + ) + return self._telemetry_provider + + def _metric_recorder( + self, + instance_uri: str, + project: str, + region: str, + cluster: str, + name: str, + ) -> MetricRecorderType: + """Get or lazily create a MetricRecorder for the given instance.""" + if instance_uri in self._metric_recorders: + return self._metric_recorders[instance_uri] + provider = self._get_telemetry_provider(project) + mr = provider.create_metric_recorder( + project_id=project, + location=region, + cluster=cluster, + instance=name, + ) + self._metric_recorders[instance_uri] = mr + return mr def connect(self, instance_uri: str, driver: str, **kwargs: Any) -> Any: """ @@ -210,8 +284,25 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) -> driver=driver, ) enable_iam_auth = kwargs.pop("enable_iam_auth", self._enable_iam_auth) + + # parse instance URI for telemetry resource labels + project, region, cluster, name = _parse_instance_uri(instance_uri) + mr = self._metric_recorder(instance_uri, project, region, cluster, name) + + attrs = TelemetryAttributes( + iam_authn=enable_iam_auth, + refresh_type=( + REFRESH_LAZY_TYPE + if self._refresh_strategy == RefreshStrategy.LAZY + else REFRESH_AHEAD_TYPE + ), + ) + start_time = time.monotonic() + # use existing connection info if possible - if instance_uri in self._cache: + cache_hit = instance_uri in self._cache + attrs.cache_hit = cache_hit + if cache_hit: cache = self._cache[instance_uri] elif self._static_conn_info: cache = StaticConnectionInfoCache(instance_uri, self._static_conn_info) @@ -220,12 +311,12 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) -> logger.debug( f"['{instance_uri}']: Refresh strategy is set to lazy refresh" ) - cache = LazyRefreshCache(instance_uri, self._client, self._keys) + cache = LazyRefreshCache(instance_uri, self._client, self._keys, mr) else: logger.debug( f"['{instance_uri}']: Refresh strategy is set to background refresh" ) - cache = RefreshAheadCache(instance_uri, self._client, self._keys) + cache = RefreshAheadCache(instance_uri, self._client, self._keys, mr) self._cache[instance_uri] = cache logger.debug(f"['{instance_uri}']: Connection info added to cache") @@ -256,6 +347,8 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) -> except Exception: # with an error from AlloyDB API call or IP type, invalidate the # cache and re-raise the error + attrs.dial_status = DIAL_CACHE_ERROR + mr.record_dial_count(attrs) await self._remove_cached(instance_uri) raise logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433") @@ -270,13 +363,39 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) -> enable_iam_auth, ) sock = await self._loop.run_in_executor(None, metadata_partial) - connect_partial = partial(connector, sock, **kwargs) - return await self._loop.run_in_executor(None, connect_partial) + except TCPConnectionError: + attrs.dial_status = DIAL_TCP_ERROR + mr.record_dial_count(attrs) + await cache.force_refresh() + raise + except TLSHandshakeError: + attrs.dial_status = DIAL_TLS_ERROR + mr.record_dial_count(attrs) + await cache.force_refresh() + raise + except MetadataExchangeError: + attrs.dial_status = DIAL_MDX_ERROR + mr.record_dial_count(attrs) + await cache.force_refresh() + raise + try: + instrumented_sock = InstrumentedSocket(sock, mr, attrs) + connect_partial = partial(connector, instrumented_sock, **kwargs) + conn = await self._loop.run_in_executor(None, connect_partial) except Exception: - # we attempt a force refresh, then throw the error + attrs.dial_status = DIAL_MDX_ERROR + mr.record_dial_count(attrs) await cache.force_refresh() raise + # record successful dial metrics + attrs.dial_status = DIAL_SUCCESS + latency_ms = (time.monotonic() - start_time) * 1000 + mr.record_dial_count(attrs) + mr.record_dial_latency(latency_ms) + mr.record_open_connection(attrs) + return conn + def metadata_exchange( self, instance_uri: str, @@ -314,12 +433,19 @@ def metadata_exchange( Returns: sock (ssl.SSLSocket): mTLS/SSL socket connected to AlloyDB Proxy server. """ - # Create socket and wrap with SSL/TLS context - sock = ctx.wrap_socket( - socket.create_connection((ip_address, SERVER_PROXY_PORT)), - server_hostname=ip_address, - ) - # set auth type for metadata exchange + try: + raw_sock = socket.create_connection((ip_address, SERVER_PROXY_PORT)) + except OSError as e: + raise TCPConnectionError(str(e)) from e + try: + sock = ctx.wrap_socket(raw_sock, server_hostname=ip_address) + except ssl.SSLError as e: + raw_sock.close() + raise TLSHandshakeError(str(e)) from e + except OSError as e: + raw_sock.close() + raise TCPConnectionError(str(e)) from e + auth_type = connectorspb.MetadataExchangeRequest.DB_NATIVE if enable_iam_auth: auth_type = connectorspb.MetadataExchangeRequest.AUTO_IAM @@ -360,7 +486,7 @@ def metadata_exchange( while message_len_buffer_size > 0: chunk = sock.recv(message_len_buffer_size) if not chunk: - raise RuntimeError( + raise MetadataExchangeError( "Connection closed while getting metadata exchange length!" ) message_len_buffer += chunk @@ -373,7 +499,7 @@ def metadata_exchange( while message_len > 0: chunk = sock.recv(message_len) if not chunk: - raise RuntimeError( + raise MetadataExchangeError( "Connection closed while performing metadata exchange!" ) buffer += chunk @@ -387,7 +513,7 @@ def metadata_exchange( # validate metadata exchange response if resp.response_code != connectorspb.MetadataExchangeResponse.OK: - raise ValueError( + raise MetadataExchangeError( f"Metadata Exchange request has failed with error: {resp.error}" ) @@ -436,3 +562,13 @@ async def close_async(self) -> None: """Helper function to cancel RefreshAheadCaches' tasks and close client.""" await asyncio.gather(*[cache.close() for cache in self._cache.values()]) + # shut down telemetry provider in executor to avoid blocking the + # event loop (shutdown triggers a final gRPC export) + if self._telemetry_provider is not None: + loop = asyncio.get_event_loop() + try: + await loop.run_in_executor( + None, self._telemetry_provider.shutdown + ) + except Exception: + pass diff --git a/google/cloud/alloydbconnector/exceptions.py b/google/cloud/alloydbconnector/exceptions.py index f636a96c..c58f22e7 100644 --- a/google/cloud/alloydbconnector/exceptions.py +++ b/google/cloud/alloydbconnector/exceptions.py @@ -23,3 +23,15 @@ class IPTypeNotFoundError(Exception): class ClosedConnectorError(Exception): pass + + +class TCPConnectionError(Exception): + pass + + +class TLSHandshakeError(Exception): + pass + + +class MetadataExchangeError(Exception): + pass diff --git a/google/cloud/alloydbconnector/instance.py b/google/cloud/alloydbconnector/instance.py index 4cdb84cd..ddaa23bf 100644 --- a/google/cloud/alloydbconnector/instance.py +++ b/google/cloud/alloydbconnector/instance.py @@ -27,6 +27,12 @@ from google.cloud.alloydbconnector.rate_limiter import AsyncRateLimiter from google.cloud.alloydbconnector.refresh_utils import _is_valid from google.cloud.alloydbconnector.refresh_utils import _seconds_until_refresh +from google.cloud.alloydbconnector.telemetry import REFRESH_AHEAD_TYPE +from google.cloud.alloydbconnector.telemetry import REFRESH_FAILURE +from google.cloud.alloydbconnector.telemetry import REFRESH_SUCCESS +from google.cloud.alloydbconnector.telemetry import MetricRecorderType +from google.cloud.alloydbconnector.telemetry import NullMetricRecorder +from google.cloud.alloydbconnector.telemetry import TelemetryAttributes if TYPE_CHECKING: from cryptography.hazmat.primitives.asymmetric import rsa @@ -77,6 +83,7 @@ def __init__( instance_uri: str, client: AlloyDBClient, keys: asyncio.Future[tuple[rsa.RSAPrivateKey, str]], + metric_recorder: MetricRecorderType = NullMetricRecorder(), ) -> None: # validate and parse instance_uri self._project, self._region, self._cluster, self._name = _parse_instance_uri( @@ -86,6 +93,7 @@ def __init__( self._instance_uri = instance_uri self._client = client self._keys = keys + self._metric_recorder = metric_recorder self._refresh_rate_limiter = AsyncRateLimiter( max_capacity=2, rate=1 / 30, @@ -127,12 +135,24 @@ async def _perform_refresh(self) -> ConnectionInfo: f"['{self._instance_uri}']: Current certificate expiration = " f"{connection_info.expiration.isoformat()}" ) + self._metric_recorder.record_refresh_count( + TelemetryAttributes( + refresh_status=REFRESH_SUCCESS, + refresh_type=REFRESH_AHEAD_TYPE, + ) + ) except Exception as e: logger.debug( f"['{self._instance_uri}']: Connection info refresh operation" f" failed: {str(e)}" ) + self._metric_recorder.record_refresh_count( + TelemetryAttributes( + refresh_status=REFRESH_FAILURE, + refresh_type=REFRESH_AHEAD_TYPE, + ) + ) raise finally: diff --git a/google/cloud/alloydbconnector/instrumented_socket.py b/google/cloud/alloydbconnector/instrumented_socket.py new file mode 100644 index 00000000..2942c9ba --- /dev/null +++ b/google/cloud/alloydbconnector/instrumented_socket.py @@ -0,0 +1,114 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Any + +from google.cloud.alloydbconnector.telemetry import MetricRecorderType +from google.cloud.alloydbconnector.telemetry import TelemetryAttributes + + +class InstrumentedSocket: + """A thin socket wrapper that tracks bytes sent/received and records + + a closed connection metric on close. + + Delegates all attribute access to the underlying socket so it can be + used as a drop-in replacement. + """ + + def __init__( + self, + sock: Any, + metric_recorder: MetricRecorderType, + attrs: TelemetryAttributes, + ) -> None: + self._sock = sock + self._mr = metric_recorder + self._attrs = attrs + self._closed = False + + def recv(self, bufsize: int, flags: int = 0) -> bytes: + data = self._sock.recv(bufsize, flags) + if data: + self._mr.record_bytes_rx(len(data)) + return data + + def recv_into(self, buffer: Any, nbytes: int = 0, flags: int = 0) -> int: + n = self._sock.recv_into(buffer, nbytes, flags) + if n > 0: + self._mr.record_bytes_rx(n) + return n + + def send(self, data: bytes, flags: int = 0) -> int: + n = self._sock.send(data, flags) + if n > 0: + self._mr.record_bytes_tx(n) + return n + + def sendall(self, data: bytes, flags: int = 0) -> None: + self._sock.sendall(data, flags) + self._mr.record_bytes_tx(len(data)) + + def read(self, bufsize: int = 0) -> bytes: + data = self._sock.read(bufsize) + if data: + self._mr.record_bytes_rx(len(data)) + return data + + def write(self, data: bytes) -> int: + n = self._sock.write(data) + if n > 0: + self._mr.record_bytes_tx(n) + return n + + def makefile( + self, + mode: str = "r", + buffering: Any = None, + *, + encoding: Any = None, + errors: Any = None, + newline: Any = None, + ) -> Any: + import socket + + # Explicitly call the standard library makefile function + # passing self instead of the raw socket, so that all reads and writes will + # call the functions above and allow for gathering telemetry. + return socket.socket.makefile( + self, # type: ignore[call-overload] + mode, + buffering, + encoding=encoding, + errors=errors, + newline=newline, + ) + + def close(self) -> None: + if not self._closed: + self._closed = True + self._mr.record_closed_connection(self._attrs) + self._sock.close() + + def __del__(self) -> None: + try: + if getattr(self, "_closed", True) is False: + self.close() + except Exception: + pass + + def __getattr__(self, name: str) -> Any: + return getattr(self._sock, name) diff --git a/google/cloud/alloydbconnector/lazy.py b/google/cloud/alloydbconnector/lazy.py index b05d5140..1ff65c92 100644 --- a/google/cloud/alloydbconnector/lazy.py +++ b/google/cloud/alloydbconnector/lazy.py @@ -23,6 +23,12 @@ from google.cloud.alloydbconnector.connection_info import ConnectionInfo from google.cloud.alloydbconnector.instance import _parse_instance_uri from google.cloud.alloydbconnector.refresh_utils import _refresh_buffer +from google.cloud.alloydbconnector.telemetry import REFRESH_FAILURE +from google.cloud.alloydbconnector.telemetry import REFRESH_LAZY_TYPE +from google.cloud.alloydbconnector.telemetry import REFRESH_SUCCESS +from google.cloud.alloydbconnector.telemetry import MetricRecorderType +from google.cloud.alloydbconnector.telemetry import NullMetricRecorder +from google.cloud.alloydbconnector.telemetry import TelemetryAttributes logger = logging.getLogger(name=__name__) @@ -41,6 +47,7 @@ def __init__( instance_uri: str, client: AlloyDBClient, keys: asyncio.Future, + metric_recorder: MetricRecorderType = NullMetricRecorder(), ) -> None: """Initializes a LazyRefreshCache instance. @@ -50,6 +57,7 @@ def __init__( client (AlloyDBClient): The AlloyDB client instance. keys (asyncio.Future): A future to the client's public-private key pair. + metric_recorder: Recorder for built-in telemetry metrics. """ # validate and parse instance connection name self._project, self._region, self._cluster, self._name = _parse_instance_uri( @@ -59,6 +67,7 @@ def __init__( self._keys = keys self._client = client + self._metric_recorder = metric_recorder self._lock = asyncio.Lock() self._cached: Optional[ConnectionInfo] = None self._needs_refresh = False @@ -106,6 +115,12 @@ async def connect_info(self) -> ConnectionInfo: f"['{self._instance_uri}']: Connection info " f"refresh operation failed: {str(e)}" ) + self._metric_recorder.record_refresh_count( + TelemetryAttributes( + refresh_status=REFRESH_FAILURE, + refresh_type=REFRESH_LAZY_TYPE, + ) + ) raise logger.debug( f"['{self._instance_uri}']: Connection info " @@ -115,6 +130,12 @@ async def connect_info(self) -> ConnectionInfo: f"['{self._instance_uri}']: Current certificate " f"expiration = {str(conn_info.expiration)}" ) + self._metric_recorder.record_refresh_count( + TelemetryAttributes( + refresh_status=REFRESH_SUCCESS, + refresh_type=REFRESH_LAZY_TYPE, + ) + ) self._cached = conn_info self._needs_refresh = False return conn_info diff --git a/google/cloud/alloydbconnector/telemetry.py b/google/cloud/alloydbconnector/telemetry.py new file mode 100644 index 00000000..448952b7 --- /dev/null +++ b/google/cloud/alloydbconnector/telemetry.py @@ -0,0 +1,429 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Built-in telemetry for the AlloyDB Python Connector. + +This module provides internal metrics collection using OpenTelemetry with a +Cloud Monitoring exporter. Metrics are exported to the +alloydb.googleapis.com/client/connector metric prefix. + +The telemetry is enabled by default and can be disabled via the +``enable_builtin_telemetry`` option on the Connector or AsyncConnector. + +Architecture +------------ +A single ``_TelemetryProvider`` is created per Connector (lazily on first +connect, once the project ID is known). It owns the OTel ``MeterProvider``, +``PeriodicExportingMetricReader``, exporter, and shared instruments. This +means **one background export thread** per Connector regardless of how many +instances it connects to. + +Each instance gets a lightweight ``_MetricRecorder`` that holds pre-built +attribute dicts and a reference to the shared instruments. Instance identity +(project, location, cluster, instance, client_uid) is carried as metric +attributes on every data point. At export time, the custom exporter moves +these from metric labels to the monitored resource labels on each time +series so that Cloud Monitoring associates them with the correct +``alloydb.googleapis.com/InstanceClient`` resource. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import logging +from typing import Any +from typing import Optional +from typing import Union + +logger = logging.getLogger(__name__) + +_PYTHON_CONNECTOR = "python" + +# Meter name — matches Go connector. +_METER_NAME = "alloydb.googleapis.com/client/connector" +# Monitored resource type — matches Go connector. +_MONITORED_RESOURCE = "alloydb.googleapis.com/InstanceClient" + +# Metric names. +_DIAL_COUNT = "dial_count" +_DIAL_LATENCY = "dial_latencies" +_OPEN_CONNECTIONS = "open_connections" +_BYTES_SENT = "bytes_sent_count" +_BYTES_RECEIVED = "bytes_received_count" +_REFRESH_COUNT = "refresh_count" + +# Resource attribute keys — used as metric attributes on each data point +# and extracted by the exporter to set per-series monitored resource labels. +_RESOURCE_TYPE_KEY = "gcp.resource_type" +_PROJECT_ID = "project_id" +_LOCATION = "location" +_CLUSTER_ID = "cluster_id" +_INSTANCE_ID = "instance_id" +_CLIENT_UID = "client_uid" + +_RESOURCE_LABEL_KEYS = frozenset( + {_PROJECT_ID, _LOCATION, _CLUSTER_ID, _INSTANCE_ID, _CLIENT_UID} +) + +# Metric attribute keys. +_CONNECTOR_TYPE = "connector_type" +_AUTH_TYPE = "auth_type" +_IS_CACHE_HIT = "is_cache_hit" +_STATUS = "status" +_REFRESH_TYPE = "refresh_type" + +# Dial status values. +DIAL_SUCCESS = "success" +DIAL_USER_ERROR = "user_error" +DIAL_CACHE_ERROR = "cache_error" +DIAL_TCP_ERROR = "tcp_error" +DIAL_TLS_ERROR = "tls_error" +DIAL_MDX_ERROR = "mdx_error" + +# Refresh status values. +REFRESH_SUCCESS = "success" +REFRESH_FAILURE = "failure" + +# Refresh type values. +REFRESH_AHEAD_TYPE = "refresh_ahead" +REFRESH_LAZY_TYPE = "lazy" + +# Default export interval in milliseconds. +_DEFAULT_EXPORT_INTERVAL_MS = 60_000 + + +@dataclass +class TelemetryAttributes: + """Holds metadata to attach to a metric recording.""" + + iam_authn: bool = False + cache_hit: bool = False + dial_status: str = "" + refresh_status: str = "" + refresh_type: str = "" + + +def _auth_type_value(iam_authn: bool) -> str: + return "iam" if iam_authn else "built_in" + + +class NullTelemetryProvider: + """A no-op TelemetryProvider for when telemetry is disabled.""" + + def shutdown(self) -> None: + pass + + def create_metric_recorder( + self, + project_id: str, + location: str, + cluster: str, + instance: str, + ) -> MetricRecorderType: + return NullMetricRecorder() + + +class NullMetricRecorder: + """A no-op MetricRecorder for when telemetry is disabled.""" + + def record_dial_count(self, attrs: TelemetryAttributes) -> None: + pass + + def record_dial_latency(self, latency_ms: float) -> None: + pass + + def record_open_connection(self, attrs: TelemetryAttributes) -> None: + pass + + def record_closed_connection(self, attrs: TelemetryAttributes) -> None: + pass + + def record_bytes_rx(self, count: int) -> None: + pass + + def record_bytes_tx(self, count: int) -> None: + pass + + def record_refresh_count(self, attrs: TelemetryAttributes) -> None: + pass + + +class _TelemetryProvider: + """Owns a single MeterProvider shared across all instances. + + Created once per Connector (lazily on first connect). Holds the OTel + MeterProvider, PeriodicExportingMetricReader (one background thread), + and the shared instrument objects (counters, histogram). + + Call ``create_metric_recorder`` to get a lightweight per-instance + ``_MetricRecorder`` that records to the shared instruments with + instance-specific attributes. + """ + + def __init__( + self, + project_id: str, + client_uid: str, + version: str, + monitoring_client: object, + ) -> None: + from opentelemetry.exporter.cloud_monitoring import ( + CloudMonitoringMetricsExporter, + ) + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk.resources import Resource + + class _SystemMetricsExporter(CloudMonitoringMetricsExporter): + """Exporter that extracts instance labels from metric attributes + and sets them as monitored resource labels on each time series.""" + + def _batch_write(self, series: Any) -> None: + # Because all instances share a single MeterProvider, + # instance identity (project, location, cluster, instance, + # client_uid) arrives here as metric labels rather than + # resource labels. We pop them from metric labels and set + # them as the monitored resource labels so Cloud Monitoring + # associates each time series with the correct + # InstanceClient resource. + for ts in series: + ts.resource.type = _MONITORED_RESOURCE + ts.resource.labels.clear() + for key in _RESOURCE_LABEL_KEYS: + val = ts.metric.labels.pop(key, None) + if val is not None: + ts.resource.labels[key] = val + super()._batch_write(series) + + def _get_metric_descriptor(self, metric: Any) -> Any: + descriptor_type = f"{self._prefix}/{metric.name}" + if descriptor_type in self._metric_descriptors: + return self._metric_descriptors[descriptor_type] + + from opentelemetry.sdk.metrics.export import Sum + + from google.api import metric_pb2 + + data = metric.data + if isinstance(data, Sum): + metric_kind = ( + metric_pb2.MetricDescriptor.MetricKind.CUMULATIVE + if data.is_monotonic + else metric_pb2.MetricDescriptor.MetricKind.GAUGE + ) + else: + metric_kind = metric_pb2.MetricDescriptor.MetricKind.CUMULATIVE + + descriptor = metric_pb2.MetricDescriptor( + type=descriptor_type, + metric_kind=metric_kind, + unit=metric.unit or "", + ) + self._metric_descriptors[descriptor_type] = descriptor + return descriptor + + resource = Resource.create( + {_RESOURCE_TYPE_KEY: _MONITORED_RESOURCE} + ) + + exporter = _SystemMetricsExporter( + project_id=project_id, + client=monitoring_client, + prefix="alloydb.googleapis.com/client/connector", + ) + + # Suppress noisy ERROR logs from the exporter when Cloud Monitoring + # is not enabled or the caller lacks permissions. + _exporter_logger = logging.getLogger("opentelemetry.exporter.cloud_monitoring") + if not logger.isEnabledFor(logging.DEBUG): + _exporter_logger.setLevel(logging.CRITICAL) + + reader = PeriodicExportingMetricReader( + exporter, + export_interval_millis=_DEFAULT_EXPORT_INTERVAL_MS, + ) + + self._provider = MeterProvider( + resource=resource, + metric_readers=[reader], + ) + + meter = self._provider.get_meter( + _METER_NAME, + version=version, + ) + + self._dial_count = meter.create_counter(_DIAL_COUNT) + self._dial_latency = meter.create_histogram(_DIAL_LATENCY) + self._open_connections = meter.create_up_down_counter(_OPEN_CONNECTIONS) + self._bytes_tx = meter.create_counter(_BYTES_SENT) + self._bytes_rx = meter.create_counter(_BYTES_RECEIVED) + self._refresh_count = meter.create_counter(_REFRESH_COUNT) + + self._client_uid = client_uid + + def shutdown(self) -> None: + self._provider.shutdown() + + def create_metric_recorder( + self, + project_id: str, + location: str, + cluster: str, + instance: str, + ) -> _MetricRecorder: + """Create a lightweight MetricRecorder for a specific instance.""" + return _MetricRecorder( + provider=self, + project_id=project_id, + location=location, + cluster=cluster, + instance=instance, + client_uid=self._client_uid, + ) + + +class _MetricRecorder: + """Lightweight per-instance recorder that delegates to shared instruments. + + Created by ``_TelemetryProvider.create_metric_recorder``. Holds pre-built + attribute dicts that include both metric attributes (connector_type, etc.) + and resource-identifying labels (project, location, cluster, instance, + client_uid). The exporter's ``_batch_write`` moves the resource labels + from metric labels to the monitored resource on each time series. + + Hot-path methods (``record_bytes_rx``, ``record_bytes_tx``) use cached + dicts to avoid per-call allocation. + """ + + def __init__( + self, + provider: _TelemetryProvider, + project_id: str, + location: str, + cluster: str, + instance: str, + client_uid: str, + ) -> None: + self._p = provider + + # Resource-identifying labels included in every metric data point. + resource_labels = { + _PROJECT_ID: project_id, + _LOCATION: location, + _CLUSTER_ID: cluster, + _INSTANCE_ID: instance, + _CLIENT_UID: client_uid, + } + + # Pre-build attribute dicts for hot-path methods to avoid + # allocating a new dict on every socket recv/send call. + self._bytes_attrs = { + _CONNECTOR_TYPE: _PYTHON_CONNECTOR, + **resource_labels, + } + self._latency_attrs = { + _CONNECTOR_TYPE: _PYTHON_CONNECTOR, + **resource_labels, + } + + # Base attrs for methods that add dynamic keys per call. + self._resource_labels = resource_labels + + def record_dial_count(self, attrs: TelemetryAttributes) -> None: + self._p._dial_count.add( + 1, + { + _CONNECTOR_TYPE: _PYTHON_CONNECTOR, + _AUTH_TYPE: _auth_type_value(attrs.iam_authn), + _IS_CACHE_HIT: attrs.cache_hit, + _STATUS: attrs.dial_status, + **self._resource_labels, + }, + ) + + def record_dial_latency(self, latency_ms: float) -> None: + self._p._dial_latency.record(latency_ms, self._latency_attrs) + + def record_open_connection(self, attrs: TelemetryAttributes) -> None: + self._p._open_connections.add( + 1, + { + _CONNECTOR_TYPE: _PYTHON_CONNECTOR, + _AUTH_TYPE: _auth_type_value(attrs.iam_authn), + **self._resource_labels, + }, + ) + + def record_closed_connection(self, attrs: TelemetryAttributes) -> None: + self._p._open_connections.add( + -1, + { + _CONNECTOR_TYPE: _PYTHON_CONNECTOR, + _AUTH_TYPE: _auth_type_value(attrs.iam_authn), + **self._resource_labels, + }, + ) + + def record_bytes_rx(self, count: int) -> None: + self._p._bytes_rx.add(count, self._bytes_attrs) + + def record_bytes_tx(self, count: int) -> None: + self._p._bytes_tx.add(count, self._bytes_attrs) + + def record_refresh_count(self, attrs: TelemetryAttributes) -> None: + self._p._refresh_count.add( + 1, + { + _CONNECTOR_TYPE: _PYTHON_CONNECTOR, + _STATUS: attrs.refresh_status, + _REFRESH_TYPE: attrs.refresh_type, + **self._resource_labels, + }, + ) + + +# Type alias for use in type annotations. +MetricRecorderType = Union[_MetricRecorder, NullMetricRecorder] +TelemetryProviderType = Union[_TelemetryProvider, NullTelemetryProvider] + + +def new_telemetry_provider( + enabled: bool, + project_id: str, + client_uid: str, + version: str, + monitoring_client: Optional[object] = None, +) -> TelemetryProviderType: + """Create a new TelemetryProvider. + + Returns a NullTelemetryProvider if telemetry is disabled or if + initialization fails. + """ + if not enabled: + logger.debug("Disabling built-in metrics") + return NullTelemetryProvider() + if monitoring_client is None: + logger.debug("Metric client is None, disabling built-in metrics") + return NullTelemetryProvider() + try: + return _TelemetryProvider( + project_id=project_id, + client_uid=client_uid, + version=version, + monitoring_client=monitoring_client, + ) + except Exception as e: + logger.debug(f"Built-in metrics exporter failed to initialize: {e}") + return NullTelemetryProvider() diff --git a/pyproject.toml b/pyproject.toml index b01657d1..b2225571 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,10 @@ dependencies = [ "protobuf", "google-cloud-alloydb", "google-api-core", + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-gcp-monitoring", + "google-cloud-monitoring", ] dynamic = ["version"] diff --git a/tests/unit/test_instrumented_socket.py b/tests/unit/test_instrumented_socket.py new file mode 100644 index 00000000..7cde6858 --- /dev/null +++ b/tests/unit/test_instrumented_socket.py @@ -0,0 +1,219 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from unittest.mock import MagicMock + +from google.cloud.alloydbconnector.instrumented_socket import InstrumentedSocket +from google.cloud.alloydbconnector.telemetry import NullMetricRecorder +from google.cloud.alloydbconnector.telemetry import TelemetryAttributes + + +class _RecordingMetricRecorder(NullMetricRecorder): + def __init__(self) -> None: + self.rx = 0 + self.tx = 0 + self.closed_calls: list[TelemetryAttributes] = [] + + def record_bytes_rx(self, count: int) -> None: + self.rx += count + + def record_bytes_tx(self, count: int) -> None: + self.tx += count + + def record_closed_connection(self, attrs: TelemetryAttributes) -> None: + self.closed_calls.append(attrs) + + +def _make_isock(sock: MagicMock) -> tuple[InstrumentedSocket, _RecordingMetricRecorder]: + mr = _RecordingMetricRecorder() + attrs = TelemetryAttributes() + return InstrumentedSocket(sock, mr, attrs), mr + + +def test_recv_records_bytes() -> None: + sock = MagicMock() + sock.recv.return_value = b"hello" + isock, mr = _make_isock(sock) + + assert isock.recv(10) == b"hello" + assert mr.rx == 5 + sock.recv.assert_called_once_with(10, 0) + + +def test_recv_empty_does_not_record() -> None: + sock = MagicMock() + sock.recv.return_value = b"" + isock, mr = _make_isock(sock) + + assert isock.recv(10) == b"" + assert mr.rx == 0 + + +def test_recv_into_records_bytes() -> None: + sock = MagicMock() + sock.recv_into.return_value = 7 + isock, mr = _make_isock(sock) + buf = bytearray(10) + + assert isock.recv_into(buf, 10) == 7 + assert mr.rx == 7 + + +def test_recv_into_zero_does_not_record() -> None: + sock = MagicMock() + sock.recv_into.return_value = 0 + isock, mr = _make_isock(sock) + + assert isock.recv_into(bytearray(10)) == 0 + assert mr.rx == 0 + + +def test_send_records_bytes() -> None: + sock = MagicMock() + sock.send.return_value = 4 + isock, mr = _make_isock(sock) + + assert isock.send(b"data") == 4 + assert mr.tx == 4 + + +def test_send_zero_does_not_record() -> None: + sock = MagicMock() + sock.send.return_value = 0 + isock, mr = _make_isock(sock) + + assert isock.send(b"") == 0 + assert mr.tx == 0 + + +def test_sendall_records_len_of_data() -> None: + sock = MagicMock() + isock, mr = _make_isock(sock) + + isock.sendall(b"hello world") + assert mr.tx == 11 + sock.sendall.assert_called_once_with(b"hello world", 0) + + +def test_read_records_bytes() -> None: + sock = MagicMock() + sock.read.return_value = b"abcd" + isock, mr = _make_isock(sock) + + assert isock.read(4) == b"abcd" + assert mr.rx == 4 + + +def test_read_empty_does_not_record() -> None: + sock = MagicMock() + sock.read.return_value = b"" + isock, mr = _make_isock(sock) + + assert isock.read(4) == b"" + assert mr.rx == 0 + + +def test_write_records_bytes() -> None: + sock = MagicMock() + sock.write.return_value = 3 + isock, mr = _make_isock(sock) + + assert isock.write(b"abc") == 3 + assert mr.tx == 3 + + +def test_write_zero_does_not_record() -> None: + sock = MagicMock() + sock.write.return_value = 0 + isock, mr = _make_isock(sock) + + assert isock.write(b"") == 0 + assert mr.tx == 0 + + +def test_close_records_closed_connection_once() -> None: + sock = MagicMock() + attrs = TelemetryAttributes(dial_status="success") + mr = _RecordingMetricRecorder() + isock = InstrumentedSocket(sock, mr, attrs) + + isock.close() + isock.close() # Second close should be no-op for metric + assert len(mr.closed_calls) == 1 + assert mr.closed_calls[0] is attrs + assert sock.close.call_count == 2 + + +def test_getattr_delegates_to_underlying_socket() -> None: + sock = MagicMock() + sock.fileno.return_value = 42 + isock, _ = _make_isock(sock) + + assert isock.fileno() == 42 + + +def test_del_closes_if_not_closed() -> None: + sock = MagicMock() + mr = _RecordingMetricRecorder() + isock = InstrumentedSocket(sock, mr, TelemetryAttributes()) + + isock.__del__() + assert len(mr.closed_calls) == 1 + + +def test_del_noop_if_already_closed() -> None: + sock = MagicMock() + mr = _RecordingMetricRecorder() + isock = InstrumentedSocket(sock, mr, TelemetryAttributes()) + + isock.close() + isock.__del__() + assert len(mr.closed_calls) == 1 + + +def test_del_swallows_exceptions() -> None: + sock = MagicMock() + sock.close.side_effect = RuntimeError("boom") + mr = _RecordingMetricRecorder() + isock = InstrumentedSocket(sock, mr, TelemetryAttributes()) + + # Should not raise. + isock.__del__() + + +def test_makefile_uses_instrumented_socket_for_io() -> None: + import socket as _socket + + s1, s2 = _socket.socketpair() + try: + mr = _RecordingMetricRecorder() + isock = InstrumentedSocket(s1, mr, TelemetryAttributes()) + + f = isock.makefile("rwb", 0) + try: + s2.sendall(b"hello\n") + assert f.read(6) == b"hello\n" + assert mr.rx == 6 + + f.write(b"world") + f.flush() + assert s2.recv(5) == b"world" + assert mr.tx == 5 + finally: + f.close() + finally: + s1.close() + s2.close() diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py new file mode 100644 index 00000000..e0b71bb3 --- /dev/null +++ b/tests/unit/test_telemetry.py @@ -0,0 +1,122 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.alloydbconnector.telemetry import DIAL_SUCCESS +from google.cloud.alloydbconnector.telemetry import REFRESH_AHEAD_TYPE +from google.cloud.alloydbconnector.telemetry import REFRESH_SUCCESS +from google.cloud.alloydbconnector.telemetry import NullMetricRecorder +from google.cloud.alloydbconnector.telemetry import NullTelemetryProvider +from google.cloud.alloydbconnector.telemetry import TelemetryAttributes +from google.cloud.alloydbconnector.telemetry import _auth_type_value +from google.cloud.alloydbconnector.telemetry import new_telemetry_provider + + +class TestAuthTypeValue: + def test_iam(self) -> None: + assert _auth_type_value(True) == "iam" + + def test_builtin(self) -> None: + assert _auth_type_value(False) == "built_in" + + +class TestTelemetryAttributes: + def test_defaults(self) -> None: + attrs = TelemetryAttributes() + assert attrs.iam_authn is False + assert attrs.cache_hit is False + assert attrs.dial_status == "" + assert attrs.refresh_status == "" + assert attrs.refresh_type == "" + + def test_custom_values(self) -> None: + attrs = TelemetryAttributes( + iam_authn=True, + cache_hit=True, + dial_status=DIAL_SUCCESS, + refresh_status=REFRESH_SUCCESS, + refresh_type=REFRESH_AHEAD_TYPE, + ) + assert attrs.iam_authn is True + assert attrs.cache_hit is True + assert attrs.dial_status == DIAL_SUCCESS + assert attrs.refresh_status == REFRESH_SUCCESS + assert attrs.refresh_type == REFRESH_AHEAD_TYPE + + +class TestNullMetricRecorder: + """NullMetricRecorder should be a no-op for all methods.""" + + def test_record_dial_count(self) -> None: + NullMetricRecorder().record_dial_count(TelemetryAttributes()) + + def test_record_dial_latency(self) -> None: + NullMetricRecorder().record_dial_latency(100.0) + + def test_record_open_connection(self) -> None: + NullMetricRecorder().record_open_connection(TelemetryAttributes()) + + def test_record_closed_connection(self) -> None: + NullMetricRecorder().record_closed_connection(TelemetryAttributes()) + + def test_record_bytes_rx(self) -> None: + NullMetricRecorder().record_bytes_rx(1024) + + def test_record_bytes_tx(self) -> None: + NullMetricRecorder().record_bytes_tx(1024) + + def test_record_refresh_count(self) -> None: + NullMetricRecorder().record_refresh_count(TelemetryAttributes()) + + +class TestNullTelemetryProvider: + def test_shutdown(self) -> None: + NullTelemetryProvider().shutdown() + + def test_create_metric_recorder_returns_null(self) -> None: + provider = NullTelemetryProvider() + mr = provider.create_metric_recorder("proj", "loc", "clust", "inst") + assert isinstance(mr, NullMetricRecorder) + + +class TestNewTelemetryProvider: + def test_disabled_returns_null(self) -> None: + provider = new_telemetry_provider( + enabled=False, + project_id="proj", + client_uid="uid", + version="1.0.0", + ) + assert isinstance(provider, NullTelemetryProvider) + + def test_none_client_returns_null(self) -> None: + provider = new_telemetry_provider( + enabled=True, + project_id="proj", + client_uid="uid", + version="1.0.0", + monitoring_client=None, + ) + assert isinstance(provider, NullTelemetryProvider) + + def test_invalid_client_returns_null(self) -> None: + """If the exporter fails to initialize, return NullTelemetryProvider.""" + provider = new_telemetry_provider( + enabled=True, + project_id="proj", + client_uid="uid", + version="1.0.0", + monitoring_client="not-a-real-client", + ) + # Should gracefully fall back to NullTelemetryProvider + assert isinstance(provider, NullTelemetryProvider)