Skip to content

Commit

Permalink
Record both prev and last value on finishing write
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Nov 30, 2023
1 parent 163570f commit e935b9a
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 54 deletions.
39 changes: 28 additions & 11 deletions libs/utils/src/lsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,41 @@ impl From<Lsn> for AtomicLsn {
}

/// Pair of LSN's pointing to the end of the last valid record and previous one
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RecordLsn {
/// LSN at the end of the current record
/// LSN at the end of the current record
pub last: Lsn,
/// LSN at the end of the previous record
pub prev: Lsn,
}

impl Ord for RecordLsn {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.last.cmp(&other.last)
}
}

impl PartialOrd for RecordLsn {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

/// Expose `self.last` as counter to be able to use RecordLsn in SeqWait
impl MonotonicCounter<Lsn> for RecordLsn {
fn cnt_advance(&mut self, lsn: Lsn) {
assert!(self.last <= lsn);
let new_prev = self.last;
self.last = lsn;
self.prev = new_prev;
}
fn cnt_value(&self) -> Lsn {
self.last
impl MonotonicCounter<RecordLsn> for RecordLsn {
fn cnt_advance(&mut self, lsn: RecordLsn) {
assert!(lsn.prev <= lsn.last);
assert!(self.last <= lsn.last);
if lsn.prev == Lsn::INVALID {
self.prev = self.last;
self.last = lsn.last;
} else {
self.prev = lsn.prev;
self.last = lsn.last;
}
}
fn cnt_value(&self) -> RecordLsn {
*self
}
}

Expand Down
7 changes: 5 additions & 2 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use postgres_ffi::DBState_DB_SHUTDOWNED;
use postgres_ffi::Oid;
use postgres_ffi::XLogFileName;
use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, RecordLsn};

// Returns checkpoint LSN from controlfile
pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
Expand Down Expand Up @@ -602,7 +602,10 @@ async fn import_file(
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer().await;
writer.finish_write(prev_lsn);
writer.finish_write(RecordLsn {
last: prev_lsn,
prev: Lsn::INVALID,
});

debug!("imported zenith signal {}", prev_lsn);
} else if file_path.starts_with("pg_tblspc") {
Expand Down
23 changes: 9 additions & 14 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::{hash_map, HashMap, HashSet};
use std::ops::Range;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use utils::lsn::RecordLsn;
use utils::{bin_ser::BeSer, lsn::Lsn};

/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
Expand Down Expand Up @@ -90,11 +91,11 @@ impl Timeline {
{
DatadirModification {
tline: self,
pending_lsns: Vec::new(),
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
lsn,
prev_lsn: Lsn::INVALID,
}
}

Expand Down Expand Up @@ -677,11 +678,11 @@ pub struct DatadirModification<'a> {

/// Current LSN of the modification
lsn: Lsn,
prev_lsn: Lsn,

// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_lsns: Vec<Lsn>,
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
Expand All @@ -701,10 +702,8 @@ impl<'a> DatadirModification<'a> {
lsn,
self.lsn
);
if lsn > self.lsn {
self.pending_lsns.push(self.lsn);
self.lsn = lsn;
}
self.prev_lsn = self.lsn;
self.lsn = lsn;
Ok(())
}

Expand Down Expand Up @@ -1223,14 +1222,10 @@ impl<'a> DatadirModification<'a> {
writer.delete_batch(&self.pending_deletions).await?;
self.pending_deletions.clear();

self.pending_lsns.push(self.lsn);
for pending_lsn in self.pending_lsns.drain(..) {
// Ideally, we should be able to call writer.finish_write() only once
// with the highest LSN. However, the last_record_lsn variable in the
// timeline keeps track of the latest LSN and the immediate previous LSN
// so we need to record every LSN to not leave a gap between them.
writer.finish_write(pending_lsn);
}
writer.finish_write(RecordLsn {
last: self.lsn,
prev: self.prev_lsn,
});

if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
Expand Down
105 changes: 84 additions & 21 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3510,14 +3510,20 @@ mod tests {
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
writer.finish_write(Lsn(0x10));
writer.finish_write(RecordLsn {
last: Lsn(0x10),
prev: Lsn::INVALID,
});
drop(writer);

let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
writer.finish_write(Lsn(0x20));
writer.finish_write(RecordLsn {
last: Lsn(0x20),
prev: Lsn::INVALID,
});
drop(writer);

assert_eq!(
Expand Down Expand Up @@ -3609,16 +3615,25 @@ mod tests {
writer
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
.await?;
writer.finish_write(Lsn(0x20));
writer.finish_write(RecordLsn {
last: Lsn(0x20),
prev: Lsn::INVALID,
});

writer
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
.await?;
writer.finish_write(Lsn(0x30));
writer.finish_write(RecordLsn {
last: Lsn(0x30),
prev: Lsn::INVALID,
});
writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
.await?;
writer.finish_write(Lsn(0x40));
writer.finish_write(RecordLsn {
last: Lsn(0x40),
prev: Lsn::INVALID,
});

//assert_current_logical_size(&tline, Lsn(0x40));

Expand All @@ -3633,7 +3648,10 @@ mod tests {
new_writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
.await?;
new_writer.finish_write(Lsn(0x40));
new_writer.finish_write(RecordLsn {
last: Lsn(0x40),
prev: Lsn::INVALID,
});

// Check page contents on both branches
assert_eq!(
Expand Down Expand Up @@ -3667,7 +3685,10 @@ mod tests {
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
lsn += 0x10;
writer
.put(
Expand All @@ -3676,7 +3697,10 @@ mod tests {
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
lsn += 0x10;
}
tline.freeze_and_flush().await?;
Expand All @@ -3689,7 +3713,10 @@ mod tests {
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
lsn += 0x10;
writer
.put(
Expand All @@ -3698,7 +3725,10 @@ mod tests {
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
}
tline.freeze_and_flush().await
}
Expand Down Expand Up @@ -4085,7 +4115,10 @@ mod tests {
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
writer.finish_write(Lsn(0x10));
writer.finish_write(RecordLsn {
last: Lsn(0x10),
prev: Lsn::INVALID,
});
drop(writer);

tline.freeze_and_flush().await?;
Expand All @@ -4095,7 +4128,10 @@ mod tests {
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
writer.finish_write(Lsn(0x20));
writer.finish_write(RecordLsn {
last: Lsn(0x20),
prev: Lsn::INVALID,
});
drop(writer);

tline.freeze_and_flush().await?;
Expand All @@ -4105,7 +4141,10 @@ mod tests {
writer
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
.await?;
writer.finish_write(Lsn(0x30));
writer.finish_write(RecordLsn {
last: Lsn(0x30),
prev: Lsn::INVALID,
});
drop(writer);

tline.freeze_and_flush().await?;
Expand All @@ -4115,7 +4154,10 @@ mod tests {
writer
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
.await?;
writer.finish_write(Lsn(0x40));
writer.finish_write(RecordLsn {
last: Lsn(0x40),
prev: Lsn::INVALID,
});
drop(writer);

tline.freeze_and_flush().await?;
Expand Down Expand Up @@ -4179,7 +4221,10 @@ mod tests {
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
drop(writer);

keyspace.add_key(test_key);
Expand Down Expand Up @@ -4241,7 +4286,10 @@ mod tests {

let writer = tline.writer().await;
writer.put_batch(&batch).await?;
writer.finish_write(last_lsn);
writer.finish_write(RecordLsn {
last: last_lsn,
prev: Lsn::INVALID,
});
drop(writer);

let cutoff = tline.get_last_record_lsn();
Expand Down Expand Up @@ -4293,7 +4341,10 @@ mod tests {
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
updated[blknum] = lsn;
drop(writer);

Expand All @@ -4313,7 +4364,10 @@ mod tests {
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
drop(writer);
updated[blknum] = lsn;
}
Expand Down Expand Up @@ -4378,7 +4432,10 @@ mod tests {
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
updated[blknum] = lsn;
drop(writer);

Expand Down Expand Up @@ -4407,7 +4464,10 @@ mod tests {
)
.await?;
println!("updating {} at {}", blknum, lsn);
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
drop(writer);
updated[blknum] = lsn;
}
Expand Down Expand Up @@ -4481,7 +4541,10 @@ mod tests {
)
.await?;
println!("updating [{}][{}] at {}", idx, blknum, lsn);
writer.finish_write(lsn);
writer.finish_write(RecordLsn {
last: lsn,
prev: Lsn::INVALID,
});
drop(writer);
updated[idx][blknum] = lsn;
}
Expand Down
Loading

0 comments on commit e935b9a

Please sign in to comment.