Skip to content

Commit

Permalink
[PG14] Feature/replicas (#278)
Browse files Browse the repository at this point in the history
* Recovery requirements:

Add condition variable for WAL recovery; allowing backends to wait for recovery up to some record pointer.

* Fix issues w.r.t. WAL when LwLsn is initiated and when recovery starts.
This fixes some test failures that showed up after updating Neon code to do
more precise handling of replica's get_page_at_lsn's request_lsn lsns.

---------

Co-authored-by: Matthias van de Meent <[email protected]>
  • Loading branch information
MMeent and Matthias van de Meent committed May 11, 2023
1 parent 50348cb commit 1144aee
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 5 deletions.
80 changes: 75 additions & 5 deletions src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ typedef struct XLogCtlData
TimeLineID lastReplayedTLI;
XLogRecPtr replayEndRecPtr;
TimeLineID replayEndTLI;
ConditionVariable replayProgressCV;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;

Expand Down Expand Up @@ -5340,9 +5341,67 @@ XLOGShmemInit(void)
SpinLockInit(&XLogCtl->info_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
ConditionVariableInit(&XLogCtl->replayProgressCV);
ConditionVariableInit(&XLogCtl->recoveryNotPausedCV);
}

/*
* Wait for recovery to complete replaying all WAL up to and including
* redoEndRecPtr.
*
* This gets woken up for every WAL record replayed, so make sure you're not
* trying to wait an LSN that is too far in the future.
*/
void
XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr)
{
static XLogRecPtr replayRecPtr = 0;

if (!RecoveryInProgress())
return;

/*
* Check the backend-local variable first, we may be able to skip accessing
* shared memory (which requires locking)
*/
if (redoEndRecPtr <= replayRecPtr)
return;

replayRecPtr = GetXLogReplayRecPtr(NULL);

/*
* Check again if we're going to need to wait, now that we've updated
* the local cached variable.
*/
if (redoEndRecPtr <= replayRecPtr)
return;

/*
* We need to wait for the variable, so prepare for that.
*
* Note: This wakes up every time a WAL record is replayed, so this can
* be expensive.
*/
ConditionVariablePrepareToSleep(&XLogCtl->replayProgressCV);

while (redoEndRecPtr > replayRecPtr)
{
bool timeout;
timeout = ConditionVariableTimedSleep(&XLogCtl->replayProgressCV,
10000000,
WAIT_EVENT_RECOVERY_WAL_STREAM);

if (timeout)
ereport(LOG,
(errmsg("Waiting for recovery to catch up to %X/%X",
LSN_FORMAT_ARGS(redoEndRecPtr))));
else
replayRecPtr = GetXLogReplayRecPtr(NULL);
}

ConditionVariableCancelSleep();
}

/*
* This func must be called ONCE on system install. It creates pg_control
* and the initial XLOG segment.
Expand Down Expand Up @@ -7265,6 +7324,14 @@ StartupXLOG(void)
abortedRecPtr = InvalidXLogRecPtr;
missingContrecPtr = InvalidXLogRecPtr;

/*
* Setup last written lsn cache, max written LSN.
* Starting from here, we could be modifying pages through REDO, which requires
* the existance of maxLwLsn + LwLsn LRU.
*/
XLogCtl->maxLastWrittenLsn = RedoRecPtr;
dlist_init(&XLogCtl->lastWrittenLsnLRU);

/* REDO */
if (InRecovery)
{
Expand Down Expand Up @@ -7772,6 +7839,8 @@ StartupXLOG(void)
WalSndWakeup();
}

ConditionVariableBroadcast(&XLogCtl->replayProgressCV);

/* Exit loop if we reached inclusive recovery target */
if (recoveryStopsAfter(xlogreader))
{
Expand Down Expand Up @@ -8167,8 +8236,6 @@ StartupXLOG(void)

XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
XLogCtl->maxLastWrittenLsn = EndOfLog;
dlist_init(&XLogCtl->lastWrittenLsnLRU);

LocalSetXLogInsertAllowed();

Expand Down Expand Up @@ -10974,11 +11041,14 @@ xlog_redo(XLogReaderState *record)
XLogRedoAction result;

result = XLogReadBufferForRedo(record, block_id, &buffer);
if (result == BLK_DONE && !IsUnderPostmaster)
if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode))
{
/*
* In the special WAL process, blocks that are being ignored
* return BLK_DONE. Accept that.
* NEON: In the special WAL redo process, blocks that are being
* ignored return BLK_DONE. Accept that.
* Additionally, in standby mode, blocks that are not present
* in shared buffers are ignored during replay, so we also
* ignore those blocks.
*/
}
else if (result != BLK_RESTORED)
Expand Down
1 change: 1 addition & 0 deletions src/include/access/xlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ extern bool HotStandbyActive(void);
extern bool HotStandbyActiveInReplay(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void);
Expand Down
4 changes: 4 additions & 0 deletions src/include/access/xlogutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ typedef enum
* need to be replayed) */
} XLogRedoAction;

/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.
*/
extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);

extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,
Expand Down

0 comments on commit 1144aee

Please sign in to comment.