Skip to content

Commit

Permalink
Merge pull request #22637 from benesch/warn-heartbeats
Browse files Browse the repository at this point in the history
persist-client: warn if heartbeat task exits with live read handle
  • Loading branch information
benesch authored Oct 25, 2023
2 parents a661b24 + 8cb70e9 commit e0ea2f1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
13 changes: 13 additions & 0 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,19 @@ where
}

if !existed {
// If the read handle was intentionally expired, this task
// *should* be aborted before it observes the expiration. So if
// we get here, this task somehow failed to keep the read lease
// alive. Warn loudly, because there's now a live read handle to
// an expired shard that will panic if used, but don't panic,
// just in case there is some edge case that results in this
// task observing the intentional expiration of a read handle.
warn!(
"heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
while read handle is live",
reader_id,
machine.shard_id(),
);
return;
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ impl Schema<ShardId> for ShardIdSchema {
#[cfg(test)]
mod tests {
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::str::FromStr;
use std::task::Context;
Expand Down Expand Up @@ -1985,12 +1986,12 @@ mod tests {
.expect("client construction failed")
.expect_open::<(), (), u64, i64>(ShardId::new())
.await;
let read_heartbeat_tasks = read
.heartbeat_tasks
let mut read_unexpired_state = read
.unexpired_state
.take()
.expect("handle should have heartbeat task");
.expect("handle should have unexpired state");
read.expire().await;
for read_heartbeat_task in read_heartbeat_tasks {
for read_heartbeat_task in mem::take(&mut read_unexpired_state.heartbeat_tasks) {
let () = read_heartbeat_task
.await
.expect("task should shutdown cleanly");
Expand Down
44 changes: 32 additions & 12 deletions src/persist-client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,8 @@ where

since: Antichain<T>,
pub(crate) last_heartbeat: EpochMillis,
explicitly_expired: bool,
lease_returner: SubscriptionLeaseReturner,

pub(crate) heartbeat_tasks: Option<Vec<JoinHandle<()>>>,
pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
}

impl<K, V, T, D> ReadHandle<K, V, T, D>
Expand Down Expand Up @@ -576,13 +574,14 @@ where
schemas,
since,
last_heartbeat,
explicitly_expired: false,
lease_returner: SubscriptionLeaseReturner {
leased_seqnos: Arc::new(Mutex::new(BTreeMap::new())),
reader_id: reader_id.clone(),
metrics,
},
heartbeat_tasks: Some(machine.start_reader_heartbeat_tasks(reader_id, gc).await),
unexpired_state: Some(UnexpiredReadHandleState {
heartbeat_tasks: machine.start_reader_heartbeat_tasks(reader_id, gc).await,
}),
}
}

Expand Down Expand Up @@ -914,9 +913,14 @@ where
/// happens.
#[instrument(level = "debug", skip_all, fields(shard = %self.machine.shard_id()))]
pub async fn expire(mut self) {
// We drop the unexpired state before expiring the reader to ensure the
// heartbeat tasks can never observe the expired state. This doesn't
// matter for correctness, but avoids confusing log output if the
// heartbeat task were to discover that its lease has been expired.
self.unexpired_state = None;

let (_, maintenance) = self.machine.expire_leased_reader(&self.reader_id).await;
maintenance.start_performing(&self.machine, &self.gc);
self.explicitly_expired = true;
}

/// Test helper for a [Self::listen] call that is expected to succeed.
Expand All @@ -929,6 +933,20 @@ where
}
}

/// State for a read handle that has not been explicitly expired.
#[derive(Debug)]
pub(crate) struct UnexpiredReadHandleState {
pub(crate) heartbeat_tasks: Vec<JoinHandle<()>>,
}

impl Drop for UnexpiredReadHandleState {
fn drop(&mut self) {
for heartbeat_task in &self.heartbeat_tasks {
heartbeat_task.abort();
}
}
}

/// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor].
///
/// To read an entire dataset, the
Expand Down Expand Up @@ -1188,14 +1206,16 @@ where
D: Semigroup + Codec64 + Send + Sync,
{
fn drop(&mut self) {
if let Some(heartbeat_tasks) = self.heartbeat_tasks.take() {
for heartbeat_task in heartbeat_tasks {
heartbeat_task.abort();
}
}
if self.explicitly_expired {
if self.unexpired_state.is_none() {
return;
}

// We drop the unexpired state before expiring the reader to ensure the
// heartbeat tasks can never observe the expired state. This doesn't
// matter for correctness, but avoids confusing log output if the
// heartbeat task were to discover that its lease has been expired.
self.unexpired_state = None;

let handle = match Handle::try_current() {
Ok(x) => x,
Err(_) => {
Expand Down

0 comments on commit e0ea2f1

Please sign in to comment.