From e935b9a339b53e587b2a6e98519461484d1bb9f7 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Thu, 30 Nov 2023 10:03:48 -0500 Subject: [PATCH] Record both prev and last value on finishing write --- libs/utils/src/lsn.rs | 39 ++++++++--- pageserver/src/import_datadir.rs | 7 +- pageserver/src/pgdatadir_mapping.rs | 23 +++--- pageserver/src/tenant.rs | 105 ++++++++++++++++++++++------ pageserver/src/tenant/timeline.rs | 18 +++-- 5 files changed, 138 insertions(+), 54 deletions(-) diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 0493d43088d2..928699fd0269 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -243,24 +243,41 @@ impl From for AtomicLsn { } /// Pair of LSN's pointing to the end of the last valid record and previous one -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct RecordLsn { - /// LSN at the end of the current record + /// LSN at the end of the current record pub last: Lsn, /// LSN at the end of the previous record pub prev: Lsn, } +impl Ord for RecordLsn { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.last.cmp(&other.last) + } +} + +impl PartialOrd for RecordLsn { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Expose `self.last` as counter to be able to use RecordLsn in SeqWait -impl MonotonicCounter for RecordLsn { - fn cnt_advance(&mut self, lsn: Lsn) { - assert!(self.last <= lsn); - let new_prev = self.last; - self.last = lsn; - self.prev = new_prev; - } - fn cnt_value(&self) -> Lsn { - self.last +impl MonotonicCounter for RecordLsn { + fn cnt_advance(&mut self, lsn: RecordLsn) { + assert!(lsn.prev <= lsn.last); + assert!(self.last <= lsn.last); + if lsn.prev == Lsn::INVALID { + self.prev = self.last; + self.last = lsn.last; + } else { + self.prev = lsn.prev; + self.last = lsn.last; + } + } + fn cnt_value(&self) -> RecordLsn { + *self } } diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 72f3cb8cf532..b063f6a2ad2d 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -26,7 +26,7 @@ use postgres_ffi::DBState_DB_SHUTDOWNED; use postgres_ffi::Oid; use postgres_ffi::XLogFileName; use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE}; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, RecordLsn}; // Returns checkpoint LSN from controlfile pub fn get_lsn_from_controlfile(path: &Path) -> Result { @@ -602,7 +602,10 @@ async fn import_file( // but it is ok to call `finish_write()`, because final `modification.commit()` // will update lsn once more to the final one. let writer = modification.tline.writer().await; - writer.finish_write(prev_lsn); + writer.finish_write(RecordLsn { + last: prev_lsn, + prev: Lsn::INVALID, + }); debug!("imported zenith signal {}", prev_lsn); } else if file_path.starts_with("pg_tblspc") { diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 35f646d5b83d..6033742fd039 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -22,6 +22,7 @@ use std::collections::{hash_map, HashMap, HashSet}; use std::ops::Range; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; +use utils::lsn::RecordLsn; use utils::{bin_ser::BeSer, lsn::Lsn}; /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type. @@ -90,11 +91,11 @@ impl Timeline { { DatadirModification { tline: self, - pending_lsns: Vec::new(), pending_updates: HashMap::new(), pending_deletions: Vec::new(), pending_nblocks: 0, lsn, + prev_lsn: Lsn::INVALID, } } @@ -677,11 +678,11 @@ pub struct DatadirModification<'a> { /// Current LSN of the modification lsn: Lsn, + prev_lsn: Lsn, // The modifications are not applied directly to the underlying key-value store. // The put-functions add the modifications here, and they are flushed to the // underlying key-value store by the 'finish' function. - pending_lsns: Vec, pending_updates: HashMap>, pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, @@ -701,10 +702,8 @@ impl<'a> DatadirModification<'a> { lsn, self.lsn ); - if lsn > self.lsn { - self.pending_lsns.push(self.lsn); - self.lsn = lsn; - } + self.prev_lsn = self.lsn; + self.lsn = lsn; Ok(()) } @@ -1223,14 +1222,10 @@ impl<'a> DatadirModification<'a> { writer.delete_batch(&self.pending_deletions).await?; self.pending_deletions.clear(); - self.pending_lsns.push(self.lsn); - for pending_lsn in self.pending_lsns.drain(..) { - // Ideally, we should be able to call writer.finish_write() only once - // with the highest LSN. However, the last_record_lsn variable in the - // timeline keeps track of the latest LSN and the immediate previous LSN - // so we need to record every LSN to not leave a gap between them. - writer.finish_write(pending_lsn); - } + writer.finish_write(RecordLsn { + last: self.lsn, + prev: self.prev_lsn, + }); if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 32d827fbac52..1d5944fa33d6 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3510,14 +3510,20 @@ mod tests { writer .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) .await?; - writer.finish_write(Lsn(0x10)); + writer.finish_write(RecordLsn { + last: Lsn(0x10), + prev: Lsn::INVALID, + }); drop(writer); let writer = tline.writer().await; writer .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) .await?; - writer.finish_write(Lsn(0x20)); + writer.finish_write(RecordLsn { + last: Lsn(0x20), + prev: Lsn::INVALID, + }); drop(writer); assert_eq!( @@ -3609,16 +3615,25 @@ mod tests { writer .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20")) .await?; - writer.finish_write(Lsn(0x20)); + writer.finish_write(RecordLsn { + last: Lsn(0x20), + prev: Lsn::INVALID, + }); writer .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30")) .await?; - writer.finish_write(Lsn(0x30)); + writer.finish_write(RecordLsn { + last: Lsn(0x30), + prev: Lsn::INVALID, + }); writer .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40")) .await?; - writer.finish_write(Lsn(0x40)); + writer.finish_write(RecordLsn { + last: Lsn(0x40), + prev: Lsn::INVALID, + }); //assert_current_logical_size(&tline, Lsn(0x40)); @@ -3633,7 +3648,10 @@ mod tests { new_writer .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40")) .await?; - new_writer.finish_write(Lsn(0x40)); + new_writer.finish_write(RecordLsn { + last: Lsn(0x40), + prev: Lsn::INVALID, + }); // Check page contents on both branches assert_eq!( @@ -3667,7 +3685,10 @@ mod tests { &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), ) .await?; - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); lsn += 0x10; writer .put( @@ -3676,7 +3697,10 @@ mod tests { &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), ) .await?; - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); lsn += 0x10; } tline.freeze_and_flush().await?; @@ -3689,7 +3713,10 @@ mod tests { &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), ) .await?; - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); lsn += 0x10; writer .put( @@ -3698,7 +3725,10 @@ mod tests { &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), ) .await?; - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); } tline.freeze_and_flush().await } @@ -4085,7 +4115,10 @@ mod tests { writer .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) .await?; - writer.finish_write(Lsn(0x10)); + writer.finish_write(RecordLsn { + last: Lsn(0x10), + prev: Lsn::INVALID, + }); drop(writer); tline.freeze_and_flush().await?; @@ -4095,7 +4128,10 @@ mod tests { writer .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) .await?; - writer.finish_write(Lsn(0x20)); + writer.finish_write(RecordLsn { + last: Lsn(0x20), + prev: Lsn::INVALID, + }); drop(writer); tline.freeze_and_flush().await?; @@ -4105,7 +4141,10 @@ mod tests { writer .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30"))) .await?; - writer.finish_write(Lsn(0x30)); + writer.finish_write(RecordLsn { + last: Lsn(0x30), + prev: Lsn::INVALID, + }); drop(writer); tline.freeze_and_flush().await?; @@ -4115,7 +4154,10 @@ mod tests { writer .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40"))) .await?; - writer.finish_write(Lsn(0x40)); + writer.finish_write(RecordLsn { + last: Lsn(0x40), + prev: Lsn::INVALID, + }); drop(writer); tline.freeze_and_flush().await?; @@ -4179,7 +4221,10 @@ mod tests { &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), ) .await?; - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); drop(writer); keyspace.add_key(test_key); @@ -4241,7 +4286,10 @@ mod tests { let writer = tline.writer().await; writer.put_batch(&batch).await?; - writer.finish_write(last_lsn); + writer.finish_write(RecordLsn { + last: last_lsn, + prev: Lsn::INVALID, + }); drop(writer); let cutoff = tline.get_last_record_lsn(); @@ -4293,7 +4341,10 @@ mod tests { &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), ) .await?; - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); updated[blknum] = lsn; drop(writer); @@ -4313,7 +4364,10 @@ mod tests { &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), ) .await?; - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); drop(writer); updated[blknum] = lsn; } @@ -4378,7 +4432,10 @@ mod tests { &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), ) .await?; - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); updated[blknum] = lsn; drop(writer); @@ -4407,7 +4464,10 @@ mod tests { ) .await?; println!("updating {} at {}", blknum, lsn); - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); drop(writer); updated[blknum] = lsn; } @@ -4481,7 +4541,10 @@ mod tests { ) .await?; println!("updating [{}][{}] at {}", idx, blknum, lsn); - writer.finish_write(lsn); + writer.finish_write(RecordLsn { + last: lsn, + prev: Lsn::INVALID, + }); drop(writer); updated[idx][blknum] = lsn; } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index eece5d6b249c..fb2fb6fe81df 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -204,7 +204,7 @@ pub struct Timeline { // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the // first WAL record when the node is started up. But here, we just // keep track of it. - last_record_lsn: SeqWait, + last_record_lsn: SeqWait, // All WAL records have been processed and stored durably on files on // local disk, up to this LSN. On crash and restart, we need to re-process @@ -570,7 +570,13 @@ impl Timeline { match self .last_record_lsn - .wait_for_timeout(lsn, self.conf.wait_lsn_timeout) + .wait_for_timeout( + RecordLsn { + last: lsn, + prev: Lsn::INVALID, // We only use the last value so it does not matter what we put here + }, + self.conf.wait_lsn_timeout, + ) .await { Ok(()) => Ok(()), @@ -2691,10 +2697,10 @@ impl Timeline { Ok(()) } - fn finish_write(&self, new_lsn: Lsn) { - assert!(new_lsn.is_aligned()); + fn finish_write(&self, new_lsn: RecordLsn) { + assert!(new_lsn.last.is_aligned()); - self.metrics.last_record_gauge.set(new_lsn.0 as i64); + self.metrics.last_record_gauge.set(new_lsn.last.0 as i64); self.last_record_lsn.advance(new_lsn); } @@ -4826,7 +4832,7 @@ impl<'a> TimelineWriter<'a> { /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for /// the 'lsn' or anything older. The previous last record LSN is stored alongside /// the latest and can be read. - pub fn finish_write(&self, new_lsn: Lsn) { + pub fn finish_write(&self, new_lsn: RecordLsn) { self.tl.finish_write(new_lsn); }