diff --git a/libs/sdk-core/src/breez_services.rs b/libs/sdk-core/src/breez_services.rs index e61aae281..1ced7d546 100644 --- a/libs/sdk-core/src/breez_services.rs +++ b/libs/sdk-core/src/breez_services.rs @@ -1233,7 +1233,7 @@ impl BreezServices { // update both closed channels and lightning transaction payments let mut payments = closed_channel_payments; payments.extend(new_data.payments.clone()); - self.persister.insert_or_update_payments(&payments, true)?; + self.persister.insert_or_update_payments(&payments)?; let duration = start.elapsed(); info!("Sync duration: {:?}", duration); @@ -1290,40 +1290,37 @@ impl BreezServices { amount_msat: u64, label: Option, ) -> Result<(), SendPaymentError> { - self.persister.insert_or_update_payments( - &[Payment { - id: invoice.payment_hash.clone(), - payment_type: PaymentType::Sent, - payment_time: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64, - amount_msat, - fee_msat: 0, - status: PaymentStatus::Pending, - error: None, - description: invoice.description.clone(), - details: PaymentDetails::Ln { - data: LnPaymentDetails { - payment_hash: invoice.payment_hash.clone(), - label: label.unwrap_or_default(), - destination_pubkey: invoice.payee_pubkey.clone(), - payment_preimage: String::new(), - keysend: false, - bolt11: invoice.bolt11.clone(), - lnurl_success_action: None, - lnurl_pay_domain: None, - lnurl_pay_comment: None, - ln_address: None, - lnurl_metadata: None, - lnurl_withdraw_endpoint: None, - swap_info: None, - reverse_swap_info: None, - pending_expiration_block: None, - open_channel_bolt11: None, - }, + self.persister.insert_or_update_payments(&[Payment { + id: invoice.payment_hash.clone(), + payment_type: PaymentType::Sent, + payment_time: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64, + amount_msat, + fee_msat: 0, + status: PaymentStatus::Pending, + error: None, + description: invoice.description.clone(), + details: PaymentDetails::Ln { + data: LnPaymentDetails { + payment_hash: invoice.payment_hash.clone(), + label: label.unwrap_or_default(), + destination_pubkey: invoice.payee_pubkey.clone(), + payment_preimage: String::new(), + keysend: false, + bolt11: invoice.bolt11.clone(), + lnurl_success_action: None, + lnurl_pay_domain: None, + lnurl_pay_comment: None, + ln_address: None, + lnurl_metadata: None, + lnurl_withdraw_endpoint: None, + swap_info: None, + reverse_swap_info: None, + pending_expiration_block: None, + open_channel_bolt11: None, }, - metadata: None, - }], - false, - )?; + }, + metadata: None, + }])?; self.persister.insert_payment_external_info( &invoice.payment_hash, @@ -1643,7 +1640,7 @@ impl BreezServices { if let Some(ref p) = payment { let res = cloned .persister - .insert_or_update_payments(&vec![p.clone()], false); + .insert_or_update_payments(&vec![p.clone()]); debug!("paid invoice was added to payments list {res:?}"); if let Ok(Some(mut node_info)) = cloned.persister.get_node_state() { node_info.channels_balance_msat += p.amount_msat; @@ -3111,7 +3108,7 @@ pub(crate) mod tests { let test_config = create_test_config(); let persister = Arc::new(create_test_persister(test_config.clone())); persister.init()?; - persister.insert_or_update_payments(&dummy_transactions, false)?; + persister.insert_or_update_payments(&dummy_transactions)?; persister.insert_payment_external_info( payment_hash_with_lnurl_success_action, PaymentExternalInfo { @@ -3332,7 +3329,7 @@ pub(crate) mod tests { let test_config = create_test_config(); let persister = Arc::new(create_test_persister(test_config.clone())); persister.init()?; - persister.insert_or_update_payments(&known_payments, false)?; + persister.insert_or_update_payments(&known_payments)?; persister.set_lsp(MockBreezServer {}.lsp_id(), None)?; let mut builder = BreezServicesBuilder::new(test_config.clone()); diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index 6bc53fdb7..830bb2dd6 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -31,6 +31,7 @@ use sdk_common::prelude::*; use serde::{Deserialize, Serialize}; use serde_json::{json, Map, Value}; use strum_macros::{Display, EnumString}; +use tokio::join; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::{sleep, MissedTickBehavior}; use tokio_stream::StreamExt; @@ -58,6 +59,9 @@ const MAX_PAYMENT_AMOUNT_MSAT: u64 = 4294967000; const MAX_INBOUND_LIQUIDITY_MSAT: u64 = 4000000000; const TRAMPOLINE_BASE_FEE_MSAT: u64 = 4000; const TRAMPOLINE_FEE_PPM: u64 = 5000; +const PAYMENT_STATE_PENDING: u8 = 1; +const PAYMENT_STATE_COMPLETE: u8 = 2; +const PAYMENT_STATE_FAILED: u8 = 4; pub(crate) struct Greenlight { sdk_config: Config, @@ -698,34 +702,54 @@ impl Greenlight { sync_state: &SyncState, htlc_list: Vec, ) -> NodeResult<(SyncState, Vec)> { - let mut node_client = self.get_node_client().await?; - let mut new_sync_state = sync_state.clone(); + let (receive_payments_res, send_payments_res) = join!( + self.pull_receive_payments(&sync_state.list_invoices_index), + self.pull_send_payments(&sync_state.send_pays_index, htlc_list), + ); + + let (receive_payments, list_invoices_index) = receive_payments_res?; + let (send_payments, send_pays_index) = send_payments_res?; + let mut new_state = sync_state.clone(); + new_state.list_invoices_index = list_invoices_index; + new_state.send_pays_index = send_pays_index; + + let mut payments: Vec = Vec::new(); + payments.extend(receive_payments); + payments.extend(send_payments); - // list invoices - let created_invoices = node_client + Ok((new_state, payments)) + } + + async fn pull_receive_payments( + &self, + state: &SyncIndex, + ) -> NodeResult<(Vec, SyncIndex)> { + let mut client = self.get_node_client().await?; + let created_invoices = client .list_invoices(cln::ListinvoicesRequest { index: Some(ListinvoicesIndex::Created.into()), - start: Some(sync_state.list_invoices_index.created), + start: Some(state.created), ..Default::default() }) .await? .into_inner(); - if let Some(last) = created_invoices.invoices.last() { - new_sync_state.list_invoices_index.created = last.created_index() - } - let updated_invoices = node_client + let updated_invoices = client .list_invoices(cln::ListinvoicesRequest { index: Some(ListinvoicesIndex::Updated.into()), - start: Some(sync_state.list_invoices_index.updated), + start: Some(state.updated), ..Default::default() }) .await? .into_inner(); + let mut new_state = state.clone(); + if let Some(last) = created_invoices.invoices.last() { + new_state.created = last.created_index() + } if let Some(last) = updated_invoices.invoices.last() { - new_sync_state.list_invoices_index.updated = last.created_index() + new_state.updated = last.updated_index() } - // construct the received transactions by filtering the invoices to those paid and beyond the filter timestamp - let received_transactions: NodeResult> = created_invoices + + let received_payments: NodeResult> = created_invoices .invoices .into_iter() .chain(updated_invoices.invoices.into_iter()) @@ -733,37 +757,41 @@ impl Greenlight { .map(TryInto::try_into) .collect(); - // fetch payments from greenlight - let created_send_pays = node_client + Ok((received_payments?, new_state)) + } + + async fn pull_send_payments( + &self, + state: &SyncIndex, + htlc_list: Vec, + ) -> NodeResult<(Vec, SyncIndex)> { + let mut client = self.get_node_client().await?; + let created_send_pays = client .list_send_pays(cln::ListsendpaysRequest { index: Some(ListsendpaysIndex::Created.into()), - start: Some(sync_state.send_pays_index.created), + start: Some(state.created), ..Default::default() }) .await? .into_inner(); - if let Some(last) = created_send_pays.payments.last() { - new_sync_state.send_pays_index.created = last.created_index() - } - let updated_send_pays = node_client + let updated_send_pays = client .list_send_pays(cln::ListsendpaysRequest { index: Some(ListsendpaysIndex::Updated.into()), - start: Some(sync_state.send_pays_index.updated), + start: Some(state.updated), ..Default::default() }) .await? .into_inner(); + + let mut new_state = state.clone(); + if let Some(last) = created_send_pays.payments.last() { + new_state.created = last.created_index() + } if let Some(last) = updated_send_pays.payments.last() { - new_sync_state.send_pays_index.updated = last.created_index() + new_state.updated = last.updated_index() } - let mut hashes: Vec<_> = created_send_pays - .payments - .iter() - .chain(updated_send_pays.payments.iter()) - .map(|p| p.payment_hash.clone()) - .collect(); - hashes.dedup(); - let hash_groups: HashSet<_> = created_send_pays + + let hash_groups: HashMap<_, _> = created_send_pays .payments .iter() .chain(updated_send_pays.payments.iter()) @@ -771,14 +799,11 @@ impl Greenlight { let mut key = hex::encode(&p.payment_hash); key.push('|'); key.push_str(&p.groupid.to_string()); - key + (key, (p.payment_hash.clone(), p.groupid)) }) .collect(); - trace!( - "list sendpays: created: {:?}, updated: {:?}", - created_send_pays, - updated_send_pays - ); + let hash_group_values: Vec<_> = hash_groups.values().cloned().collect(); + self.persister.insert_send_pays( &created_send_pays .payments @@ -797,7 +822,7 @@ impl Greenlight { // Now all new send_pays are persisted. Retrieve the send_pays for the // payment hashes, to ensure any send_pays belonging to the same payment // that were not fetched in this round are also included. - let send_pays = self.persister.list_send_pays(&hashes)?; + let send_pays = self.persister.list_send_pays(&hash_group_values)?; // Now that all send_pays belonging to all payments are here, aggregate // the send_pays into payments. This is a copy of what core lightning's @@ -807,9 +832,6 @@ impl Greenlight { let mut key = hex::encode(&send_pay.payment_hash); key.push('|'); key.push_str(&send_pay.groupid.to_string()); - if !hash_groups.contains(&key) { - continue; - } let payment = outbound_payments.entry(key).or_insert(SendPayAgg { state: 0, created_at: send_pay.created_at, @@ -820,7 +842,7 @@ impl Greenlight { description: None, preimage: None, amount_sent: 0, - amount: None, + amount: Some(0), num_nonfailed_parts: 0, }); if payment.bolt11.is_none() { @@ -846,15 +868,15 @@ impl Greenlight { SendPayStatus::Pending => { add_amount_sent(payment, send_pay.amount_sent_msat, send_pay.amount_msat); payment.num_nonfailed_parts += 1; - payment.state |= 1; + payment.state |= PAYMENT_STATE_PENDING; } SendPayStatus::Failed => { - payment.state |= 4; + payment.state |= PAYMENT_STATE_FAILED; } SendPayStatus::Complete => { add_amount_sent(payment, send_pay.amount_sent_msat, send_pay.amount_msat); payment.num_nonfailed_parts += 1; - payment.state |= 2; + payment.state |= PAYMENT_STATE_COMPLETE; } } } @@ -864,12 +886,7 @@ impl Greenlight { .map(TryInto::try_into) .collect::, _>>()?; let outbound_payments = update_payment_expirations(outbound_payments, htlc_list)?; - - let mut transactions: Vec = Vec::new(); - transactions.extend(received_transactions?); - transactions.extend(outbound_payments); - - Ok((new_sync_state, transactions)) + Ok((outbound_payments, new_state)) } } @@ -895,106 +912,6 @@ fn add_amount_sent( } } -impl TryFrom for SendPay { - type Error = NodeError; - - fn try_from(value: ListsendpaysPayments) -> std::result::Result { - Ok(SendPay { - created_index: value - .created_index - .ok_or(NodeError::generic("missing created index"))?, - updated_index: value.updated_index, - groupid: value.groupid, - partid: value.partid, - payment_hash: value.payment_hash, - status: value.status.try_into()?, - amount_msat: value.amount_msat.map(|a| a.msat), - destination: value.destination, - created_at: value.created_at, - amount_sent_msat: value.amount_sent_msat.map(|a| a.msat), - label: value.label, - bolt11: value.bolt11, - description: value.description, - bolt12: value.bolt12, - payment_preimage: value.payment_preimage, - erroronion: value.erroronion, - }) - } -} - -impl TryFrom for SendPayStatus { - type Error = NodeError; - - fn try_from(value: i32) -> std::result::Result { - match value { - 0 => Ok(Self::Pending), - 1 => Ok(Self::Failed), - 2 => Ok(Self::Complete), - _ => Err(NodeError::generic("invalid send_pay status")), - } - } -} - -impl TryFrom for Payment { - type Error = NodeError; - - fn try_from(value: SendPayAgg) -> std::result::Result { - // For trampoline payments the amount_msat doesn't match the actual - // amount. If it's a trampoline payment, take the amount from the label. - let (payment_amount, client_label) = - serde_json::from_str::(&value.label.clone().unwrap_or_default()) - .ok() - .and_then(|label| { - label - .trampoline - .then_some((label.amount_msat, label.client_label)) - }) - .unwrap_or((value.amount.unwrap_or_default(), value.label.clone())); - let fee_msat = match value.amount { - Some(amount) => value.amount_sent.saturating_sub(amount), - None => 0, - }; - let status = if value.state & 2 > 0 { - PaymentStatus::Complete - } else if value.state & 1 > 0 { - PaymentStatus::Pending - } else { - PaymentStatus::Failed - }; - Ok(Self { - id: hex::encode(&value.payment_hash), - payment_type: PaymentType::Sent, - payment_time: value.created_at as i64, - amount_msat: payment_amount, - fee_msat, - status, - error: None, - description: value.description, - details: PaymentDetails::Ln { - data: LnPaymentDetails { - payment_hash: hex::encode(&value.payment_hash), - label: client_label.unwrap_or_default(), - destination_pubkey: value.destination.map(hex::encode).unwrap_or_default(), - payment_preimage: value.preimage.map(hex::encode).unwrap_or_default(), - keysend: value.bolt11.is_none(), - bolt11: value.bolt11.unwrap_or_default(), - open_channel_bolt11: None, - lnurl_success_action: None, - lnurl_pay_domain: None, - lnurl_pay_comment: None, - ln_address: None, - lnurl_metadata: None, - lnurl_withdraw_endpoint: None, - swap_info: None, - reverse_swap_info: None, - pending_expiration_block: None, - }, - }, - metadata: None, - }) - } -} - #[derive(Clone, Debug, Default, Deserialize, Serialize)] struct SyncIndex { pub created: u64, @@ -2087,7 +2004,7 @@ enum NodeCommand { } struct SendPayAgg { - state: i32, + state: u8, created_at: u64, payment_hash: Vec, bolt11: Option, @@ -2129,6 +2046,117 @@ fn update_payment_expirations( Ok(payments_res) } +impl TryFrom for SendPay { + type Error = NodeError; + + fn try_from(value: ListsendpaysPayments) -> std::result::Result { + Ok(SendPay { + created_index: value + .created_index + .ok_or(NodeError::generic("missing created index"))?, + updated_index: value.updated_index, + groupid: value.groupid, + partid: value.partid, + payment_hash: value.payment_hash, + status: value.status.try_into()?, + amount_msat: value.amount_msat.map(|a| a.msat), + destination: value.destination, + created_at: value.created_at, + amount_sent_msat: value.amount_sent_msat.map(|a| a.msat), + label: value.label, + bolt11: value.bolt11, + description: value.description, + bolt12: value.bolt12, + payment_preimage: value.payment_preimage, + erroronion: value.erroronion, + }) + } +} + +impl TryFrom for SendPayStatus { + type Error = NodeError; + + fn try_from(value: i32) -> std::result::Result { + match value { + 0 => Ok(Self::Pending), + 1 => Ok(Self::Failed), + 2 => Ok(Self::Complete), + _ => Err(NodeError::generic("invalid send_pay status")), + } + } +} + +impl TryFrom for Payment { + type Error = NodeError; + + fn try_from(value: SendPayAgg) -> std::result::Result { + let ln_invoice = value + .bolt11 + .as_ref() + .ok_or(InvoiceError::generic("No bolt11 invoice")) + .and_then(|b| parse_invoice(b)); + + // For trampoline payments the amount_msat doesn't match the actual + // amount. If it's a trampoline payment, take the amount from the label. + let (payment_amount, client_label) = + serde_json::from_str::(&value.label.clone().unwrap_or_default()) + .ok() + .and_then(|label| { + label + .trampoline + .then_some((label.amount_msat, label.client_label)) + }) + .unwrap_or((value.amount.unwrap_or_default(), value.label)); + let fee_msat = match value.amount { + Some(amount) => value.amount_sent.saturating_sub(amount), + None => 0, + }; + let status = if value.state & PAYMENT_STATE_COMPLETE > 0 { + PaymentStatus::Complete + } else if value.state & PAYMENT_STATE_PENDING > 0 { + PaymentStatus::Pending + } else { + PaymentStatus::Failed + }; + Ok(Self { + id: hex::encode(&value.payment_hash), + payment_type: PaymentType::Sent, + payment_time: value.created_at as i64, + amount_msat: match status { + PaymentStatus::Complete => payment_amount, + _ => ln_invoice + .as_ref() + .map_or(0, |i| i.amount_msat.unwrap_or_default()), + }, + fee_msat, + status, + error: None, + description: ln_invoice.map(|i| i.description).unwrap_or_default(), + details: PaymentDetails::Ln { + data: LnPaymentDetails { + payment_hash: hex::encode(&value.payment_hash), + label: client_label.unwrap_or_default(), + destination_pubkey: value.destination.map(hex::encode).unwrap_or_default(), + payment_preimage: value.preimage.map(hex::encode).unwrap_or_default(), + keysend: value.bolt11.is_none(), + bolt11: value.bolt11.unwrap_or_default(), + open_channel_bolt11: None, + lnurl_success_action: None, + lnurl_pay_domain: None, + lnurl_pay_comment: None, + ln_address: None, + lnurl_metadata: None, + lnurl_withdraw_endpoint: None, + swap_info: None, + reverse_swap_info: None, + pending_expiration_block: None, + }, + }, + metadata: None, + }) + } +} + //pub(crate) fn offchain_payment_to_transaction impl TryFrom for Payment { type Error = NodeError; diff --git a/libs/sdk-core/src/persist/migrations.rs b/libs/sdk-core/src/persist/migrations.rs index a5769a673..75f4d4fc4 100644 --- a/libs/sdk-core/src/persist/migrations.rs +++ b/libs/sdk-core/src/persist/migrations.rs @@ -454,7 +454,9 @@ pub(crate) fn current_migrations() -> Vec<&'static str> { payment_preimage BLOB, erroronion BLOB ) STRICT; - " + ", + "DELETE FROM payments", + "DELETE FROM cached_items WHERE key = 'sync_state'", ] } diff --git a/libs/sdk-core/src/persist/send_pays.rs b/libs/sdk-core/src/persist/send_pays.rs index 523805181..e4055c8e3 100644 --- a/libs/sdk-core/src/persist/send_pays.rs +++ b/libs/sdk-core/src/persist/send_pays.rs @@ -72,7 +72,10 @@ impl SqliteStorage { Ok(()) } - pub(crate) fn list_send_pays(&self, hashes: &[Vec]) -> PersistResult> { + pub(crate) fn list_send_pays( + &self, + hash_groups: &[(Vec, u64)], + ) -> PersistResult> { let conn = self.get_connection()?; let mut stmt = conn.prepare( r#"SELECT created_index, updated_index, groupid, partid, @@ -80,14 +83,16 @@ impl SqliteStorage { amount_sent_msat, label, bolt11, description, bolt12, payment_preimage, erroronion FROM send_pays - WHERE payment_hash = :payment_hash"#, + WHERE payment_hash = :payment_hash AND groupid = :groupid + ORDER BY created_index"#, )?; let mut send_pays = Vec::new(); - for hash in hashes { + for hash_group in hash_groups { let rows: Vec<_> = stmt .query_map( named_params! { - ":payment_hash": hash + ":payment_hash": hash_group.0, + ":groupid": hash_group.1, }, |row| { let status: i32 = row.get("status")?; diff --git a/libs/sdk-core/src/persist/transactions.rs b/libs/sdk-core/src/persist/transactions.rs index 34fc71c66..9d3cbec06 100644 --- a/libs/sdk-core/src/persist/transactions.rs +++ b/libs/sdk-core/src/persist/transactions.rs @@ -20,16 +20,7 @@ impl SqliteStorage { /// Note that, if a payment has details of type [LnPaymentDetails] which contain a [SuccessActionProcessed], /// then the [LnPaymentDetails] will NOT be persisted. In that case, the [SuccessActionProcessed] /// can be inserted separately via [SqliteStorage::insert_payment_external_info]. - pub fn insert_or_update_payments( - &self, - transactions: &[Payment], - delete_pending: bool, - ) -> PersistResult<()> { - if delete_pending { - let deleted = self.delete_pending_lightning_payments()?; - debug!("Deleted {deleted} pending payments"); - } - + pub fn insert_or_update_payments(&self, transactions: &[Payment]) -> PersistResult<()> { let con = self.get_connection()?; let mut prep_statement = con.prepare( " @@ -62,14 +53,6 @@ impl SqliteStorage { Ok(()) } - /// Deletes any pending sent payments and returns the deleted count - fn delete_pending_lightning_payments(&self) -> PersistResult { - Ok(self.get_connection()?.execute( - "DELETE FROM payments WHERE payment_type = ?1 AND status = ?2", - params![PaymentType::Sent.to_string(), PaymentStatus::Pending], - )?) - } - /// Inserts metadata associated with this payment pub fn insert_payment_external_info( &self, @@ -776,8 +759,8 @@ fn test_ln_transactions() -> PersistResult<(), Box> { }]; let storage = SqliteStorage::new(test_utils::create_test_sql_dir()); storage.init()?; - storage.insert_or_update_payments(&txs, false)?; - storage.insert_or_update_payments(&failed_txs, false)?; + storage.insert_or_update_payments(&txs)?; + storage.insert_or_update_payments(&failed_txs)?; storage.insert_payment_external_info( payment_hash_with_lnurl_success_action, PaymentExternalInfo { @@ -868,7 +851,7 @@ fn test_ln_transactions() -> PersistResult<(), Box> { matches!( &retrieve_txs[1].details, PaymentDetails::Ln {data: LnPaymentDetails {swap_info: swap, ..}} if swap == &Some(swap_info)) ); - storage.insert_or_update_payments(&txs, false)?; + storage.insert_or_update_payments(&txs)?; let retrieve_txs = storage.list_payments(ListPaymentsRequest::default())?; assert_eq!(retrieve_txs.len(), 5); assert_eq!(retrieve_txs, txs); diff --git a/libs/sdk-core/src/swap_in/swap.rs b/libs/sdk-core/src/swap_in/swap.rs index bf2342243..f3758677e 100644 --- a/libs/sdk-core/src/swap_in/swap.rs +++ b/libs/sdk-core/src/swap_in/swap.rs @@ -1167,7 +1167,7 @@ mod tests { }, metadata: None, }; - persister.insert_or_update_payments(&vec![payment.clone()], false)?; + persister.insert_or_update_payments(&vec![payment.clone()])?; // We test the case that a confirmed transaction was detected on chain that // sent funds to this address. @@ -1198,7 +1198,7 @@ mod tests { // paid_amount of the swap. let mut payment = payment.clone(); payment.amount_msat = 2_000; - persister.insert_or_update_payments(&vec![payment], false)?; + persister.insert_or_update_payments(&vec![payment])?; swapper .on_event(BreezEvent::InvoicePaid { details: crate::InvoicePaidDetails {