Skip to content

Commit

Permalink
Added opentelemetry instrumentation and reworked signals
Browse files Browse the repository at this point in the history
  • Loading branch information
maximebf committed May 27, 2024
1 parent f1e5154 commit 3735d4f
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 44 deletions.
17 changes: 17 additions & 0 deletions docs/instrumentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Instrumentation

SQLORM can be instrumented using OpenTelemetry.

It supports automatic instrumentation or can be enabled manually:

```py
from sqlorm.opentelemetry import SQLORMInstrumentor

SQLORMInstrumentor().instrument(engine=engine)
```

You can optionally configure SQLAlchemy instrumentation to enable sqlcommenter which enriches the query with contextual information.

```py
SQLORMInstrumentor().instrument(enable_commenter=True, commenter_options={})
```
16 changes: 12 additions & 4 deletions docs/signals.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ Signals allow you to listen and react to events emitted by sqlorm. The [blinker]

The following signals exist on the `Engine` class. The sender is always the engine instance.

- `connected`: receive `conn` (connection instance) and `from_pool` (bool)
- `disconnected`: receive `conn` (connection instance) and `close_conn` (bool). This signal may not indicate a connection has been closed but only returned to the pool. Use the `close_conn` kwargs to distinguish.
- `connected`: receive `conn` (connection instance)
- `pool_checkout`: connection checked out of the pool, receive `conn`
- `pool_checkin`: connection returned to the pool, receive `conn`
- `disconnected`: receive `conn` (connection instance)

Example:

Expand Down Expand Up @@ -44,8 +46,14 @@ def on_before_commit(session):

The following signals exist on the `Transaction` class. The sender is always the transaction instance.

- `before_execute`: receive `stmt` and `params`. Returning a cursor will stop sqlorm execute() and return the cursor directly
- `before_executemany`: receive `stmt` and `seq_of_parameters`. Returning false will stop sqlorm executemany()
- `before_execute`: receive `stmt`, `params` and `many` (to distinguish between execute and executemany)
- when called from execute: returning a cursor will stop sqlorm execution and return the cursor directly
- when called from executemany: returning False will stop sqlorm execution
- in both case, returning a tuple `(stmt, params)` will override stmt and params
- `after_execute`: receive `cursor`, `stmt`, `params` and `many`
- `handle_error`: receive `cursor`, `stmt`, `params`, `many` and `exc`:
- when handling for execute: return a cursor to prevent raising the exception
- when handling for executemany: return True to prevent raising

## Model

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ nav:
- signals.md
- schema.md
- drivers.md
- instrumentation.md

theme:
name: material
Expand Down
11 changes: 5 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@ packages = [{include = "sqlorm"}]
[tool.poetry.dependencies]
python = "^3.10"
blinker = "^1.8.2"

[tool.poetry.group.postgresql.dependencies]
psycopg = {extras = ["binary"], version = "^3.1.18"}

