Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added non-atomic SPI connection #1876

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pgrx/src/spi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ mod client;
mod cursor;
mod query;
mod tuple;
pub use client::SpiClient;
use client::SpiConnection;
pub use client::{SpiClient, SpiTransaction};
pub use cursor::SpiCursor;
pub use query::{OwnedPreparedStatement, PreparedStatement, Query};
pub use tuple::{SpiHeapTupleData, SpiHeapTupleDataEntry, SpiTupleTable};
Expand Down Expand Up @@ -394,6 +394,24 @@ impl Spi {
f(connection.client())
}

/// Execute SPI commands via the provided `SpiClient` on a non-atomic connection.
///
/// While inside the provided closure, code executes under a short-lived "SPI Memory Context",
/// and Postgres will completely free that context when this function is finished.
///
/// pgrx' SPI API endeavors to return Datum values from functions like `::get_one()` that are
/// automatically copied into the into the `CurrentMemoryContext` at the time of this
/// function call.
pub fn connect_non_atomic<R, F>(f: F) -> R
where
F: FnOnce(SpiClient<'_>, SpiTransaction<'_>) -> R,
{
let connection = SpiConnection::connect_non_atomic()
.expect("SPI_connect_ext indicated an unexpected failure");

f(connection.client(), connection.transaction())
}

#[track_caller]
pub fn check_status(status_code: i32) -> std::result::Result<SpiOkCodes, Error> {
match SpiOkCodes::try_from(status_code) {
Expand Down
40 changes: 40 additions & 0 deletions pgrx/src/spi/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ impl SpiConnection {
Spi::check_status(unsafe { pg_sys::SPI_connect() })?;
Ok(SpiConnection(PhantomData))
}

pub(super) fn connect_non_atomic() -> SpiResult<Self> {
Spi::check_status(unsafe { pg_sys::SPI_connect_ext(pg_sys::SPI_OPT_NONATOMIC as i32) })?;
Ok(SpiConnection(PhantomData))
}
}

impl Drop for SpiConnection {
Expand All @@ -221,4 +226,39 @@ impl SpiConnection {
pub(super) fn client(&self) -> SpiClient<'_> {
SpiClient { __marker: PhantomData }
}

pub(super) fn transaction(&self) -> SpiTransaction<'_> {
SpiTransaction { _conn: PhantomData }
}
}

/// Represents an SPI transaction.
pub struct SpiTransaction<'conn> {
_conn: PhantomData<&'conn SpiConnection>,
}

impl<'conn> SpiTransaction<'conn> {
/// Commits back the transaction and starts a new `SpiTransaction` with default transaction characteristics.
pub fn commit(self) -> Self {
unsafe { pg_sys::SPI_commit() };
self
}

/// Commits back the transaction and starts a new `SpiTransaction` with the same characteristics as the just finished one.
pub fn commit_and_chain(self) -> Self {
unsafe { pg_sys::SPI_commit_and_chain() };
self
}

/// Rolls back the transaction and starts a new `SpiTransaction` with default transaction characteristics.
pub fn rollback(self) -> Self {
unsafe { pg_sys::SPI_rollback() };
self
}

/// Rolls back the transaction and starts a new `SpiTransaction` with the same characteristics as the just finished one.
pub fn rollback_and_chain(self) -> Self {
unsafe { pg_sys::SPI_rollback_and_chain() };
self
}
}
Loading