Skip to content

Commit c8405e2

Browse files
joostjagerclaude
andcommitted
Defer MonitorUpdatingPersister writes to flush()
Update MonitorUpdatingPersister and MonitorUpdatingPersisterAsync to queue persist operations in memory instead of writing immediately to disk. The Persist trait methods now return ChannelMonitorUpdateStatus:: InProgress and the actual writes happen when flush() is called. This fixes a race condition that could cause channel force closures: previously, if the node crashed after writing channel monitors but before writing the channel manager, the monitors would be ahead of the manager on restart. By deferring monitor writes until after the channel manager is persisted (via flush()), we ensure the manager is always at least as up-to-date as the monitors. Key changes: - Add PendingWrite enum to represent queued write/remove operations - Add pending_writes queue to MonitorUpdatingPersisterAsyncInner - Add flush() to Persist trait and ChainMonitor - ChainMonitor::flush() calls channel_monitor_updated for each completed write - Update Persist impl to queue writes and return InProgress - Call flush() in background processor after channel manager persistence - Remove unused event_notifier from AsyncPersister Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent b4fb555 commit c8405e2

File tree

3 files changed

+202
-194
lines changed

3 files changed

+202
-194
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,11 @@ where
13491349
res?;
13501350
}
13511351

1352+
// Flush any pending monitor writes after channel manager persistence.
1353+
if let Err(e) = chain_monitor.flush() {
1354+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1355+
}
1356+
13521357
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13531358
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13541359
}) {
@@ -1413,6 +1418,12 @@ where
14131418
channel_manager.get_cm().encode(),
14141419
)
14151420
.await?;
1421+
1422+
// Flush any pending monitor writes after final channel manager persistence.
1423+
if let Err(e) = chain_monitor.flush() {
1424+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1425+
}
1426+
14161427
if let Some(ref scorer) = scorer {
14171428
kv_store
14181429
.write(
@@ -1731,6 +1742,11 @@ impl BackgroundProcessor {
17311742
channel_manager.get_cm().encode(),
17321743
))?;
17331744
log_trace!(logger, "Done persisting ChannelManager.");
1745+
1746+
// Flush any pending monitor writes after channel manager persistence.
1747+
if let Err(e) = chain_monitor.flush() {
1748+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1749+
}
17341750
}
17351751

17361752
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
@@ -1853,6 +1869,12 @@ impl BackgroundProcessor {
18531869
CHANNEL_MANAGER_PERSISTENCE_KEY,
18541870
channel_manager.get_cm().encode(),
18551871
)?;
1872+
1873+
// Flush any pending monitor writes after final channel manager persistence.
1874+
if let Err(e) = chain_monitor.flush() {
1875+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1876+
}
1877+
18561878
if let Some(ref scorer) = scorer {
18571879
kv_store.write(
18581880
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,

lightning/src/chain/chainmonitor.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{
3939
use crate::chain::transaction::{OutPoint, TransactionData};
4040
use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
4141
use crate::events::{self, Event, EventHandler, ReplayEvent};
42+
use crate::io;
4243
use crate::ln::channel_state::ChannelDetails;
4344
#[cfg(peer_storage)]
4445
use crate::ln::msgs::PeerStorage;
@@ -208,6 +209,15 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
208209
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
209210
Vec::new()
210211
}
212+
213+
/// Flushes any pending writes to the underlying storage.
214+
///
215+
/// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
216+
/// from persist methods), this method should write all queued data to storage.
217+
///
218+
/// Returns the list of completed updates (channel_id, update_id) on success, or an error if
219+
/// any write failed.
220+
fn flush(&self) -> Result<Vec<(ChannelId, u64)>, io::Error>;
211221
}
212222

213223
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -272,7 +282,6 @@ pub struct AsyncPersister<
272282
FE::Target: FeeEstimator,
273283
{
274284
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
275-
event_notifier: Arc<Notifier>,
276285
}
277286

278287
impl<
@@ -320,17 +329,15 @@ where
320329
&self, monitor_name: MonitorName,
321330
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
322331
) -> ChannelMonitorUpdateStatus {
323-
let notifier = Arc::clone(&self.event_notifier);
324-
self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
332+
self.persister.queue_new_channel(monitor_name, monitor);
325333
ChannelMonitorUpdateStatus::InProgress
326334
}
327335

328336
fn update_persisted_channel(
329337
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
330338
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
331339
) -> ChannelMonitorUpdateStatus {
332-
let notifier = Arc::clone(&self.event_notifier);
333-
self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
340+
self.persister.queue_channel_update(monitor_name, monitor_update, monitor);
334341
ChannelMonitorUpdateStatus::InProgress
335342
}
336343

@@ -341,6 +348,10 @@ where
341348
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
342349
self.persister.get_and_clear_completed_updates()
343350
}
351+
352+
fn flush(&self) -> Result<Vec<(ChannelId, u64)>, io::Error> {
353+
crate::util::persist::poll_sync_future(self.persister.flush())
354+
}
344355
}
345356

346357
/// An implementation of [`chain::Watch`] for monitoring channels.
@@ -440,7 +451,6 @@ impl<
440451
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
441452
_our_peerstorage_encryption_key: PeerStorageKey,
442453
) -> Self {
443-
let event_notifier = Arc::new(Notifier::new());
444454
Self {
445455
monitors: RwLock::new(new_hash_map()),
446456
chain_source,
@@ -450,8 +460,8 @@ impl<
450460
_entropy_source,
451461
pending_monitor_events: Mutex::new(Vec::new()),
452462
highest_chain_height: AtomicUsize::new(0),
453-
event_notifier: Arc::clone(&event_notifier),
454-
persister: AsyncPersister { persister, event_notifier },
463+
event_notifier: Arc::new(Notifier::new()),
464+
persister: AsyncPersister { persister },
455465
pending_send_only_events: Mutex::new(Vec::new()),
456466
#[cfg(peer_storage)]
457467
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
@@ -742,6 +752,22 @@ where
742752
.collect()
743753
}
744754

755+
/// Flushes any pending writes to the underlying storage.
756+
///
757+
/// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
758+
/// from persist methods), this method writes all queued data to storage and signals
759+
/// completion to the channel manager via [`Self::channel_monitor_updated`].
760+
///
761+
/// Returns the list of completed updates (channel_id, update_id) on success, or an error if
762+
/// any write failed. Note that even if an error is returned, some writes may have succeeded.
763+
pub fn flush(&self) -> Result<Vec<(ChannelId, u64)>, io::Error> {
764+
let completed = self.persister.flush()?;
765+
for (channel_id, update_id) in &completed {
766+
let _ = self.channel_monitor_updated(*channel_id, *update_id);
767+
}
768+
Ok(completed)
769+
}
770+
745771
#[cfg(any(test, feature = "_test_utils"))]
746772
pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
747773
self.monitors.write().unwrap().remove(channel_id).unwrap().monitor

0 commit comments

Comments
 (0)