-
Notifications
You must be signed in to change notification settings - Fork 1k
Add IPFS usage metrics / extend logging / extend supported content path formats #6058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,11 @@ use anyhow::{anyhow, Context, Error}; | |
use graph::{ | ||
blockchain, | ||
cheap_clone::CheapClone, | ||
components::{link_resolver::LinkResolver, subgraph::InstanceDSTemplateInfo}, | ||
components::{ | ||
link_resolver::{LinkResolver, LinkResolverContext}, | ||
subgraph::InstanceDSTemplateInfo, | ||
}, | ||
data::subgraph::DeploymentHash, | ||
prelude::{async_trait, BlockNumber, Link}, | ||
slog::Logger, | ||
}; | ||
|
@@ -184,11 +188,17 @@ pub struct UnresolvedMapping { | |
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource { | ||
async fn resolve( | ||
self, | ||
deployment_hash: &DeploymentHash, | ||
resolver: &Arc<dyn LinkResolver>, | ||
logger: &Logger, | ||
_manifest_idx: u32, | ||
) -> Result<DataSource, Error> { | ||
let content = resolver.cat(logger, &self.source.package.file).await?; | ||
let content = resolver | ||
.cat( | ||
LinkResolverContext::new(deployment_hash, logger), | ||
&self.source.package.file, | ||
) | ||
.await?; | ||
|
||
let mut package = graph::substreams::Package::decode(content.as_ref())?; | ||
|
||
|
@@ -234,7 +244,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource { | |
let handler = match (self.mapping.handler, self.mapping.file) { | ||
(Some(handler), Some(file)) => { | ||
let module_bytes = resolver | ||
.cat(logger, &file) | ||
.cat(LinkResolverContext::new(deployment_hash, logger), &file) | ||
.await | ||
.with_context(|| format!("failed to resolve mapping {}", file.link))?; | ||
|
||
|
@@ -314,6 +324,7 @@ impl blockchain::DataSourceTemplate<Chain> for NoopDataSourceTemplate { | |
impl blockchain::UnresolvedDataSourceTemplate<Chain> for NoopDataSourceTemplate { | ||
async fn resolve( | ||
self, | ||
_deployment_hash: &DeploymentHash, | ||
_resolver: &Arc<dyn LinkResolver>, | ||
_logger: &Logger, | ||
_manifest_idx: u32, | ||
|
@@ -329,7 +340,7 @@ mod test { | |
use anyhow::Error; | ||
use graph::{ | ||
blockchain::{DataSource as _, UnresolvedDataSource as _}, | ||
components::link_resolver::LinkResolver, | ||
components::link_resolver::{LinkResolver, LinkResolverContext}, | ||
data::subgraph::LATEST_VERSION, | ||
prelude::{async_trait, serde_yaml, JsonValueStream, Link}, | ||
slog::{o, Discard, Logger}, | ||
|
@@ -433,7 +444,10 @@ mod test { | |
let ds: UnresolvedDataSource = serde_yaml::from_str(TEMPLATE_DATA_SOURCE).unwrap(); | ||
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {}); | ||
let logger = Logger::root(Discard, o!()); | ||
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap(); | ||
let ds: DataSource = ds | ||
.resolve(&Default::default(), &link_resolver, &logger, 0) | ||
.await | ||
.unwrap(); | ||
let expected = DataSource { | ||
kind: SUBSTREAMS_KIND.into(), | ||
network: Some("mainnet".into()), | ||
|
@@ -470,7 +484,10 @@ mod test { | |
serde_yaml::from_str(TEMPLATE_DATA_SOURCE_WITH_PARAMS).unwrap(); | ||
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {}); | ||
let logger = Logger::root(Discard, o!()); | ||
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap(); | ||
let ds: DataSource = ds | ||
.resolve(&Default::default(), &link_resolver, &logger, 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about |
||
.await | ||
.unwrap(); | ||
let expected = DataSource { | ||
kind: SUBSTREAMS_KIND.into(), | ||
network: Some("mainnet".into()), | ||
|
@@ -705,17 +722,21 @@ mod test { | |
unimplemented!() | ||
} | ||
|
||
async fn cat(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> { | ||
async fn cat(&self, _ctx: LinkResolverContext, _link: &Link) -> Result<Vec<u8>, Error> { | ||
Ok(gen_package().encode_to_vec()) | ||
} | ||
|
||
async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> { | ||
async fn get_block( | ||
&self, | ||
_ctx: LinkResolverContext, | ||
_link: &Link, | ||
) -> Result<Vec<u8>, Error> { | ||
unimplemented!() | ||
} | ||
|
||
async fn json_stream( | ||
&self, | ||
_logger: &Logger, | ||
_ctx: LinkResolverContext, | ||
_link: &Link, | ||
) -> Result<JsonValueStream, Error> { | ||
unimplemented!() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,17 @@ use anyhow::anyhow; | |
use anyhow::Error; | ||
use bytes::Bytes; | ||
use graph::futures03::future::BoxFuture; | ||
use graph::ipfs::ContentPath; | ||
use graph::ipfs::IpfsClient; | ||
use graph::ipfs::RetryPolicy; | ||
use graph::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy}; | ||
use graph::{derive::CheapClone, prelude::CheapClone}; | ||
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt}; | ||
|
||
pub type IpfsService = Buffer<ContentPath, BoxFuture<'static, Result<Option<Bytes>, Error>>>; | ||
pub type IpfsService = Buffer<IpfsRequest, BoxFuture<'static, Result<Option<Bytes>, Error>>>; | ||
|
||
#[derive(Clone, Debug)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got a little hung up on why this needs to be 'Clone'; I think it ultimately boils down to the fact that when we make the request, we want it returned back and we do that in In any event, this would definitely be something for a different PR. |
||
pub struct IpfsRequest { | ||
pub ctx: IpfsContext, | ||
pub path: ContentPath, | ||
} | ||
|
||
pub fn ipfs_service( | ||
client: Arc<dyn IpfsClient>, | ||
|
@@ -43,7 +47,10 @@ struct IpfsServiceInner { | |
} | ||
|
||
impl IpfsServiceInner { | ||
async fn call_inner(self, path: ContentPath) -> Result<Option<Bytes>, Error> { | ||
async fn call_inner( | ||
self, | ||
IpfsRequest { ctx, path }: IpfsRequest, | ||
) -> Result<Option<Bytes>, Error> { | ||
let multihash = path.cid().hash().code(); | ||
if !SAFE_MULTIHASHES.contains(&multihash) { | ||
return Err(anyhow!("CID multihash {} is not allowed", multihash)); | ||
|
@@ -52,6 +59,7 @@ impl IpfsServiceInner { | |
let res = self | ||
.client | ||
.cat( | ||
ctx, | ||
&path, | ||
self.max_file_size, | ||
Some(self.timeout), | ||
|
@@ -126,14 +134,24 @@ mod test { | |
|
||
let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash; | ||
|
||
let client = | ||
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard()) | ||
.unwrap(); | ||
let client = IpfsRpcClient::new_unchecked( | ||
ServerAddress::local_rpc_api(), | ||
Default::default(), | ||
&graph::log::discard(), | ||
) | ||
.unwrap(); | ||
|
||
let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10); | ||
|
||
let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap(); | ||
let content = svc.oneshot(path).await.unwrap().unwrap(); | ||
let content = svc | ||
.oneshot(IpfsRequest { | ||
ctx: Default::default(), | ||
path, | ||
}) | ||
.await | ||
.unwrap() | ||
.unwrap(); | ||
|
||
assert_eq!(content.to_vec(), random_bytes); | ||
} | ||
|
@@ -157,7 +175,8 @@ mod test { | |
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; | ||
|
||
let server = MockServer::start().await; | ||
let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); | ||
let ipfs_client = | ||
IpfsRpcClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap(); | ||
let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1); | ||
let path = ContentPath::new(CID).unwrap(); | ||
|
||
|
@@ -179,6 +198,12 @@ mod test { | |
.await; | ||
|
||
// This means that we never reached the successful response. | ||
ipfs_service.oneshot(path).await.unwrap_err(); | ||
ipfs_service | ||
.oneshot(IpfsRequest { | ||
ctx: Default::default(), | ||
path, | ||
}) | ||
.await | ||
.unwrap_err(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kinda dislike using
Default::default
because from just reading the code, I have no idea what gets passed there. Could you write that asLinkResolverContext::default()
?