diff --git a/libs/sdk-core/src/breez_services.rs b/libs/sdk-core/src/breez_services.rs index 06f6d1d74..05d4d5331 100644 --- a/libs/sdk-core/src/breez_services.rs +++ b/libs/sdk-core/src/breez_services.rs @@ -1188,16 +1188,16 @@ impl BreezServices { let node_pubkey = self.node_api.start().await?; self.connect_lsp_peer(node_pubkey).await?; - // First query the changes since last sync time. - let since_timestamp = self.persister.get_last_sync_time()?.unwrap_or(0); + // First query the changes since last sync state. + let sync_state = self.persister.get_sync_state()?; let new_data = &self .node_api - .pull_changed(since_timestamp, match_local_balance) + .pull_changed(sync_state.clone(), match_local_balance) .await?; debug!( - "pull changed time={:?} {:?}", - since_timestamp, new_data.payments + "pull changed old state={:?} new state={:?}", + sync_state, new_data.sync_state ); // update node state and channels state @@ -1232,11 +1232,8 @@ impl BreezServices { let duration = start.elapsed(); info!("Sync duration: {:?}", duration); - // update the cached last sync time - if let Ok(last_payment_timestamp) = self.persister.last_payment_timestamp() { - self.persister.set_last_sync_time(last_payment_timestamp)?; - } - + // update the cached sync state + self.persister.set_sync_state(&new_data.sync_state)?; self.notify_event_listeners(BreezEvent::Synced).await?; Ok(()) } diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index 31d8a60c5..64146ad3c 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -14,11 +14,13 @@ use gl_client::credentials::{Device, Nobody}; use gl_client::node; use gl_client::node::ClnClient; use gl_client::pb::cln::listinvoices_invoices::ListinvoicesInvoicesStatus; +use gl_client::pb::cln::listinvoices_request::ListinvoicesIndex; use gl_client::pb::cln::listpays_pays::ListpaysPaysStatus; use gl_client::pb::cln::listpeerchannels_channels::ListpeerchannelsChannelsState::*; +use gl_client::pb::cln::listsendpays_request::ListsendpaysIndex; use gl_client::pb::cln::{ self, Amount, GetrouteRequest, GetrouteRoute, ListchannelsRequest, - ListclosedchannelsClosedchannels, ListpaysPays, ListpeerchannelsChannels, + ListclosedchannelsClosedchannels, ListpaysPays, ListpeerchannelsChannels, ListsendpaysPayments, PreapproveinvoiceRequest, SendpayRequest, SendpayRoute, WaitsendpayRequest, }; use gl_client::pb::{OffChainPayment, PayStatus, TrampolinePayRequest}; @@ -27,6 +29,7 @@ use gl_client::signer::model::greenlight::{amount, scheduler}; use gl_client::signer::Signer; use sdk_common::prelude::*; use serde::{Deserialize, Serialize}; +use serde_json::Value; use strum_macros::{Display, EnumString}; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::{sleep, MissedTickBehavior}; @@ -47,6 +50,7 @@ use crate::lightning::util::message_signing::verify; use crate::lightning_invoice::{RawBolt11Invoice, SignedRawBolt11Invoice}; use crate::node_api::{CreateInvoiceRequest, FetchBolt11Result, NodeAPI, NodeError, NodeResult}; use crate::persist::db::SqliteStorage; +use crate::persist::send_pays::{SendPay, SendPayStatus}; use crate::{models::*, LspInformation}; use crate::{NodeConfig, PrepareRedeemOnchainFundsRequest, PrepareRedeemOnchainFundsResponse}; @@ -684,6 +688,321 @@ impl Greenlight { _ = self.inprogress_payments.fetch_sub(1, Ordering::Relaxed); res } + + // pulls transactions from greenlight based on last sync timestamp. + // greenlight gives us the payments via API and for received payments we are looking for settled invoices. + async fn pull_transactions( + &self, + sync_state: &SyncState, + htlc_list: Vec, + ) -> NodeResult<(SyncState, Vec)> { + let mut node_client = self.get_node_client().await?; + let mut new_sync_state = sync_state.clone(); + + // list invoices + let created_invoices = node_client + .list_invoices(cln::ListinvoicesRequest { + index: Some(ListinvoicesIndex::Created.into()), + start: Some(sync_state.list_invoices_index.created), + ..Default::default() + }) + .await? + .into_inner(); + if let Some(last) = created_invoices.invoices.last() { + new_sync_state.list_invoices_index.created = last.created_index() + } + let updated_invoices = node_client + .list_invoices(cln::ListinvoicesRequest { + index: Some(ListinvoicesIndex::Updated.into()), + start: Some(sync_state.list_invoices_index.updated), + ..Default::default() + }) + .await? + .into_inner(); + if let Some(last) = updated_invoices.invoices.last() { + new_sync_state.list_invoices_index.updated = last.created_index() + } + // construct the received transactions by filtering the invoices to those paid and beyond the filter timestamp + let received_transactions: NodeResult> = created_invoices + .invoices + .into_iter() + .chain(updated_invoices.invoices.into_iter()) + .filter(|i| i.status() == ListinvoicesInvoicesStatus::Paid) + .map(TryInto::try_into) + .collect(); + + // fetch payments from greenlight + let created_send_pays = node_client + .list_send_pays(cln::ListsendpaysRequest { + index: Some(ListsendpaysIndex::Created.into()), + start: Some(sync_state.send_pays_index.created), + ..Default::default() + }) + .await? + .into_inner(); + if let Some(last) = created_send_pays.payments.last() { + new_sync_state.send_pays_index.created = last.created_index() + } + let updated_send_pays = node_client + .list_send_pays(cln::ListsendpaysRequest { + index: Some(ListsendpaysIndex::Updated.into()), + start: Some(sync_state.send_pays_index.updated), + ..Default::default() + }) + .await? + .into_inner(); + if let Some(last) = updated_send_pays.payments.last() { + new_sync_state.send_pays_index.updated = last.created_index() + } + let mut hashes: Vec<_> = created_send_pays + .payments + .iter() + .chain(updated_send_pays.payments.iter()) + .map(|p| p.payment_hash.clone()) + .collect(); + hashes.dedup(); + let hash_groups: HashSet<_> = created_send_pays + .payments + .iter() + .chain(updated_send_pays.payments.iter()) + .map(|p| { + let mut key = hex::encode(&p.payment_hash); + key.push('|'); + key.push_str(&p.groupid.to_string()); + key + }) + .collect(); + trace!( + "list sendpays: created: {:?}, updated: {:?}", + created_send_pays, + updated_send_pays + ); + self.persister.insert_send_pays( + &created_send_pays + .payments + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + )?; + self.persister.insert_send_pays( + &updated_send_pays + .payments + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + )?; + + // Now all new send_pays are persisted. Retrieve the send_pays for the + // payment hashes, to ensure any send_pays belonging to the same payment + // that were not fetched in this round are also included. + let send_pays = self.persister.list_send_pays(&hashes)?; + + // Now that all send_pays belonging to all payments are here, aggregate + // the send_pays into payments. This is a copy of what core lightning's + // listpays function does under the hood. + let mut outbound_payments: HashMap = HashMap::new(); + for send_pay in send_pays { + let mut key = hex::encode(&send_pay.payment_hash); + key.push('|'); + key.push_str(&send_pay.groupid.to_string()); + if !hash_groups.contains(&key) { + continue; + } + let payment = outbound_payments.entry(key).or_insert(SendPayAgg { + state: 0, + created_at: send_pay.created_at, + payment_hash: send_pay.payment_hash, + bolt11: None, + destination: None, + label: None, + description: None, + preimage: None, + amount_sent: 0, + amount: None, + num_nonfailed_parts: 0, + }); + if payment.bolt11.is_none() { + payment.bolt11 = send_pay.bolt11; + } + if payment.destination.is_none() { + payment.destination = send_pay.destination; + } + if payment.description.is_none() { + payment.description = send_pay.description; + } + if payment.label.is_none() { + payment.label = send_pay.label; + } + if payment.preimage.is_none() { + payment.preimage = send_pay.payment_preimage; + } + if send_pay.created_at < payment.created_at { + payment.created_at = send_pay.created_at; + } + + match send_pay.status { + SendPayStatus::Pending => { + add_amount_sent(payment, send_pay.amount_sent_msat, send_pay.amount_msat); + payment.num_nonfailed_parts += 1; + payment.state |= 1; + } + SendPayStatus::Failed => { + payment.state |= 4; + } + SendPayStatus::Complete => { + add_amount_sent(payment, send_pay.amount_sent_msat, send_pay.amount_msat); + payment.num_nonfailed_parts += 1; + payment.state |= 2; + } + } + } + + let outbound_payments: Vec = outbound_payments + .into_values() + .map(TryInto::try_into) + .collect::, _>>()?; + let outbound_payments = update_payment_expirations(outbound_payments, htlc_list)?; + + let mut transactions: Vec = Vec::new(); + transactions.extend(received_transactions?); + transactions.extend(outbound_payments); + + Ok((new_sync_state, transactions)) + } +} + +fn add_amount_sent( + agg: &mut SendPayAgg, + send_pay_amount_sent_msat: Option, + send_pay_amount_msat: Option, +) { + if let Some(amount_sent_msat) = send_pay_amount_sent_msat { + agg.amount_sent += amount_sent_msat; + } + + let amount_msat = match send_pay_amount_msat { + Some(amount_msat) => amount_msat, + None => { + agg.amount = None; + return; + } + }; + + if let Some(amount) = agg.amount { + agg.amount = Some(amount + amount_msat); + } +} + +impl TryFrom for SendPay { + type Error = NodeError; + + fn try_from(value: ListsendpaysPayments) -> std::result::Result { + Ok(SendPay { + created_index: value + .created_index + .ok_or(NodeError::generic("missing created index"))?, + updated_index: value.updated_index, + groupid: value.groupid, + partid: value.partid, + payment_hash: value.payment_hash, + status: value.status.try_into()?, + amount_msat: value.amount_msat.map(|a| a.msat), + destination: value.destination, + created_at: value.created_at, + amount_sent_msat: value.amount_sent_msat.map(|a| a.msat), + label: value.label, + bolt11: value.bolt11, + description: value.description, + bolt12: value.bolt12, + payment_preimage: value.payment_preimage, + erroronion: value.erroronion, + }) + } +} + +impl TryFrom for SendPayStatus { + type Error = NodeError; + + fn try_from(value: i32) -> std::result::Result { + match value { + 0 => Ok(Self::Pending), + 1 => Ok(Self::Failed), + 2 => Ok(Self::Complete), + _ => Err(NodeError::generic("invalid send_pay status")), + } + } +} + +impl TryFrom for Payment { + type Error = NodeError; + + fn try_from(value: SendPayAgg) -> std::result::Result { + // For trampoline payments the amount_msat doesn't match the actual + // amount. If it's a trampoline payment, take the amount from the label. + let (payment_amount, client_label) = + serde_json::from_str::(&value.label.clone().unwrap_or_default()) + .ok() + .and_then(|label| { + label + .trampoline + .then_some((label.amount_msat, label.client_label)) + }) + .unwrap_or((value.amount.unwrap_or_default(), value.label.clone())); + let fee_msat = match value.amount { + Some(amount) => value.amount_sent.saturating_sub(amount), + None => 0, + }; + let status = if value.state & 2 > 0 { + PaymentStatus::Complete + } else if value.state & 1 > 0 { + PaymentStatus::Pending + } else { + PaymentStatus::Failed + }; + Ok(Self { + id: hex::encode(&value.payment_hash), + payment_type: PaymentType::Sent, + payment_time: value.created_at as i64, + amount_msat: payment_amount, + fee_msat, + status, + error: None, + description: value.description, + details: PaymentDetails::Ln { + data: LnPaymentDetails { + payment_hash: hex::encode(&value.payment_hash), + label: client_label.unwrap_or_default(), + destination_pubkey: value.destination.map(hex::encode).unwrap_or_default(), + payment_preimage: value.preimage.map(hex::encode).unwrap_or_default(), + keysend: value.bolt11.is_none(), + bolt11: value.bolt11.unwrap_or_default(), + open_channel_bolt11: None, + lnurl_success_action: None, + lnurl_pay_domain: None, + lnurl_pay_comment: None, + ln_address: None, + lnurl_metadata: None, + lnurl_withdraw_endpoint: None, + swap_info: None, + reverse_swap_info: None, + pending_expiration_block: None, + }, + }, + metadata: None, + }) + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +struct SyncIndex { + pub created: u64, + pub updated: u64, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +struct SyncState { + pub send_pays_index: SyncIndex, + pub list_invoices_index: SyncIndex, } #[tonic::async_trait] @@ -782,10 +1101,14 @@ impl NodeAPI for Greenlight { // implement pull changes from greenlight async fn pull_changed( &self, - since_timestamp: u64, + sync_state: Option, match_local_balance: bool, ) -> NodeResult { - info!("pull changed since {}", since_timestamp); + let sync_state: SyncState = match sync_state { + Some(sync_state) => serde_json::from_value(sync_state)?, + None => SyncState::default(), + }; + let node_client = self.get_node_client().await?; // get node info @@ -885,9 +1208,12 @@ impl NodeAPI for Greenlight { htlc_list.extend(channel.htlcs); } + let (new_sync_state, payments) = self.pull_transactions(&sync_state, htlc_list).await?; + Ok(SyncResponse { + sync_state: serde_json::to_value(new_sync_state)?, node_state, - payments: pull_transactions(since_timestamp, node_client.clone(), htlc_list).await?, + payments, channels: all_channel_models, }) } @@ -1754,59 +2080,18 @@ enum NodeCommand { Stop, } -// pulls transactions from greenlight based on last sync timestamp. -// greenlight gives us the payments via API and for received payments we are looking for settled invoices. -async fn pull_transactions( - since_timestamp: u64, - client: node::ClnClient, - htlc_list: Vec, -) -> NodeResult> { - let mut c = client.clone(); - - // list invoices - let invoices = c - .list_invoices(cln::ListinvoicesRequest::default()) - .await? - .into_inner(); - // construct the received transactions by filtering the invoices to those paid and beyond the filter timestamp - let received_transactions: NodeResult> = invoices - .invoices - .into_iter() - .filter(|i| { - i.paid_at.unwrap_or_default() > since_timestamp - && i.status() == ListinvoicesInvoicesStatus::Paid - }) - .map(TryInto::try_into) - .collect(); - - // fetch payments from greenlight - let payments = c - .list_pays(cln::ListpaysRequest::default()) - .await? - .into_inner(); - trace!("list payments (unfiltered): {:?}", payments); - // construct the payment transactions (pending and complete) - let outbound_transactions: NodeResult> = payments - .pays - .into_iter() - .filter(|p| { - p.created_at > since_timestamp - || match p.completed_at { - None => true, - Some(completed_at) => completed_at > since_timestamp, - } - }) - .map(TryInto::try_into) - .collect(); - - let outbound_transactions: NodeResult> = - update_payment_expirations(outbound_transactions?, htlc_list); - - let mut transactions: Vec = Vec::new(); - transactions.extend(received_transactions?); - transactions.extend(outbound_transactions?); - - Ok(transactions) +struct SendPayAgg { + state: i32, + created_at: u64, + payment_hash: Vec, + bolt11: Option, + destination: Option>, + label: Option, + description: Option, + preimage: Option>, + amount_sent: u64, + amount: Option, + num_nonfailed_parts: u64, } fn update_payment_expirations( diff --git a/libs/sdk-core/src/models.rs b/libs/sdk-core/src/models.rs index 86b866c41..4b9965e8f 100644 --- a/libs/sdk-core/src/models.rs +++ b/libs/sdk-core/src/models.rs @@ -12,6 +12,7 @@ use sdk_common::grpc; use sdk_common::prelude::Network::*; use sdk_common::prelude::*; use serde::{Deserialize, Serialize}; +use serde_json::Value; use strum_macros::{Display, EnumString}; use crate::bitcoin::blockdata::opcodes; @@ -639,6 +640,7 @@ pub struct NodeState { /// Internal response to a [crate::node_api::NodeAPI::pull_changed] call pub struct SyncResponse { + pub sync_state: Value, pub node_state: NodeState, pub payments: Vec, pub channels: Vec, diff --git a/libs/sdk-core/src/node_api.rs b/libs/sdk-core/src/node_api.rs index 171a935e1..645bf6041 100644 --- a/libs/sdk-core/src/node_api.rs +++ b/libs/sdk-core/src/node_api.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use std::pin::Pin; use anyhow::Result; +use serde_json::Value; use tokio::sync::{mpsc, watch}; use tokio_stream::Stream; use tonic::Streaming; @@ -120,7 +121,7 @@ pub trait NodeAPI: Send + Sync { async fn fetch_bolt11(&self, payment_hash: Vec) -> NodeResult>; async fn pull_changed( &self, - since_timestamp: u64, + sync_state: Option, match_local_balance: bool, ) -> NodeResult; /// As per the `pb::PayRequest` docs, `amount_msat` is only needed when the invoice doesn't specify an amount diff --git a/libs/sdk-core/src/persist/cache.rs b/libs/sdk-core/src/persist/cache.rs index 61238ca5a..cd6c3cae8 100644 --- a/libs/sdk-core/src/persist/cache.rs +++ b/libs/sdk-core/src/persist/cache.rs @@ -1,10 +1,12 @@ +use serde_json::Value; + use crate::models::NodeState; use super::{db::SqliteStorage, error::PersistResult}; const KEY_GL_CREDENTIALS: &str = "gl_credentials"; const KEY_LAST_BACKUP_TIME: &str = "last_backup_time"; -const KEY_LAST_SYNC_TIME: &str = "last_sync_time"; +const KEY_SYNC_STATE: &str = "sync_state"; const KEY_NODE_STATE: &str = "node_state"; const KEY_STATIC_BACKUP: &str = "static_backup"; const KEY_WEBHOOK_URL: &str = "webhook_url"; @@ -60,14 +62,14 @@ impl SqliteStorage { }) } - pub fn set_last_sync_time(&self, t: u64) -> PersistResult<()> { - self.update_cached_item(KEY_LAST_SYNC_TIME, t.to_string()) + pub fn set_sync_state(&self, t: &Value) -> PersistResult<()> { + self.update_cached_item(KEY_SYNC_STATE, t.to_string()) } - pub fn get_last_sync_time(&self) -> PersistResult> { - let state_str = self.get_cached_item(KEY_LAST_SYNC_TIME)?; + pub fn get_sync_state(&self) -> PersistResult> { + let state_str = self.get_cached_item(KEY_SYNC_STATE)?; Ok(match state_str { - Some(str) => str.as_str().parse::().ok(), + Some(str) => serde_json::from_str(&str)?, None => None, }) } diff --git a/libs/sdk-core/src/persist/migrations.rs b/libs/sdk-core/src/persist/migrations.rs index 73feb37dd..a5769a673 100644 --- a/libs/sdk-core/src/persist/migrations.rs +++ b/libs/sdk-core/src/persist/migrations.rs @@ -434,7 +434,27 @@ pub(crate) fn current_migrations() -> Vec<&'static str> { ", "DELETE FROM cached_items WHERE key = 'gl_credentials'", "DELETE FROM cached_items WHERE key = 'last_sync_time'", - "DELETE FROM cached_items WHERE key = 'node_state'" + "DELETE FROM cached_items WHERE key = 'node_state'", + " + CREATE TABLE IF NOT EXISTS send_pays ( + created_index INTEGER PRIMARY KEY NOT NULL, + updated_index INTEGER, + groupid INTEGER NOT NULL, + partid INTEGER, + payment_hash BLOB NOT NULL, + status INTEGER NOT NULL, + amount_msat INTEGER, + destination BLOB, + created_at INTEGER NOT NULL, + amount_sent_msat INTEGER, + label TEXT, + bolt11 TEXT, + description TEXT, + bolt12 TEXT, + payment_preimage BLOB, + erroronion BLOB + ) STRICT; + " ] } diff --git a/libs/sdk-core/src/persist/mod.rs b/libs/sdk-core/src/persist/mod.rs index ba9f4fbc5..896beb37a 100644 --- a/libs/sdk-core/src/persist/mod.rs +++ b/libs/sdk-core/src/persist/mod.rs @@ -4,6 +4,7 @@ pub(crate) mod db; pub(crate) mod error; pub(crate) mod migrations; pub(crate) mod reverseswap; +pub(crate) mod send_pays; pub(crate) mod settings; pub(crate) mod swap; pub(crate) mod sync; diff --git a/libs/sdk-core/src/persist/send_pays.rs b/libs/sdk-core/src/persist/send_pays.rs new file mode 100644 index 000000000..523805181 --- /dev/null +++ b/libs/sdk-core/src/persist/send_pays.rs @@ -0,0 +1,122 @@ +use rusqlite::named_params; +use strum_macros::FromRepr; + +use super::{db::SqliteStorage, error::PersistResult}; + +#[derive(FromRepr)] +#[repr(i32)] +pub(crate) enum SendPayStatus { + Pending = 0, + Failed = 1, + Complete = 2, +} + +pub(crate) struct SendPay { + pub created_index: u64, + pub updated_index: Option, + pub groupid: u64, + pub partid: Option, + pub payment_hash: Vec, + pub status: SendPayStatus, + pub amount_msat: Option, + pub destination: Option>, + pub created_at: u64, + pub amount_sent_msat: Option, + pub label: Option, + pub bolt11: Option, + pub description: Option, + pub bolt12: Option, + pub payment_preimage: Option>, + pub erroronion: Option>, +} + +impl SqliteStorage { + pub(crate) fn insert_send_pays(&self, send_pays: &[SendPay]) -> PersistResult<()> { + let conn = self.get_connection()?; + let mut stmt = conn.prepare( + r#"INSERT OR REPLACE INTO send_pays (created_index, updated_index, + groupid, partid, payment_hash, status, amount_msat, + destination, created_at, amount_sent_msat, label, bolt11, + description, bolt12, payment_preimage, erroronion) + VALUES (:created_index, :updated_index, + :groupid, :partid, :payment_hash, :status, :amount_msat, + :destination, :created_at, :amount_sent_msat, :label, :bolt11, + :description, :bolt12, :payment_preimage, :erroronion)"#, + )?; + for send_pay in send_pays { + let status: i32 = match send_pay.status { + SendPayStatus::Pending => 0, + SendPayStatus::Failed => 1, + SendPayStatus::Complete => 2, + }; + stmt.execute(named_params! { + ":created_index": send_pay.created_index, + ":updated_index": send_pay.updated_index, + ":groupid": send_pay.groupid, + ":partid": send_pay.partid, + ":payment_hash": send_pay.payment_hash, + ":status": status, + ":amount_msat": send_pay.amount_msat, + ":destination": send_pay.destination, + ":created_at": send_pay.created_at, + ":amount_sent_msat": send_pay.amount_sent_msat, + ":label": send_pay.label, + ":bolt11": send_pay.bolt11, + ":description": send_pay.description, + ":bolt12": send_pay.bolt12, + ":payment_preimage": send_pay.payment_preimage, + ":erroronion": send_pay.erroronion, + })?; + } + + Ok(()) + } + + pub(crate) fn list_send_pays(&self, hashes: &[Vec]) -> PersistResult> { + let conn = self.get_connection()?; + let mut stmt = conn.prepare( + r#"SELECT created_index, updated_index, groupid, partid, + payment_hash, status, amount_msat, destination, created_at, + amount_sent_msat, label, bolt11, description, bolt12, + payment_preimage, erroronion + FROM send_pays + WHERE payment_hash = :payment_hash"#, + )?; + let mut send_pays = Vec::new(); + for hash in hashes { + let rows: Vec<_> = stmt + .query_map( + named_params! { + ":payment_hash": hash + }, + |row| { + let status: i32 = row.get("status")?; + Ok(SendPay { + amount_msat: row.get("amount_msat")?, + amount_sent_msat: row.get("amount_sent_msat")?, + created_index: row.get("created_index")?, + updated_index: row.get("updated_index")?, + groupid: row.get("groupid")?, + partid: row.get("partid")?, + payment_hash: row.get("payment_hash")?, + status: SendPayStatus::from_repr(status) + .ok_or(rusqlite::Error::IntegralValueOutOfRange(5, 2))?, + destination: row.get("destination")?, + created_at: row.get("created_at")?, + label: row.get("label")?, + bolt11: row.get("bolt11")?, + description: row.get("description")?, + bolt12: row.get("bolt12")?, + payment_preimage: row.get("payment_preimage")?, + erroronion: row.get("erroronion")?, + }) + }, + )? + .collect::, _>>()?; + for row in rows { + send_pays.push(row); + } + } + Ok(send_pays) + } +} diff --git a/libs/sdk-core/src/persist/transactions.rs b/libs/sdk-core/src/persist/transactions.rs index 2d735a0da..34fc71c66 100644 --- a/libs/sdk-core/src/persist/transactions.rs +++ b/libs/sdk-core/src/persist/transactions.rs @@ -194,14 +194,6 @@ impl SqliteStorage { Ok(()) } - pub fn last_payment_timestamp(&self) -> PersistResult { - Ok(self.get_connection()?.query_row( - "SELECT max(payment_time) FROM payments where status != ?1", - params![PaymentStatus::Pending], - |row| row.get(0), - )?) - } - /// Constructs [Payment] by joining data in the `payment` and `payments_external_info` tables /// /// This queries all payments. To query a single payment, see [Self::get_payment_by_hash] @@ -876,9 +868,6 @@ fn test_ln_transactions() -> PersistResult<(), Box> { matches!( &retrieve_txs[1].details, PaymentDetails::Ln {data: LnPaymentDetails {swap_info: swap, ..}} if swap == &Some(swap_info)) ); - let max_ts = storage.last_payment_timestamp()?; - assert_eq!(max_ts, 2000); - storage.insert_or_update_payments(&txs, false)?; let retrieve_txs = storage.list_payments(ListPaymentsRequest::default())?; assert_eq!(retrieve_txs.len(), 5); diff --git a/libs/sdk-core/src/test_utils.rs b/libs/sdk-core/src/test_utils.rs index 28ed1f8cb..fa2d014dd 100644 --- a/libs/sdk-core/src/test_utils.rs +++ b/libs/sdk-core/src/test_utils.rs @@ -13,6 +13,7 @@ use rand::rngs::OsRng; use rand::{random, Rng}; use sdk_common::grpc; use sdk_common::prelude::{FiatAPI, FiatCurrency, Rate}; +use serde_json::Value; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::sleep; use tokio_stream::Stream; @@ -349,10 +350,11 @@ impl NodeAPI for MockNodeAPI { async fn pull_changed( &self, - _since_timestamp: u64, - _balance_changed: bool, + _sync_state: Option, + _match_local_balance: bool, ) -> NodeResult { Ok(SyncResponse { + sync_state: Value::Null, node_state: self.node_state.clone(), payments: self .cloud_payments