Skip to content

Commit

Permalink
Merge pull request #26 from dapper91/dev
Browse files Browse the repository at this point in the history
- asyncpg connection manager added.
  • Loading branch information
dapper91 committed Dec 9, 2023
2 parents 0167e97 + 431f2b4 commit 0561ada
Show file tree
Hide file tree
Showing 15 changed files with 158 additions and 28 deletions.
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,6 @@ repos:
name: mypy
pass_filenames: false
additional_dependencies:
- types-psycopg2
- types-psycopg2>=2.9.5
- asyncpg-stubs>=0.29.0
args: ["--package", "generic_connection_pool"]
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changelog
=========

0.7.0 (2023-12-09)
------------------

- asyncpg connection manager added.


0.6.1 (2023-10-06)
------------------

Expand Down
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ The following example illustrates how to create https pool:
try:
fetch('https://en.wikipedia.org/wiki/HTTP') # http connection is opened
fetch('https://en.wikipedia.org/wiki/Python_(programming_language)') # http connection is reused
fetch('https://en.wikipedia.org/wiki/HTTP') # http connection opened
fetch('https://en.wikipedia.org/wiki/Python_(programming_language)') # http connection reused
finally:
http_pool.close()
Expand Down Expand Up @@ -148,19 +148,19 @@ The following example illustrates how to create https pool:
)
try:
# connection is opened
# connection opened
with pg_pool.connection(endpoint='master') as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM pg_stats;")
print(cur.fetchone())
# connection is opened
# connection opened
with pg_pool.connection(endpoint='replica-1') as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM pg_stats;")
print(cur.fetchone())
# connection is reused
# connection reused
with pg_pool.connection(endpoint='master') as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM pg_stats;")
Expand Down
9 changes: 6 additions & 3 deletions docs/source/pages/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ _______
.. automodule:: generic_connection_pool.contrib.socket_async
:members:

.. automodule:: generic_connection_pool.contrib.psycopg2
:members:

.. automodule:: generic_connection_pool.contrib.unix
:members:

.. automodule:: generic_connection_pool.contrib.unix_async
:members:

.. automodule:: generic_connection_pool.contrib.psycopg2
:members:

.. automodule:: generic_connection_pool.contrib.asyncpg
:members:
4 changes: 2 additions & 2 deletions docs/source/pages/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ ____________________

After the pool is instantiated a connection can be acquired using :py:meth:`~generic_connection_pool.threading.ConnectionPool.acquire`
and released using :py:meth:`~generic_connection_pool.threading.ConnectionPool.release`.
To get rid of boilerplate code the connection manager supports automated acquiring using
To get rid of boilerplate code the connection pool supports automated connection acquiring using
:py:meth:`~generic_connection_pool.threading.ConnectionPool.connection` returning a context manager:


Expand All @@ -87,7 +87,7 @@ To get rid of boilerplate code the connection manager supports automated acquiri
Connection aliveness checks
___________________________

Connection manager provides an api for connection aliveness checks.
Connection manager provides the api for connection aliveness checks.
To implement that override method :py:meth:`~generic_connection_pool.threading.BaseConnectionManager.check_aliveness`.
The method must return ``True`` if connection is alive and ``False`` otherwise:

Expand Down
51 changes: 51 additions & 0 deletions examples/async_pg_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio

import asyncpg

from generic_connection_pool.asyncio import ConnectionPool
from generic_connection_pool.contrib.asyncpg import DbConnectionManager

Endpoint = str
Connection = asyncpg.Connection


async def main() -> None:
dsn_params = dict(database='postgres', user='postgres', password='secret')

pg_pool = ConnectionPool[Endpoint, 'Connection[asyncpg.Record]'](
DbConnectionManager[asyncpg.Record](
dsn_params={
'master': dict(dsn_params, host='db-master.local'),
'replica-1': dict(dsn_params, host='db-replica-1.local'),
'replica-2': dict(dsn_params, host='db-replica-2.local'),
},
),
acquire_timeout=2.0,
idle_timeout=60.0,
max_lifetime=600.0,
min_idle=3,
max_size=10,
total_max_size=15,
background_collector=True,
)

