diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index a2da3e6cb4e..f13094d51fc 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -1,10 +1,12 @@ use anyhow::{anyhow, Error}; use anyhow::{ensure, Context}; use graph::blockchain::{BlockPtr, TriggerWithHandler}; +use graph::components::link_resolver::LinkResolverContext; use graph::components::metrics::subgraph::SubgraphInstanceMetrics; use graph::components::store::{EthereumCallCache, StoredDynamicDataSource}; use graph::components::subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError}; use graph::components::trigger_processor::RunnableTriggers; +use graph::data::subgraph::DeploymentHash; use graph::data_source::common::{ CallDecls, DeclaredCall, FindMappingABI, MappingABI, UnresolvedMappingABI, }; @@ -1197,6 +1199,7 @@ pub struct UnresolvedDataSource { impl blockchain::UnresolvedDataSource for UnresolvedDataSource { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -1210,7 +1213,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { context, } = self; - let mapping = mapping.resolve(resolver, logger).await.with_context(|| { + let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| { format!( "failed to resolve data source {} with source_address {:?} and source_start_block {}", name, source.address, source.start_block @@ -1244,6 +1247,7 @@ pub struct DataSourceTemplate { impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTemplate { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -1257,7 +1261,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem } = self; let mapping = mapping - .resolve(resolver, logger) + .resolve(deployment_hash, resolver, logger) .await .with_context(|| format!("failed to resolve data source template {}", name))?; @@ -1355,6 +1359,7 @@ impl FindMappingABI for Mapping { impl UnresolvedMapping { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result { @@ -1377,13 +1382,17 @@ impl UnresolvedMapping { abis.into_iter() .map(|unresolved_abi| async { Result::<_, Error>::Ok(Arc::new( - unresolved_abi.resolve(resolver, logger).await?, + unresolved_abi + .resolve(deployment_hash, resolver, logger) + .await?, )) }) .collect::>() .try_collect::>(), async { - let module_bytes = resolver.cat(logger, &link).await?; + let module_bytes = resolver + .cat(LinkResolverContext::new(deployment_hash, logger), &link) + .await?; Ok(Arc::new(module_bytes)) }, ) diff --git a/chain/near/src/data_source.rs b/chain/near/src/data_source.rs index ea54c31d157..a00cea2ea83 100644 --- a/chain/near/src/data_source.rs +++ b/chain/near/src/data_source.rs @@ -1,8 +1,9 @@ use graph::anyhow::Context; use graph::blockchain::{Block, TriggerWithHandler}; +use graph::components::link_resolver::LinkResolverContext; use graph::components::store::StoredDynamicDataSource; use graph::components::subgraph::InstanceDSTemplateInfo; -use graph::data::subgraph::DataSourceContext; +use graph::data::subgraph::{DataSourceContext, DeploymentHash}; use graph::prelude::SubgraphManifestValidationError; use graph::{ anyhow::{anyhow, Error}, @@ -330,6 +331,7 @@ pub struct UnresolvedDataSource { impl blockchain::UnresolvedDataSource for UnresolvedDataSource { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, _manifest_idx: u32, @@ -343,7 +345,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { context, } = self; - let mapping = mapping.resolve(resolver, logger).await.with_context(|| { + let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| { format!( "failed to resolve data source {} with source_account {:?} and source_start_block {}", name, source.account, source.start_block @@ -369,6 +371,7 @@ pub type DataSourceTemplate = BaseDataSourceTemplate; impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTemplate { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, _manifest_idx: u32, @@ -381,7 +384,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem } = self; let mapping = mapping - .resolve(resolver, logger) + .resolve(deployment_hash, resolver, logger) .await .with_context(|| format!("failed to resolve data source template {}", name))?; @@ -432,6 +435,7 @@ pub struct UnresolvedMapping { impl UnresolvedMapping { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result { @@ -447,7 +451,7 @@ impl UnresolvedMapping { let api_version = semver::Version::parse(&api_version)?; let module_bytes = resolver - .cat(logger, &link) + .cat(LinkResolverContext::new(deployment_hash, logger), &link) .await .with_context(|| format!("failed to resolve mapping {}", link.link))?; diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index dff2cfa31c4..da284ecf569 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -4,7 +4,11 @@ use anyhow::{anyhow, Context, Error}; use graph::{ blockchain, cheap_clone::CheapClone, - components::{link_resolver::LinkResolver, subgraph::InstanceDSTemplateInfo}, + components::{ + link_resolver::{LinkResolver, LinkResolverContext}, + subgraph::InstanceDSTemplateInfo, + }, + data::subgraph::DeploymentHash, prelude::{async_trait, BlockNumber, Link}, slog::Logger, }; @@ -184,11 +188,17 @@ pub struct UnresolvedMapping { impl blockchain::UnresolvedDataSource for UnresolvedDataSource { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, _manifest_idx: u32, ) -> Result { - let content = resolver.cat(logger, &self.source.package.file).await?; + let content = resolver + .cat( + LinkResolverContext::new(deployment_hash, logger), + &self.source.package.file, + ) + .await?; let mut package = graph::substreams::Package::decode(content.as_ref())?; @@ -234,7 +244,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { let handler = match (self.mapping.handler, self.mapping.file) { (Some(handler), Some(file)) => { let module_bytes = resolver - .cat(logger, &file) + .cat(LinkResolverContext::new(deployment_hash, logger), &file) .await .with_context(|| format!("failed to resolve mapping {}", file.link))?; @@ -314,6 +324,7 @@ impl blockchain::DataSourceTemplate for NoopDataSourceTemplate { impl blockchain::UnresolvedDataSourceTemplate for NoopDataSourceTemplate { async fn resolve( self, + _deployment_hash: &DeploymentHash, _resolver: &Arc, _logger: &Logger, _manifest_idx: u32, @@ -329,7 +340,7 @@ mod test { use anyhow::Error; use graph::{ blockchain::{DataSource as _, UnresolvedDataSource as _}, - components::link_resolver::LinkResolver, + components::link_resolver::{LinkResolver, LinkResolverContext}, data::subgraph::LATEST_VERSION, prelude::{async_trait, serde_yaml, JsonValueStream, Link}, slog::{o, Discard, Logger}, @@ -433,7 +444,10 @@ mod test { let ds: UnresolvedDataSource = serde_yaml::from_str(TEMPLATE_DATA_SOURCE).unwrap(); let link_resolver: Arc = Arc::new(NoopLinkResolver {}); let logger = Logger::root(Discard, o!()); - let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap(); + let ds: DataSource = ds + .resolve(&Default::default(), &link_resolver, &logger, 0) + .await + .unwrap(); let expected = DataSource { kind: SUBSTREAMS_KIND.into(), network: Some("mainnet".into()), @@ -470,7 +484,10 @@ mod test { serde_yaml::from_str(TEMPLATE_DATA_SOURCE_WITH_PARAMS).unwrap(); let link_resolver: Arc = Arc::new(NoopLinkResolver {}); let logger = Logger::root(Discard, o!()); - let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap(); + let ds: DataSource = ds + .resolve(&Default::default(), &link_resolver, &logger, 0) + .await + .unwrap(); let expected = DataSource { kind: SUBSTREAMS_KIND.into(), network: Some("mainnet".into()), @@ -705,17 +722,21 @@ mod test { unimplemented!() } - async fn cat(&self, _logger: &Logger, _link: &Link) -> Result, Error> { + async fn cat(&self, _ctx: LinkResolverContext, _link: &Link) -> Result, Error> { Ok(gen_package().encode_to_vec()) } - async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, Error> { + async fn get_block( + &self, + _ctx: LinkResolverContext, + _link: &Link, + ) -> Result, Error> { unimplemented!() } async fn json_stream( &self, - _logger: &Logger, + _ctx: LinkResolverContext, _link: &Link, ) -> Result { unimplemented!() diff --git a/core/src/polling_monitor/ipfs_service.rs b/core/src/polling_monitor/ipfs_service.rs index 86a5feef0ab..76730b02215 100644 --- a/core/src/polling_monitor/ipfs_service.rs +++ b/core/src/polling_monitor/ipfs_service.rs @@ -5,13 +5,17 @@ use anyhow::anyhow; use anyhow::Error; use bytes::Bytes; use graph::futures03::future::BoxFuture; -use graph::ipfs::ContentPath; -use graph::ipfs::IpfsClient; -use graph::ipfs::RetryPolicy; +use graph::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy}; use graph::{derive::CheapClone, prelude::CheapClone}; use tower::{buffer::Buffer, ServiceBuilder, ServiceExt}; -pub type IpfsService = Buffer, Error>>>; +pub type IpfsService = Buffer, Error>>>; + +#[derive(Clone, Debug)] +pub struct IpfsRequest { + pub ctx: IpfsContext, + pub path: ContentPath, +} pub fn ipfs_service( client: Arc, @@ -43,7 +47,10 @@ struct IpfsServiceInner { } impl IpfsServiceInner { - async fn call_inner(self, path: ContentPath) -> Result, Error> { + async fn call_inner( + self, + IpfsRequest { ctx, path }: IpfsRequest, + ) -> Result, Error> { let multihash = path.cid().hash().code(); if !SAFE_MULTIHASHES.contains(&multihash) { return Err(anyhow!("CID multihash {} is not allowed", multihash)); @@ -52,6 +59,7 @@ impl IpfsServiceInner { let res = self .client .cat( + ctx, &path, self.max_file_size, Some(self.timeout), @@ -126,14 +134,24 @@ mod test { let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash; - let client = - IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard()) - .unwrap(); + let client = IpfsRpcClient::new_unchecked( + ServerAddress::local_rpc_api(), + Default::default(), + &graph::log::discard(), + ) + .unwrap(); let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10); let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap(); - let content = svc.oneshot(path).await.unwrap().unwrap(); + let content = svc + .oneshot(IpfsRequest { + ctx: Default::default(), + path, + }) + .await + .unwrap() + .unwrap(); assert_eq!(content.to_vec(), random_bytes); } @@ -157,7 +175,8 @@ mod test { const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; let server = MockServer::start().await; - let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + let ipfs_client = + IpfsRpcClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap(); let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1); let path = ContentPath::new(CID).unwrap(); @@ -179,6 +198,12 @@ mod test { .await; // This means that we never reached the successful response. - ipfs_service.oneshot(path).await.unwrap_err(); + ipfs_service + .oneshot(IpfsRequest { + ctx: Default::default(), + path, + }) + .await + .unwrap_err(); } } diff --git a/core/src/polling_monitor/mod.rs b/core/src/polling_monitor/mod.rs index ffa36f63b09..7bf4726e7c3 100644 --- a/core/src/polling_monitor/mod.rs +++ b/core/src/polling_monitor/mod.rs @@ -1,6 +1,7 @@ mod arweave_service; mod ipfs_service; mod metrics; +mod request; use std::collections::HashMap; use std::fmt::Display; @@ -24,9 +25,11 @@ use tower::retry::backoff::{Backoff, ExponentialBackoff, ExponentialBackoffMaker use tower::util::rng::HasherRng; use tower::{Service, ServiceExt}; +use self::request::RequestId; + pub use self::metrics::PollingMonitorMetrics; pub use arweave_service::{arweave_service, ArweaveService}; -pub use ipfs_service::{ipfs_service, IpfsService}; +pub use ipfs_service::{ipfs_service, IpfsRequest, IpfsService}; const MIN_BACKOFF: Duration = Duration::from_secs(5); @@ -97,15 +100,15 @@ impl Queue { /// /// The service returns the request ID along with errors or responses. The response is an /// `Option`, to represent the object not being found. -pub fn spawn_monitor( +pub fn spawn_monitor( service: S, - response_sender: mpsc::UnboundedSender<(ID, Res)>, + response_sender: mpsc::UnboundedSender<(Req, Res)>, logger: Logger, metrics: Arc, -) -> PollingMonitor +) -> PollingMonitor where - S: Service, Error = E> + Send + 'static, - ID: Display + Clone + Default + Eq + Send + Sync + Hash + 'static, + S: Service, Error = E> + Send + 'static, + Req: RequestId + Clone + Send + Sync + 'static, E: Display + Send + 'static, S::Future: Send, { @@ -125,9 +128,9 @@ where break None; } - let id = queue.pop_front(); - match id { - Some(id) => break Some((id, ())), + let req = queue.pop_front(); + match req { + Some(req) => break Some((req, ())), // Nothing on the queue, wait for a queue wake up or cancellation. None => { @@ -154,36 +157,39 @@ where // the `CallAll` from being polled. This can cause starvation as those requests may // be holding on to resources such as slots for concurrent calls. match response { - Ok((id, Some(response))) => { - backoffs.remove(&id); - let send_result = response_sender.send((id, response)); + Ok((req, Some(response))) => { + backoffs.remove(req.request_id()); + let send_result = response_sender.send((req, response)); if send_result.is_err() { // The receiver has been dropped, cancel this task. break; } } - // Object not found, push the id to the back of the queue. - Ok((id, None)) => { - debug!(logger, "not found on polling"; "object_id" => id.to_string()); - + // Object not found, push the request to the back of the queue. + Ok((req, None)) => { + debug!(logger, "not found on polling"; "object_id" => req.request_id().to_string()); metrics.not_found.inc(); // We'll try again after a backoff. - backoff(id, &queue, &mut backoffs); + backoff(req, &queue, &mut backoffs); } - // Error polling, log it and push the id to the back of the queue. - Err((id, e)) => { - debug!(logger, "error polling"; - "error" => format!("{:#}", e), - "object_id" => id.to_string()); + // Error polling, log it and push the request to the back of the queue. + Err((Some(req), e)) => { + debug!(logger, "error polling"; "error" => format!("{:#}", e), "object_id" => req.request_id().to_string()); metrics.errors.inc(); // Requests that return errors could mean there is a permanent issue with // fetching the given item, or could signal the endpoint is overloaded. // Either way a backoff makes sense. - backoff(id, &queue, &mut backoffs); + backoff(req, &queue, &mut backoffs); + } + + // poll_ready call failure + Err((None, e)) => { + debug!(logger, "error polling"; "error" => format!("{:#}", e)); + metrics.errors.inc(); } } } @@ -193,28 +199,28 @@ where PollingMonitor { queue } } -fn backoff(id: ID, queue: &Arc>, backoffs: &mut Backoffs) +fn backoff(req: Req, queue: &Arc>, backoffs: &mut Backoffs) where - ID: Eq + Hash + Clone + Send + 'static, + Req: RequestId + Send + Sync + 'static, { let queue = queue.cheap_clone(); - let backoff = backoffs.next_backoff(id.clone()); + let backoff = backoffs.next_backoff(req.request_id().clone()); graph::spawn(async move { backoff.await; - queue.push_back(id); + queue.push_back(req); }); } /// Handle for adding objects to be monitored. -pub struct PollingMonitor { - queue: Arc>, +pub struct PollingMonitor { + queue: Arc>, } -impl PollingMonitor { - /// Add an object id to the polling queue. New requests have priority and are pushed to the +impl PollingMonitor { + /// Add a request to the polling queue. New requests have priority and are pushed to the /// front of the queue. - pub fn monitor(&self, id: ID) { - self.queue.push_front(id); + pub fn monitor(&self, req: Req) { + self.queue.push_front(req); } } @@ -225,17 +231,16 @@ struct ReturnRequest { impl Service for ReturnRequest where S: Service, - Req: Clone + Default + Send + Sync + 'static, + Req: Clone + Send + Sync + 'static, S::Error: Send, S::Future: Send + 'static, { type Response = (Req, S::Response); - type Error = (Req, S::Error); + type Error = (Option, S::Error); type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - // `Req::default` is a value that won't be used since if `poll_ready` errors, the service is shot anyways. - self.service.poll_ready(cx).map_err(|e| (Req::default(), e)) + self.service.poll_ready(cx).map_err(|e| (None, e)) } fn call(&mut self, req: Req) -> Self::Future { @@ -243,7 +248,7 @@ where self.service .call(req.clone()) .map_ok(move |x| (req, x)) - .map_err(move |e| (req1, e)) + .map_err(move |e| (Some(req1), e)) .boxed() } } diff --git a/core/src/polling_monitor/request.rs b/core/src/polling_monitor/request.rs new file mode 100644 index 00000000000..42375fb38fb --- /dev/null +++ b/core/src/polling_monitor/request.rs @@ -0,0 +1,39 @@ +use std::fmt::Display; +use std::hash::Hash; + +use graph::{data_source::offchain::Base64, ipfs::ContentPath}; + +use crate::polling_monitor::ipfs_service::IpfsRequest; + +/// Request ID is used to create backoffs on request failures. +pub trait RequestId { + type Id: Clone + Display + Eq + Hash + Send + Sync + 'static; + + /// Returns the ID of the request. + fn request_id(&self) -> &Self::Id; +} + +impl RequestId for IpfsRequest { + type Id = ContentPath; + + fn request_id(&self) -> &ContentPath { + &self.path + } +} + +impl RequestId for Base64 { + type Id = Base64; + + fn request_id(&self) -> &Base64 { + self + } +} + +#[cfg(debug_assertions)] +impl RequestId for &'static str { + type Id = &'static str; + + fn request_id(&self) -> &Self::Id { + self + } +} diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index 3f35d570a7d..78a3c1d83c3 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -1,7 +1,7 @@ mod instance; use crate::polling_monitor::{ - spawn_monitor, ArweaveService, IpfsService, PollingMonitor, PollingMonitorMetrics, + spawn_monitor, ArweaveService, IpfsRequest, IpfsService, PollingMonitor, PollingMonitorMetrics, }; use anyhow::{self, Error}; use bytes::Bytes; @@ -18,7 +18,7 @@ use graph::{ CausalityRegion, DataSource, DataSourceTemplate, }, derive::CheapClone, - ipfs::ContentPath, + ipfs::IpfsContext, prelude::{ BlockNumber, BlockPtr, BlockState, CancelGuard, CheapClone, DeploymentHash, MetricsRegistry, RuntimeHostBuilder, SubgraphCountMetric, SubgraphInstanceMetrics, @@ -31,7 +31,6 @@ use std::sync::{Arc, RwLock}; use std::{collections::HashMap, time::Instant}; use self::instance::SubgraphInstance; - use super::Decoder; #[derive(Clone, CheapClone, Debug)] @@ -224,10 +223,12 @@ impl> IndexingContext { } pub struct OffchainMonitor { - ipfs_monitor: PollingMonitor, - ipfs_monitor_rx: mpsc::UnboundedReceiver<(ContentPath, Bytes)>, + ipfs_monitor: PollingMonitor, + ipfs_monitor_rx: mpsc::UnboundedReceiver<(IpfsRequest, Bytes)>, arweave_monitor: PollingMonitor, arweave_monitor_rx: mpsc::UnboundedReceiver<(Base64, Bytes)>, + deployment_hash: DeploymentHash, + logger: Logger, } impl OffchainMonitor { @@ -251,18 +252,29 @@ impl OffchainMonitor { metrics.cheap_clone(), ); - let arweave_monitor = spawn_monitor(arweave_service, arweave_monitor_tx, logger, metrics); + let arweave_monitor = spawn_monitor( + arweave_service, + arweave_monitor_tx, + logger.cheap_clone(), + metrics, + ); + Self { ipfs_monitor, ipfs_monitor_rx, arweave_monitor, arweave_monitor_rx, + deployment_hash: subgraph_hash.to_owned(), + logger, } } fn add_source(&mut self, source: offchain::Source) -> Result<(), Error> { match source { - offchain::Source::Ipfs(cid_file) => self.ipfs_monitor.monitor(cid_file), + offchain::Source::Ipfs(path) => self.ipfs_monitor.monitor(IpfsRequest { + ctx: IpfsContext::new(&self.deployment_hash, &self.logger), + path, + }), offchain::Source::Arweave(base64) => self.arweave_monitor.monitor(base64), }; Ok(()) @@ -274,8 +286,8 @@ impl OffchainMonitor { let mut triggers = vec![]; loop { match self.ipfs_monitor_rx.try_recv() { - Ok((cid_file, data)) => triggers.push(offchain::TriggerData { - source: offchain::Source::Ipfs(cid_file), + Ok((req, data)) => triggers.push(offchain::TriggerData { + source: offchain::Source::Ipfs(req.path), data: Arc::new(data), }), Err(TryRecvError::Disconnected) => { diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 9ca3430a5fb..1618a427c22 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -11,6 +11,7 @@ use std::collections::BTreeSet; use crate::subgraph::runner::SubgraphRunner; use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper}; use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; +use graph::components::link_resolver::LinkResolverContext; use graph::components::metrics::gas::GasMetrics; use graph::components::metrics::subgraph::DeploymentStatusMetric; use graph::components::store::SourceableStore; @@ -282,7 +283,10 @@ impl SubgraphInstanceManager { if self.subgraph_store.is_deployed(&graft.base)? { let file_bytes = self .link_resolver - .cat(&logger, &graft.base.to_ipfs_link()) + .cat( + LinkResolverContext::new(&deployment.hash, &logger), + &graft.base.to_ipfs_link(), + ) .await?; let yaml = String::from_utf8(file_bytes)?; diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 00d379db01f..54013ca1ece 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -4,6 +4,7 @@ use std::sync::Mutex; use async_trait::async_trait; use graph::{ + components::link_resolver::LinkResolverContext, components::store::{DeploymentId, DeploymentLocator}, prelude::{SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, *}, }; @@ -88,7 +89,10 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss let file_bytes = self .link_resolver - .cat(&logger, &loc.hash.to_ipfs_link()) + .cat( + LinkResolverContext::new(&loc.hash, &logger), + &loc.hash.to_ipfs_link(), + ) .await .map_err(SubgraphAssignmentProviderError::ResolveError)?; diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 10f46c4891f..22ea48a09dd 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use graph::blockchain::Blockchain; use graph::blockchain::BlockchainKind; use graph::blockchain::BlockchainMap; +use graph::components::link_resolver::LinkResolverContext; use graph::components::store::{DeploymentId, DeploymentLocator, SubscriptionManager}; use graph::components::subgraph::Settings; use graph::data::subgraph::schema::DeploymentCreate; @@ -289,7 +290,10 @@ where let raw: serde_yaml::Mapping = { let file_bytes = self .resolver - .cat(&logger, &hash.to_ipfs_link()) + .cat( + LinkResolverContext::new(&hash, &logger), + &hash.to_ipfs_link(), + ) .await .map_err(|e| { SubgraphRegistrarError::ResolveError( diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 01487c42113..50c9f90a9e6 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -9,7 +9,7 @@ use crate::{ }, subgraph::InstanceDSTemplateInfo, }, - data::subgraph::UnifiedMappingApiVersion, + data::subgraph::{DeploymentHash, UnifiedMappingApiVersion}, data_source, prelude::{ transaction_receipt::LightTransactionReceipt, BlockHash, ChainStore, @@ -190,6 +190,7 @@ pub struct MockUnresolvedDataSource; impl UnresolvedDataSource for MockUnresolvedDataSource { async fn resolve( self, + _deployment_hash: &DeploymentHash, _resolver: &Arc, _logger: &slog::Logger, _manifest_idx: u32, @@ -240,6 +241,7 @@ pub struct MockUnresolvedDataSourceTemplate; impl UnresolvedDataSourceTemplate for MockUnresolvedDataSourceTemplate { async fn resolve( self, + _deployment_hash: &DeploymentHash, _resolver: &Arc, _logger: &slog::Logger, _manifest_idx: u32, diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 00a9ac33e4e..9910ee084b5 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -375,6 +375,7 @@ pub trait UnresolvedDataSourceTemplate: { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -404,6 +405,7 @@ pub trait UnresolvedDataSource: { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 9f7ded84a67..64172b7afe6 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -17,10 +17,10 @@ use crate::futures01::stream::Stream; use crate::futures01::try_ready; use crate::futures01::Async; use crate::futures01::Poll; -use crate::ipfs::ContentPath; -use crate::ipfs::IpfsClient; -use crate::ipfs::RetryPolicy; -use crate::prelude::{LinkResolver as LinkResolverTrait, *}; +use crate::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy}; +use crate::prelude::*; + +use super::{LinkResolver, LinkResolverContext}; #[derive(Clone, CheapClone, Derivative)] #[derivative(Debug)] @@ -51,20 +51,25 @@ impl IpfsResolver { } #[async_trait] -impl LinkResolverTrait for IpfsResolver { - fn with_timeout(&self, timeout: Duration) -> Box { +impl LinkResolver for IpfsResolver { + fn with_timeout(&self, timeout: Duration) -> Box { let mut s = self.cheap_clone(); s.timeout = timeout; Box::new(s) } - fn with_retries(&self) -> Box { + fn with_retries(&self) -> Box { let mut s = self.cheap_clone(); s.retry = true; Box::new(s) } - async fn cat(&self, _logger: &Logger, link: &Link) -> Result, Error> { + async fn cat(&self, ctx: LinkResolverContext, link: &Link) -> Result, Error> { + let LinkResolverContext { + deployment_hash, + logger, + } = ctx; + let path = ContentPath::new(&link.link)?; let timeout = self.timeout; let max_file_size = self.max_file_size; @@ -75,17 +80,26 @@ impl LinkResolverTrait for IpfsResolver { (Some(timeout), RetryPolicy::Networking) }; + let ctx = IpfsContext { + deployment_hash, + logger, + }; let data = self .client .clone() - .cat(&path, max_file_size, timeout, retry_policy) + .cat(ctx, &path, max_file_size, timeout, retry_policy) .await? .to_vec(); Ok(data) } - async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error> { + async fn get_block(&self, ctx: LinkResolverContext, link: &Link) -> Result, Error> { + let LinkResolverContext { + deployment_hash, + logger, + } = ctx; + let path = ContentPath::new(&link.link)?; let timeout = self.timeout; @@ -97,17 +111,30 @@ impl LinkResolverTrait for IpfsResolver { (Some(timeout), RetryPolicy::Networking) }; + let ctx = IpfsContext { + deployment_hash, + logger, + }; let data = self .client .clone() - .get_block(&path, timeout, retry_policy) + .get_block(ctx, &path, timeout, retry_policy) .await? .to_vec(); Ok(data) } - async fn json_stream(&self, logger: &Logger, link: &Link) -> Result { + async fn json_stream( + &self, + ctx: LinkResolverContext, + link: &Link, + ) -> Result { + let LinkResolverContext { + deployment_hash, + logger, + } = ctx; + let path = ContentPath::new(&link.link)?; let max_map_file_size = self.max_map_file_size; let timeout = self.timeout; @@ -120,10 +147,14 @@ impl LinkResolverTrait for IpfsResolver { (Some(timeout), RetryPolicy::Networking) }; + let ctx = IpfsContext { + deployment_hash, + logger, + }; let mut stream = self .client .clone() - .cat_stream(&path, timeout, retry_policy) + .cat_stream(ctx, &path, timeout, retry_policy) .await? .fuse() .boxed() @@ -227,10 +258,15 @@ mod tests { let logger = crate::log::discard(); - let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger).unwrap(); + let client = IpfsRpcClient::new_unchecked( + ServerAddress::local_rpc_api(), + Default::default(), + &logger, + ) + .unwrap(); let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars)); - let err = IpfsResolver::cat(&resolver, &logger, &Link { link: cid.clone() }) + let err = IpfsResolver::cat(&resolver, Default::default(), &Link { link: cid.clone() }) .await .unwrap_err(); @@ -246,10 +282,15 @@ mod tests { .to_owned(); let logger = crate::log::discard(); - let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?; + let client = IpfsRpcClient::new_unchecked( + ServerAddress::local_rpc_api(), + Default::default(), + &logger, + )?; let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars)); - let stream = IpfsResolver::json_stream(&resolver, &logger, &Link { link: cid }).await?; + let stream = + IpfsResolver::json_stream(&resolver, Default::default(), &Link { link: cid }).await?; stream.map_ok(|sv| sv.value).try_collect().await } diff --git a/graph/src/components/link_resolver/mod.rs b/graph/src/components/link_resolver/mod.rs index 1115b59cdc3..8843eabf079 100644 --- a/graph/src/components/link_resolver/mod.rs +++ b/graph/src/components/link_resolver/mod.rs @@ -1,16 +1,19 @@ +mod arweave; +mod ipfs; + +use std::fmt::Debug; +use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use slog::Logger; -use crate::data::subgraph::Link; +use crate::cheap_clone::CheapClone; +use crate::data::subgraph::{DeploymentHash, Link}; +use crate::derive::CheapClone; use crate::prelude::Error; -use std::fmt::Debug; - -mod arweave; -mod ipfs; pub use arweave::*; -use async_trait::async_trait; pub use ipfs::*; /// Resolves links to subgraph manifests and resources referenced by them. @@ -23,14 +26,43 @@ pub trait LinkResolver: Send + Sync + 'static + Debug { fn with_retries(&self) -> Box; /// Fetches the link contents as bytes. - async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error>; + async fn cat(&self, ctx: LinkResolverContext, link: &Link) -> Result, Error>; /// Fetches the IPLD block contents as bytes. - async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error>; + async fn get_block(&self, ctx: LinkResolverContext, link: &Link) -> Result, Error>; /// Read the contents of `link` and deserialize them into a stream of JSON /// values. The values must each be on a single line; newlines are significant /// as they are used to split the file contents and each line is deserialized /// separately. - async fn json_stream(&self, logger: &Logger, link: &Link) -> Result; + async fn json_stream( + &self, + ctx: LinkResolverContext, + link: &Link, + ) -> Result; +} + +#[derive(Clone, Debug, CheapClone)] +pub struct LinkResolverContext { + pub deployment_hash: Arc, + pub logger: Logger, +} + +impl LinkResolverContext { + pub fn new(deployment_hash: &DeploymentHash, logger: &Logger) -> Self { + Self { + deployment_hash: deployment_hash.as_str().into(), + logger: logger.cheap_clone(), + } + } +} + +#[cfg(debug_assertions)] +impl Default for LinkResolverContext { + fn default() -> Self { + Self { + deployment_hash: "test".into(), + logger: crate::log::discard(), + } + } } diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 77c8ba67d36..36e8080a818 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -35,7 +35,7 @@ use crate::{ bail, blockchain::{BlockPtr, Blockchain}, components::{ - link_resolver::LinkResolver, + link_resolver::{LinkResolver, LinkResolverContext}, store::{StoreError, SubgraphStore}, }, data::{ @@ -419,13 +419,17 @@ pub struct UnresolvedSchema { impl UnresolvedSchema { pub async fn resolve( self, + deployment_hash: &DeploymentHash, spec_version: &Version, id: DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result { let schema_bytes = resolver - .cat(logger, &self.file) + .cat( + LinkResolverContext::new(deployment_hash, logger), + &self.file, + ) .await .with_context(|| format!("failed to resolve schema {}", &self.file.link))?; InputSchema::parse(spec_version, &String::from_utf8(schema_bytes)?, id) @@ -1011,21 +1015,21 @@ impl UnresolvedSubgraphManifest { } let schema = schema - .resolve(&spec_version, id.clone(), resolver, logger) + .resolve(&id, &spec_version, id.clone(), resolver, logger) .await?; let (data_sources, templates) = try_join( data_sources .into_iter() .enumerate() - .map(|(idx, ds)| ds.resolve(resolver, logger, idx as u32)) + .map(|(idx, ds)| ds.resolve(&id, resolver, logger, idx as u32)) .collect::>() .try_collect::>(), templates .into_iter() .enumerate() .map(|(idx, template)| { - template.resolve(resolver, &schema, logger, ds_count as u32 + idx as u32) + template.resolve(&id, resolver, &schema, logger, ds_count as u32 + idx as u32) }) .collect::>() .try_collect::>(), diff --git a/graph/src/data_source/common.rs b/graph/src/data_source/common.rs index 57781815f5f..7e59984b8f7 100644 --- a/graph/src/data_source/common.rs +++ b/graph/src/data_source/common.rs @@ -1,6 +1,11 @@ use crate::blockchain::block_stream::EntitySourceOperation; use crate::prelude::{BlockPtr, Value}; -use crate::{components::link_resolver::LinkResolver, data::value::Word, prelude::Link}; +use crate::{ + components::link_resolver::{LinkResolver, LinkResolverContext}, + data::subgraph::DeploymentHash, + data::value::Word, + prelude::Link, +}; use anyhow::{anyhow, Context, Error}; use ethabi::{Address, Contract, Function, LogParam, ParamType, Token}; use graph_derive::CheapClone; @@ -72,15 +77,22 @@ pub struct UnresolvedMappingABI { impl UnresolvedMappingABI { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result { - let contract_bytes = resolver.cat(logger, &self.file).await.with_context(|| { - format!( - "failed to resolve ABI {} from {}", - self.name, self.file.link + let contract_bytes = resolver + .cat( + LinkResolverContext::new(deployment_hash, logger), + &self.file, ) - })?; + .await + .with_context(|| { + format!( + "failed to resolve ABI {} from {}", + self.name, self.file.link + ) + })?; let contract = Contract::load(&*contract_bytes) .with_context(|| format!("failed to load ABI {}", self.name))?; Ok(MappingABI { diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index 4c56e99ea9b..386b5323a64 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -3,6 +3,8 @@ pub mod common; pub mod offchain; pub mod subgraph; +use crate::data::subgraph::DeploymentHash; + pub use self::DataSource as DataSourceEnum; pub use causality_region::CausalityRegion; @@ -329,17 +331,18 @@ pub enum UnresolvedDataSource { impl UnresolvedDataSource { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, ) -> Result, anyhow::Error> { match self { Self::Onchain(unresolved) => unresolved - .resolve(resolver, logger, manifest_idx) + .resolve(deployment_hash, resolver, logger, manifest_idx) .await .map(DataSource::Onchain), Self::Subgraph(unresolved) => unresolved - .resolve::(resolver, logger, manifest_idx) + .resolve::(deployment_hash, resolver, logger, manifest_idx) .await .map(DataSource::Subgraph), Self::Offchain(_unresolved) => { @@ -458,6 +461,7 @@ impl Default for UnresolvedDataSourceTemplate { impl UnresolvedDataSourceTemplate { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, schema: &InputSchema, logger: &Logger, @@ -465,15 +469,15 @@ impl UnresolvedDataSourceTemplate { ) -> Result, Error> { match self { Self::Onchain(ds) => ds - .resolve(resolver, logger, manifest_idx) + .resolve(deployment_hash, resolver, logger, manifest_idx) .await .map(|ti| DataSourceTemplate::Onchain(ti)), Self::Offchain(ds) => ds - .resolve(resolver, logger, manifest_idx, schema) + .resolve(deployment_hash, resolver, logger, manifest_idx, schema) .await .map(DataSourceTemplate::Offchain), Self::Subgraph(ds) => ds - .resolve(resolver, logger, manifest_idx) + .resolve(deployment_hash, resolver, logger, manifest_idx) .await .map(DataSourceTemplate::Subgraph), } diff --git a/graph/src/data_source/offchain.rs b/graph/src/data_source/offchain.rs index 46a77e8ba32..1ebdbc6d340 100644 --- a/graph/src/data_source/offchain.rs +++ b/graph/src/data_source/offchain.rs @@ -2,11 +2,15 @@ use crate::{ bail, blockchain::{BlockPtr, BlockTime, Blockchain}, components::{ - link_resolver::LinkResolver, + link_resolver::{LinkResolver, LinkResolverContext}, store::{BlockNumber, StoredDynamicDataSource}, subgraph::{InstanceDSTemplate, InstanceDSTemplateInfo}, }, - data::{store::scalar::Bytes, subgraph::SPEC_VERSION_0_0_7, value::Word}, + data::{ + store::scalar::Bytes, + subgraph::{DeploymentHash, SPEC_VERSION_0_0_7}, + value::Word, + }, data_source, ipfs::ContentPath, prelude::{DataSourceContext, Link}, @@ -378,6 +382,7 @@ impl UnresolvedDataSource { #[allow(dead_code)] pub(super) async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -398,7 +403,10 @@ impl UnresolvedDataSource { kind, name: self.name, source, - mapping: self.mapping.resolve(resolver, schema, logger).await?, + mapping: self + .mapping + .resolve(deployment_hash, resolver, schema, logger) + .await?, context: Arc::new(None), creation_block: None, done_at: Arc::new(AtomicI32::new(NOT_DONE_VALUE)), @@ -410,6 +418,7 @@ impl UnresolvedDataSource { impl UnresolvedMapping { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, schema: &InputSchema, logger: &Logger, @@ -433,7 +442,14 @@ impl UnresolvedMapping { api_version: semver::Version::parse(&self.api_version)?, entities, handler: self.handler, - runtime: Arc::new(resolver.cat(logger, &self.file).await?), + runtime: Arc::new( + resolver + .cat( + LinkResolverContext::new(deployment_hash, logger), + &self.file, + ) + .await?, + ), link: self.file, }) } @@ -479,6 +495,7 @@ impl Into for DataSourceTemplate { impl UnresolvedDataSourceTemplate { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -488,7 +505,7 @@ impl UnresolvedDataSourceTemplate { let mapping = self .mapping - .resolve(resolver, schema, logger) + .resolve(deployment_hash, resolver, schema, logger) .await .with_context(|| format!("failed to resolve data source template {}", self.name))?; diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index 87b44e66174..e84c75f85fa 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -1,6 +1,9 @@ use crate::{ blockchain::{block_stream::EntitySourceOperation, Block, Blockchain}, - components::{link_resolver::LinkResolver, store::BlockNumber}, + components::{ + link_resolver::{LinkResolver, LinkResolverContext}, + store::BlockNumber, + }, data::{ subgraph::{ calls_host_fn, SubgraphManifest, UnresolvedSubgraphManifest, LATEST_VERSION, @@ -256,11 +259,15 @@ impl UnresolvedDataSource { async fn resolve_source_manifest( &self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result>, Error> { let source_raw = resolver - .cat(logger, &self.source.address.to_ipfs_link()) + .cat( + LinkResolverContext::new(deployment_hash, logger), + &self.source.address.to_ipfs_link(), + ) .await .context(format!( "Failed to resolve source subgraph [{}] manifest", @@ -314,7 +321,10 @@ impl UnresolvedDataSource { // If there's a graft, recursively verify it if let Some(graft) = &manifest.graft { let graft_raw = resolver - .cat(logger, &graft.base.to_ipfs_link()) + .cat( + LinkResolverContext::new(&manifest.id, logger), + &graft.base.to_ipfs_link(), + ) .await .context("Failed to resolve graft base manifest")?; @@ -343,6 +353,7 @@ impl UnresolvedDataSource { #[allow(dead_code)] pub(super) async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -354,7 +365,9 @@ impl UnresolvedDataSource { ); let kind = self.kind.clone(); - let source_manifest = self.resolve_source_manifest::(resolver, logger).await?; + let source_manifest = self + .resolve_source_manifest::(deployment_hash, resolver, logger) + .await?; let source_spec_version = &source_manifest.spec_version; if source_spec_version < &SPEC_VERSION_1_3_0 { return Err(anyhow!( @@ -406,7 +419,10 @@ impl UnresolvedDataSource { name: self.name, network: self.network, source, - mapping: self.mapping.resolve(resolver, logger).await?, + mapping: self + .mapping + .resolve(deployment_hash, resolver, logger) + .await?, context: Arc::new(self.context), creation_block: None, }) @@ -416,6 +432,7 @@ impl UnresolvedDataSource { impl UnresolvedMapping { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result { @@ -429,7 +446,9 @@ impl UnresolvedMapping { let resolver = Arc::clone(resolver); let logger = logger.clone(); async move { - let resolved_abi = unresolved_abi.resolve(&resolver, &logger).await?; + let resolved_abi = unresolved_abi + .resolve(deployment_hash, &resolver, &logger) + .await?; Ok::<_, Error>(Arc::new(resolved_abi)) } }) @@ -446,7 +465,14 @@ impl UnresolvedMapping { entities: self.entities, handlers: self.handlers, abis, - runtime: Arc::new(resolver.cat(logger, &self.file).await?), + runtime: Arc::new( + resolver + .cat( + LinkResolverContext::new(deployment_hash, logger), + &self.file, + ) + .await?, + ), link: self.file, }) } @@ -492,6 +518,7 @@ impl Into for DataSourceTemplate { impl UnresolvedDataSourceTemplate { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -500,7 +527,7 @@ impl UnresolvedDataSourceTemplate { let mapping = self .mapping - .resolve(resolver, logger) + .resolve(deployment_hash, resolver, logger) .await .with_context(|| format!("failed to resolve data source template {}", self.name))?; diff --git a/graph/src/ipfs/cache.rs b/graph/src/ipfs/cache.rs index 4c15e2cbc3d..57dd82a34c9 100644 --- a/graph/src/ipfs/cache.rs +++ b/graph/src/ipfs/cache.rs @@ -20,7 +20,8 @@ use tokio::sync::Mutex as AsyncMutex; use crate::{env::ENV_VARS, prelude::CheapClone}; use super::{ - ContentPath, IpfsClient, IpfsError, IpfsRequest, IpfsResponse, IpfsResult, RetryPolicy, + ContentPath, IpfsClient, IpfsContext, IpfsError, IpfsMetrics, IpfsRequest, IpfsResponse, + IpfsResult, RetryPolicy, }; struct RedisClient { @@ -217,39 +218,38 @@ pub struct CachingClient { } impl CachingClient { - pub async fn new(client: Arc) -> IpfsResult { + pub async fn new(client: Arc, logger: &Logger) -> IpfsResult { let env = &ENV_VARS.mappings; let cache = Cache::new( - client.logger(), + logger, env.max_ipfs_cache_size as usize, env.max_ipfs_cache_file_size, env.ipfs_cache_location.clone(), ) .await?; + Ok(CachingClient { client, cache }) } - async fn with_cache(&self, path: &ContentPath, f: F) -> IpfsResult + async fn with_cache(&self, logger: Logger, path: &ContentPath, f: F) -> IpfsResult where F: AsyncFnOnce() -> IpfsResult, { - if let Some(data) = self.cache.find(self.logger(), path).await { + if let Some(data) = self.cache.find(&logger, path).await { return Ok(data); } let data = f().await?; - self.cache - .insert(self.logger(), path.clone(), data.clone()) - .await; + self.cache.insert(&logger, path.clone(), data.clone()).await; Ok(data) } } #[async_trait] impl IpfsClient for CachingClient { - fn logger(&self) -> &Logger { - self.client.logger() + fn metrics(&self) -> &IpfsMetrics { + self.client.metrics() } async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { @@ -258,16 +258,17 @@ impl IpfsClient for CachingClient { async fn cat( self: Arc, + ctx: IpfsContext, path: &ContentPath, max_size: usize, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { - self.with_cache(path, async || { + self.with_cache(ctx.logger(path), path, async || { { self.client .cheap_clone() - .cat(path, max_size, timeout, retry_policy) + .cat(ctx, path, max_size, timeout, retry_policy) .await } }) @@ -276,14 +277,15 @@ impl IpfsClient for CachingClient { async fn get_block( self: Arc, + ctx: IpfsContext, path: &ContentPath, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { - self.with_cache(path, async || { + self.with_cache(ctx.logger(path), path, async || { self.client .cheap_clone() - .get_block(path, timeout, retry_policy) + .get_block(ctx, path, timeout, retry_policy) .await }) .await diff --git a/graph/src/ipfs/client.rs b/graph/src/ipfs/client.rs index 90da991152a..252d80383b0 100644 --- a/graph/src/ipfs/client.rs +++ b/graph/src/ipfs/client.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use bytes::Bytes; @@ -10,16 +10,16 @@ use futures03::StreamExt; use futures03::TryStreamExt; use slog::Logger; -use crate::ipfs::ContentPath; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsResult; -use crate::ipfs::RetryPolicy; +use crate::cheap_clone::CheapClone as _; +use crate::data::subgraph::DeploymentHash; +use crate::derive::CheapClone; +use crate::ipfs::{ContentPath, IpfsError, IpfsMetrics, IpfsResult, RetryPolicy}; /// A read-only connection to an IPFS server. #[async_trait] pub trait IpfsClient: Send + Sync + 'static { - /// Returns the logger associated with the client. - fn logger(&self) -> &Logger; + /// Returns the metrics associated with the IPFS client. + fn metrics(&self) -> &IpfsMetrics; /// Sends a request to the IPFS server and returns a raw response. async fn call(self: Arc, req: IpfsRequest) -> IpfsResult; @@ -32,21 +32,31 @@ pub trait IpfsClient: Send + Sync + 'static { /// The timeout is not propagated to the resulting stream. async fn cat_stream( self: Arc, + ctx: IpfsContext, path: &ContentPath, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult>> { let fut = retry_policy - .create("IPFS.cat_stream", self.logger()) + .create("IPFS.cat_stream", &ctx.logger(path)) .no_timeout() .run({ let path = path.to_owned(); move || { - let path = path.clone(); let client = self.clone(); + let metrics = self.metrics().clone(); + let deployment_hash = ctx.deployment_hash(); + let path = path.clone(); - async move { client.call(IpfsRequest::Cat(path)).await } + async move { + run_with_metrics( + client.call(IpfsRequest::Cat(path)), + deployment_hash, + metrics, + ) + .await + } } }); @@ -61,27 +71,33 @@ pub trait IpfsClient: Send + Sync + 'static { /// does not return a response within the specified amount of time. async fn cat( self: Arc, + ctx: IpfsContext, path: &ContentPath, max_size: usize, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { let fut = retry_policy - .create("IPFS.cat", self.logger()) + .create("IPFS.cat", &ctx.logger(path)) .no_timeout() .run({ let path = path.to_owned(); move || { - let path = path.clone(); let client = self.clone(); + let metrics = self.metrics().clone(); + let deployment_hash = ctx.deployment_hash(); + let path = path.clone(); async move { - client - .call(IpfsRequest::Cat(path)) - .await? - .bytes(Some(max_size)) - .await + run_with_metrics( + client.call(IpfsRequest::Cat(path)), + deployment_hash, + metrics, + ) + .await? + .bytes(Some(max_size)) + .await } } }); @@ -95,26 +111,32 @@ pub trait IpfsClient: Send + Sync + 'static { /// does not return a response within the specified amount of time. async fn get_block( self: Arc, + ctx: IpfsContext, path: &ContentPath, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { let fut = retry_policy - .create("IPFS.get_block", self.logger()) + .create("IPFS.get_block", &ctx.logger(path)) .no_timeout() .run({ let path = path.to_owned(); move || { - let path = path.clone(); let client = self.clone(); + let metrics = self.metrics().clone(); + let deployment_hash = ctx.deployment_hash(); + let path = path.clone(); async move { - client - .call(IpfsRequest::GetBlock(path)) - .await? - .bytes(None) - .await + run_with_metrics( + client.call(IpfsRequest::GetBlock(path)), + deployment_hash, + metrics, + ) + .await? + .bytes(None) + .await } } }); @@ -123,6 +145,41 @@ pub trait IpfsClient: Send + Sync + 'static { } } +#[derive(Clone, Debug, CheapClone)] +pub struct IpfsContext { + pub deployment_hash: Arc, + pub logger: Logger, +} + +impl IpfsContext { + pub fn new(deployment_hash: &DeploymentHash, logger: &Logger) -> Self { + Self { + deployment_hash: deployment_hash.as_str().into(), + logger: logger.cheap_clone(), + } + } + + pub(super) fn deployment_hash(&self) -> Arc { + self.deployment_hash.clone() + } + + pub(super) fn logger(&self, path: &ContentPath) -> Logger { + self.logger.new( + slog::o!("deployment" => self.deployment_hash.to_string(), "path" => path.to_string()), + ) + } +} + +#[cfg(debug_assertions)] +impl Default for IpfsContext { + fn default() -> Self { + Self { + deployment_hash: "test".into(), + logger: crate::log::discard(), + } + } +} + /// Describes a request to an IPFS server. #[derive(Clone, Debug)] pub enum IpfsRequest { @@ -193,3 +250,27 @@ where None => fut.await, } } + +async fn run_with_metrics( + fut: F, + deployment_hash: Arc, + metrics: IpfsMetrics, +) -> IpfsResult +where + F: Future>, +{ + let timer = Instant::now(); + metrics.add_request(&deployment_hash); + + fut.await + .inspect(|_resp| { + metrics.observe_request_duration(&deployment_hash, timer.elapsed().as_secs_f64()) + }) + .inspect_err(|err| { + if err.is_timeout() { + metrics.add_not_found(&deployment_hash) + } else { + metrics.add_error(&deployment_hash) + } + }) +} diff --git a/graph/src/ipfs/content_path.rs b/graph/src/ipfs/content_path.rs index 2032526b6ae..ab752fe6d49 100644 --- a/graph/src/ipfs/content_path.rs +++ b/graph/src/ipfs/content_path.rs @@ -1,5 +1,6 @@ use anyhow::anyhow; use cid::Cid; +use url::Url; use crate::ipfs::IpfsError; use crate::ipfs::IpfsResult; @@ -13,39 +14,79 @@ pub struct ContentPath { impl ContentPath { /// Creates a new [ContentPath] from the specified input. + /// + /// Supports the following formats: + /// - [/] + /// - /ipfs/[/] + /// - ipfs://[/] + /// - http[s]://.../ipfs/[/] + /// - http[s]://.../api/v0/cat?arg=[/] pub fn new(input: impl AsRef) -> IpfsResult { - let input = input.as_ref(); + let input = input.as_ref().trim(); if input.is_empty() { return Err(IpfsError::InvalidContentPath { - input: "".to_owned(), - source: anyhow!("path is empty"), + input: "".to_string(), + source: anyhow!("content path is empty"), }); } - let (cid, path) = input - .strip_prefix("/ipfs/") - .unwrap_or(input) - .split_once('/') - .unwrap_or((input, "")); + if input.starts_with("http://") || input.starts_with("https://") { + return Self::parse_from_url(input); + } + + Self::parse_from_cid_and_path(input) + } + + fn parse_from_url(input: &str) -> IpfsResult { + let url = Url::parse(input).map_err(|_err| IpfsError::InvalidContentPath { + input: input.to_string(), + source: anyhow!("input is not a valid URL"), + })?; + + if let Some((_, x)) = url.query_pairs().find(|(key, _)| key == "arg") { + return Self::parse_from_cid_and_path(&x); + } + + if let Some((_, x)) = url.path().split_once("/ipfs/") { + return Self::parse_from_cid_and_path(x); + } + + Self::parse_from_cid_and_path(url.path()) + } + + fn parse_from_cid_and_path(mut input: &str) -> IpfsResult { + input = input.trim_matches('/'); + + for prefix in ["ipfs/", "ipfs://"] { + if let Some(input_without_prefix) = input.strip_prefix(prefix) { + input = input_without_prefix + } + } + + let (cid, path) = input.split_once('/').unwrap_or((input, "")); let cid = cid .parse::() .map_err(|err| IpfsError::InvalidContentPath { - input: input.to_owned(), + input: input.to_string(), source: anyhow::Error::from(err).context("invalid CID"), })?; if path.contains('?') { return Err(IpfsError::InvalidContentPath { - input: input.to_owned(), + input: input.to_string(), source: anyhow!("query parameters not allowed"), }); } Ok(Self { cid, - path: (!path.is_empty()).then_some(path.to_owned()), + path: if path.is_empty() { + None + } else { + Some(path.to_string()) + }, }) } @@ -97,13 +138,20 @@ mod tests { const CID_V0: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; const CID_V1: &str = "bafybeiczsscdsbs7ffqz55asqdf3smv6klcw3gofszvwlyarci47bgf354"; + fn make_path(cid: &str, path: Option<&str>) -> ContentPath { + ContentPath { + cid: cid.parse().unwrap(), + path: path.map(ToOwned::to_owned), + } + } + #[test] fn fails_on_empty_input() { let err = ContentPath::new("").unwrap_err(); assert_eq!( err.to_string(), - "'' is not a valid IPFS content path: path is empty", + "'' is not a valid IPFS content path: content path is empty", ); } @@ -119,75 +167,37 @@ mod tests { #[test] fn accepts_a_valid_cid_v0() { let path = ContentPath::new(CID_V0).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: None, - } - ); + assert_eq!(path, make_path(CID_V0, None)); } #[test] fn accepts_a_valid_cid_v1() { let path = ContentPath::new(CID_V1).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V1.parse().unwrap(), - path: None, - } - ); + assert_eq!(path, make_path(CID_V1, None)); } #[test] - fn fails_on_a_leading_slash_followed_by_a_valid_cid() { - let err = ContentPath::new(format!("/{CID_V0}")).unwrap_err(); + fn accepts_and_removes_leading_slashes() { + let path = ContentPath::new(format!("/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); - assert!(err.to_string().starts_with(&format!( - "'/{CID_V0}' is not a valid IPFS content path: invalid CID: " - ))); + let path = ContentPath::new(format!("///////{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); } #[test] - fn ignores_the_first_slash_after_the_cid() { + fn accepts_and_removes_trailing_slashes() { let path = ContentPath::new(format!("{CID_V0}/")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: None, - } - ); + let path = ContentPath::new(format!("{CID_V0}///////")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); } #[test] fn accepts_a_path_after_the_cid() { let path = ContentPath::new(format!("{CID_V0}/readme.md")).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: Some("readme.md".to_owned()), - } - ); - } - - #[test] - fn accepts_multiple_consecutive_slashes_after_the_cid() { - let path = ContentPath::new(format!("{CID_V0}//")).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: Some("/".to_owned()), - } - ); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); } #[test] @@ -214,23 +224,67 @@ mod tests { #[test] fn accepts_and_removes_the_ipfs_prefix() { let path = ContentPath::new(format!("/ipfs/{CID_V0}")).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: None, - } - ); + assert_eq!(path, make_path(CID_V0, None)); let path = ContentPath::new(format!("/ipfs/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + } - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: Some("readme.md".to_owned()), - } - ); + #[test] + fn accepts_and_removes_the_ipfs_schema() { + let path = ContentPath::new(format!("ipfs://{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("ipfs://{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + } + + #[test] + fn accepts_and_parses_ipfs_rpc_urls() { + let path = ContentPath::new(format!("http://ipfs.com/api/v0/cat?arg={CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = + ContentPath::new(format!("http://ipfs.com/api/v0/cat?arg={CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + + let path = ContentPath::new(format!("https://ipfs.com/api/v0/cat?arg={CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!( + "https://ipfs.com/api/v0/cat?arg={CID_V0}/readme.md" + )) + .unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + } + + #[test] + fn accepts_and_parses_ipfs_gateway_urls() { + let path = ContentPath::new(format!("http://ipfs.com/ipfs/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("http://ipfs.com/ipfs/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + + let path = ContentPath::new(format!("https://ipfs.com/ipfs/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("https://ipfs.com/ipfs/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + } + + #[test] + fn accepts_and_parses_paths_from_urls() { + let path = ContentPath::new(format!("http://ipfs.com/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("http://ipfs.com/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + + let path = ContentPath::new(format!("https://ipfs.com/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("https://ipfs.com/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); } } diff --git a/graph/src/ipfs/error.rs b/graph/src/ipfs/error.rs index 1722b02f467..6553813628b 100644 --- a/graph/src/ipfs/error.rs +++ b/graph/src/ipfs/error.rs @@ -50,7 +50,7 @@ pub enum IpfsError { #[error(transparent)] RequestFailed(RequestError), - #[error("Invalid cache configuration: {source}")] + #[error("Invalid cache configuration: {source:#}")] InvalidCacheConfig { source: anyhow::Error }, } diff --git a/graph/src/ipfs/gateway_client.rs b/graph/src/ipfs/gateway_client.rs index d2ac9f0c8b1..53cc4fc5cbb 100644 --- a/graph/src/ipfs/gateway_client.rs +++ b/graph/src/ipfs/gateway_client.rs @@ -5,17 +5,14 @@ use async_trait::async_trait; use derivative::Derivative; use http::header::ACCEPT; use http::header::CACHE_CONTROL; -use reqwest::StatusCode; +use reqwest::{redirect::Policy as RedirectPolicy, StatusCode}; use slog::Logger; use crate::env::ENV_VARS; -use crate::ipfs::IpfsClient; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsRequest; -use crate::ipfs::IpfsResponse; -use crate::ipfs::IpfsResult; -use crate::ipfs::RetryPolicy; -use crate::ipfs::ServerAddress; +use crate::ipfs::{ + IpfsClient, IpfsError, IpfsMetrics, IpfsRequest, IpfsResponse, IpfsResult, RetryPolicy, + ServerAddress, +}; /// A client that connects to an IPFS gateway. /// @@ -28,14 +25,19 @@ pub struct IpfsGatewayClient { #[derivative(Debug = "ignore")] http_client: reqwest::Client, + metrics: IpfsMetrics, logger: Logger, } impl IpfsGatewayClient { /// Creates a new [IpfsGatewayClient] with the specified server address. /// Verifies that the server is responding to IPFS gateway requests. - pub(crate) async fn new(server_address: impl AsRef, logger: &Logger) -> IpfsResult { - let client = Self::new_unchecked(server_address, logger)?; + pub(crate) async fn new( + server_address: impl AsRef, + metrics: IpfsMetrics, + logger: &Logger, + ) -> IpfsResult { + let client = Self::new_unchecked(server_address, metrics, logger)?; client .send_test_request() @@ -50,10 +52,20 @@ impl IpfsGatewayClient { /// Creates a new [IpfsGatewayClient] with the specified server address. /// Does not verify that the server is responding to IPFS gateway requests. - pub fn new_unchecked(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + pub fn new_unchecked( + server_address: impl AsRef, + metrics: IpfsMetrics, + logger: &Logger, + ) -> IpfsResult { Ok(Self { server_address: ServerAddress::new(server_address)?, - http_client: reqwest::Client::new(), + http_client: reqwest::Client::builder() + // IPFS gateways allow requests to directory CIDs. + // However, they sometimes redirect before displaying the directory listing. + // This policy permits that behavior. + .redirect(RedirectPolicy::limited(1)) + .build()?, + metrics, logger: logger.to_owned(), }) } @@ -113,8 +125,8 @@ impl IpfsGatewayClient { #[async_trait] impl IpfsClient for IpfsGatewayClient { - fn logger(&self) -> &Logger { - &self.logger + fn metrics(&self) -> &IpfsMetrics { + &self.metrics } async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { @@ -189,7 +201,8 @@ mod tests { async fn make_client() -> (MockServer, Arc) { let server = mock_server().await; - let client = IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + let client = + IpfsGatewayClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap(); (server, Arc::new(client)) } @@ -206,7 +219,7 @@ mod tests { async fn new_fails_to_create_the_client_if_gateway_is_not_accessible() { let server = mock_server().await; - IpfsGatewayClient::new(server.uri(), &discard()) + IpfsGatewayClient::new(server.uri(), Default::default(), &discard()) .await .unwrap_err(); } @@ -222,7 +235,7 @@ mod tests { .mount(&server) .await; - IpfsGatewayClient::new(server.uri(), &discard()) + IpfsGatewayClient::new(server.uri(), Default::default(), &discard()) .await .unwrap(); @@ -232,7 +245,7 @@ mod tests { .mount(&server) .await; - IpfsGatewayClient::new(server.uri(), &discard()) + IpfsGatewayClient::new(server.uri(), Default::default(), &discard()) .await .unwrap(); } @@ -252,7 +265,7 @@ mod tests { .mount(&server) .await; - IpfsGatewayClient::new(server.uri(), &discard()) + IpfsGatewayClient::new(server.uri(), Default::default(), &discard()) .await .unwrap(); } @@ -261,7 +274,7 @@ mod tests { async fn new_unchecked_creates_the_client_without_checking_the_gateway() { let server = mock_server().await; - IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + IpfsGatewayClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap(); } #[tokio::test] @@ -275,7 +288,7 @@ mod tests { .await; let bytes = client - .cat_stream(&make_path(), None, RetryPolicy::None) + .cat_stream(Default::default(), &make_path(), None, RetryPolicy::None) .await .unwrap() .try_fold(BytesMut::new(), |mut acc, chunk| async { @@ -300,7 +313,12 @@ mod tests { .await; let result = client - .cat_stream(&make_path(), Some(ms(300)), RetryPolicy::None) + .cat_stream( + Default::default(), + &make_path(), + Some(ms(300)), + RetryPolicy::None, + ) .await; assert!(matches!(result, Err(_))); @@ -324,7 +342,12 @@ mod tests { .await; let _stream = client - .cat_stream(&make_path(), None, RetryPolicy::NonDeterministic) + .cat_stream( + Default::default(), + &make_path(), + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); } @@ -340,7 +363,13 @@ mod tests { .await; let bytes = client - .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + usize::MAX, + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -360,7 +389,13 @@ mod tests { .await; let bytes = client - .cat(&make_path(), data.len(), None, RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + data.len(), + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -380,7 +415,13 @@ mod tests { .await; client - .cat(&make_path(), data.len() - 1, None, RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + data.len() - 1, + None, + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -396,7 +437,13 @@ mod tests { .await; client - .cat(&make_path(), usize::MAX, Some(ms(300)), RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + usize::MAX, + Some(ms(300)), + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -420,6 +467,7 @@ mod tests { let bytes = client .cat( + Default::default(), &make_path(), usize::MAX, None, @@ -442,7 +490,7 @@ mod tests { .await; let bytes = client - .get_block(&make_path(), None, RetryPolicy::None) + .get_block(Default::default(), &make_path(), None, RetryPolicy::None) .await .unwrap(); @@ -460,7 +508,12 @@ mod tests { .await; client - .get_block(&make_path(), Some(ms(300)), RetryPolicy::None) + .get_block( + Default::default(), + &make_path(), + Some(ms(300)), + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -483,7 +536,12 @@ mod tests { .await; let bytes = client - .get_block(&make_path(), None, RetryPolicy::NonDeterministic) + .get_block( + Default::default(), + &make_path(), + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); diff --git a/graph/src/ipfs/metrics.rs b/graph/src/ipfs/metrics.rs new file mode 100644 index 00000000000..67bd684aa7f --- /dev/null +++ b/graph/src/ipfs/metrics.rs @@ -0,0 +1,87 @@ +use prometheus::{HistogramVec, IntCounterVec}; + +use crate::components::metrics::MetricsRegistry; + +#[derive(Clone, Debug)] +pub struct IpfsMetrics { + request_count: Box, + error_count: Box, + not_found_count: Box, + request_duration: Box, +} + +impl IpfsMetrics { + pub fn new(registry: &MetricsRegistry) -> Self { + let request_count = registry + .new_int_counter_vec( + "ipfs_request_count", + "The total number of IPFS requests.", + &["deployment"], + ) + .unwrap(); + + let error_count = registry + .new_int_counter_vec( + "ipfs_error_count", + "The total number of failed IPFS requests.", + &["deployment"], + ) + .unwrap(); + + let not_found_count = registry + .new_int_counter_vec( + "ipfs_not_found_count", + "The total number of IPFS requests that timed out.", + &["deployment"], + ) + .unwrap(); + + let request_duration = registry + .new_histogram_vec( + "ipfs_request_duration", + "The duration of successful IPFS requests.\n\ + The time it takes to download the response body is not included.", + vec!["deployment".to_owned()], + vec![ + 0.2, 0.5, 1.0, 5.0, 10.0, 20.0, 30.0, 60.0, 90.0, 120.0, 180.0, 240.0, + ], + ) + .unwrap(); + + Self { + request_count, + error_count, + not_found_count, + request_duration, + } + } + + pub(super) fn add_request(&self, deployment_hash: &str) { + self.request_count + .with_label_values(&[deployment_hash]) + .inc() + } + + pub(super) fn add_error(&self, deployment_hash: &str) { + self.error_count.with_label_values(&[deployment_hash]).inc() + } + + pub(super) fn add_not_found(&self, deployment_hash: &str) { + self.not_found_count + .with_label_values(&[deployment_hash]) + .inc() + } + + pub(super) fn observe_request_duration(&self, deployment_hash: &str, duration_secs: f64) { + self.request_duration + .with_label_values(&[deployment_hash]) + .observe(duration_secs.clamp(0.2, 240.0)); + } +} + +#[cfg(debug_assertions)] +impl Default for IpfsMetrics { + fn default() -> Self { + Self::new(&MetricsRegistry::mock()) + } +} diff --git a/graph/src/ipfs/mod.rs b/graph/src/ipfs/mod.rs index 3a5fe211d26..403cbf614cd 100644 --- a/graph/src/ipfs/mod.rs +++ b/graph/src/ipfs/mod.rs @@ -8,6 +8,7 @@ use futures03::stream::StreamExt; use slog::info; use slog::Logger; +use crate::components::metrics::MetricsRegistry; use crate::util::security::SafeDisplay; mod cache; @@ -15,6 +16,7 @@ mod client; mod content_path; mod error; mod gateway_client; +mod metrics; mod pool; mod retry_policy; mod rpc_client; @@ -22,13 +24,12 @@ mod server_address; pub mod test_utils; -pub use self::client::IpfsClient; -pub use self::client::IpfsRequest; -pub use self::client::IpfsResponse; +pub use self::client::{IpfsClient, IpfsContext, IpfsRequest, IpfsResponse}; pub use self::content_path::ContentPath; pub use self::error::IpfsError; pub use self::error::RequestError; pub use self::gateway_client::IpfsGatewayClient; +pub use self::metrics::IpfsMetrics; pub use self::pool::IpfsClientPool; pub use self::retry_policy::RetryPolicy; pub use self::rpc_client::IpfsRpcClient; @@ -45,12 +46,14 @@ pub type IpfsResult = Result; /// All clients are set up to cache results pub async fn new_ipfs_client( server_addresses: I, + registry: &MetricsRegistry, logger: &Logger, ) -> IpfsResult> where I: IntoIterator, S: AsRef, { + let metrics = IpfsMetrics::new(registry); let mut clients: Vec> = Vec::new(); for server_address in server_addresses { @@ -62,8 +65,8 @@ where SafeDisplay(server_address) ); - let client = use_first_valid_api(server_address, logger).await?; - let client = Arc::new(CachingClient::new(client).await?); + let client = use_first_valid_api(server_address, metrics.clone(), logger).await?; + let client = Arc::new(CachingClient::new(client, logger).await?); clients.push(client); } @@ -76,8 +79,7 @@ where n => { info!(logger, "Creating a pool of {} IPFS clients", n); - let pool = IpfsClientPool::new(clients, logger); - + let pool = IpfsClientPool::new(clients); Ok(Arc::new(pool)) } } @@ -85,11 +87,12 @@ where async fn use_first_valid_api( server_address: &str, + metrics: IpfsMetrics, logger: &Logger, ) -> IpfsResult> { let supported_apis: Vec>>> = vec![ Box::pin(async { - IpfsGatewayClient::new(server_address, logger) + IpfsGatewayClient::new(server_address, metrics.clone(), logger) .await .map(|client| { info!( @@ -102,7 +105,7 @@ async fn use_first_valid_api( }) }), Box::pin(async { - IpfsRpcClient::new(server_address, logger) + IpfsRpcClient::new(server_address, metrics.clone(), logger) .await .map(|client| { info!( diff --git a/graph/src/ipfs/pool.rs b/graph/src/ipfs/pool.rs index 80abd7ca3e8..f1e0e9a4f7a 100644 --- a/graph/src/ipfs/pool.rs +++ b/graph/src/ipfs/pool.rs @@ -4,13 +4,8 @@ use anyhow::anyhow; use async_trait::async_trait; use futures03::stream::FuturesUnordered; use futures03::stream::StreamExt; -use slog::Logger; -use crate::ipfs::IpfsClient; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsRequest; -use crate::ipfs::IpfsResponse; -use crate::ipfs::IpfsResult; +use crate::ipfs::{IpfsClient, IpfsError, IpfsMetrics, IpfsRequest, IpfsResponse, IpfsResult}; /// Contains a list of IPFS clients and, for each read request, selects the fastest IPFS client /// that can provide the content and streams the response from that client. @@ -19,23 +14,21 @@ use crate::ipfs::IpfsResult; /// as some of them may already have the content cached. pub struct IpfsClientPool { clients: Vec>, - logger: Logger, } impl IpfsClientPool { /// Creates a new IPFS client pool from the specified clients. - pub fn new(clients: Vec>, logger: &Logger) -> Self { - Self { - clients, - logger: logger.to_owned(), - } + pub fn new(clients: Vec>) -> Self { + assert!(!clients.is_empty()); + Self { clients } } } #[async_trait] impl IpfsClient for IpfsClientPool { - fn logger(&self) -> &Logger { - &self.logger + fn metrics(&self) -> &IpfsMetrics { + // All clients are expected to share the same metrics. + self.clients[0].metrics() } async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { @@ -82,9 +75,7 @@ mod tests { use wiremock::ResponseTemplate; use super::*; - use crate::ipfs::ContentPath; - use crate::ipfs::IpfsGatewayClient; - use crate::ipfs::RetryPolicy; + use crate::ipfs::{ContentPath, IpfsGatewayClient, RetryPolicy}; use crate::log::discard; const PATH: &str = "/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; @@ -95,7 +86,8 @@ mod tests { async fn make_client() -> (MockServer, Arc) { let server = MockServer::start().await; - let client = IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + let client = + IpfsGatewayClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap(); (server, Arc::new(client)) } @@ -145,10 +137,10 @@ mod tests { .await; let clients: Vec> = vec![client_1, client_2, client_3]; - let pool = Arc::new(IpfsClientPool::new(clients, &discard())); + let pool = Arc::new(IpfsClientPool::new(clients)); let bytes = pool - .cat_stream(&make_path(), None, RetryPolicy::None) + .cat_stream(Default::default(), &make_path(), None, RetryPolicy::None) .await .unwrap() .try_fold(BytesMut::new(), |mut acc, chunk| async { @@ -198,10 +190,16 @@ mod tests { .await; let clients: Vec> = vec![client_1, client_2, client_3]; - let pool = Arc::new(IpfsClientPool::new(clients, &discard())); + let pool = Arc::new(IpfsClientPool::new(clients)); let bytes = pool - .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + usize::MAX, + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -245,10 +243,10 @@ mod tests { .await; let clients: Vec> = vec![client_1, client_2, client_3]; - let pool = Arc::new(IpfsClientPool::new(clients, &discard())); + let pool = Arc::new(IpfsClientPool::new(clients)); let bytes = pool - .get_block(&make_path(), None, RetryPolicy::None) + .get_block(Default::default(), &make_path(), None, RetryPolicy::None) .await .unwrap(); diff --git a/graph/src/ipfs/rpc_client.rs b/graph/src/ipfs/rpc_client.rs index 16976537044..77c48b3e9ab 100644 --- a/graph/src/ipfs/rpc_client.rs +++ b/graph/src/ipfs/rpc_client.rs @@ -10,13 +10,10 @@ use reqwest::StatusCode; use slog::Logger; use crate::env::ENV_VARS; -use crate::ipfs::IpfsClient; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsRequest; -use crate::ipfs::IpfsResponse; -use crate::ipfs::IpfsResult; -use crate::ipfs::RetryPolicy; -use crate::ipfs::ServerAddress; +use crate::ipfs::{ + IpfsClient, IpfsError, IpfsMetrics, IpfsRequest, IpfsResponse, IpfsResult, RetryPolicy, + ServerAddress, +}; /// A client that connects to an IPFS RPC API. /// @@ -29,6 +26,7 @@ pub struct IpfsRpcClient { #[derivative(Debug = "ignore")] http_client: reqwest::Client, + metrics: IpfsMetrics, logger: Logger, test_request_timeout: Duration, } @@ -36,8 +34,12 @@ pub struct IpfsRpcClient { impl IpfsRpcClient { /// Creates a new [IpfsRpcClient] with the specified server address. /// Verifies that the server is responding to IPFS RPC API requests. - pub async fn new(server_address: impl AsRef, logger: &Logger) -> IpfsResult { - let client = Self::new_unchecked(server_address, logger)?; + pub async fn new( + server_address: impl AsRef, + metrics: IpfsMetrics, + logger: &Logger, + ) -> IpfsResult { + let client = Self::new_unchecked(server_address, metrics, logger)?; client .send_test_request() @@ -52,10 +54,15 @@ impl IpfsRpcClient { /// Creates a new [IpfsRpcClient] with the specified server address. /// Does not verify that the server is responding to IPFS RPC API requests. - pub fn new_unchecked(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + pub fn new_unchecked( + server_address: impl AsRef, + metrics: IpfsMetrics, + logger: &Logger, + ) -> IpfsResult { Ok(Self { server_address: ServerAddress::new(server_address)?, http_client: reqwest::Client::new(), + metrics, logger: logger.to_owned(), test_request_timeout: ENV_VARS.ipfs_request_timeout, }) @@ -113,8 +120,8 @@ impl IpfsRpcClient { #[async_trait] impl IpfsClient for IpfsRpcClient { - fn logger(&self) -> &Logger { - &self.logger + fn metrics(&self) -> &IpfsMetrics { + &self.metrics } async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { @@ -165,7 +172,8 @@ mod tests { async fn make_client() -> (MockServer, Arc) { let server = mock_server().await; - let client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + let client = + IpfsRpcClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap(); (server, Arc::new(client)) } @@ -182,7 +190,7 @@ mod tests { async fn new_fails_to_create_the_client_if_rpc_api_is_not_accessible() { let server = mock_server().await; - IpfsRpcClient::new(server.uri(), &discard()) + IpfsRpcClient::new(server.uri(), Default::default(), &discard()) .await .unwrap_err(); } @@ -197,7 +205,9 @@ mod tests { .mount(&server) .await; - IpfsRpcClient::new(server.uri(), &discard()).await.unwrap(); + IpfsRpcClient::new(server.uri(), Default::default(), &discard()) + .await + .unwrap(); } #[tokio::test] @@ -217,14 +227,16 @@ mod tests { .mount(&server) .await; - IpfsRpcClient::new(server.uri(), &discard()).await.unwrap(); + IpfsRpcClient::new(server.uri(), Default::default(), &discard()) + .await + .unwrap(); } #[tokio::test] async fn new_unchecked_creates_the_client_without_checking_the_rpc_api() { let server = mock_server().await; - IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + IpfsRpcClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap(); } #[tokio::test] @@ -238,7 +250,7 @@ mod tests { .await; let bytes = client - .cat_stream(&make_path(), None, RetryPolicy::None) + .cat_stream(Default::default(), &make_path(), None, RetryPolicy::None) .await .unwrap() .try_fold(BytesMut::new(), |mut acc, chunk| async { @@ -263,7 +275,12 @@ mod tests { .await; let result = client - .cat_stream(&make_path(), Some(ms(300)), RetryPolicy::None) + .cat_stream( + Default::default(), + &make_path(), + Some(ms(300)), + RetryPolicy::None, + ) .await; assert!(matches!(result, Err(_))); @@ -287,7 +304,12 @@ mod tests { .await; let _stream = client - .cat_stream(&make_path(), None, RetryPolicy::NonDeterministic) + .cat_stream( + Default::default(), + &make_path(), + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); } @@ -303,7 +325,13 @@ mod tests { .await; let bytes = client - .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + usize::MAX, + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -323,7 +351,13 @@ mod tests { .await; let bytes = client - .cat(&make_path(), data.len(), None, RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + data.len(), + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -343,7 +377,13 @@ mod tests { .await; client - .cat(&make_path(), data.len() - 1, None, RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + data.len() - 1, + None, + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -359,7 +399,13 @@ mod tests { .await; client - .cat(&make_path(), usize::MAX, Some(ms(300)), RetryPolicy::None) + .cat( + Default::default(), + &make_path(), + usize::MAX, + Some(ms(300)), + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -383,6 +429,7 @@ mod tests { let bytes = client .cat( + Default::default(), &make_path(), usize::MAX, None, @@ -405,7 +452,7 @@ mod tests { .await; let bytes = client - .get_block(&make_path(), None, RetryPolicy::None) + .get_block(Default::default(), &make_path(), None, RetryPolicy::None) .await .unwrap(); @@ -423,7 +470,12 @@ mod tests { .await; client - .get_block(&make_path(), Some(ms(300)), RetryPolicy::None) + .get_block( + Default::default(), + &make_path(), + Some(ms(300)), + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -446,7 +498,12 @@ mod tests { .await; let bytes = client - .get_block(&make_path(), None, RetryPolicy::NonDeterministic) + .get_block( + Default::default(), + &make_path(), + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); diff --git a/node/src/main.rs b/node/src/main.rs index 0c5744513bb..2e91a0ae4d0 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -211,7 +211,7 @@ async fn main_inner() { let logger_factory = LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone()); - let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) + let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &metrics_registry, &logger) .await .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index f79a0497477..22a001e99cf 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -58,7 +58,7 @@ pub async fn run( let logger_factory = LoggerFactory::new(logger.clone(), None, metrics_ctx.registry.clone()); // FIXME: Hard-coded IPFS config, take it from config file instead? - let ipfs_client = graph::ipfs::new_ipfs_client(&ipfs_url, &logger).await?; + let ipfs_client = graph::ipfs::new_ipfs_client(&ipfs_url, &metrics_registry, &logger).await?; let ipfs_service = ipfs_service( ipfs_client.cheap_clone(), diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 461d4a08256..0b1bc9dbc94 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -65,7 +65,9 @@ fn mock_host_exports( Arc::new(templates.iter().map(|t| t.into()).collect()), ); - let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &LOGGER).unwrap(); + let client = + IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), Default::default(), &LOGGER) + .unwrap(); HostExports::new( subgraph_id, diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 12099c55b7e..94004b82c48 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -13,6 +13,7 @@ use web3::types::H160; use graph::blockchain::BlockTime; use graph::blockchain::Blockchain; +use graph::components::link_resolver::LinkResolverContext; use graph::components::store::{EnsLookup, GetScope, LoadRelatedRequest}; use graph::components::subgraph::{ InstanceDSTemplate, PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing, @@ -479,7 +480,10 @@ impl HostExports { // Does not consume gas because this is not a part of the deterministic feature set. // Ideally this would first consume gas for fetching the file stats, and then again // for the bytes of the file. - graph::block_on(self.link_resolver.cat(logger, &Link { link })) + graph::block_on(self.link_resolver.cat( + LinkResolverContext::new(&self.subgraph_id, logger), + &Link { link }, + )) } pub(crate) fn ipfs_get_block( @@ -490,7 +494,10 @@ impl HostExports { // Does not consume gas because this is not a part of the deterministic feature set. // Ideally this would first consume gas for fetching the file stats, and then again // for the bytes of the file. - graph::block_on(self.link_resolver.get_block(logger, &Link { link })) + graph::block_on(self.link_resolver.get_block( + LinkResolverContext::new(&self.subgraph_id, logger), + &Link { link }, + )) } // Read the IPFS file `link`, split it into JSON objects, and invoke the @@ -501,7 +508,7 @@ impl HostExports { // of the callback must be `callback(JSONValue, Value)`, and the `userData` // parameter is passed to the callback without any changes pub(crate) fn ipfs_map( - link_resolver: &Arc, + &self, wasm_ctx: &WasmInstanceData, link: String, callback: &str, @@ -533,8 +540,10 @@ impl HostExports { let logger = ctx.logger.new(o!("ipfs_map" => link.clone())); let result = { - let mut stream: JsonValueStream = - graph::block_on(link_resolver.json_stream(&logger, &Link { link }))?; + let mut stream: JsonValueStream = graph::block_on(self.link_resolver.json_stream( + LinkResolverContext::new(&self.subgraph_id, &logger), + &Link { link }, + ))?; let mut v = Vec::new(); while let Some(sv) = graph::block_on(stream.next()) { let sv = sv?; diff --git a/runtime/wasm/src/module/context.rs b/runtime/wasm/src/module/context.rs index 03cbf244c23..15f765e2030 100644 --- a/runtime/wasm/src/module/context.rs +++ b/runtime/wasm/src/module/context.rs @@ -609,14 +609,9 @@ impl WasmInstanceContext<'_> { // Pause the timeout while running ipfs_map, and resume it when done. self.suspend_timeout(); let start_time = Instant::now(); - let output_states = HostExports::ipfs_map( - &self.as_ref().ctx.host_exports.link_resolver.cheap_clone(), - self.as_ref(), - link.clone(), - &callback, - user_data, - flags, - )?; + let host_exports = self.as_ref().ctx.host_exports.cheap_clone(); + let output_states = + host_exports.ipfs_map(self.as_ref(), link.clone(), &callback, user_data, flags)?; self.start_timeout(); debug!( diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 61a273e353a..8b72671e26e 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -8,6 +8,7 @@ use web3::types::Address; use git_testament::{git_testament, CommitKind}; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; +use graph::components::link_resolver::LinkResolverContext; use graph::components::store::{BlockPtrForNumber, BlockStore, QueryPermit, Store}; use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; @@ -492,7 +493,10 @@ impl IndexNodeResolver { let raw_yaml: serde_yaml::Mapping = { let file_bytes = self .link_resolver - .cat(&self.logger, &deployment_hash.to_ipfs_link()) + .cat( + LinkResolverContext::new(deployment_hash, &self.logger), + &deployment_hash.to_ipfs_link(), + ) .await .map_err(SubgraphManifestResolveError::ResolveError)?; diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index f025be2e626..8cca2f3fa8b 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -19,13 +19,13 @@ use graph::entity; use graph::env::ENV_VARS; use graph::prelude::web3::types::H256; use graph::prelude::{ - anyhow, async_trait, serde_yaml, tokio, BigDecimal, BigInt, DeploymentHash, Link, Logger, + anyhow, async_trait, serde_yaml, tokio, BigDecimal, BigInt, DeploymentHash, Link, SubgraphManifest, SubgraphManifestResolveError, SubgraphManifestValidationError, SubgraphStore, UnvalidatedSubgraphManifest, }; use graph::{ blockchain::NodeCapabilities as _, - components::link_resolver::{JsonValueStream, LinkResolver as LinkResolverTrait}, + components::link_resolver::{JsonValueStream, LinkResolver, LinkResolverContext}, data::subgraph::SubgraphFeature, }; @@ -82,29 +82,33 @@ impl TextResolver { } #[async_trait] -impl LinkResolverTrait for TextResolver { - fn with_timeout(&self, _timeout: Duration) -> Box { +impl LinkResolver for TextResolver { + fn with_timeout(&self, _timeout: Duration) -> Box { Box::new(self.clone()) } - fn with_retries(&self) -> Box { + fn with_retries(&self) -> Box { Box::new(self.clone()) } - async fn cat(&self, _logger: &Logger, link: &Link) -> Result, anyhow::Error> { + async fn cat(&self, _ctx: LinkResolverContext, link: &Link) -> Result, anyhow::Error> { self.texts .get(&link.link) .ok_or(anyhow!("No text for {}", &link.link)) .map(Clone::clone) } - async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, anyhow::Error> { + async fn get_block( + &self, + _ctx: LinkResolverContext, + _link: &Link, + ) -> Result, anyhow::Error> { unimplemented!() } async fn json_stream( &self, - _logger: &Logger, + _ctx: LinkResolverContext, _link: &Link, ) -> Result { unimplemented!() @@ -127,7 +131,7 @@ async fn try_resolve_manifest( resolver.add("/ipfs/QmSourceSchema", &SOURCE_SUBGRAPH_SCHEMA); resolver.add(FILE_CID, &FILE); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(text)?; Ok(SubgraphManifest::resolve_from_raw(id, raw, &resolver, &LOGGER, max_spec_version).await?) @@ -149,7 +153,7 @@ async fn resolve_unvalidated(text: &str) -> UnvalidatedSubgraphManifest { resolver.add(id.as_str(), &text); resolver.add("/ipfs/Qmschema", &GQL_SCHEMA); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(text).unwrap(); UnvalidatedSubgraphManifest::resolve(id, raw, &resolver, &LOGGER, SPEC_VERSION_0_0_4.clone()) @@ -221,7 +225,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -264,7 +268,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -300,7 +304,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -1298,7 +1302,7 @@ schema: resolver.add("/ipfs/Qmabi", &ABI); resolver.add("/ipfs/Qmschema", &GQL_SCHEMA_FULLTEXT); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1350,7 +1354,7 @@ schema: resolver.add("/ipfs/Qmabi", &ABI); resolver.add("/ipfs/Qmschema", &GQL_SCHEMA_FULLTEXT); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1426,7 +1430,7 @@ dataSources: resolver.add("/ipfs/Qmschema", &GQL_SCHEMA); resolver.add("/ipfs/Qmmapping", &MAPPING_WITH_IPFS_FUNC_WASM); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1504,7 +1508,7 @@ dataSources: resolver.add("/ipfs/Qmschema", &GQL_SCHEMA); resolver.add("/ipfs/Qmmapping", &MAPPING_WITH_IPFS_FUNC_WASM); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1613,7 +1617,7 @@ dataSources: resolver.add("/ipfs/Qmschema", &GQL_SCHEMA); resolver.add("/ipfs/Qmmapping", &MAPPING_WITH_IPFS_FUNC_WASM); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1651,7 +1655,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -1686,7 +1690,7 @@ dataSources: resolver.add("/ipfs/QmSource", &SOURCE_SUBGRAPH_MANIFEST); resolver.add("/ipfs/QmSourceSchema", &SOURCE_SUBGRAPH_SCHEMA); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1721,7 +1725,7 @@ dataSources: entities: - User network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -1780,7 +1784,7 @@ dataSources: entities: - User network: mainnet - source: + source: address: 'QmNestedSource' startBlock: 9562480 mapping: @@ -1834,7 +1838,7 @@ specVersion: 1.3.0 resolver.add("/ipfs/QmSource", &SOURCE_SUBGRAPH_MANIFEST); resolver.add("/ipfs/QmSourceSchema", &SOURCE_SUBGRAPH_SCHEMA); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(yaml).unwrap(); test_store::run_test_sequentially(|_| async move { @@ -1873,7 +1877,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -1909,7 +1913,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index cc99e406c1c..bc3add6ca82 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -17,7 +17,9 @@ use graph::blockchain::{ TriggerFilterWrapper, TriggersAdapter, TriggersAdapterSelector, }; use graph::cheap_clone::CheapClone; -use graph::components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit}; +use graph::components::link_resolver::{ + ArweaveClient, ArweaveResolver, FileSizeLimit, LinkResolverContext, +}; use graph::components::metrics::MetricsRegistry; use graph::components::network_provider::ChainName; use graph::components::store::{DeploymentLocator, EthereumCallCache, SourceableStore}; @@ -33,7 +35,7 @@ use graph::futures03::{Stream, StreamExt}; use graph::http_body_util::Full; use graph::hyper::body::Bytes; use graph::hyper::Request; -use graph::ipfs::IpfsClient; +use graph::ipfs::{IpfsClient, IpfsMetrics}; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::serde_json::{self, json}; use graph::prelude::{ @@ -265,7 +267,10 @@ impl TestContext { // Stolen from the IPFS provider, there's prolly a nicer way to re-use it let file_bytes = self .link_resolver - .cat(&logger, &deployment.hash.to_ipfs_link()) + .cat( + LinkResolverContext::new(&deployment.hash, &logger), + &deployment.hash.to_ipfs_link(), + ) .await .unwrap(); @@ -475,6 +480,7 @@ pub async fn setup( let ipfs_client: Arc = Arc::new( graph::ipfs::IpfsRpcClient::new_unchecked( graph::ipfs::ServerAddress::local_rpc_api(), + IpfsMetrics::new(&mock_registry), &logger, ) .unwrap(), diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 261c886dfea..77e1d14e496 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -1295,9 +1295,13 @@ async fn build_subgraph_with_yarn_cmd_and_arg( arg: Option<&str>, ) -> DeploymentHash { // Test that IPFS is up. - ipfs::IpfsRpcClient::new(ipfs::ServerAddress::local_rpc_api(), &graph::log::discard()) - .await - .expect("Could not connect to IPFS, make sure it's running at port 5001"); + ipfs::IpfsRpcClient::new( + ipfs::ServerAddress::local_rpc_api(), + Default::default(), + &graph::log::discard(), + ) + .await + .expect("Could not connect to IPFS, make sure it's running at port 5001"); // Make sure dependencies are present.