Replies: 1 comment
-
I guess you are after the same design then me, where the connection is not passed into a query method, but is a field in the types where the query methods are. This is a bit nasty, but I finally settled with a solution having a Look at the unit tests for an example how to use this. Here, queries are directly added to the Please let me know if you
//! The common interface for application code to execute
//! DB queries.
//!
//! In essence, this crate contains three types:
//! - [`DefaultDb`]: executes queries outside of a transaction, can
//! [start](`DefaultDb::begin_transaction`) a transaction.
//! - [`TransactedDb`]: executes queries inside of a transaction, can
//! [commit](`TransactedDb::commit`) and [roll
//! back](`TransactedDb::rollback`) a transaction.
//! - [`Database`]: a trait implemented by both [`DefaultDb`] and
//! [`TransactedDb`].
//!
//! This crate does not provide any queries by itself. These are meant to be
//! added by other crates via extension traits on [`Database`].
//!
//! ### Registering queries via extension trait
//! Queries should be added as a trait:
//!
//! ```ignore
//! // the trait that contains the queries
//! trait FooQueries : Database {
//! // note that queries functions are not abstract
//! async fn create_foo(&mut self, ...) -> Result<nehws_lib_model::FooId> {
//! sqlx::query!("select ...").fetch_all(self.connection())
//! }
//!
//! // more queries methods here....
//! }
//! ```
//!
//! Make queries available to Database trait
//! via a blanked implementation:
//!
//! ```ignore
//! impl<T: Database> ExampleQueries for T {}
//! ```
#![expect(missing_debug_implementations, async_fn_in_trait)]
use sqlx::{Acquire as _, PgPool, Postgres, Transaction, pool::PoolConnection};
/// We re-export some SQLX types, so these can be used
pub use sqlx::{
Error as NehwsDbError, PgConnection as NehwsDbConnection, PgTransaction as NehwsTransaction,
Result as NehwsDbResult,
};
/// Provides access to a connection.
/// Should be called by extension traits but not from application code.
pub trait Database {
/// Acquires a new connection from the pool.
///
/// [`TransactedDb::commit`]
async fn connection(&mut self) -> sqlx::Result<&mut NehwsDbConnection>;
}
/// Default implementation that works without a transaction
pub struct DefaultDb {
/// db pool
pool: PgPool,
/// when starting a transaction,
acquired: Option<PoolConnection<Postgres>>,
}
impl DefaultDb {
/// Creates a new db instance.
#[must_use]
pub fn new(pool: PgPool) -> DefaultDb {
DefaultDb { pool, acquired: None }
}
/// Run the migrations defined in the `migrations` directory.
pub async fn run_migrations(&self) -> Result<(), sqlx::Error> {
sqlx::migrate!().run(&self.pool).await?;
Ok(())
}
/// If no connection has been acquired yet, a new connection is taken from
/// the pool. Once a transaction has been started, it is the callers'
/// responsibility to make changes permanent by committing it.
pub async fn begin_transaction<'c>(&'c mut self) -> sqlx::Result<TransactedDb<'c>> {
let connection = self.connection().await?;
let transaction = connection.begin().await?;
Ok(TransactedDb { transaction })
}
}
impl Database for DefaultDb {
async fn connection(&mut self) -> sqlx::Result<&mut NehwsDbConnection> {
if self.acquired.is_none() {
let connection = self.pool.acquire().await?;
self.acquired.replace(connection);
}
Ok(self.acquired.as_mut().unwrap_or_else(|| unreachable!()))
}
}
impl Clone for DefaultDb {
fn clone(&self) -> Self {
Self { pool: self.pool.clone(), acquired: None }
}
}
/// DB Queries inside a transaction
pub struct TransactedDb<'c> {
/// what shall I say... the transaction.
transaction: Transaction<'c, Postgres>,
}
/// Operations only valid inside a transaction
impl<'c> TransactedDb<'c> {
/// Attempts to commit the current transaction.
pub async fn commit(self) -> sqlx::Result<()> {
self.transaction.commit().await
}
/// Attempts to rollback the current transaction.
#[allow(dead_code)]
pub async fn rollback(self) -> sqlx::Result<()> {
self.transaction.rollback().await
}
}
impl<'c> Database for TransactedDb<'c> {
async fn connection(&mut self) -> sqlx::Result<&mut NehwsDbConnection> {
self.transaction.acquire().await
}
}
/// Offers access to the underlying transaction.
/// The sole purpose for this is to offer an easier migration path to the new DB
/// layer, to allow code that still uses sqlx' transaction directly continue to
/// work. TODO: remove this when db layer is used throughout the system
impl<'c> AsMut<Transaction<'c, Postgres>> for TransactedDb<'c> {
fn as_mut(&mut self) -> &mut Transaction<'c, Postgres> {
&mut self.transaction
}
}
#[cfg(test)]
mod tests {
use sqlx::{Acquire as _, Row as _};
use super::{Database, NehwsDbResult};
trait ExampleQueries: Database {
async fn find_something(&mut self) -> NehwsDbResult<Option<usize>> {
Ok(sqlx::query!("select 1 as \"peter!\"")
.fetch_optional(self.connection().await?)
.await?
.map(|x| x.peter as _))
}
async fn insert_foo(&mut self, whatever: i32) -> NehwsDbResult<i32> {
// Note: for this test, a new table is created. Since the table is not part of
// our migrations, sqlx cannot check the query at runtime, so we cannot user
// query! macro here.
//
let id: i32 = sqlx::query("insert into foo (whatever) values ($1) returning id;")
.bind(whatever)
.fetch_one(self.connection().await?)
.await?
.get(0);
Ok(id)
}
async fn find_foo_by_id(&mut self, id: i32) -> NehwsDbResult<Option<i32>> {
Ok(sqlx::query("select whatever from foo where id=$1")
.bind(id)
.fetch_optional(self.connection().await?)
.await?
.and_then(|row| row.get(0)))
}
// async fn _connection(&mut self) -> NehwsDbResult<&mut NehwsDbConnection>;
}
/// make out queries available on both `DefaultDb` and `TransactedDb`
impl<T: Database> ExampleQueries for T {}
// These are not tests, but demonstrate how to use this from the app perspectiv
#[sqlx::test]
async fn simple(pool: sqlx::PgPool) -> NehwsDbResult<()> {
let mut db = super::DefaultDb::new(pool);
let query_result = db.find_something().await?;
assert_eq!(query_result, Some(1));
Ok(())
}
#[sqlx::test]
async fn transacted(pool: sqlx::PgPool) -> NehwsDbResult<()> {
create_foo_table(&pool).await?;
let mut db1 = super::DefaultDb::new(pool.clone());
let mut tx = db1.begin_transaction().await?;
let id = tx.insert_foo(123).await?;
assert!(id > 0, "should have assigned an id greater zero");
assert_eq!(
tx.find_foo_by_id(id).await?,
Some(123),
"should have returned new row within same transaction"
);
let mut db2 = super::DefaultDb::new(pool.clone());
assert_eq!(
db2.find_foo_by_id(id).await?,
None,
"uncommitted transaction must not be visible to other connections"
);
tx.commit().await?;
assert_eq!(
db2.find_foo_by_id(id).await?,
Some(123),
"transaction must become visible to other connections after commit"
);
Ok(())
}
#[sqlx::test]
async fn join_transaction(pool: sqlx::PgPool) -> NehwsDbResult<()> {
create_foo_table(&pool).await?;
let mut tx = pool.begin().await?;
let id: i32 = sqlx::query("insert into foo (whatever) values (123) returning id;")
.fetch_one(tx.acquire().await?)
.await?
.get(0);
let db = super::DefaultDb::new(pool.clone());
#[allow(deprecated)]
let mut tx = db.join_transaction(tx);
let foo = tx.find_foo_by_id(id).await?;
assert!(foo.is_some(), "record expected to be visible from within same transaction");
Ok(())
}
async fn create_foo_table(pool: &sqlx::PgPool) -> NehwsDbResult<()> {
sqlx::query("create table foo (id serial primary key, whatever integer)")
.execute(pool)
.await?;
Ok(())
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I currently have a setup where I have a store:
And repositories, for instance, AccountsRepo:
And it's used like:
Is there an idiomatic and neat way to make the repositories ALSO accept transactions (or any
Executor
)? For example, so it can ALSO be used like this:Beta Was this translation helpful? Give feedback.
All reactions