[tool.poetry.group.mysql.dependencies]
mysql-connector-python = "^8.3.0"
psycopg = { extras = ["binary"], version = "^3.1.18", optional = true }
mysql-connector-python = { version = "^8.3.0", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^8.0.0"
Expand All @@ -25,6 +21,9 @@ ruff = "^0.4.3"
mkdocs-material = "^9.5.24"
mkdocs-callouts = "^1.13.2"

[tool.poetry.plugins."opentelemetry_instrumentor"]
sqlorm = "sqlorm.opentelemetry:SQLORMInstrumentor"

[tool.ruff]
include = ["sqlorm/**/*.py"]
line-length = 100
Expand Down
3 changes: 3 additions & 0 deletions sqlorm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
from .model import BaseModel, Model, Column, ColumnExpr, Relationship, flag_dirty_attr, is_dirty
from .types import *
from .schema import create_all, create_table, init_db, migrate


__version__ = "0.2.0"
111 changes: 77 additions & 34 deletions sqlorm/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import inspect
import urllib.parse
import functools
from blinker import Namespace
from .sql import render, ParametrizedStmt
from .resultset import ResultSet, CompositeResultSet, CompositionMap
Expand Down Expand Up @@ -32,6 +33,8 @@ class Engine:
"""

connected = _signals.signal("connected")
pool_checkin = _signals.signal("pool-checkin")
pool_checkout = _signals.signal("pool-checkout")
disconnected = _signals.signal("disconnected")

@classmethod
Expand Down Expand Up @@ -82,46 +85,45 @@ def __init__(

def connect(self, from_pool=True):
if not from_pool or self.pool is False:
if self.logger:
getattr(self.logger, self.logger_level)("New connection established")
return self.connection_factory(self.dbapi)
return self._connect()

if self.pool:
conn = self.pool.pop(0)
if self.logger:
getattr(self.logger, self.logger_level)("Re-using connection from pool")
self.connected.send(self, conn=conn, from_pool=True)
self.pool_checkout.send(self, conn=conn)
elif not self.max_pool_conns or len(self.active_conns) < self.max_pool_conns:
if self.logger:
getattr(self.logger, self.logger_level)("New connection established")
conn = self.connection_factory(self.dbapi)
self.connected.send(self, conn=conn, from_pool=False)
conn = self._connect()
else:
raise EngineError("Max number of connections reached")

self.active_conns.append(conn)
return conn

def _connect(self):
if self.logger:
getattr(self.logger, self.logger_level)("Creating new connection")
conn = self.connection_factory(self.dbapi)
self.connected.send(self, conn=conn)
return conn

def disconnect(self, conn, force=False):
if conn in self.active_conns:
if force or self.pool is False:
self._close(conn)
elif conn in self.active_conns:
self.active_conns.remove(conn)
if force:
if self.logger:
getattr(self.logger, self.logger_level)("Closing connection (forced)")
conn.close()
self.disconnected.send(self, conn=conn, close_conn=True)
else:
if self.logger:
getattr(self.logger, self.logger_level)("Connection returned to pool")
self.pool.append(conn)
self.disconnected.send(self, conn=conn, close_conn=False)
elif self.pool is False or force:
if self.logger:
getattr(self.logger, self.logger_level)("Closing connection")
conn.close()
self.disconnected.send(self, conn=conn, close_conn=True)
getattr(self.logger, self.logger_level)("Returning connection to pool")
self.pool.append(conn)
self.pool_checkin.send(self, conn=conn)
else:
raise EngineError("Cannot close connection which is not part of pool")

def _close(self, conn):
if self.logger:
getattr(self.logger, self.logger_level)("Closing connection")
conn.close()
self.disconnected.send(self, conn=conn)

def disconnect_all(self):
if self.pool is False:
Expand All @@ -130,7 +132,7 @@ def disconnect_all(self):
getattr(self.logger, self.logger_level)("Closing all connections from pool")
for conn in self.pool + self.active_conns:
conn.close()
self.disconnected.send(self, conn=conn, close_conn=True)
self.disconnected.send(self, conn=conn)
self.pool = []
self.active_conns = []

Expand Down Expand Up @@ -375,7 +377,8 @@ class Transaction:
default_composite_separator = "__"

before_execute = _signals.signal("before-execute")
before_executemany = _signals.signal("before-executemany")
after_execute = _signals.signal("after-execute")
handle_error = _signals.signal("handle-error")

def __init__(self, session, virtual=False):
self.session = session
Expand Down Expand Up @@ -403,8 +406,10 @@ def cursor(self, stmt=None, params=None):
return self.session.connect().cursor()
stmt, params = render(stmt, params)

rv = _signal_rv(self.before_execute.send(self, stmt=stmt, params=params))
if rv:
rv = _signal_rv(self.before_execute.send(self, stmt=stmt, params=params, many=False))
if isinstance(rv, tuple):
stmt, params = rv
elif rv:
return rv

if self.session and self.session.logger:
Expand All @@ -413,10 +418,19 @@ def cursor(self, stmt=None, params=None):
)

cur = self.session.connect().cursor()
if params:
cur.execute(stmt, params)
else:
cur.execute(stmt)
try:
# because the default value of params may depend on some engine
if params:
cur.execute(stmt, params)
else:
cur.execute(stmt)
except Exception as e:
rv = _signal_rv(self.handle_error.send(self, cursor=cur, stmt=stmt, params=params, exc=e, many=False))
if rv:
return rv
raise

self.after_execute.send(self, cursor=cur, stmt=stmt, params=params, many=False)
return cur

def execute(self, stmt, params=None):
Expand All @@ -428,9 +442,11 @@ def execute(self, stmt, params=None):

def executemany(self, stmt, seq_of_parameters):
rv = _signal_rv(
self.before_executemany.send(self, stmt=stmt, seq_of_parameters=seq_of_parameters)
self.before_execute.send(self, stmt=stmt, params=seq_of_parameters, many=True)
)
if rv is False:
if isinstance(rv, tuple):
stmt, seq_of_parameters = rv
elif rv is False:
return

if self.session and self.session.logger:
Expand All @@ -439,7 +455,16 @@ def executemany(self, stmt, seq_of_parameters):
)

cur = self.cursor()
cur.executemany(str(stmt), seq_of_parameters)

try:
cur.executemany(str(stmt), seq_of_parameters)
except Exception as e:
if not _signal_rv(
self.handle_error.send(self, cursor=cur, stmt=stmt, params=seq_of_parameters, exc=e, many=True)
):
raise

self.after_execute.send(self, cursor=cur, stmt=stmt, params=seq_of_parameters, many=True)
cur.close()

def fetch(self, stmt, params=None, model=None, obj=None, loader=None):
Expand Down Expand Up @@ -555,3 +580,21 @@ def _signal_rv(signal_rv):
if rv:
final_rv = rv
return final_rv


def connect_via_engine(engine, signal, func=None):
def decorator(func):
@functools.wraps(func)
def wrapper(sender, **kw):
matches = False
if isinstance(sender, Engine):
matches = sender is engine
elif isinstance(sender, Session):
matches = sender.engine is engine
elif isinstance(sender, Transaction):
matches = sender.session.engine is engine
if matches:
return func(sender, **kw)
signal.connect(wrapper, weak=False)
return wrapper
return decorator(func) if func else decorator
107 changes: 107 additions & 0 deletions sqlorm/opentelemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Modified from opentelemetry-instrumentation-sqlalchemy
from collections.abc import Sequence
from typing import Collection

from wrapt import wrap_function_wrapper as _w

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.trace import get_tracer

import sqlorm

from .tracer import (
EngineTracer,
_wrap_connect,
_wrap_engine_init,
)


class SQLORMInstrumentor(BaseInstrumentor):
"""An instrumentor for SQLORM
See `BaseInstrumentor`
"""

def _instrument(self, **kwargs):
"""Instruments SQLORM engine creation methods and the engine
if passed as an argument.
Args:
**kwargs: Optional arguments
``engine``: a SQLORM engine instance
``engines``: a list of SQLORM engine instances
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``enable_commenter``: bool to enable sqlcommenter, defaults to False
``commenter_options``: dict of sqlcommenter config, defaults to {}
Returns:
An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise.
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(
__name__,
sqlorm.__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

meter_provider = kwargs.get("meter_provider")
meter = get_meter(
__name__,
sqlorm.__version__,
meter_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

connections_usage = meter.create_up_down_counter(
name=MetricInstruments.DB_CLIENT_CONNECTIONS_USAGE,
unit="connections",
description="The number of connections that are currently in state described by the state attribute.",
)

enable_commenter = kwargs.get("enable_commenter", False)
commenter_options = kwargs.get("commenter_options", {})

_w(
"sqlorm.engine",
"Engine.__init__",
_wrap_engine_init(
tracer, connections_usage, enable_commenter, commenter_options
),
)
_w(
"sqlorm.engine",
"Engine._connect",
_wrap_connect(tracer),
)
if kwargs.get("engine") is not None:
return EngineTracer(
tracer,
kwargs.get("engine"),
connections_usage,
kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}),
)
if kwargs.get("engines") is not None and isinstance(
kwargs.get("engines"), Sequence
):
return [
EngineTracer(
tracer,
engine,
connections_usage,
kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}),
)
for engine in kwargs.get("engines")
]

return None

def _uninstrument(self, **kwargs):
unwrap(sqlorm.Engine, "__init__")
unwrap(sqlorm.Engine, "_connect")
EngineTracer.remove_all_event_listeners()
Loading

0 comments on commit 3735d4f

Please sign in to comment.