Skip to content

Commit b3044af

Browse files
committed
feat(core, graph): add Amp subgraph metrics
1 parent 3a28f67 commit b3044af

File tree

12 files changed

+381
-26
lines changed

12 files changed

+381
-26
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,16 @@ substreams-entity-change = "2"
100100
substreams-near-core = "=0.10.2"
101101
rand = { version = "0.9.1", features = ["os_rng"] }
102102

103+
prometheus = "0.13.4"
104+
103105
# Dependencies related to Amp subgraphs
104106
ahash = "0.8.11"
105107
alloy = { version = "1.0.12", default-features = false, features = ["json-abi", "serde"] }
106108
arrow = { version = "=55.0.0" }
107109
arrow-flight = { version = "=55.0.0", features = ["flight-sql-experimental"] }
108110
futures = "0.3.31"
109111
half = "2.7.1"
112+
indoc = "2.0.7"
110113
lazy-regex = "3.4.1"
111114
parking_lot = "0.12.4"
112115
sqlparser-latest = { version = "0.57.0", package = "sqlparser", features = ["visitor"] }

core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ alloy.workspace = true
2424
arrow.workspace = true
2525
chrono.workspace = true
2626
futures.workspace = true
27+
indoc.workspace = true
2728
itertools.workspace = true
2829
parking_lot.workspace = true
30+
prometheus.workspace = true
2931
slog.workspace = true
3032
tokio-util.workspace = true
3133
tokio.workspace = true

core/src/amp_subgraph/metrics.rs

Lines changed: 230 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

3+
use alloy::primitives::BlockNumber;
34
use graph::{
45
cheap_clone::CheapClone,
56
components::{
67
metrics::{stopwatch::StopwatchMetrics, MetricsRegistry},
78
store::WritableStore,
89
},
9-
data::subgraph::DeploymentHash,
10+
prelude::DeploymentHash,
1011
};
12+
use indoc::indoc;
13+
use prometheus::{IntCounter, IntGauge};
1114
use slog::Logger;
1215

13-
/// Contains deployment specific metrics.
16+
/// Contains metrics specific to a deployment.
1417
pub(super) struct Metrics {
18+
pub(super) deployment_status: DeploymentStatus,
19+
pub(super) deployment_head: DeploymentHead,
20+
pub(super) deployment_target: DeploymentTarget,
21+
pub(super) deployment_synced: DeploymentSynced,
22+
pub(super) indexing_duration: IndexingDuration,
23+
pub(super) blocks_processed: BlocksProcessed,
1524
pub(super) stopwatch: StopwatchMetrics,
1625
}
1726

