Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion core/rs/core/src/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub struct crsql_ExtData {
pub mergeEqualValues: ::core::ffi::c_int,
pub timestamp: ::core::ffi::c_ulonglong,
pub ordinalMap: *mut ::core::ffi::c_void,
pub forceUpdateMode: ::core::ffi::c_int,
}

#[repr(C)]
Expand Down Expand Up @@ -271,7 +272,7 @@ fn bindgen_test_layout_crsql_ExtData() {
let ptr = UNINIT.as_ptr();
assert_eq!(
::core::mem::size_of::<crsql_ExtData>(),
168usize,
176usize,
concat!("Size of: ", stringify!(crsql_ExtData))
);
assert_eq!(
Expand Down Expand Up @@ -511,4 +512,14 @@ fn bindgen_test_layout_crsql_ExtData() {
stringify!(ordinalMap)
)
);
assert_eq!(
unsafe { ::core::ptr::addr_of!((*ptr).forceUpdateMode) as usize - ptr as usize },
168usize,
concat!(
"Offset of field: ",
stringify!(crsql_ExtData),
"::",
stringify!(forceUpdateMode)
)
);
}
2 changes: 2 additions & 0 deletions core/rs/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) {
(*ext_data).seq = 0;
(*ext_data).timestamp = 0;
(*ext_data).updatedTableInfosThisTx = 0;
// Disable force update mode on transaction end
(*ext_data).forceUpdateMode = 0;

let mut ordinals: mem::ManuallyDrop<Box<BTreeMap<Vec<u8>, i64>>> = mem::ManuallyDrop::new(
Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap<Vec<u8>, i64>),
Expand Down
79 changes: 79 additions & 0 deletions core/rs/core/src/force_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use alloc::string::String;
use crate::c::crsql_ExtData;
use crate::tableinfo::TableInfo;
use sqlite_nostd as sqlite;

/// Check if force update mode is enabled
pub fn is_force_update_mode_enabled(ext_data: *mut crsql_ExtData) -> bool {
unsafe { (*ext_data).forceUpdateMode != 0 }
}

/// Get the forced CL for a row in force update mode.
/// This returns the current CL + 2 to ensure the forced update wins.
/// If current CL is odd (alive), we return CL + 2 (next alive state).
/// If current CL is even (deleted), we return CL + 1 (resurrect).
/// If no CL exists (0), we return 1 (initial insert).
pub fn get_forced_cl(current_cl: i64) -> i64 {
if current_cl == 0 {
// No existing CL, start at 1
1
} else if current_cl % 2 == 0 {
// Current CL is even (deleted), resurrect with odd CL
current_cl + 1
} else {
// Current CL is odd (alive), force to next alive state (skip delete)
current_cl + 2
}
}

/// For force update mode, we need to ensure all operations result in a higher CL.
/// This function calculates what the "delete" CL should be for a force delete operation.
pub fn get_forced_delete_cl(current_cl: i64) -> i64 {
if current_cl == 0 {
// No existing CL, delete at CL 2
2
} else if current_cl % 2 == 0 {
// Already deleted, bump to next delete state
current_cl + 2
} else {
// Currently alive, delete at next even CL
current_cl + 1
}
}

/// Get the current CL for a key from the table info cache or database
pub fn get_current_cl_for_key(
db: *mut sqlite::sqlite3,
tbl_info: &TableInfo,
key: sqlite::int64,
) -> Result<i64, String> {
// First check the cache
if let Some(&cl) = tbl_info.get_cl(key) {
return Ok(cl);
}

// If not in cache, query from database
let local_cl_stmt_ref = tbl_info
.get_local_cl_stmt(db)
.map_err(|_| "failed to get local_cl_stmt")?;
let local_cl_stmt = local_cl_stmt_ref
.as_ref()
.ok_or("Failed to deref local_cl_stmt")?;

local_cl_stmt
.bind_int64(1, key)
.and_then(|_| local_cl_stmt.bind_int64(2, key))
.map_err(|_| "failed to bind to local_cl_stmt")?;

let cl = if local_cl_stmt.step().map_err(|_| "failed to step local_cl_stmt")? == sqlite::ResultCode::ROW {
local_cl_stmt.column_int64(0)
} else {
0
};

local_cl_stmt
.reset()
.map_err(|_| "failed to reset local_cl_stmt")?;

Ok(cl)
}
79 changes: 79 additions & 0 deletions core/rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod db_version;
mod db_version;
mod debug;
mod ext_data;
mod force_update;
mod is_crr;
mod local_writes;
#[cfg(feature = "test")]
Expand Down Expand Up @@ -686,6 +687,40 @@ pub extern "C" fn sqlite3_crsqlcore_init(
return null_mut();
}

let rc = db
.create_function_v2(
"crsql_enable_force_update_mode",
0,
sqlite::UTF8,
Some(ext_data as *mut c_void),
Some(x_crsql_enable_force_update_mode),
None,
None,
None,
)
.unwrap_or(sqlite::ResultCode::ERROR);
if rc != ResultCode::OK {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

let rc = db
.create_function_v2(
"crsql_disable_force_update_mode",
0,
sqlite::UTF8,
Some(ext_data as *mut c_void),
Some(x_crsql_disable_force_update_mode),
None,
None,
None,
)
.unwrap_or(sqlite::ResultCode::ERROR);
if rc != ResultCode::OK {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

return ext_data as *mut c_void;
}

Expand Down Expand Up @@ -1235,6 +1270,50 @@ unsafe extern "C" fn x_crsql_sync_bit(
ctx.result_int(*sync_bit_ptr);
}

/**
* Enable force update mode. All writes within the transaction will be treated
* as delete+recreate with higher CL values to force the update to win.
*
* MUST be called within a transaction. Will fail if called outside a transaction.
* Force update mode is automatically disabled when the transaction commits or rolls back.
*
* `select crsql_enable_force_update_mode()`
*/
unsafe extern "C" fn x_crsql_enable_force_update_mode(
ctx: *mut sqlite::context,
_argc: i32,
_argv: *mut *mut sqlite::value,
) {
let db = ctx.db_handle();

// Check if we're in a transaction
let in_transaction = sqlite_nostd::get_autocommit(db) == 0;

if !in_transaction {
ctx.result_error("force update mode can only be enabled within a transaction. Use BEGIN before enabling.");
return;
}

let ext_data = ctx.user_data() as *mut c::crsql_ExtData;
(*ext_data).forceUpdateMode = 1;
ctx.result_text_static("force update mode enabled");
}

/**
* Disable force update mode. Writes will behave normally.
*
* `select crsql_disable_force_update_mode()`
*/
unsafe extern "C" fn x_crsql_disable_force_update_mode(
ctx: *mut sqlite::context,
_argc: i32,
_argv: *mut *mut sqlite::value,
) {
let ext_data = ctx.user_data() as *mut c::crsql_ExtData;
(*ext_data).forceUpdateMode = 0;
ctx.result_text_static("force update mode disabled");
}

#[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn crsql_is_crr(db: *mut sqlite::sqlite3, table: *const c_char) -> c_int {
Expand Down
76 changes: 76 additions & 0 deletions core/rs/core/src/local_writes/after_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use sqlite_nostd as sqlite;

use crate::compare_values::crsql_compare_sqlite_values;
use crate::{c::crsql_ExtData, tableinfo::TableInfo};
use crate::force_update;

use super::trigger_fn_preamble;

Expand Down Expand Up @@ -78,6 +79,20 @@ fn after_update(
.get_or_create_key_via_raw_values(db, pks_new)
.map_err(|_| "failed getting or creating lookaside key")?;

// Force update mode: treat all updates as delete + recreate
if force_update::is_force_update_mode_enabled(ext_data) {
return handle_force_update(
db,
ext_data,
tbl_info,
pks_new,
pks_old,
new_key,
next_db_version,
&ts,
);
}

let mut changed = false;
// Changing a primary key column to a new value is the same thing as deleting the row
// previously identified by the primary key.
Expand Down Expand Up @@ -149,6 +164,67 @@ fn after_update(
Ok(ResultCode::OK)
}

/// Handle update in force update mode by treating it as delete + recreate
fn handle_force_update(
db: *mut sqlite3,
ext_data: *mut crsql_ExtData,
tbl_info: &mut TableInfo,
pks_new: &[*mut value],
pks_old: &[*mut value],
new_key: sqlite::int64,
next_db_version: i64,
ts: &str,
) -> Result<ResultCode, String> {
// Get current CL for the row
let current_cl = force_update::get_current_cl_for_key(db, tbl_info, new_key)?;

// Check if primary key changed
let pk_changed = crate::compare_values::any_value_changed(pks_new, pks_old)?;

if pk_changed {
// Handle PK change: delete old, create new
let old_key = tbl_info
.get_or_create_key_via_raw_values(db, pks_old)
.map_err(|_| "failed getting or creating lookaside key for old pks")?;

let old_cl = force_update::get_current_cl_for_key(db, tbl_info, old_key)?;
let delete_cl = force_update::get_forced_delete_cl(old_cl);

let next_seq = super::bump_seq(ext_data);
let cl = super::mark_locally_deleted(db, tbl_info, old_key, next_db_version, next_seq, ts)?;
tbl_info.set_cl(old_key, cl);

// Create new row with forced CL
let new_cl = force_update::get_forced_cl(0); // New row starts at CL 1
let next_seq = super::bump_seq(ext_data);
let cl = super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq, ts)?;
tbl_info.set_cl(new_key, cl);

// Mark all non-pk columns as inserted
super::mark_locally_inserted(db, ext_data, tbl_info, new_key, next_db_version, ts)?;
} else {
// No PK change: force delete + recreate at same key
let delete_cl = force_update::get_forced_delete_cl(current_cl);
let next_seq = super::bump_seq(ext_data);
let cl = super::mark_locally_deleted(db, tbl_info, new_key, next_db_version, next_seq, ts)?;
tbl_info.set_cl(new_key, cl);

// Recreate with forced CL
let create_cl = force_update::get_forced_cl(delete_cl);
let next_seq = super::bump_seq(ext_data);
let cl = super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq, ts)?;
tbl_info.set_cl(new_key, cl);

// Mark all non-pk columns as inserted
super::mark_locally_inserted(db, ext_data, tbl_info, new_key, next_db_version, ts)?;
}

// Actually bump the db_version since we made changes
crate::db_version::next_db_version(db, ext_data)?;

Ok(ResultCode::OK)
}

#[allow(non_snake_case)]
fn after_update__move_non_pk_col(
db: *mut sqlite3,
Expand Down
2 changes: 2 additions & 0 deletions core/rs/integration_check/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub extern "C" fn crsql_integration_check() {
t::test_cl_set_vtab::run_suite().expect("test cl set vtab suite");
println!("Running db_version");
t::test_db_version::run_suite().expect("test db version suite");
println!("Running force_update_mode");
t::force_update_mode::run_suite().expect("force update mode suite");
}

pub fn opendb() -> Result<CRConnection, ResultCode> {
Expand Down
Loading
Loading