Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 119 additions & 47 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ struct HarnessNode<'a> {
persistence_style: ChannelMonitorUpdateStatus,
deferred: bool,
serialized_manager: Vec<u8>,
serialized_manager_generation: u64,
height: u32,
last_htlc_clear_fee: u32,
}
Expand Down Expand Up @@ -917,6 +918,7 @@ impl<'a> HarnessNode<'a> {
persistence_style,
deferred,
serialized_manager: Vec::new(),
serialized_manager_generation: 0,
height: 0,
last_htlc_clear_fee: 253,
}
Expand Down Expand Up @@ -976,6 +978,7 @@ impl<'a> HarnessNode<'a> {
if self.node.get_and_clear_needs_persistence() {
let pending_monitor_writes = self.monitor.pending_operation_count();
self.serialized_manager = self.node.encode();
self.serialized_manager_generation += 1;
if self.deferred {
self.monitor.flush(pending_monitor_writes, &self.logger);
} else {
Expand All @@ -991,6 +994,7 @@ impl<'a> HarnessNode<'a> {
fn force_checkpoint_manager_persistence(&mut self) {
let pending_monitor_writes = self.monitor.pending_operation_count();
self.serialized_manager = self.node.encode();
self.serialized_manager_generation += 1;
self.node.get_and_clear_needs_persistence();
if self.deferred {
self.monitor.flush(pending_monitor_writes, &self.logger);
Expand All @@ -999,6 +1003,10 @@ impl<'a> HarnessNode<'a> {
}
}

fn next_manager_persistence_generation(&self) -> u64 {
self.serialized_manager_generation + 1
}

fn bump_fee_estimate(&mut self, chan_type: ChanType) {
let mut max_feerate = self.last_htlc_clear_fee;
if matches!(chan_type, ChanType::Legacy) {
Expand Down Expand Up @@ -1098,7 +1106,8 @@ impl<'a> HarnessNode<'a> {

fn reload<Out: Output + MaybeSend + MaybeSync>(
&mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
) {
) -> u64 {
let loaded_manager_generation = self.serialized_manager_generation;
let logger = Self::build_logger(self.node_id, out);
let persister = Self::build_persister(self.persistence_style);
let chain_monitor = Self::build_chain_monitor(
Expand Down Expand Up @@ -1170,6 +1179,7 @@ impl<'a> HarnessNode<'a> {
// even if the reloaded ChannelManager does not need persistence. Always checkpoint here so
// those registrations can be flushed against the manager snapshot they belong to.
self.force_checkpoint_manager_persistence();
loaded_manager_generation
}
}

Expand Down Expand Up @@ -1476,15 +1486,79 @@ impl PeerLink {
}
}

struct PendingPayment {
payment_id: PaymentId,
payment_hash: PaymentHash,
first_persisted_manager_generation: u64,
}

struct NodePayments {
pending: Vec<PaymentId>,
pending: Vec<PendingPayment>,
resolved: HashMap<PaymentId, Option<PaymentHash>>,
}

impl NodePayments {
fn new() -> Self {
Self { pending: Vec::new(), resolved: new_hash_map() }
}

fn add_pending(
&mut self, payment_id: PaymentId, payment_hash: PaymentHash,
first_persisted_manager_generation: u64,
) {
self.pending.push(PendingPayment {
payment_id,
payment_hash,
first_persisted_manager_generation,
});
}

fn mark_sent(&mut self, sent_id: PaymentId, payment_hash: PaymentHash) {
let idx_opt = self.pending.iter().position(|pending| pending.payment_id == sent_id);
if let Some(idx) = idx_opt {
self.pending.remove(idx);
self.resolved.insert(sent_id, Some(payment_hash));
} else {
assert!(self.resolved.contains_key(&sent_id));
}
}

fn mark_resolved_without_hash(&mut self, payment_id: PaymentId) {
let idx_opt = self.pending.iter().position(|pending| pending.payment_id == payment_id);
if let Some(idx) = idx_opt {
self.pending.remove(idx);
self.resolved.insert(payment_id, None);
} else if !self.resolved.contains_key(&payment_id) {
// Some resolutions can arrive immediately, before the send helper records
// the payment as pending. Track them so later duplicate events are accepted.
self.resolved.insert(payment_id, None);
}
}

fn mark_successful_probe(&mut self, payment_id: PaymentId) {
let idx_opt = self.pending.iter().position(|pending| pending.payment_id == payment_id);
if let Some(idx) = idx_opt {
self.pending.remove(idx);
self.resolved.insert(payment_id, None);
} else {
assert!(self.resolved.contains_key(&payment_id));
}
}

fn sync_pending_with_manager_generation(
&mut self, loaded_manager_generation: u64,
) -> Vec<PaymentHash> {
let mut rolled_back_payment_hashes = Vec::new();
let pending = mem::take(&mut self.pending);
for pending_payment in pending {
if pending_payment.first_persisted_manager_generation > loaded_manager_generation {
rolled_back_payment_hashes.push(pending_payment.payment_hash);
} else {
self.pending.push(pending_payment);
}
}
rolled_back_payment_hashes
}
}

struct PaymentTracker {
Expand Down Expand Up @@ -1590,7 +1664,11 @@ impl PaymentTracker {
},
};
if succeeded {
self.nodes[source_idx].pending.push(id);
self.nodes[source_idx].add_pending(
id,
hash,
source.next_manager_persistence_generation(),
);
}
succeeded
}
Expand Down Expand Up @@ -1667,7 +1745,11 @@ impl PaymentTracker {
},
};
if succeeded {
self.nodes[source_idx].pending.push(id);
self.nodes[source_idx].add_pending(
id,
hash,
source.next_manager_persistence_generation(),
);
}
}

Expand Down Expand Up @@ -1736,7 +1818,11 @@ impl PaymentTracker {
Ok(()) => Self::check_payment_send_events(source, id),
};
if succeeded {
self.nodes[source_idx].pending.push(id);
self.nodes[source_idx].add_pending(
id,
hash,
source.next_manager_persistence_generation(),
);
}
}

Expand Down Expand Up @@ -1836,7 +1922,11 @@ impl PaymentTracker {
Ok(()) => Self::check_payment_send_events(source, id),
};
if succeeded {
self.nodes[source_idx].pending.push(id);
self.nodes[source_idx].add_pending(
id,
hash,
source.next_manager_persistence_generation(),
);
}
}

Expand All @@ -1853,41 +1943,6 @@ impl PaymentTracker {
}
}

fn mark_sent(&mut self, node_idx: usize, sent_id: PaymentId, payment_hash: PaymentHash) {
let node = &mut self.nodes[node_idx];
let idx_opt = node.pending.iter().position(|id| *id == sent_id);
if let Some(idx) = idx_opt {
node.pending.remove(idx);
node.resolved.insert(sent_id, Some(payment_hash));
} else {
assert!(node.resolved.contains_key(&sent_id));
}
}

fn mark_resolved_without_hash(&mut self, node_idx: usize, payment_id: PaymentId) {
let node = &mut self.nodes[node_idx];
let idx_opt = node.pending.iter().position(|id| *id == payment_id);
if let Some(idx) = idx_opt {
node.pending.remove(idx);
node.resolved.insert(payment_id, None);
} else if !node.resolved.contains_key(&payment_id) {
// Some resolutions can arrive immediately, before the send helper records
// the payment as pending. Track them so later duplicate events are accepted.
node.resolved.insert(payment_id, None);
}
}

fn mark_successful_probe(&mut self, node_idx: usize, payment_id: PaymentId) {
let node = &mut self.nodes[node_idx];
let idx_opt = node.pending.iter().position(|id| *id == payment_id);
if let Some(idx) = idx_opt {
node.pending.remove(idx);
node.resolved.insert(payment_id, None);
} else {
assert!(node.resolved.contains_key(&payment_id));
}
}

fn assert_all_resolved(&self) {
for (idx, node) in self.nodes.iter().enumerate() {
assert!(
Expand Down Expand Up @@ -2725,17 +2780,17 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
}
},
events::Event::PaymentSent { payment_id, payment_hash, .. } => {
payments.mark_sent(node_idx, payment_id.unwrap(), payment_hash);
payments.nodes[node_idx].mark_sent(payment_id.unwrap(), payment_hash);
},
// Even though we don't explicitly send probes, because probes are detected based on
// hashing the payment hash+preimage, it is rather trivial for the fuzzer to build
// payments that accidentally end up looking like probes.
events::Event::ProbeSuccessful { payment_id, .. } => {
payments.mark_successful_probe(node_idx, payment_id);
payments.nodes[node_idx].mark_successful_probe(payment_id);
},
events::Event::PaymentFailed { payment_id, .. }
| events::Event::ProbeFailed { payment_id, .. } => {
payments.mark_resolved_without_hash(node_idx, payment_id);
payments.nodes[node_idx].mark_resolved_without_hash(payment_id);
},
events::Event::PaymentClaimed { .. } => {},
events::Event::PaymentPathSuccessful { .. } => {},
Expand Down Expand Up @@ -2860,6 +2915,9 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
}

fn restart_node(&mut self, node_idx: usize, v: u8, router: &'a FuzzRouter) {
if !self.nodes[node_idx].deferred {
self.nodes[node_idx].checkpoint_manager_persistence();
}
match node_idx {
0 => {
self.ab_link.disconnect_for_reload(0, &self.nodes, &mut self.queues);
Expand All @@ -2873,7 +2931,13 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
},
_ => panic!("invalid node index"),
}
self.nodes[node_idx].reload(v, &self.out, router, self.chan_type);
let loaded_manager_generation =
self.nodes[node_idx].reload(v, &self.out, router, self.chan_type);
let rolled_back_payment_hashes = self.payments.nodes[node_idx]
.sync_pending_with_manager_generation(loaded_manager_generation);
for payment_hash in rolled_back_payment_hashes {
self.payments.claimed_payment_hashes.remove(&payment_hash);
}
}

fn settle_all(&mut self) {
Expand Down Expand Up @@ -3116,6 +3180,16 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
0x88 => harness.nodes[2].bump_fee_estimate(harness.chan_type),
0x89 => harness.nodes[2].reset_fee_estimate(),

0x90 => {
harness.nodes[0].checkpoint_manager_persistence();
},
0x91 => {
harness.nodes[1].checkpoint_manager_persistence();
},
0x92 => {
harness.nodes[2].checkpoint_manager_persistence();
},

0xa0 => {
if !cfg!(splicing) {
break 'fuzz_loop;
Expand Down Expand Up @@ -3370,8 +3444,6 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
},
_ => break 'fuzz_loop,
}

harness.checkpoint_manager_persistences();
}
harness.finish();
}
Expand Down
Loading