Skip to content

Commit d335f78

Browse files
authored
Use unnamed statement in pg when not persistent (#3863)
This will automatically close the prepared statement when another query is run, avoiding a memory leak.
1 parent 760b395 commit d335f78

File tree

3 files changed

+48
-6
lines changed

3 files changed

+48
-6
lines changed

sqlx-postgres/src/connection/executor.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,15 @@ async fn prepare(
2525
sql: &str,
2626
parameters: &[PgTypeInfo],
2727
metadata: Option<Arc<PgStatementMetadata>>,
28+
persistent: bool,
2829
) -> Result<(StatementId, Arc<PgStatementMetadata>), Error> {
29-
let id = conn.inner.next_statement_id;
30-
conn.inner.next_statement_id = id.next();
30+
let id = if persistent {
31+
let id = conn.inner.next_statement_id;
32+
conn.inner.next_statement_id = id.next();
33+
id
34+
} else {
35+
StatementId::UNNAMED
36+
};
3137

3238
// build a list of type OIDs to send to the database in the PARSE command
3339
// we have not yet started the query sequence, so we are *safe* to cleanly make
@@ -163,8 +169,7 @@ impl PgConnection {
163169
&mut self,
164170
sql: &str,
165171
parameters: &[PgTypeInfo],
166-
// should we store the result of this prepare to the cache
167-
store_to_cache: bool,
172+
persistent: bool,
168173
// optional metadata that was provided by the user, this means they are reusing
169174
// a statement object
170175
metadata: Option<Arc<PgStatementMetadata>>,
@@ -173,9 +178,9 @@ impl PgConnection {
173178
return Ok((*statement).clone());
174179
}
175180

176-
let statement = prepare(self, sql, parameters, metadata).await?;
181+
let statement = prepare(self, sql, parameters, metadata, persistent).await?;
177182

178-
if store_to_cache && self.inner.cache_statement.is_enabled() {
183+
if persistent && self.inner.cache_statement.is_enabled() {
179184
if let Some((id, _)) = self.inner.cache_statement.insert(sql, statement.clone()) {
180185
self.inner.stream.write_msg(Close::Statement(id))?;
181186
self.write_sync();

sqlx-postgres/src/message/parse.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,19 @@ fn test_encode_parse() {
7777

7878
assert_eq!(buf, EXPECTED);
7979
}
80+
81+
#[test]
82+
fn test_encode_parse_unnamed_statement() {
83+
const EXPECTED: &[u8] = b"P\0\0\0\x15\0SELECT $1\0\0\x01\0\0\0\x19";
84+
85+
let mut buf = Vec::new();
86+
let m = Parse {
87+
statement: StatementId::UNNAMED,
88+
query: "SELECT $1",
89+
param_types: &[Oid(25)],
90+
};
91+
92+
m.encode_msg(&mut buf).unwrap();
93+
94+
assert_eq!(buf, EXPECTED);
95+
}

tests/postgres/postgres.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,27 @@ async fn it_closes_statement_from_cache_issue_470() -> anyhow::Result<()> {
817817
Ok(())
818818
}
819819

820+
#[sqlx_macros::test]
821+
async fn it_closes_statements_when_not_persistent_issue_3850() -> anyhow::Result<()> {
822+
let mut conn = new::<Postgres>().await?;
823+
824+
let _row = sqlx::query("SELECT $1 AS val")
825+
.bind(Oid(1))
826+
.persistent(false)
827+
.fetch_one(&mut conn)
828+
.await?;
829+
830+
let row = sqlx::query("SELECT count(*) AS num_prepared_statements FROM pg_prepared_statements")
831+
.persistent(false)
832+
.fetch_one(&mut conn)
833+
.await?;
834+
835+
let n: i64 = row.get("num_prepared_statements");
836+
assert_eq!(0, n, "no prepared statements should be open");
837+
838+
Ok(())
839+
}
840+
820841
#[sqlx_macros::test]
821842
async fn it_sets_application_name() -> anyhow::Result<()> {
822843
sqlx_test::setup_if_needed();

0 commit comments

Comments
 (0)