Skip to content

Commit 3d4c962

Browse files
ibarakaievclaude
andcommitted
fix(electric-db-collection): prevent orphan transactions after must-refetch in progressive mode
When a `must-refetch` message is received in progressive mode, it starts a transaction with `begin()` and calls `truncate()`. This resets `hasReceivedUpToDate` to `false`, causing `isBufferingInitialSync()` to return `true`. The bug: subsequent messages after must-refetch were being buffered instead of written to the existing transaction. When `up-to-date` was received, the atomic swap code would create a NEW transaction, leaving the first transaction (from must-refetch) uncommitted forever. This "orphan transaction" caused the collection to become corrupted with undefined values. The fix: Add `&& !transactionStarted` checks to 5 places so that when a transaction is already started (from must-refetch), messages are written directly to it instead of being buffered for atomic swap. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 2c2420c commit 3d4c962

File tree

2 files changed

+31
-5
lines changed

2 files changed

+31
-5
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@tanstack/electric-db-collection': patch
3+
---
4+
5+
Fix orphan transactions after `must-refetch` in progressive sync mode
6+
7+
When a `must-refetch` message was received in progressive mode, it started a transaction with `truncate()` but reset `hasReceivedUpToDate`, causing subsequent messages to be buffered instead of written to the existing transaction. On `up-to-date`, the atomic swap code would create a new transaction, leaving the first one uncommitted forever. This caused collections to become corrupted with undefined values.
8+
9+
The fix ensures that when a transaction is already started (e.g., from must-refetch), messages are written directly to it instead of being buffered for atomic swap.

packages/electric-db-collection/src/electric.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,7 +1303,12 @@ function createElectricSync<T extends Row<unknown>>(
13031303

13041304
// Check for txids in the message and add them to our store
13051305
// Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap)
1306-
if (hasTxids(message) && !isBufferingInitialSync()) {
1306+
// EXCEPTION: If a transaction is already started (e.g., from must-refetch), track txids
1307+
// to avoid losing them when messages are written to the existing transaction.
1308+
if (
1309+
hasTxids(message) &&
1310+
(!isBufferingInitialSync() || transactionStarted)
1311+
) {
13071312
message.headers.txids?.forEach((txid) => newTxids.add(txid))
13081313
}
13091314

@@ -1338,7 +1343,9 @@ function createElectricSync<T extends Row<unknown>>(
13381343
}
13391344

13401345
// In buffered initial sync of progressive mode, buffer messages instead of writing
1341-
if (isBufferingInitialSync()) {
1346+
// EXCEPTION: If a transaction is already started (e.g., from must-refetch), write
1347+
// directly to it instead of buffering. This prevents orphan transactions.
1348+
if (isBufferingInitialSync() && !transactionStarted) {
13421349
bufferedMessages.push(message)
13431350
} else {
13441351
// Normal processing: write changes immediately
@@ -1352,7 +1359,9 @@ function createElectricSync<T extends Row<unknown>>(
13521359
} else if (isSnapshotEndMessage(message)) {
13531360
// Track postgres snapshot metadata for resolving awaiting mutations
13541361
// Skip during buffered initial sync (will be extracted during atomic swap)
1355-
if (!isBufferingInitialSync()) {
1362+
// EXCEPTION: If a transaction is already started (e.g., from must-refetch), track snapshots
1363+
// to avoid losing them when messages are written to the existing transaction.
1364+
if (!isBufferingInitialSync() || transactionStarted) {
13561365
newSnapshots.push(parseSnapshotMessage(message))
13571366
}
13581367
} else if (isUpToDateMessage(message)) {
@@ -1365,7 +1374,9 @@ function createElectricSync<T extends Row<unknown>>(
13651374
}
13661375
} else if (isMoveOutMessage(message)) {
13671376
// Handle move-out event: buffer if buffering, otherwise process immediately
1368-
if (isBufferingInitialSync()) {
1377+
// EXCEPTION: If a transaction is already started (e.g., from must-refetch), process
1378+
// immediately to avoid orphan transactions.
1379+
if (isBufferingInitialSync() && !transactionStarted) {
13691380
bufferedMessages.push(message)
13701381
} else {
13711382
// Normal processing: process move-out immediately
@@ -1405,7 +1416,13 @@ function createElectricSync<T extends Row<unknown>>(
14051416

14061417
if (commitPoint !== null) {
14071418
// PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end)
1408-
if (isBufferingInitialSync() && commitPoint === `up-to-date`) {
1419+
// EXCEPTION: Skip atomic swap if a transaction is already started (e.g., from must-refetch).
1420+
// In that case, do a normal commit to properly close the existing transaction.
1421+
if (
1422+
isBufferingInitialSync() &&
1423+
commitPoint === `up-to-date` &&
1424+
!transactionStarted
1425+
) {
14091426
debug(
14101427
`${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`,
14111428
)

0 commit comments

Comments
 (0)