Skip to content

Commit bae0c11

Browse files
MoonBoi9001claude
andcommitted
feat(index-node): add blockForPoi query for dispute investigation
Adds a new index-node GraphQL query that searches a block range to find which block produced a given POI. This supports dispute investigation where an indexer may have submitted a POI for block N that actually corresponds to block X < N. The resolver fetches all poi2$ digest entries in a single DB call, batch fetches block hashes in 50k chunks, and runs ProofOfIndexingFinisher for each block until a match is found. Also adds network_for_deployment to the StatusStore trait and its implementation chain (SubgraphStore, Store) as a supporting method for resolving the chain store from a deployment hash. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3056290 commit bae0c11

File tree

8 files changed

+400
-3
lines changed

8 files changed

+400
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

graph/src/components/store/traits.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr};
1010
use crate::components::metrics::stopwatch::StopwatchMetrics;
1111
use crate::components::network_provider::ChainName;
1212
use crate::components::server::index_node::VersionInfo;
13+
use crate::components::subgraph::ProofOfIndexingVersion;
1314
use crate::components::subgraph::SubgraphVersionSwitchingMode;
1415
use crate::components::transaction_receipt;
1516
use crate::components::versions::ApiVersion;
@@ -741,6 +742,28 @@ pub trait QueryStore: Send + Sync {
741742
fn deployment_id(&self) -> DeploymentId;
742743
}
743744

745+
/// A single POI digest entry from the `poi2$` table, representing the
746+
/// accumulated digest for a causality region over a block range.
747+
#[derive(Clone, Debug)]
748+
pub struct PoiDigestEntry {
749+
/// The causality region identifier (the entity id in poi2$)
750+
pub id: Id,
751+
/// The accumulated digest bytes
752+
pub digest: Vec<u8>,
753+
/// Start of the block range (inclusive)
754+
pub start_block: BlockNumber,
755+
/// End of the block range (exclusive, i32::MAX if open-ended)
756+
pub end_block: BlockNumber,
757+
}
758+
759+
/// The full POI digest history for a deployment, containing all digest
760+
/// entries and the POI version needed to compute proofs.
761+
#[derive(Clone, Debug)]
762+
pub struct PoiDigestHistory {
763+
pub entries: Vec<PoiDigestEntry>,
764+
pub poi_version: ProofOfIndexingVersion,
765+
}
766+
744767
/// A view of the store that can provide information about the indexing status
745768
/// of any subgraph and any deployment
746769
#[async_trait]
@@ -790,6 +813,19 @@ pub trait StatusStore: Send + Sync + 'static {
790813
block_number: BlockNumber,
791814
fetch_block_ptr: &dyn BlockPtrForNumber,
792815
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;
816+
817+
/// Retrieve the full POI digest history for a deployment within a block
818+
/// range. Returns all `poi2$` entries whose block ranges overlap the
819+
/// given range, along with the deployment's `ProofOfIndexingVersion`.
820+
/// Returns `None` if the deployment doesn't exist or has no POI data.
821+
async fn get_poi_digest_history(
822+
&self,
823+
subgraph_id: &DeploymentHash,
824+
block_range: std::ops::Range<BlockNumber>,
825+
) -> Result<Option<PoiDigestHistory>, StoreError>;
826+
827+
/// Get the network for a deployment
828+
async fn network_for_deployment(&self, id: &DeploymentHash) -> Result<String, StoreError>;
793829
}
794830

795831
#[async_trait]

server/index-node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ graph-graphql = { path = "../../graphql" }
1111
graph-chain-ethereum = { path = "../../chain/ethereum" }
1212
graph-chain-near = { path = "../../chain/near" }
1313
git-testament = "0.2.6"
14+
rayon = "1"

server/index-node/src/resolver.rs

Lines changed: 232 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use std::collections::BTreeMap;
1+
use std::collections::{BTreeMap, HashMap};
22

33
use async_trait::async_trait;
4+
use graph::components::subgraph::ProofOfIndexingFinisher;
45
use graph::data::query::Trace;
56
use graph::data::store::Id;
67
use graph::prelude::alloy::primitives::Address;
@@ -454,6 +455,235 @@ where
454455
Ok(r::Value::List(public_poi_results))
455456
}
456457

