Skip to content

Add IPFS usage metrics / extend logging / extend supported content path formats #6058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use anyhow::{anyhow, Error};
use anyhow::{ensure, Context};
use graph::blockchain::{BlockPtr, TriggerWithHandler};
use graph::components::link_resolver::LinkResolverContext;
use graph::components::metrics::subgraph::SubgraphInstanceMetrics;
use graph::components::store::{EthereumCallCache, StoredDynamicDataSource};
use graph::components::subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError};
use graph::components::trigger_processor::RunnableTriggers;
use graph::data::subgraph::DeploymentHash;
use graph::data_source::common::{
CallDecls, DeclaredCall, FindMappingABI, MappingABI, UnresolvedMappingABI,
};
Expand Down Expand Up @@ -1197,6 +1199,7 @@ pub struct UnresolvedDataSource {
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
manifest_idx: u32,
Expand All @@ -1210,7 +1213,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
context,
} = self;

let mapping = mapping.resolve(resolver, logger).await.with_context(|| {
let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| {
format!(
"failed to resolve data source {} with source_address {:?} and source_start_block {}",
name, source.address, source.start_block
Expand Down Expand Up @@ -1244,6 +1247,7 @@ pub struct DataSourceTemplate {
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
manifest_idx: u32,
Expand All @@ -1257,7 +1261,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
} = self;

let mapping = mapping
.resolve(resolver, logger)
.resolve(deployment_hash, resolver, logger)
.await
.with_context(|| format!("failed to resolve data source template {}", name))?;

Expand Down Expand Up @@ -1355,6 +1359,7 @@ impl FindMappingABI for Mapping {
impl UnresolvedMapping {
pub async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
) -> Result<Mapping, anyhow::Error> {
Expand All @@ -1377,13 +1382,17 @@ impl UnresolvedMapping {
abis.into_iter()
.map(|unresolved_abi| async {
Result::<_, Error>::Ok(Arc::new(
unresolved_abi.resolve(resolver, logger).await?,
unresolved_abi
.resolve(deployment_hash, resolver, logger)
.await?,
))
})
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>(),
async {
let module_bytes = resolver.cat(logger, &link).await?;
let module_bytes = resolver
.cat(LinkResolverContext::new(deployment_hash, logger), &link)
.await?;
Ok(Arc::new(module_bytes))
},
)
Expand Down
12 changes: 8 additions & 4 deletions chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use graph::anyhow::Context;
use graph::blockchain::{Block, TriggerWithHandler};
use graph::components::link_resolver::LinkResolverContext;
use graph::components::store::StoredDynamicDataSource;
use graph::components::subgraph::InstanceDSTemplateInfo;
use graph::data::subgraph::DataSourceContext;
use graph::data::subgraph::{DataSourceContext, DeploymentHash};
use graph::prelude::SubgraphManifestValidationError;
use graph::{
anyhow::{anyhow, Error},
Expand Down Expand Up @@ -330,6 +331,7 @@ pub struct UnresolvedDataSource {
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
Expand All @@ -343,7 +345,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
context,
} = self;

let mapping = mapping.resolve(resolver, logger).await.with_context(|| {
let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| {
format!(
"failed to resolve data source {} with source_account {:?} and source_start_block {}",
name, source.account, source.start_block
Expand All @@ -369,6 +371,7 @@ pub type DataSourceTemplate = BaseDataSourceTemplate<Mapping>;
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
Expand All @@ -381,7 +384,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
} = self;

let mapping = mapping
.resolve(resolver, logger)
.resolve(deployment_hash, resolver, logger)
.await
.with_context(|| format!("failed to resolve data source template {}", name))?;

Expand Down Expand Up @@ -432,6 +435,7 @@ pub struct UnresolvedMapping {
impl UnresolvedMapping {
pub async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
) -> Result<Mapping, Error> {
Expand All @@ -447,7 +451,7 @@ impl UnresolvedMapping {
let api_version = semver::Version::parse(&api_version)?;

let module_bytes = resolver
.cat(logger, &link)
.cat(LinkResolverContext::new(deployment_hash, logger), &link)
.await
.with_context(|| format!("failed to resolve mapping {}", link.link))?;

Expand Down
39 changes: 30 additions & 9 deletions chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use anyhow::{anyhow, Context, Error};
use graph::{
blockchain,
cheap_clone::CheapClone,
components::{link_resolver::LinkResolver, subgraph::InstanceDSTemplateInfo},
components::{
link_resolver::{LinkResolver, LinkResolverContext},
subgraph::InstanceDSTemplateInfo,
},
data::subgraph::DeploymentHash,
prelude::{async_trait, BlockNumber, Link},
slog::Logger,
};
Expand Down Expand Up @@ -184,11 +188,17 @@ pub struct UnresolvedMapping {
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSource, Error> {
let content = resolver.cat(logger, &self.source.package.file).await?;
let content = resolver
.cat(
LinkResolverContext::new(deployment_hash, logger),
&self.source.package.file,
)
.await?;

let mut package = graph::substreams::Package::decode(content.as_ref())?;

Expand Down Expand Up @@ -234,7 +244,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
let handler = match (self.mapping.handler, self.mapping.file) {
(Some(handler), Some(file)) => {
let module_bytes = resolver
.cat(logger, &file)
.cat(LinkResolverContext::new(deployment_hash, logger), &file)
.await
.with_context(|| format!("failed to resolve mapping {}", file.link))?;

Expand Down Expand Up @@ -314,6 +324,7 @@ impl blockchain::DataSourceTemplate<Chain> for NoopDataSourceTemplate {
impl blockchain::UnresolvedDataSourceTemplate<Chain> for NoopDataSourceTemplate {
async fn resolve(
self,
_deployment_hash: &DeploymentHash,
_resolver: &Arc<dyn LinkResolver>,
_logger: &Logger,
_manifest_idx: u32,
Expand All @@ -329,7 +340,7 @@ mod test {
use anyhow::Error;
use graph::{
blockchain::{DataSource as _, UnresolvedDataSource as _},
components::link_resolver::LinkResolver,
components::link_resolver::{LinkResolver, LinkResolverContext},
data::subgraph::LATEST_VERSION,
prelude::{async_trait, serde_yaml, JsonValueStream, Link},
slog::{o, Discard, Logger},
Expand Down Expand Up @@ -433,7 +444,10 @@ mod test {
let ds: UnresolvedDataSource = serde_yaml::from_str(TEMPLATE_DATA_SOURCE).unwrap();
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
let logger = Logger::root(Discard, o!());
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap();
let ds: DataSource = ds
.resolve(&Default::default(), &link_resolver, &logger, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda dislike using Default::default because from just reading the code, I have no idea what gets passed there. Could you write that as LinkResolverContext::default() ?

.await
.unwrap();
let expected = DataSource {
kind: SUBSTREAMS_KIND.into(),
network: Some("mainnet".into()),
Expand Down Expand Up @@ -470,7 +484,10 @@ mod test {
serde_yaml::from_str(TEMPLATE_DATA_SOURCE_WITH_PARAMS).unwrap();
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
let logger = Logger::root(Discard, o!());
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap();
let ds: DataSource = ds
.resolve(&Default::default(), &link_resolver, &logger, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about Default::default() here

.await
.unwrap();
let expected = DataSource {
kind: SUBSTREAMS_KIND.into(),
network: Some("mainnet".into()),
Expand Down Expand Up @@ -705,17 +722,21 @@ mod test {
unimplemented!()
}

async fn cat(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
async fn cat(&self, _ctx: LinkResolverContext, _link: &Link) -> Result<Vec<u8>, Error> {
Ok(gen_package().encode_to_vec())
}

async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
async fn get_block(
&self,
_ctx: LinkResolverContext,
_link: &Link,
) -> Result<Vec<u8>, Error> {
unimplemented!()
}

async fn json_stream(
&self,
_logger: &Logger,
_ctx: LinkResolverContext,
_link: &Link,
) -> Result<JsonValueStream, Error> {
unimplemented!()
Expand Down
47 changes: 36 additions & 11 deletions core/src/polling_monitor/ipfs_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ use anyhow::anyhow;
use anyhow::Error;
use bytes::Bytes;
use graph::futures03::future::BoxFuture;
use graph::ipfs::ContentPath;
use graph::ipfs::IpfsClient;
use graph::ipfs::RetryPolicy;
use graph::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy};
use graph::{derive::CheapClone, prelude::CheapClone};
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};

pub type IpfsService = Buffer<ContentPath, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
pub type IpfsService = Buffer<IpfsRequest, BoxFuture<'static, Result<Option<Bytes>, Error>>>;

#[derive(Clone, Debug)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a little hung up on why this needs to be 'Clone'; I think it ultimately boils down to the fact that when we make the request, we want it returned back and we do that in ReturnRequest::call; it seems we could avoid cloning if we changed what IpfsServiceInner::call_inner returns and have it always include the request in its return value, basically moving ownership of the request through call_inner. In this case, I am also not sure how important it is to save on cloning, though we clone on every request.

In any event, this would definitely be something for a different PR.

pub struct IpfsRequest {
pub ctx: IpfsContext,
pub path: ContentPath,
}

pub fn ipfs_service(
client: Arc<dyn IpfsClient>,
Expand Down Expand Up @@ -43,7 +47,10 @@ struct IpfsServiceInner {
}

impl IpfsServiceInner {
async fn call_inner(self, path: ContentPath) -> Result<Option<Bytes>, Error> {
async fn call_inner(
self,
IpfsRequest { ctx, path }: IpfsRequest,
) -> Result<Option<Bytes>, Error> {
let multihash = path.cid().hash().code();
if !SAFE_MULTIHASHES.contains(&multihash) {
return Err(anyhow!("CID multihash {} is not allowed", multihash));
Expand All @@ -52,6 +59,7 @@ impl IpfsServiceInner {
let res = self
.client
.cat(
ctx,
&path,
self.max_file_size,
Some(self.timeout),
Expand Down Expand Up @@ -126,14 +134,24 @@ mod test {

let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash;

let client =
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
.unwrap();
let client = IpfsRpcClient::new_unchecked(
ServerAddress::local_rpc_api(),
Default::default(),
&graph::log::discard(),
)
.unwrap();

let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10);

let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
let content = svc.oneshot(path).await.unwrap().unwrap();
let content = svc
.oneshot(IpfsRequest {
ctx: Default::default(),
path,
})
.await
.unwrap()
.unwrap();

assert_eq!(content.to_vec(), random_bytes);
}
Expand All @@ -157,7 +175,8 @@ mod test {
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";

let server = MockServer::start().await;
let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap();
let ipfs_client =
IpfsRpcClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap();
let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1);
let path = ContentPath::new(CID).unwrap();

Expand All @@ -179,6 +198,12 @@ mod test {
.await;

// This means that we never reached the successful response.
ipfs_service.oneshot(path).await.unwrap_err();
ipfs_service
.oneshot(IpfsRequest {
ctx: Default::default(),
path,
})
.await
.unwrap_err();
}
}
Loading