Skip to content

Commit

Permalink
Remove committing mode in ingest_record
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Nov 30, 2023
1 parent 9355e2f commit 163570f
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 19 deletions.
6 changes: 4 additions & 2 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,9 @@ async fn import_wal(
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.await?;
modification.commit().await?;
last_lsn = lsn;

nrecords += 1;
Expand Down Expand Up @@ -442,8 +443,9 @@ pub async fn import_wal_from_tar(
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.await?;
modification.commit().await?;
last_lsn = lsn;

debug!("imported record at {} (end {})", lsn, end_lsn);
Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,12 @@ impl<'a> DatadirModification<'a> {
}
}

/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
/// need to look up the keys in the modification first before looking them up in the
/// timeline to not miss the latest updates.
#[derive(Clone, Copy)]
pub enum Version<'a> {
Lsn(Lsn),
Expand Down
3 changes: 1 addition & 2 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,6 @@ impl Timeline {
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
let ingest_batch_size = self.conf.ingest_batch_size;
drop(tenant_conf_guard);

let mut guard = self.walreceiver.lock().unwrap();
Expand All @@ -1583,7 +1582,7 @@ impl Timeline {
max_lsn_wal_lag,
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
ingest_batch_size,
ingest_batch_size: self.conf.ingest_batch_size,
},
broker_client,
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,7 @@ pub(super) async fn handle_walreceiver_connection(

// Ingest the records without immediately committing them.
walingest
.ingest_record(
recdata,
lsn,
&mut modification,
&mut decoded,
&ctx,
false,
)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;

Expand Down
11 changes: 4 additions & 7 deletions pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl WalIngest {
pub async fn new(
timeline: &Timeline,
startpoint: Lsn,
ctx: &'_ RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
Expand Down Expand Up @@ -85,7 +85,6 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
commit: bool,
) -> anyhow::Result<()> {
let pg_version = modification.tline.pg_version;

Expand Down Expand Up @@ -353,11 +352,9 @@ impl WalIngest {
self.checkpoint_modified = false;
}

// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
if commit {
modification.commit().await?;
}
// Note that at this point this record is only cached in the modification
// until commit() is called to flush the data into the repository and update
// the latest LSN.

Ok(())
}
Expand Down

0 comments on commit 163570f

Please sign in to comment.