From 90589e58cb926e880f8b1880286e294a8991daeb Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 20 May 2025 17:05:35 +0200 Subject: [PATCH] QuestDB support Address type introspection failures with QuestDB due to its lack of recursive CTE support. This primarily impacts types not built into asyncpg, such as float8[]. This commit: - Automatically registers float8[] when connected to QuestDB. - Disables certain features like advisory locks, which are not supported. - Notes that JIT, while used by QuestDB, cannot be disabled via its SQL API. --- asyncpg/connection.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/asyncpg/connection.py b/asyncpg/connection.py index e9354109..c23e40e4 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -98,6 +98,8 @@ def __init__(self, protocol, transport, loop, self._server_caps = _detect_server_capabilities( self._server_version, settings) + self._protocol.get_settings().register_data_types(self._server_caps.extra_types) + if self._server_version < (14, 0): self._intro_query = introspection.INTRO_LOOKUP_TYPES_13 else: @@ -2654,7 +2656,7 @@ class _ConnectionProxy: ServerCapabilities = collections.namedtuple( 'ServerCapabilities', ['advisory_locks', 'notifications', 'plpgsql', 'sql_reset', - 'sql_close_all', 'sql_copy_from_where', 'jit']) + 'sql_close_all', 'sql_copy_from_where', 'jit', 'extra_types']) ServerCapabilities.__doc__ = 'PostgreSQL server capabilities.' @@ -2668,6 +2670,7 @@ def _detect_server_capabilities(server_version, connection_settings): sql_close_all = False jit = False sql_copy_from_where = False + extra_types = [] elif hasattr(connection_settings, 'crdb_version'): # CockroachDB detected. advisory_locks = False @@ -2677,6 +2680,7 @@ def _detect_server_capabilities(server_version, connection_settings): sql_close_all = False jit = False sql_copy_from_where = False + extra_types = [] elif hasattr(connection_settings, 'crate_version'): # CrateDB detected. advisory_locks = False @@ -2686,6 +2690,29 @@ def _detect_server_capabilities(server_version, connection_settings): sql_close_all = False jit = False sql_copy_from_where = False + extra_types = [] + elif hasattr(connection_settings, 'questdb_version'): + # QuestDB detected. + advisory_locks = False + notifications = False + plpgsql = False + sql_reset = False + sql_close_all = False + jit = False + sql_copy_from_where = False + extra_types = [{ + 'oid': 1022, + 'elemtype': 701, + 'kind': 'b', + 'name': '_float8', + 'elemtype_name': 'float8', + 'ns': 'pg_catalog', + 'elemdelim': ',', + 'depth': 0, + 'range_subtype': None, + 'attrtypoids': None, + 'basetype': None + }] else: # Standard PostgreSQL server assumed. advisory_locks = True @@ -2695,6 +2722,7 @@ def _detect_server_capabilities(server_version, connection_settings): sql_close_all = True jit = server_version >= (11, 0) sql_copy_from_where = server_version.major >= 12 + extra_types = [] return ServerCapabilities( advisory_locks=advisory_locks, @@ -2704,6 +2732,7 @@ def _detect_server_capabilities(server_version, connection_settings): sql_close_all=sql_close_all, sql_copy_from_where=sql_copy_from_where, jit=jit, + extra_types=extra_types )