Skip to content

Commit 877ea86

Browse files
committed
Improve error propagation to sender in SnapshotPartitionTask
1 parent 164a3f3 commit 877ea86

File tree

3 files changed

+71
-44
lines changed

3 files changed

+71
-44
lines changed

crates/partition-store/src/partition_store_manager.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,10 @@ impl PartitionStoreManager {
201201
snapshot_id: SnapshotId,
202202
snapshot_base_path: PathBuf,
203203
) -> anyhow::Result<LocalPartitionSnapshot> {
204-
let mut partition_store = self.lookup.lock().await
204+
let mut partition_store = self
205+
.lookup
206+
.lock()
207+
.await
205208
.live
206209
.get_mut(&partition_id)
207210
.ok_or(anyhow!("Unknown partition: {}", partition_id))?

crates/worker/src/partition_processor_manager/mod.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ impl PartitionProcessorManager {
304304
}
305305
}
306306

307-
#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))]
307+
#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)
308+
))]
308309
fn on_asynchronous_event(&mut self, event: AsynchronousEvent) {
309310
let AsynchronousEvent {
310311
partition_id,
@@ -543,7 +544,7 @@ impl PartitionProcessorManager {
543544
status.last_archived_log_lsn = self
544545
.archived_lsn_channels
545546
.get(partition_id)
546-
.and_then(|(_, rx)| rx.borrow().clone());
547+
.and_then(|(_, rx)| *rx.borrow());
547548

548549
Some((*partition_id, status))
549550
} else {
@@ -786,15 +787,8 @@ impl PartitionProcessorManager {
786787
TaskKind::PartitionSnapshotProducer,
787788
"create-snapshot",
788789
Some(partition_id),
789-
async move {
790-
create_snapshot_task
791-
.create_snapshot()
792-
.await
793-
.inspect_err(|err| {
794-
warn!("Unhandled error in create_snapshot task: {}", err)
795-
})
796-
}
797-
.instrument(snapshot_span),
790+
async move { create_snapshot_task.create_snapshot().await }
791+
.instrument(snapshot_span),
798792
);
799793

800794
match spawn_task_result {

crates/worker/src/partition_processor_manager/snapshot_task.rs

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
use std::path::PathBuf;
1212
use std::time::SystemTime;
13-
1413
use tokio::sync::{oneshot, watch};
1514
use tracing::{debug, warn};
1615

@@ -39,49 +38,80 @@ impl SnapshotPartitionTask {
3938
"Creating partition snapshot"
4039
);
4140

42-
let snapshot_id = SnapshotId::new();
43-
let snapshot = self
44-
.partition_store_manager
45-
.export_partition_snapshot(self.partition_id, snapshot_id, self.snapshot_base_path)
46-
.await?;
47-
48-
let metadata = write_snapshot_metadata_header(
49-
snapshot_id,
41+
let result = create_snapshot_inner(
42+
self.partition_store_manager,
5043
self.cluster_name,
5144
self.node_name,
5245
self.partition_id,
53-
snapshot,
46+
self.snapshot_base_path,
47+
self.archived_lsn_sender,
5448
)
55-
.await?;
49+
.await;
5650

57-
// todo(pavel): SnapshotRepository integration will go in here in a future PR
51+
match result {
52+
Ok(metadata) => {
53+
let _ = self
54+
.result_sender
55+
.send(Ok(metadata.snapshot_id))
56+
.inspect_err(|err| {
57+
warn!(
58+
"Failed to send snapshot acknowledgement after snapshot {} was successfully created: {:?}",
59+
metadata.snapshot_id,
60+
err
61+
)
62+
});
5863

59-
self.archived_lsn_sender
60-
.send(Some(metadata.min_applied_lsn))?;
61-
62-
let _ = self
63-
.result_sender
64-
.send(Ok(metadata.snapshot_id))
65-
.inspect_err(|err| {
64+
debug!(
65+
partition_id = %self.partition_id,
66+
snapshot_id = %metadata.snapshot_id,
67+
archived_lsn = %metadata.min_applied_lsn,
68+
"Partition snapshot created"
69+
);
70+
Ok(metadata.snapshot_id)
71+
}
72+
Err(err) => {
6673
warn!(
67-
"Failed to send snapshot acknowledgement after snapshot {} was successfully created: {:?}",
68-
metadata.snapshot_id,
74+
partition_id = %self.partition_id,
75+
"Failed to create partition snapshot: {}",
6976
err
70-
)
71-
});
72-
73-
debug!(
74-
partition_id = %self.partition_id,
75-
snapshot_id = %metadata.snapshot_id,
76-
archived_lsn = %metadata.min_applied_lsn,
77-
"Partition snapshot created"
78-
);
77+
);
7978

80-
Ok(metadata.snapshot_id)
79+
Err(err)
80+
}
81+
}
8182
}
8283
}
8384

84-
pub async fn write_snapshot_metadata_header(
85+
async fn create_snapshot_inner(
86+
partition_store_manager: PartitionStoreManager,
87+
cluster_name: String,
88+
node_name: String,
89+
partition_id: PartitionId,
90+
snapshot_base_path: PathBuf,
91+
archived_lsn_sender: watch::Sender<Option<Lsn>>,
92+
) -> anyhow::Result<PartitionSnapshotMetadata> {
93+
let snapshot_id = SnapshotId::new();
94+
let snapshot = partition_store_manager
95+
.export_partition_snapshot(partition_id, snapshot_id, snapshot_base_path.clone())
96+
.await?;
97+
98+
let metadata = write_snapshot_metadata_header(
99+
snapshot_id,
100+
cluster_name,
101+
node_name,
102+
partition_id,
103+
snapshot,
104+
)
105+
.await?;
106+
107+
// todo(pavel): SnapshotRepository integration will go in here in a future PR
108+
109+
archived_lsn_sender.send(Some(metadata.min_applied_lsn))?;
110+
111+
Ok(metadata)
112+
}
113+
114+
async fn write_snapshot_metadata_header(
85115
snapshot_id: SnapshotId,
86116
cluster_name: String,
87117
node_name: String,

0 commit comments

Comments
 (0)