try:
# connection opened
async with pg_pool.connection(endpoint='master') as conn:
result = await conn.fetchval("SELECT inet_server_addr()")
print(result)

# connection opened
async with pg_pool.connection(endpoint='replica-1') as conn:
result = await conn.fetchval("SELECT inet_server_addr()")
print(result)

# connection reused
async with pg_pool.connection(endpoint='master') as conn:
result = await conn.fetchval("SELECT inet_server_addr()")
print(result)

finally:
await pg_pool.close()

asyncio.run(main())
7 changes: 5 additions & 2 deletions examples/async_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ async def main() -> None:

async def fetch(url: str, timeout: float = 5.0) -> None:
url = urllib.parse.urlsplit(url)
if url.hostname is None:
raise ValueError

port = url.port or 443 if url.scheme == 'https' else 80

async with http_pool.connection(endpoint=(url.hostname, port), timeout=timeout) as (reader, writer):
Expand Down Expand Up @@ -56,8 +59,8 @@ async def fetch(url: str, timeout: float = 5.0) -> None:
print(b''.join(chunks))

try:
await fetch('https://en.wikipedia.org/wiki/HTTP') # http connection is opened
await fetch('https://en.wikipedia.org/wiki/Python_(programming_language)') # http connection is reused
await fetch('https://en.wikipedia.org/wiki/HTTP') # http connection opened
await fetch('https://en.wikipedia.org/wiki/Python_(programming_language)') # http connection reused
finally:
await http_pool.close()

Expand Down
4 changes: 2 additions & 2 deletions examples/custom_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def command(addr: IPv4Address, port: int, cmd: str) -> None:


try:
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT ID') # tcp connection is opened
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT INFO') # tcp connection is reused
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT ID') # tcp connection opened
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT INFO') # tcp connection reused
finally:
redis_pool.close()
12 changes: 6 additions & 6 deletions examples/pg_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@
)

try:
# connection is opened
# connection opened
with pg_pool.connection(endpoint='master') as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM pg_stats;")
cur.execute("SELECT inet_server_addr()")
print(cur.fetchone())

# connection is opened
# connection opened
with pg_pool.connection(endpoint='replica-1') as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM pg_stats;")
cur.execute("SELECT inet_server_addr()")
print(cur.fetchone())

# connection is reused
# connection reused
with pg_pool.connection(endpoint='master') as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM pg_stats;")
cur.execute("SELECT inet_server_addr()")
print(cur.fetchone())

finally:
Expand Down
7 changes: 5 additions & 2 deletions examples/ssl_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

def fetch(url: str, timeout: float = 5.0) -> None:
url = urllib.parse.urlsplit(url)
if url.hostname is None:
raise ValueError

port = url.port or 443 if url.scheme == 'https' else 80

with http_pool.connection(endpoint=(url.hostname, port), timeout=timeout) as sock:
Expand All @@ -46,7 +49,7 @@ def fetch(url: str, timeout: float = 5.0) -> None:


try:
fetch('https://en.wikipedia.org/wiki/HTTP') # http connection is opened
fetch('https://en.wikipedia.org/wiki/Python_(programming_language)') # http connection is reused
fetch('https://en.wikipedia.org/wiki/HTTP') # http connection opened
fetch('https://en.wikipedia.org/wiki/Python_(programming_language)') # http connection reused
finally:
http_pool.close()
4 changes: 2 additions & 2 deletions examples/tcp_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def command(addr: IpAddress, port: int, cmd: str) -> None:


try:
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT ID') # tcp connection is opened
command(IPv4Address('127.0.0.1'), 6379, 'INFO') # tcp connection is reused
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT ID') # tcp connection opened
command(IPv4Address('127.0.0.1'), 6379, 'INFO') # tcp connection reused
finally:
redis_pool.close()
59 changes: 59 additions & 0 deletions generic_connection_pool/contrib/asyncpg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Postgres asyncpg connection manager implementation.
"""

from typing import Generic, Mapping, Optional, TypeVar

import asyncpg

from generic_connection_pool.asyncio import BaseConnectionManager

DbEndpoint = str
Connection = asyncpg.Connection
DsnParameters = Mapping[DbEndpoint, Mapping[str, str]]

RecordT = TypeVar('RecordT', bound=asyncpg.Record)


class DbConnectionManager(BaseConnectionManager[DbEndpoint, 'Connection[RecordT]'], Generic[RecordT]):
"""
Psycopg2 based postgres connection manager.
:param dsn_params: databases dsn parameters
"""

def __init__(self, dsn_params: DsnParameters):
self._dsn_params = dsn_params

async def create(
self,
endpoint: DbEndpoint,
timeout: Optional[float] = None,
) -> 'Connection[RecordT]':
return await asyncpg.connect(**self._dsn_params[endpoint]) # type: ignore[call-overload]

async def dispose(
self,
endpoint: DbEndpoint,
conn: 'Connection[RecordT]',
timeout: Optional[float] = None,
) -> None:
await conn.close(timeout=timeout)

async def check_aliveness(
self,
endpoint: DbEndpoint,
conn: 'Connection[RecordT]',
timeout: Optional[float] = None,
) -> bool:
return conn.is_closed()

async def on_acquire(self, endpoint: DbEndpoint, conn: 'Connection[RecordT]') -> None:
await self._rollback_uncommitted(conn)

async def on_release(self, endpoint: DbEndpoint, conn: 'Connection[RecordT]') -> None:
await self._rollback_uncommitted(conn)

async def _rollback_uncommitted(self, conn: 'Connection[RecordT]') -> None:
if conn.is_in_transaction():
await conn.execute('ROLLBACK')
2 changes: 1 addition & 1 deletion generic_connection_pool/contrib/unix.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .socket import socket_timeout

if sys.platform not in ('linux', 'darwin', 'freebsd'):
raise AssertionError('this module is supported only on unix platforms')
raise AssertionError('this module is only supported by unix platforms')


UnixSocketEndpoint = pathlib.Path
Expand Down
2 changes: 1 addition & 1 deletion generic_connection_pool/contrib/unix_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from generic_connection_pool.contrib.socket_async import StreamAlivenessCheckingMixin

if sys.platform not in ('linux', 'darwin', 'freebsd'):
raise AssertionError('this module is supported only on unix platforms')
raise AssertionError('this module is only supported by unix platforms')


UnixSocketEndpoint = pathlib.Path
Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "generic-connection-pool"
version = "0.6.1"
version = "0.7.0"
description = "generic connection pool"
authors = ["Dmitry Pershin <[email protected]>"]
license = "Unlicense"
Expand Down Expand Up @@ -30,6 +30,7 @@ classifiers = [
[tool.poetry.dependencies]
python = ">=3.9"
psycopg2 = {version = ">=2.9.5", optional = true}
asyncpg = {version = "^0.29.0", optional = true}

furo = {version = "^2023.7.26", optional = true}
Sphinx = {version = "^6.2.1", optional = true}
Expand All @@ -39,6 +40,7 @@ sphinx-copybutton = {version = "^0.5.2", optional = true}

[tool.poetry.extras]
psycopg2 = ["psycopg2"]
asyncpg = ["asyncpg"]
docs = ['Sphinx', 'sphinx_design', 'sphinx-copybutton', 'furo', 'toml']

[tool.poetry.dev-dependencies]
Expand All @@ -50,6 +52,8 @@ pytest-mock = "^3.11.1"
pytest-cov = "^4.1.0"
types-psycopg2 = "^2.9.21"
pytest-timeout = "^2.1.0"
asyncpg-stubs = {version = "^0.29.0", python = ">=3.9,<4.0"}


[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down

0 comments on commit 0561ada

Please sign in to comment.