@@ -25,12 +34,227 @@ impl Metrics {
2534
) -> Self {
2635
let stopwatch = StopwatchMetrics::new(
2736
logger.cheap_clone(),
28-
deployment,
37+
deployment.cheap_clone(),
2938
"amp-process",
30-
metrics_registry,
39+
metrics_registry.cheap_clone(),
3140
store.shard().to_string(),
3241
);
3342

34-
Self { stopwatch }
43+
let const_labels = [("deployment", &deployment)];
44+
45+
Self {
46+
deployment_status: DeploymentStatus::new(&metrics_registry, const_labels.clone()),
47+
deployment_head: DeploymentHead::new(&metrics_registry, const_labels.clone()),
48+
deployment_target: DeploymentTarget::new(&metrics_registry, const_labels.clone()),
49+
deployment_synced: DeploymentSynced::new(&metrics_registry, const_labels.clone()),
50+
indexing_duration: IndexingDuration::new(&metrics_registry, const_labels.clone()),
51+
blocks_processed: BlocksProcessed::new(&metrics_registry, const_labels.clone()),
52+
stopwatch,
53+
}
54+
}
55+
}
56+
57+
/// Reports the current indexing status of a deployment.
58+
pub(super) struct DeploymentStatus(IntGauge);
59+
60+
impl DeploymentStatus {
61+
const STATUS_STARTING: i64 = 1;
62+
const STATUS_RUNNING: i64 = 2;
63+
const STATUS_STOPPED: i64 = 3;
64+
const STATUS_FAILED: i64 = 4;
65+
66+
fn new(
67+
metrics_registry: &MetricsRegistry,
68+
const_labels: impl IntoIterator<Item = (impl ToString, impl ToString)>,
69+
) -> Self {
70+
let int_gauge = metrics_registry
71+
.new_int_gauge(
72+
"amp_deployment_status",
73+
indoc!(
74+
"
75+
Indicates the current indexing status of a deployment.
76+
Possible values:
77+
1 - graph-node is preparing to start indexing;
78+
2 - deployment is being indexed;
79+
3 - indexing is stopped by request;
80+
4 - indexing failed;
81+
"
82+
),
83+
const_labels,
84+
)
85+
.expect("failed to register `amp_deployment_status` gauge");
86+
87+
Self(int_gauge)
88+
}
89+
90+
/// Records that the graph-node is preparing to start indexing.
91+
pub fn starting(&self) {
92+
self.0.set(Self::STATUS_STARTING);
93+
}
94+
95+
/// Records that the deployment is being indexed.
96+
pub fn running(&self) {
97+
self.0.set(Self::STATUS_RUNNING);
98+
}
99+
100+
/// Records that the indexing stopped by request.
101+
pub fn stopped(&self) {
102+
self.0.set(Self::STATUS_STOPPED);
103+
}
104+
105+
/// Records that the indexing failed.
106+
pub fn failed(&self) {
107+
self.0.set(Self::STATUS_FAILED);
108+
}
109+
}
110+
111+
/// Tracks the most recent block number processed by a deployment.
112+
pub(super) struct DeploymentHead(IntGauge);
113+
114+
impl DeploymentHead {
115+
fn new(
116+
metrics_registry: &MetricsRegistry,
117+
const_labels: impl IntoIterator<Item = (impl ToString, impl ToString)>,
118+
) -> Self {
119+
let int_gauge = metrics_registry
120+
.new_int_gauge(
121+
"amp_deployment_head",
122+
"Tracks the most recent block number processed by a deployment",
123+
const_labels,
124+
)
125+
.expect("failed to register `amp_deployment_head` gauge");
126+
127+
Self(int_gauge)
128+
}
129+
130+
/// Updates the most recent block number processed by this deployment.
131+
pub(super) fn update(&self, new_most_recent_block_number: BlockNumber) {
132+
self.0.set(
133+
i64::try_from(new_most_recent_block_number)
134+
.expect("new most recent block number does not fit into `i64`"),
135+
);
136+
}
137+
}
138+
139+
/// Tracks the target block number of a deployment.
140+
pub(super) struct DeploymentTarget(IntGauge);
141+
142+
impl DeploymentTarget {
143+
fn new(
144+
metrics_registry: &MetricsRegistry,
145+
const_labels: impl IntoIterator<Item = (impl ToString, impl ToString)>,
146+
) -> Self {
147+
let int_gauge = metrics_registry
148+
.new_int_gauge(
149+
"amp_deployment_target",
150+
"Tracks the target block number of a deployment",
151+
const_labels,
152+
)
153+
.expect("failed to register `amp_deployment_target` gauge");
154+
155+
Self(int_gauge)
156+
}
157+
158+
/// Updates the target block number of this deployment.
159+
pub(super) fn update(&self, new_target_block_number: BlockNumber) {
160+
self.0.set(
161+
i64::try_from(new_target_block_number)
162+
.expect("new target block number does not fit into `i64`"),
163+
);
164+
}
165+
}
166+
167+
/// Indicates whether a deployment has reached the chain head or the end block since it was deployed.
168+
pub(super) struct DeploymentSynced(IntGauge);
169+
170+
impl DeploymentSynced {
171+
const NOT_SYNCED: i64 = 0;
172+
const SYNCED: i64 = 1;
173+
174+
pub fn new(
175+
metrics_registry: &MetricsRegistry,
176+
const_labels: impl IntoIterator<Item = (impl ToString, impl ToString)>,
177+
) -> Self {
178+
let int_gauge = metrics_registry
179+
.new_int_gauge(
180+
"amp_deployment_synced",
181+
indoc!(
182+
"
183+
Indicates whether a deployment has reached the chain head or the end block since it was deployed.
184+
Possible values:
185+
0 - deployment is not synced;
186+
1 - deployment is synced;
187+
"
188+
),
189+
const_labels,
190+
)
191+
.expect("failed to register `amp_deployment_synced` gauge");
192+
193+
Self(int_gauge)
194+
}
195+
196+
/// Records the current sync status of this deployment.
197+
pub fn record(&self, synced: bool) {
198+
self.0.set(if synced {
199+
Self::SYNCED
200+
} else {
201+
Self::NOT_SYNCED
202+
});
203+
}
204+
}
205+
206+
/// Tracks the total duration in seconds of deployment indexing.
207+
#[derive(Clone)]
208+
pub(super) struct IndexingDuration(IntCounter);
209+
210+
impl IndexingDuration {
211+
fn new(
212+
metrics_registry: &MetricsRegistry,
213+
const_labels: impl IntoIterator<Item = (impl ToString, impl ToString)>,
214+
) -> Self {
215+
let int_counter = metrics_registry
216+
.new_int_counter(
217+
"amp_deployment_indexing_duration_seconds",
218+
"Tracks the total duration in seconds of deployment indexing",
219+
const_labels,
220+
)
221+
.expect("failed to register `amp_deployment_indexing_duration_seconds` counter");
222+
223+
Self(int_counter)
224+
}
225+
226+
/// Records a new indexing duration of this deployment.
227+
pub(super) fn record(&self, duration: Duration) {
228+
self.0.inc_by(duration.as_secs())
229+
}
230+
}
231+
232+
/// Tracks the total number of blocks processed by a deployment.
233+
pub(super) struct BlocksProcessed(IntCounter);
234+
235+
impl BlocksProcessed {
236+
fn new(
237+
metrics_registry: &MetricsRegistry,
238+
const_labels: impl IntoIterator<Item = (impl ToString, impl ToString)>,
239+
) -> Self {
240+
let int_counter = metrics_registry
241+
.new_int_counter(
242+
"amp_deployment_blocks_processed_count",
243+
"Tracks the total number of blocks processed by a deployment",
244+
const_labels,
245+
)
246+
.expect("failed to register `amp_deployment_blocks_processed_count` counter");
247+
248+
Self(int_counter)
249+
}
250+
251+
/// Records a new processed block.
252+
pub(super) fn record_one(&self) {
253+
self.record(1);
254+
}
255+
256+
/// Records the new processed blocks.
257+
pub(super) fn record(&self, number_of_blocks_processed: usize) {
258+
self.0.inc_by(number_of_blocks_processed as u64);
35259
}
36260
}

core/src/amp_subgraph/runner/context.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,13 @@ impl<AC> Context<AC> {
9393
.min()
9494
.unwrap()
9595
}
96+
97+
pub(super) fn max_end_block(&self) -> BlockNumber {
98+
self.manifest
99+
.data_sources
100+
.iter()
101+
.map(|data_source| data_source.source.end_block)
102+
.max()
103+
.unwrap()
104+
}
96105
}

0 commit comments

Comments
 (0)