458+
async fn resolve_block_for_poi(
459+
&self,
460+
field: &a::Field,
461+
) -> Result<r::Value, QueryExecutionError> {
462+
const CHUNK_SIZE: i32 = 1_000_000;
463+
464+
let deployment_id = field
465+
.get_required::<DeploymentHash>("subgraph")
466+
.expect("Valid subgraph required");
467+
let target_poi_hash = field
468+
.get_required::<BlockHash>("targetPoi")
469+
.expect("Valid targetPoi required");
470+
let start_block = field
471+
.get_required::<BlockNumber>("startBlock")
472+
.expect("Valid startBlock required");
473+
let end_block = field
474+
.get_required::<BlockNumber>("endBlock")
475+
.expect("Valid endBlock required");
476+
477+
let indexer = field
478+
.get_optional::<Address>("indexer")
479+
.expect("Invalid indexer");
480+
481+
if end_block <= start_block {
482+
return Ok(r::Value::Null);
483+
}
484+
485+
let target_bytes: [u8; 32] = match target_poi_hash.as_slice().try_into() {
486+
Ok(bytes) => bytes,
487+
Err(_) => {
488+
error!(
489+
self.logger,
490+
"Invalid targetPoi: expected 32 bytes";
491+
"got_bytes" => target_poi_hash.as_slice().len()
492+
);
493+
return Ok(r::Value::Null);
494+
}
495+
};
496+
497+
// Resolve the network for this deployment
498+
let network = match self.store.network_for_deployment(&deployment_id).await {
499+
Ok(n) => n,
500+
Err(e) => {
501+
error!(
502+
self.logger,
503+
"Failed to resolve network for deployment";
504+
"subgraph" => &deployment_id,
505+
"error" => format!("{:?}", e)
506+
);
507+
return Ok(r::Value::Null);
508+
}
509+
};
510+
511+
// Fetch the full digest history for the block range
512+
let history = match self
513+
.store
514+
.get_poi_digest_history(&deployment_id, start_block..end_block)
515+
.await
516+
{
517+
Ok(Some(h)) => h,
518+
Ok(None) => return Ok(r::Value::Null),
519+
Err(e) => {
520+
error!(
521+
self.logger,
522+
"Failed to fetch POI digest history";
523+
"subgraph" => &deployment_id,
524+
"error" => format!("{:?}", e)
525+
);
526+
return Ok(r::Value::Null);
527+
}
528+
};
529+
530+
let poi_version = history.poi_version;
531+
532+
// Build a lookup structure: for each causality region id, a sorted
533+
// vec of (start_block, end_block, digest) for binary search.
534+
let mut region_entries: HashMap<Id, Vec<(BlockNumber, BlockNumber, Vec<u8>)>> =
535+
HashMap::new();
536+
for entry in history.entries {
537+
region_entries.entry(entry.id).or_default().push((
538+
entry.start_block,
539+
entry.end_block,
540+
entry.digest,
541+
));
542+
}
543+
for entries in region_entries.values_mut() {
544+
entries.sort_by_key(|(start, _, _)| *start);
545+
}
546+
547+
// Share across rayon threads
548+
let region_entries = Arc::new(region_entries);
549+
550+
let chain_store = match self.store.block_store().chain_store(&network).await {
551+
Some(cs) => cs,
552+
None => {
553+
error!(
554+
self.logger,
555+
"Chain store not found for network";
556+
"network" => &network
557+
);
558+
return Ok(r::Value::Null);
559+
}
560+
};
561+
562+
// Search backwards from end_block (the match is likely near the top).
563+
// Pipeline: fetch the next chunk while computing POIs for the current one.
564+
let mut chunk_end = end_block;
565+
let chunk_start = std::cmp::max(chunk_end - CHUNK_SIZE, start_block);
566+
567+
// Fetch first chunk
568+
let block_numbers: Vec<BlockNumber> = (chunk_start..chunk_end).collect();
569+
let mut current_ptrs = match chain_store
570+
.cheap_clone()
571+
.block_ptrs_by_numbers(block_numbers)
572+
.await
573+
{
574+
Ok(ptrs) => ptrs,
575+
Err(e) => {
576+
error!(
577+
self.logger,
578+
"Failed to fetch block hashes";
579+
"range" => format!("{}..{}", chunk_start, chunk_end),
580+
"error" => format!("{:?}", e)
581+
);
582+
return Ok(r::Value::Null);
583+
}
584+
};
585+
chunk_end = chunk_start;
586+
587+
loop {
588+
// Start prefetching the next chunk while we process the current one
589+
let next_chunk_end = chunk_end;
590+
let next_chunk_start = std::cmp::max(next_chunk_end - CHUNK_SIZE, start_block);
591+
let prefetch = if next_chunk_start < next_chunk_end {
592+
let cs = chain_store.cheap_clone();
593+
let numbers: Vec<BlockNumber> = (next_chunk_start..next_chunk_end).collect();
594+
Some(tokio::spawn(async move {
595+
cs.block_ptrs_by_numbers(numbers).await
596+
}))
597+
} else {
598+
None
599+
};
600+
601+
// Collect blocks with unambiguous hashes for parallel search
602+
let blocks_to_check: Vec<(BlockNumber, BlockHash)> = current_ptrs
603+
.iter()
604+
.filter_map(|(num, ptrs)| {
605+
if ptrs.len() == 1 {
606+
Some((*num, ptrs[0].hash.clone()))
607+
} else {
608+
None
609+
}
610+
})
611+
.collect();
612+
613+
// Parallel POI computation across all cores via rayon
614+
let re = region_entries.clone();
615+
let did = deployment_id.clone();
616+
let result = graph::spawn_blocking_allow_panic(move || {
617+
use rayon::prelude::*;
618+
blocks_to_check
619+
.par_iter()
620+
.find_map_any(|(block_num, block_hash)| {
621+
let block_ptr = BlockPtr::new(block_hash.clone(), *block_num);
622+
let mut finisher =
623+
ProofOfIndexingFinisher::new(&block_ptr, &did, &indexer, poi_version);
624+
625+
for (region_id, entries) in re.as_ref() {
626+
let idx = entries.partition_point(|(start, _, _)| *start <= *block_num);
627+
if idx == 0 {
628+
continue;
629+
}
630+
let (start, end, ref digest) = entries[idx - 1];
631+
if *block_num >= start && *block_num < end {
632+
finisher.add_causality_region(region_id, digest);
633+
}
634+
}
635+
636+
let computed = finisher.finish();
637+
if computed == target_bytes {
638+
Some((*block_num, block_hash.clone(), computed))
639+
} else {
640+
None
641+
}
642+
})
643+
})
644+
.await
645+
.map_err(|e| QueryExecutionError::Panic(e.to_string()))?;
646+
647+
if let Some((block_num, block_hash, computed_poi)) = result {
648+
// Found it - abort any in-flight prefetch
649+
if let Some(handle) = prefetch {
650+
handle.abort();
651+
}
652+
return Ok(object! {
653+
__typename: "PoiSearchResult",
654+
deployment: deployment_id.to_string(),
655+
block: object! {
656+
hash: block_hash.hash_hex(),
657+
number: block_num,
658+
},
659+
proofOfIndexing: format!("0x{}", hex::encode(computed_poi)),
660+
});
661+
}
662+
663+
// Move to the next chunk
664+
match prefetch {
665+
Some(handle) => {
666+
current_ptrs = handle
667+
.await
668+
.map_err(|e| QueryExecutionError::Panic(e.to_string()))?
669+
.map_err(|e| {
670+
error!(
671+
self.logger,
672+
"Failed to fetch block hashes";
673+
"range" => format!("{}..{}", next_chunk_start, next_chunk_end),
674+
"error" => format!("{:?}", e)
675+
);
676+
QueryExecutionError::StoreError(e.into())
677+
})?;
678+
chunk_end = next_chunk_start;
679+
}
680+
None => break,
681+
}
682+
}
683+
684+
Ok(r::Value::Null)
685+
}
686+
457687
async fn resolve_indexing_status_for_version(
458688
&self,
459689
field: &a::Field,
@@ -858,6 +1088,7 @@ where
8581088
// The top-level `subgraphVersions` field
8591089
(None, "apiVersions") => self.resolve_api_versions(field),
8601090
(None, "version") => self.version(),
1091+
(None, "blockForPoi") => self.resolve_block_for_poi(field).await,
8611092

8621093
// Resolve fields of `Object` values (e.g. the `latestBlock` field of `EthereumBlock`)
8631094
(value, _) => Ok(value.unwrap_or(r::Value::Null)),

server/index-node/src/schema.graphql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@ type Query {
4646
blockHash: Bytes!
4747
): [CachedEthereumCall!]
4848
apiVersions(subgraphId: String!): [ApiVersion!]!
49+
"""
50+
Find the block number that produced a given proof of indexing.
51+
Used for dispute investigation to verify which block an indexer
52+
actually synced to when they submitted a POI.
53+
"""
54+
blockForPoi(
55+
subgraph: String!
56+
targetPoi: Bytes!
57+
startBlock: Int!
58+
endBlock: Int!
59+
indexer: Bytes
60+
): PoiSearchResult
4961
}
5062

5163
type Version {
@@ -203,6 +215,12 @@ type ProofOfIndexingResult {
203215
proofOfIndexing: Bytes
204216
}
205217

218+
type PoiSearchResult {
219+
block: Block!
220+
deployment: String!
221+
proofOfIndexing: Bytes!
222+
}
223+
206224
type ApiVersion {
207225
"""
208226
Version number in SemVer format

0 commit comments

Comments
 (0)