Skip to content

Conversation

clokep
Copy link
Contributor

@clokep clokep commented Sep 30, 2025

This adds support for psycopg (aka psycopg3) as a PostgreSQL driver in addition to psycopg2. This requires adding some abstractions between psycopg2 and psycopg although most of the queries can be identical (since it is just PostgreSQL underneath).

This is a branch that @realtyem and myself have been working on for several years.

The bulk of this PR is splitting the PostgresEngine into a PsycopgEngine and Psycopg2Engine which use the correct underlying driver. We then need to update some if-statements in the code to differentiate between these.

A bunch of configuration code also needs to be adjusted to handle having multiple engines for postgres.

This also updates the test matrix for trial to run against psycopg2 and psycopg.

This is step 1 of #14586.

clokep and others added 25 commits October 9, 2024 16:31
Use the proper isolation level on psycopg, where it is no longer an int.
…tch() instead of execute_values()

execute_values() from psycopg2 can do insertions with fetch=False, but
psycopg is better off inserting with execute_batch() instead(because of
pipelining support)
@github-actions github-actions bot deployed to PR Documentation Preview September 30, 2025 19:19 Active
@github-actions github-actions bot deployed to PR Documentation Preview September 30, 2025 20:08 Active
@github-actions github-actions bot deployed to PR Documentation Preview October 6, 2025 14:08 Active
@clokep clokep marked this pull request as ready for review October 6, 2025 20:15
@clokep clokep requested a review from a team as a code owner October 6, 2025 20:15
@clokep
Copy link
Contributor Author

clokep commented Oct 6, 2025

I think this is ready for a review, I'm sure it'll need a few changes, but overall I think it is sound.

Copy link
Contributor

@reivilibre reivilibre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks both of you, looks like a tremendous amount of work has gone into this!

From my perspective, this looks virtually ready, just the really minor things I've pointed out, I'd say:

  • as said, I would like more eyes on the COPY-based replacement for batch execution. It is probably fine, though.
  • It looks like some optimisations have been walked back compared to Psycopg2. I personally think this is fine, though it would be ideal to have a tracking issue containing a ticklist of these, so we can track their re-implementation as a prerequisite to considering Psycopg support 'complete'/first-class/ready to replace Psycopg2.

Other than that, it all looks clear and reasonable to me!


def set_statement_timeout(self, cursor: Cursor, statement_timeout: int) -> None:
"""Configure the current cursor's statement timeout."""
cursor.execute("SET statement_timeout TO ?", (statement_timeout,))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slight preference to using the explicit SET SESSION form. But this is only cosmetic, no semantic change

Generic[ConnectionType, CursorType, IsolationLevelType],
BaseDatabaseEngine[ConnectionType, CursorType, IsolationLevelType],
metaclass=abc.ABCMeta,
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a mini docstring pointing out this is (now) an abstract class, with Pyscopg2Engine and PsycopgEngine as concrete implementations, might be nice

Comment on lines 436 to 456
# We use fetch = False to mean a writable query. You *might* be able
# to morph that into a COPY (...) FROM STDIN, but it isn't worth the
# effort for the few places we set fetch = False.
assert fetch is True

# execute_values requires a single replacement, but we need to expand it
# for COPY. This assumes all inner sequences are the same length.
value_str = "(" + ", ".join("?" for _ in next(iter(values))) + ")"
sql = sql.replace("?", ", ".join(value_str for _ in values))

# Wrap the SQL in the COPY statement.
sql = f"COPY ({sql}) TO STDOUT"

def f(
the_sql: str, the_args: Sequence[Sequence[Any]]
) -> Iterable[Tuple[Any, ...]]:
with self.txn.copy(the_sql, the_args) as copy:
yield from copy.rows()

# Flatten the values.
return self._do_execute(f, sql, list(itertools.chain.from_iterable(values)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not saying there's anything wrong with this, but this looks non-trivial I'd like to get some extra pair of eyes on this.

Slightly suspicious about next(iter(values)) — would that advance an iterator such as a generator expression, in a way that would drop the first row?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly suspicious about next(iter(values)) — would that advance an iterator such as a generator expression, in a way that would drop the first row?

This exact concern is obsoleted by d49827d (now accepts a Collection, which shouldn't have this risk).

Still think I'd like to take a second look at this aspect with a fresh mind and/or get some extra eyes, but in principle it seems fine

lambda the_sql: execute_batch(self.txn, the_sql, args), sql
)

# TODO Can psycopg3 do anything better?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just opening this as a thread for discussion to see whether we care enough to either do this now, or open a ticket for it, or something else.


txn.execute_values(sql, args, fetch=False)

# TODO Maybe improve for psycopg.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just opening this as a thread for discussion to see whether we care enough to either do this now, or open a ticket for it, or something else.

Comment on lines 506 to 507
if isinstance(sql, psycopg.sql.Composed):
return sql.as_string(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would appreciate a comment to explain why we handle Composed instances here (seems to go against the method signature?)

But more concretely: do we need to also strip out any potential newlines from the stringification here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obsoleted by f5b6429 which takes this out again

"""

if isinstance(txn.database_engine, PostgresEngine):
if isinstance(txn.database_engine, Psycopg2Engine):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we open a ticket about this as well. A solution might be to pass in an array and rely on UNNEST, but that is something I'm more than happy to defer to later, it just would be nice to remember to do it

else
export PASS_SYNAPSE_COMPLEMENT_USE_WORKERS=
if [[ -n "$POSTGRES" ]]; then
if [[ "$POSTGRES" = "psycopg" ]]; then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

judging from the CI yaml this needs to be case-insensitive (to match Psycopg from the CI matrix.database); or am I wrong?

reivilibre

This comment was marked as duplicate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants