Skip to content

Commit

Permalink
fixup! clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Sep 9, 2024
1 parent e341947 commit 08ce745
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions cr8/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from urllib.parse import urlparse, parse_qs, urlunparse
from datetime import datetime, date
from typing import List, Union, Iterable
from typing import List, Union, Iterable, Dict
from decimal import Decimal
from cr8.aio import asyncio # import via aio for uvloop setup

Expand Down Expand Up @@ -321,58 +321,62 @@ def __init__(self, hosts, conn_pool_limit=25, session_settings=None):
self.urls = itertools.cycle(list(map(_append_sql, hosts)))
self.conn_pool_limit = conn_pool_limit
self.is_cratedb = True
self._pool = []
self._pools = {}
self.session_settings = session_settings or {}

@property
async def _session(self):
if not self._pool:
self._connector_params = {
async def _session(self, url):
pool = self._pools.get(url)
if not pool:
pool = asyncio.Queue(maxsize=self.conn_pool_limit)
self._pools[url] = pool
_connector_params = {
'limit': 1,
'verify_ssl': _verify_ssl_from_first(self.hosts)
}
for n in range(0, self.conn_pool_limit):
tcp_connector = aiohttp.TCPConnector(**self._connector_params)
tcp_connector = aiohttp.TCPConnector(**_connector_params)
session = aiohttp.ClientSession(connector=tcp_connector)
for setting, value in self.session_settings.items():
payload = {'stmt': f'set {setting}={value}'}
await _exec(
session,
next(self.urls),
url,
dumps(payload, cls=CrateJsonEncoder)
)
self._pool.append(session)
pool.put_nowait(session)

return self._pool.pop()
return await pool.get()


async def execute(self, stmt, args=None):
payload = {'stmt': _plain_or_callable(stmt)}
if args:
payload['args'] = _plain_or_callable(args)
session = await self._session
url = next(self.urls)
session = await self._session(url)
result = await _exec(
session,
next(self.urls),
url,
dumps(payload, cls=CrateJsonEncoder)
)
self._pool.append(session)
await self._pools[url].put(session)
return result

async def execute_many(self, stmt, bulk_args):
data = dumps(dict(
stmt=_plain_or_callable(stmt),
bulk_args=_plain_or_callable(bulk_args)
), cls=CrateJsonEncoder)
session = await self._session
result = await _exec(session, next(self.urls), data)
self._pool.append(session)
url = next(self.urls)
session = await self._session(url)
result = await _exec(session, url, data)
await self._pools[url].put(session)
return result

async def get_server_version(self):
session = await self._session
urlparts = urlparse(self.hosts[0])
url = urlunparse((urlparts.scheme, urlparts.netloc, '/', '', '', ''))
session = await self._session(url)
async with session.get(url) as resp:
r = await resp.json()
version = r['version']
Expand All @@ -381,12 +385,15 @@ async def get_server_version(self):
'number': version['number'],
'date': _date_or_none(version['build_timestamp'][:10])
}
self._pool.append(session)
await self._pools[url].put(session)
return result

async def _close(self):
for session in self._pool:
await session.close()
for url, pool in self._pools.items():
while not pool.empty():
session = await pool.get()
await session.close()
self._pools = {}

def close(self):
asyncio.get_event_loop().run_until_complete(self._close())
Expand Down

0 comments on commit 08ce745

Please sign in to comment.