Skip to content

Commit

Permalink
ChainService: Fallback to next mempool.space endpoint on error (#898)
Browse files Browse the repository at this point in the history
* Fetch and cache mempool.space endpoints on startup

* Update ChainService to accept multiple base_urls

* Update ChainService to fallback to next URL on error

* Add config rustdoc for mempoolspace_url

* Update flutter bridge

* Update RN bindings

* Refactor to use a RedundantChainService model

* Update flutter bridge files

* Add a default mempool URL if fetching list fails

* Move fetch_mempoolspace_urls() call to start()
  • Loading branch information
ok300 authored Mar 27, 2024
1 parent a997c69 commit a7ee484
Show file tree
Hide file tree
Showing 13 changed files with 278 additions and 46 deletions.
2 changes: 1 addition & 1 deletion libs/sdk-bindings/src/breez_sdk.udl
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ interface NodeConfig {
dictionary Config {
string breezserver;
string chainnotifier_url;
string mempoolspace_url;
string? mempoolspace_url;
string working_dir;
Network network;
u32 payment_timeout_sec;
Expand Down
79 changes: 72 additions & 7 deletions libs/sdk-core/src/breez_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use tonic::transport::{Channel, Endpoint};
use tonic::{Request, Status};

use crate::backup::{BackupRequest, BackupTransport, BackupWatcher};
use crate::chain::{ChainService, MempoolSpace, Outspend, RecommendedFees};
use crate::chain::{
ChainService, Outspend, RecommendedFees, RedundantChainService, RedundantChainServiceTrait,
DEFAULT_MEMPOOL_SPACE_URL,
};
use crate::error::{
LnUrlAuthError, LnUrlPayError, LnUrlWithdrawError, ReceiveOnchainError, ReceiveOnchainResult,
ReceivePaymentError, SdkError, SdkResult, SendOnchainError, SendPaymentError,
Expand All @@ -38,7 +41,7 @@ use crate::grpc::payment_notifier_client::PaymentNotifierClient;
use crate::grpc::signer_client::SignerClient;
use crate::grpc::support_client::SupportClient;
use crate::grpc::swapper_client::SwapperClient;
use crate::grpc::PaymentInformation;
use crate::grpc::{ChainApiServersRequest, PaymentInformation};
use crate::input_parser::get_reqwest_client;
use crate::invoice::{
add_routing_hints, parse_invoice, validate_network, LNInvoice, RouteHint, RouteHintHop,
Expand Down Expand Up @@ -1376,6 +1379,8 @@ impl BreezServices {
debug!("Received the signal to exit event polling loop");
});

self.init_chainservice_urls().await?;

Ok(())
}

Expand Down Expand Up @@ -1592,6 +1597,29 @@ impl BreezServices {
});
}

async fn init_chainservice_urls(&self) -> Result<()> {
let breez_server = Arc::new(BreezServer::new(
PRODUCTION_BREEZSERVER_URL.to_string(),
None,
)?);
let persister = &self.persister;

let cloned_breez_server = breez_server.clone();
let cloned_persister = persister.clone();
tokio::spawn(async move {
match cloned_breez_server.fetch_mempoolspace_urls().await {
Ok(fresh_urls) => {
if let Err(e) = cloned_persister.set_mempoolspace_base_urls(fresh_urls) {
error!("Failed to cache mempool.space URLs: {e}");
}
}
Err(e) => error!("Failed to fetch mempool.space URLs: {e}"),
}
});

Ok(())
}

/// Configures a global SDK logger that will log to file and will forward log events to
/// an optional application-specific logger.
///
Expand Down Expand Up @@ -1994,11 +2022,6 @@ impl BreezServicesBuilder {
.unwrap_or_else(|| Arc::new(SqliteStorage::new(self.config.working_dir.clone())));
persister.init()?;

// mempool space is used to monitor the chain
let chain_service = Arc::new(MempoolSpace::from_base_url(
self.config.mempoolspace_url.clone(),
));

let mut node_api = self.node_api.clone();
let mut backup_transport = self.backup_transport.clone();
if node_api.is_none() {
Expand Down Expand Up @@ -2073,6 +2096,28 @@ impl BreezServicesBuilder {
persister: persister.clone(),
});

// mempool space is used to monitor the chain
let mempoolspace_urls = match self.config.mempoolspace_url.clone() {
None => {
let cached = persister.get_mempoolspace_base_urls()?;
match cached.len() {
// If we have no cached values, or we cached an empty list, fetch new ones
0 => {
let fresh_urls = breez_server
.fetch_mempoolspace_urls()
.await
.unwrap_or(vec![DEFAULT_MEMPOOL_SPACE_URL.into()]);
persister.set_mempoolspace_base_urls(fresh_urls.clone())?;
fresh_urls
}
// If we already have cached values, return those
_ => cached,
}
}
Some(mempoolspace_url_from_config) => vec![mempoolspace_url_from_config],
};
let chain_service = Arc::new(RedundantChainService::from_base_urls(mempoolspace_urls));

let btc_receive_swapper = Arc::new(BTCReceiveSwap::new(
self.config.network.into(),
unwrapped_node_api.clone(),
Expand Down Expand Up @@ -2207,6 +2252,26 @@ impl BreezServer {
.version;
Ok(response)
}

pub(crate) async fn fetch_mempoolspace_urls(&self) -> SdkResult<Vec<String>> {
let mut client = self.get_information_client().await?;

let chain_api_servers = client
.chain_api_servers(ChainApiServersRequest {})
.await?
.into_inner()
.servers;
trace!("Received chain_api_servers: {chain_api_servers:?}");

let mempoolspace_urls = chain_api_servers
.iter()
.filter(|s| s.server_type == "MEMPOOL_SPACE")
.map(|s| s.server_base_url.clone())
.collect();
trace!("Received mempoolspace_urls: {mempoolspace_urls:?}");

Ok(mempoolspace_urls)
}
}

pub(crate) struct ApiKeyInterceptor {
Expand Down
2 changes: 1 addition & 1 deletion libs/sdk-core/src/bridge_generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ impl support::IntoDart for Config {
vec![
self.breezserver.into_into_dart().into_dart(),
self.chainnotifier_url.into_into_dart().into_dart(),
self.mempoolspace_url.into_into_dart().into_dart(),
self.mempoolspace_url.into_dart(),
self.working_dir.into_into_dart().into_dart(),
self.network.into_into_dart().into_dart(),
self.payment_timeout_sec.into_into_dart().into_dart(),
Expand Down
154 changes: 132 additions & 22 deletions libs/sdk-core/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use serde::{Deserialize, Serialize};

use crate::bitcoin::hashes::hex::FromHex;
use crate::bitcoin::{OutPoint, Txid};
use crate::input_parser::{get_parse_and_log_response, get_reqwest_client};
use crate::input_parser::{get_parse_and_log_response, get_reqwest_client, post_and_log_response};

pub const DEFAULT_MEMPOOL_SPACE_URL: &str = "https://mempool.space/api";

#[tonic::async_trait]
pub trait ChainService: Send + Sync {
Expand All @@ -21,6 +23,89 @@ pub trait ChainService: Send + Sync {
async fn broadcast_transaction(&self, tx: Vec<u8>) -> Result<String>;
}

pub trait RedundantChainServiceTrait: ChainService {
fn from_base_urls(base_urls: Vec<String>) -> Self;
}

#[derive(Clone)]
pub struct RedundantChainService {
instances: Vec<MempoolSpace>,
}
impl RedundantChainServiceTrait for RedundantChainService {
fn from_base_urls(base_urls: Vec<String>) -> Self {
Self {
instances: base_urls
.iter()
.map(|url: &String| url.trim_end_matches('/'))
.map(MempoolSpace::from_base_url)
.collect(),
}
}
}

#[tonic::async_trait]
impl ChainService for RedundantChainService {
async fn recommended_fees(&self) -> Result<RecommendedFees> {
for inst in &self.instances {
match inst.recommended_fees().await {
Ok(res) => {
return Ok(res);
}
Err(e) => error!("Call to chain service {} failed: {e}", inst.base_url),
}
}
Err(anyhow!("All chain service instances failed"))
}

async fn address_transactions(&self, address: String) -> Result<Vec<OnchainTx>> {
for inst in &self.instances {
match inst.address_transactions(address.clone()).await {
Ok(res) => {
return Ok(res);
}
Err(e) => error!("Call to chain service {} failed: {e}", inst.base_url),
}
}
Err(anyhow!("All chain service instances failed"))
}

async fn current_tip(&self) -> Result<u32> {
for inst in &self.instances {
match inst.current_tip().await {
Ok(res) => {
return Ok(res);
}
Err(e) => error!("Call to chain service {} failed: {e}", inst.base_url),
}
}
Err(anyhow!("All chain service instances failed"))
}

async fn transaction_outspends(&self, txid: String) -> Result<Vec<Outspend>> {
for inst in &self.instances {
match inst.transaction_outspends(txid.clone()).await {
Ok(res) => {
return Ok(res);
}
Err(e) => error!("Call to chain service {} failed: {e}", inst.base_url),
}
}
Err(anyhow!("All chain service instances failed"))
}

async fn broadcast_transaction(&self, tx: Vec<u8>) -> Result<String> {
for inst in &self.instances {
match inst.broadcast_transaction(tx.clone()).await {
Ok(res) => {
return Ok(res);
}
Err(e) => error!("Call to chain service {} failed: {e}", inst.base_url),
}
}
Err(anyhow!("All chain service instances failed"))
}
}

#[derive(Clone)]
pub struct Utxo {
pub out: OutPoint,
Expand Down Expand Up @@ -225,45 +310,41 @@ pub struct Outspend {
impl Default for MempoolSpace {
fn default() -> Self {
MempoolSpace {
base_url: "https://mempool.space".to_string(),
base_url: DEFAULT_MEMPOOL_SPACE_URL.into(),
}
}
}

impl MempoolSpace {
pub fn from_base_url(base_url: String) -> MempoolSpace {
MempoolSpace { base_url }
pub fn from_base_url(base_url: &str) -> MempoolSpace {
MempoolSpace {
base_url: base_url.into(),
}
}
}

#[tonic::async_trait]
impl ChainService for MempoolSpace {
async fn recommended_fees(&self) -> Result<RecommendedFees> {
get_parse_and_log_response(&format!("{}/api/v1/fees/recommended", self.base_url)).await
get_parse_and_log_response(&format!("{}/v1/fees/recommended", self.base_url)).await
}

async fn address_transactions(&self, address: String) -> Result<Vec<OnchainTx>> {
get_parse_and_log_response(&format!("{}/api/address/{address}/txs", self.base_url)).await
get_parse_and_log_response(&format!("{}/address/{address}/txs", self.base_url)).await
}

async fn current_tip(&self) -> Result<u32> {
get_parse_and_log_response(&format!("{}/api/blocks/tip/height", self.base_url)).await
get_parse_and_log_response(&format!("{}/blocks/tip/height", self.base_url)).await
}

async fn transaction_outspends(&self, txid: String) -> Result<Vec<Outspend>> {
let url = format!("{}/api/tx/{txid}/outspends", self.base_url);
let url = format!("{}/tx/{txid}/outspends", self.base_url);
Ok(get_reqwest_client()?.get(url).send().await?.json().await?)
}

async fn broadcast_transaction(&self, tx: Vec<u8>) -> Result<String> {
let client = get_reqwest_client()?;
let txid_or_error = client
.post(format!("{}/api/tx", self.base_url))
.body(hex::encode(tx))
.send()
.await?
.text()
.await?;
let txid_or_error =
post_and_log_response(&format!("{}/tx", self.base_url), Some(hex::encode(tx))).await?;
match txid_or_error.contains("error") {
true => Err(anyhow!("Error fetching tx: {txid_or_error}")),
false => Ok(txid_or_error),
Expand All @@ -272,17 +353,17 @@ impl ChainService for MempoolSpace {
}
#[cfg(test)]
mod tests {
use crate::chain::{MempoolSpace, OnchainTx};
use crate::chain::{
MempoolSpace, OnchainTx, RedundantChainService, RedundantChainServiceTrait,
};
use anyhow::Result;
use tokio::test;

use super::ChainService;

#[test]
async fn test_recommended_fees() -> Result<()> {
let ms = Box::new(MempoolSpace::from_base_url(
"https://mempool.space".to_string(),
));
let ms = MempoolSpace::default();
let fees = ms.recommended_fees().await?;
assert!(fees.economy_fee > 0);
assert!(fees.fastest_fee > 0);
Expand All @@ -293,9 +374,38 @@ mod tests {
Ok(())
}

#[test]
async fn test_recommended_fees_with_fallback() -> Result<()> {
let ms = RedundantChainService::from_base_urls(vec![
"https://mempool-url-unreachable.space/api/".into(),
]);
assert!(ms.recommended_fees().await.is_err());

let ms = RedundantChainService::from_base_urls(vec![
"https://mempool-url-unreachable.space/api/".into(),
"https://mempool.emzy.de/api/".into(),
]);
assert!(ms.recommended_fees().await.is_ok());

let ms = RedundantChainService::from_base_urls(vec![
"https://mempool-url-unreachable.space/api/".into(),
"https://another-mempool-url-unreachable.space/api/".into(),
]);
assert!(ms.recommended_fees().await.is_err());

let ms = RedundantChainService::from_base_urls(vec![
"https://mempool-url-unreachable.space/api/".into(),
"https://another-mempool-url-unreachable.space/api/".into(),
"https://mempool.emzy.de/api/".into(),
]);
assert!(ms.recommended_fees().await.is_ok());

Ok(())
}

#[test]
async fn test_address_transactions() -> Result<()> {
let ms = MempoolSpace::from_base_url("https://mempool.space".to_string());
let ms = MempoolSpace::default();
let txs = ms
.address_transactions("bc1qvhykeqcpdzu0pdvy99xnh9ckhwzcfskct6h6l2".to_string())
.await?;
Expand All @@ -312,7 +422,7 @@ mod tests {

// #[test]
// async fn test_address_transactions_mempool() {
// let ms = MempoolSpace::from_base_url("https://mempool.space".to_string());
// let ms = MempoolSpace::default();
// let txs = ms
// .address_transactions("1N4f3y3LYJZ2Qd9FyPt3AcHp451qt12paR".to_string())
// .await
Expand Down
10 changes: 10 additions & 0 deletions libs/sdk-core/src/grpc/proto/breez.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ service Information {
rpc BreezAppVersions(BreezAppVersionsRequest)
returns (BreezAppVersionsReply) {}
rpc ReceiverInfo(ReceiverInfoRequest) returns (ReceiverInfoReply) {}
rpc ChainApiServers(ChainApiServersRequest) returns (ChainApiServersReply) {}
}

service ChannelOpener {
Expand Down Expand Up @@ -429,6 +430,15 @@ message BreezStatusReply {
BreezStatus status = 1;
}

message ChainApiServersRequest {}
message ChainApiServersReply {
message ChainAPIServer {
string server_type = 1;
string server_base_url = 2;
}
repeated ChainAPIServer servers = 1;
}

/////////////////////////////////////////////
// From lspd.proto
/////////////////////////////////////////////
Expand Down
Loading

0 comments on commit a7ee484

Please sign in to comment.