-
Notifications
You must be signed in to change notification settings - Fork 396
Support psycopg as a PostgreSQL driver #18999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
Use the proper isolation level on psycopg, where it is no longer an int.
…tch() instead of execute_values() execute_values() from psycopg2 can do insertions with fetch=False, but psycopg is better off inserting with execute_batch() instead(because of pipelining support)
I think this is ready for a review, I'm sure it'll need a few changes, but overall I think it is sound. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks both of you, looks like a tremendous amount of work has gone into this!
From my perspective, this looks virtually ready, just the really minor things I've pointed out, I'd say:
- as said, I would like more eyes on the
COPY
-based replacement for batch execution. It is probably fine, though. - It looks like some optimisations have been walked back compared to Psycopg2. I personally think this is fine, though it would be ideal to have a tracking issue containing a ticklist of these, so we can track their re-implementation as a prerequisite to considering Psycopg support 'complete'/first-class/ready to replace Psycopg2.
Other than that, it all looks clear and reasonable to me!
synapse/storage/engines/postgres.py
Outdated
|
||
def set_statement_timeout(self, cursor: Cursor, statement_timeout: int) -> None: | ||
"""Configure the current cursor's statement timeout.""" | ||
cursor.execute("SET statement_timeout TO ?", (statement_timeout,)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slight preference to using the explicit SET SESSION
form. But this is only cosmetic, no semantic change
Generic[ConnectionType, CursorType, IsolationLevelType], | ||
BaseDatabaseEngine[ConnectionType, CursorType, IsolationLevelType], | ||
metaclass=abc.ABCMeta, | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a mini docstring pointing out this is (now) an abstract class, with Pyscopg2Engine and PsycopgEngine as concrete implementations, might be nice
# We use fetch = False to mean a writable query. You *might* be able | ||
# to morph that into a COPY (...) FROM STDIN, but it isn't worth the | ||
# effort for the few places we set fetch = False. | ||
assert fetch is True | ||
|
||
# execute_values requires a single replacement, but we need to expand it | ||
# for COPY. This assumes all inner sequences are the same length. | ||
value_str = "(" + ", ".join("?" for _ in next(iter(values))) + ")" | ||
sql = sql.replace("?", ", ".join(value_str for _ in values)) | ||
|
||
# Wrap the SQL in the COPY statement. | ||
sql = f"COPY ({sql}) TO STDOUT" | ||
|
||
def f( | ||
the_sql: str, the_args: Sequence[Sequence[Any]] | ||
) -> Iterable[Tuple[Any, ...]]: | ||
with self.txn.copy(the_sql, the_args) as copy: | ||
yield from copy.rows() | ||
|
||
# Flatten the values. | ||
return self._do_execute(f, sql, list(itertools.chain.from_iterable(values))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not saying there's anything wrong with this, but this looks non-trivial I'd like to get some extra pair of eyes on this.
Slightly suspicious about next(iter(values))
— would that advance an iterator such as a generator expression, in a way that would drop the first row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly suspicious about next(iter(values)) — would that advance an iterator such as a generator expression, in a way that would drop the first row?
This exact concern is obsoleted by d49827d (now accepts a Collection
, which shouldn't have this risk).
Still think I'd like to take a second look at this aspect with a fresh mind and/or get some extra eyes, but in principle it seems fine
lambda the_sql: execute_batch(self.txn, the_sql, args), sql | ||
) | ||
|
||
# TODO Can psycopg3 do anything better? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just opening this as a thread for discussion to see whether we care enough to either do this now, or open a ticket for it, or something else.
|
||
txn.execute_values(sql, args, fetch=False) | ||
|
||
# TODO Maybe improve for psycopg. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just opening this as a thread for discussion to see whether we care enough to either do this now, or open a ticket for it, or something else.
synapse/storage/database.py
Outdated
if isinstance(sql, psycopg.sql.Composed): | ||
return sql.as_string(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would appreciate a comment to explain why we handle Composed
instances here (seems to go against the method signature?)
But more concretely: do we need to also strip out any potential newlines from the stringification here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Obsoleted by f5b6429 which takes this out again
""" | ||
|
||
if isinstance(txn.database_engine, PostgresEngine): | ||
if isinstance(txn.database_engine, Psycopg2Engine): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we open a ticket about this as well. A solution might be to pass in an array and rely on UNNEST
, but that is something I'm more than happy to defer to later, it just would be nice to remember to do it
else | ||
export PASS_SYNAPSE_COMPLEMENT_USE_WORKERS= | ||
if [[ -n "$POSTGRES" ]]; then | ||
if [[ "$POSTGRES" = "psycopg" ]]; then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
judging from the CI yaml this needs to be case-insensitive (to match Psycopg
from the CI matrix.database
); or am I wrong?
This adds support for psycopg (aka psycopg3) as a PostgreSQL driver in addition to psycopg2. This requires adding some abstractions between psycopg2 and psycopg although most of the queries can be identical (since it is just PostgreSQL underneath).
This is a branch that @realtyem and myself have been working on for several years.
The bulk of this PR is splitting the
PostgresEngine
into aPsycopgEngine
andPsycopg2Engine
which use the correct underlying driver. We then need to update some if-statements in the code to differentiate between these.A bunch of configuration code also needs to be adjusted to handle having multiple engines for postgres.
This also updates the test matrix for trial to run against psycopg2 and psycopg.
This is step 1 of #14586.