Skip to content

Commit 2268792

Browse files
committed
wip: horizon migration exploration
Signed-off-by: Joseph Livesey <[email protected]>
1 parent 3f409ab commit 2268792

File tree

19 files changed

+330
-180
lines changed

19 files changed

+330
-180
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ sqlx = { version = "0.8.2", features = [
8383
], default-features = false }
8484
stdext = "0.3.3"
8585
tap_aggregator = { version = "0.5.6", default-features = false }
86-
tap_core = { version = "4.1.3", default-features = false }
86+
tap_core = { version = "4.1.4", default-features = false }
8787
tap_graph = { version = "0.3.4", features = ["v2"] }
8888
tempfile = "3.8.0"
8989
test-log = { version = "0.2.12", default-features = false }

crates/profiler/src/lib.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::fs::{self, File};
5-
use std::io::Write;
6-
use std::path::{Path, PathBuf};
7-
use std::sync::atomic::{AtomicU64, Ordering};
8-
use std::sync::Arc;
9-
use std::thread;
10-
use std::time::{Duration, SystemTime};
4+
use std::{
5+
fs::{self, File},
6+
io::Write,
7+
path::{Path, PathBuf},
8+
sync::{
9+
atomic::{AtomicU64, Ordering},
10+
Arc,
11+
},
12+
thread,
13+
time::{Duration, SystemTime},
14+
};
1115

1216
use chrono::{DateTime, Utc};
1317
use pprof::protos::Message;

crates/service/src/tap/checks/allocation_eligible.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ use std::collections::HashMap;
66
use anyhow::anyhow;
77
use indexer_allocation::Allocation;
88
use tap_core::receipt::checks::{Check, CheckError, CheckResult};
9-
use thegraph_core::alloy::primitives::Address;
10-
use thegraph_core::CollectionId;
9+
use thegraph_core::{alloy::primitives::Address, CollectionId};
1110
use tokio::sync::watch::Receiver;
1211

1312
use crate::tap::{CheckingReceipt, TapReceipt};
@@ -32,7 +31,9 @@ impl Check<TapReceipt> for AllocationEligible {
3231
) -> CheckResult {
3332
let allocation_id = match receipt.signed_receipt() {
3433
TapReceipt::V1(receipt) => receipt.message.allocation_id,
35-
TapReceipt::V2(receipt) => CollectionId::from(receipt.message.collection_id).as_address(),
34+
TapReceipt::V2(receipt) => {
35+
CollectionId::from(receipt.message.collection_id).as_address()
36+
}
3637
};
3738
if !self
3839
.indexer_allocations

crates/service/src/tap/receipt_store.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use itertools::{Either, Itertools};
77
use sqlx::{types::BigDecimal, PgPool};
88
use tap_core::{manager::adapters::ReceiptStore, receipt::WithValueAndTimestamp};
99
use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain};
10-
use tokio::{sync::mpsc::Receiver, sync::oneshot::Sender as OneShotSender, task::JoinHandle};
10+
use tokio::{
11+
sync::{mpsc::Receiver, oneshot::Sender as OneShotSender},
12+
task::JoinHandle,
13+
};
1114
use tokio_util::sync::CancellationToken;
1215

1316
use super::{AdapterError, CheckingReceipt, IndexerTapContext, TapReceipt};

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ use tap_aggregator::grpc::{
2525
v1::tap_aggregator_client::TapAggregatorClient as AggregatorV1,
2626
v2::tap_aggregator_client::TapAggregatorClient as AggregatorV2,
2727
};
28-
use thegraph_core::alloy::{
29-
hex::ToHexExt,
30-
primitives::{Address, U256},
31-
sol_types::Eip712Domain,
28+
use thegraph_core::{
29+
alloy::{
30+
hex::ToHexExt,
31+
primitives::{Address, U256},
32+
sol_types::Eip712Domain,
33+
},
34+
AllocationId as AllocationIdCore, CollectionId,
3235
};
33-
use thegraph_core::{AllocationId as AllocationIdCore, CollectionId};
3436
use tokio::{sync::watch::Receiver, task::JoinHandle};
3537
use tonic::transport::{Channel, Endpoint};
3638
use tracing::Level;
@@ -145,7 +147,8 @@ impl From<tap_graph::SignedRav> for RavInformation {
145147
impl From<&tap_graph::v2::SignedRav> for RavInformation {
146148
fn from(value: &tap_graph::v2::SignedRav) -> Self {
147149
RavInformation {
148-
allocation_id: AllocationIdCore::from(CollectionId::from(value.message.collectionId)).into_inner(),
150+
allocation_id: AllocationIdCore::from(CollectionId::from(value.message.collectionId))
151+
.into_inner(),
149152
value_aggregate: value.message.valueAggregate,
150153
}
151154
}
@@ -214,9 +217,9 @@ pub enum SenderAccountMessage {
214217
/// as well as requesting the underlaying allocation rav request
215218
///
216219
/// Custom behavior is defined in [ReceiptFees]
217-
UpdateReceiptFees(Address, ReceiptFees),
220+
UpdateReceiptFees(AllocationId, ReceiptFees),
218221
/// Updates the counter for invalid receipts and verify to deny sender
219-
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
222+
UpdateInvalidReceiptFees(AllocationId, UnaggregatedReceipts),
220223
/// Update rav tracker
221224
UpdateRav(RavInformation),
222225
#[cfg(test)]
@@ -475,7 +478,7 @@ impl State {
475478
.build();
476479

477480
SenderAllocation::<Horizon>::spawn_linked(
478-
Some(self.format_sender_allocation(&id)),
481+
Some(self.format_sender_allocation(&id.as_address())),
479482
SenderAllocation::default(),
480483
args,
481484
sender_account_ref.get_cell(),
@@ -1061,7 +1064,7 @@ impl Actor for SenderAccount {
10611064

10621065
state
10631066
.invalid_receipts_tracker
1064-
.update(allocation_id, unaggregated_fees.value);
1067+
.update(allocation_id.address(), unaggregated_fees.value);
10651068

10661069
// invalid receipts can't go down
10671070
let should_deny = !state.denied && state.deny_condition_reached();
@@ -1098,7 +1101,7 @@ impl Actor for SenderAccount {
10981101
// add new value
10991102
state
11001103
.sender_fee_tracker
1101-
.add(allocation_id, value, timestamp_ns);
1104+
.add(allocation_id.address(), value, timestamp_ns);
11021105

11031106
SENDER_FEE_TRACKER
11041107
.with_label_values(&[&state.sender.to_string()])
@@ -1111,16 +1114,16 @@ impl Actor for SenderAccount {
11111114
.set(
11121115
state
11131116
.sender_fee_tracker
1114-
.get_total_fee_for_allocation(&allocation_id)
1117+
.get_total_fee_for_allocation(&allocation_id.address())
11151118
.map(|fee| fee.value)
11161119
.unwrap_or_default() as f64,
11171120
);
11181121
}
11191122
ReceiptFees::RavRequestResponse(fees, rav_result) => {
1120-
state.finalize_rav_request(allocation_id, (fees, rav_result));
1123+
state.finalize_rav_request(allocation_id.address(), (fees, rav_result));
11211124
}
11221125
ReceiptFees::UpdateValue(unaggregated_fees) => {
1123-
state.update_sender_fee(allocation_id, unaggregated_fees);
1126+
state.update_sender_fee(allocation_id.address(), unaggregated_fees);
11241127
}
11251128
ReceiptFees::Retry => {}
11261129
}
@@ -1138,8 +1141,10 @@ impl Actor for SenderAccount {
11381141
let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee();
11391142
let total_counter_for_allocation = state
11401143
.sender_fee_tracker
1141-
.get_count_outside_buffer_for_allocation(&allocation_id);
1142-
let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id);
1144+
.get_count_outside_buffer_for_allocation(&allocation_id.address());
1145+
let can_trigger_rav = state
1146+
.sender_fee_tracker
1147+
.can_trigger_rav(allocation_id.address());
11431148
let counter_greater_receipt_limit = total_counter_for_allocation
11441149
>= state.config.rav_request_receipt_limit
11451150
&& can_trigger_rav;
@@ -1159,7 +1164,9 @@ impl Actor for SenderAccount {
11591164
%allocation_id,
11601165
"Total counter greater than the receipt limit per rav. Triggering RAV request"
11611166
);
1162-
state.rav_request_for_allocation(allocation_id).await
1167+
state
1168+
.rav_request_for_allocation(allocation_id.address())
1169+
.await
11631170
} else {
11641171
Ok(())
11651172
};
@@ -1368,7 +1375,7 @@ impl Actor for SenderAccount {
13681375

13691376
// check for deny conditions
13701377
let _ = myself.cast(SenderAccountMessage::UpdateReceiptFees(
1371-
allocation_id,
1378+
AllocationId::Legacy(AllocationIdCore::from(allocation_id)),
13721379
ReceiptFees::Retry,
13731380
));
13741381

@@ -1472,7 +1479,10 @@ pub mod tests {
14721479
flush_messages, pgpool, ALLOCATION_ID_0, ALLOCATION_ID_1, TAP_SENDER as SENDER,
14731480
TAP_SIGNER as SIGNER,
14741481
};
1475-
use thegraph_core::alloy::{hex::ToHexExt, primitives::U256};
1482+
use thegraph_core::{
1483+
alloy::{hex::ToHexExt, primitives::U256},
1484+
AllocationId as AllocationIdCore,
1485+
};
14761486
use tokio::sync::mpsc;
14771487
use wiremock::{
14781488
matchers::{body_string_contains, method},
@@ -1566,7 +1576,9 @@ pub mod tests {
15661576
.call()
15671577
.await;
15681578

1569-
let allocation_ids = HashSet::from_iter([AllocationId::Legacy(ALLOCATION_ID_0)]);
1579+
let allocation_ids = HashSet::from_iter([AllocationId::Legacy(AllocationIdCore::from(
1580+
ALLOCATION_ID_0,
1581+
))]);
15701582
// we expect it to create a sender allocation
15711583
sender_account
15721584
.cast(SenderAccountMessage::UpdateAllocationIds(
@@ -1660,7 +1672,7 @@ pub mod tests {
16601672
// we expect it to create a sender allocation
16611673
sender_account
16621674
.cast(SenderAccountMessage::NewAllocationId(AllocationId::Legacy(
1663-
ALLOCATION_ID_0,
1675+
AllocationIdCore::from(ALLOCATION_ID_0),
16641676
)))
16651677
.unwrap();
16661678

@@ -1674,9 +1686,11 @@ pub mod tests {
16741686
// nothing should change because we already created
16751687
sender_account
16761688
.cast(SenderAccountMessage::UpdateAllocationIds(
1677-
vec![AllocationId::Legacy(ALLOCATION_ID_0)]
1678-
.into_iter()
1679-
.collect(),
1689+
vec![AllocationId::Legacy(AllocationIdCore::from(
1690+
ALLOCATION_ID_0,
1691+
))]
1692+
.into_iter()
1693+
.collect(),
16801694
))
16811695
.unwrap();
16821696

@@ -1754,7 +1768,7 @@ pub mod tests {
17541768
basic_sender_account
17551769
.sender_account
17561770
.cast(SenderAccountMessage::UpdateReceiptFees(
1757-
ALLOCATION_ID_0,
1771+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
17581772
ReceiptFees::NewReceipt(TRIGGER_VALUE - 1, get_current_timestamp_u64_ns()),
17591773
))
17601774
.unwrap();
@@ -1782,7 +1796,7 @@ pub mod tests {
17821796
basic_sender_account
17831797
.sender_account
17841798
.cast(SenderAccountMessage::UpdateReceiptFees(
1785-
ALLOCATION_ID_0,
1799+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
17861800
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
17871801
))
17881802
.unwrap();
@@ -1796,7 +1810,7 @@ pub mod tests {
17961810
basic_sender_account
17971811
.sender_account
17981812
.cast(SenderAccountMessage::UpdateReceiptFees(
1799-
ALLOCATION_ID_0,
1813+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
18001814
ReceiptFees::Retry,
18011815
))
18021816
.unwrap();
@@ -1825,7 +1839,7 @@ pub mod tests {
18251839

18261840
sender_account
18271841
.cast(SenderAccountMessage::UpdateReceiptFees(
1828-
ALLOCATION_ID_0,
1842+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
18291843
ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()),
18301844
))
18311845
.unwrap();
@@ -1835,7 +1849,7 @@ pub mod tests {
18351849

18361850
sender_account
18371851
.cast(SenderAccountMessage::UpdateReceiptFees(
1838-
ALLOCATION_ID_0,
1852+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
18391853
ReceiptFees::NewReceipt(1, get_current_timestamp_u64_ns()),
18401854
))
18411855
.unwrap();
@@ -1846,7 +1860,7 @@ pub mod tests {
18461860

18471861
sender_account
18481862
.cast(SenderAccountMessage::UpdateReceiptFees(
1849-
ALLOCATION_ID_0,
1863+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
18501864
ReceiptFees::Retry,
18511865
))
18521866
.unwrap();
@@ -1865,9 +1879,11 @@ pub mod tests {
18651879
let (sender_account, _, prefix, _) = create_sender_account()
18661880
.pgpool(pgpool)
18671881
.initial_allocation(
1868-
vec![AllocationId::Legacy(ALLOCATION_ID_0)]
1869-
.into_iter()
1870-
.collect(),
1882+
vec![AllocationId::Legacy(AllocationIdCore::from(
1883+
ALLOCATION_ID_0,
1884+
))]
1885+
.into_iter()
1886+
.collect(),
18711887
)
18721888
.escrow_subgraph_endpoint(&mock_escrow_subgraph.uri())
18731889
.call()
@@ -1952,7 +1968,7 @@ pub mod tests {
19521968

19531969
sender_account
19541970
.cast(SenderAccountMessage::UpdateReceiptFees(
1955-
ALLOCATION_ID_0,
1971+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
19561972
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
19571973
))
19581974
.unwrap();
@@ -1988,7 +2004,7 @@ pub mod tests {
19882004
($value:expr) => {
19892005
sender_account
19902006
.cast(SenderAccountMessage::UpdateReceiptFees(
1991-
ALLOCATION_ID_0,
2007+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
19922008
ReceiptFees::UpdateValue(UnaggregatedReceipts {
19932009
value: $value,
19942010
last_id: 11,
@@ -2005,7 +2021,7 @@ pub mod tests {
20052021
($value:expr) => {
20062022
sender_account
20072023
.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
2008-
ALLOCATION_ID_0,
2024+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
20092025
UnaggregatedReceipts {
20102026
value: $value,
20112027
last_id: 11,
@@ -2141,7 +2157,7 @@ pub mod tests {
21412157
($value:expr) => {
21422158
sender_account
21432159
.cast(SenderAccountMessage::UpdateReceiptFees(
2144-
ALLOCATION_ID_0,
2160+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
21452161
ReceiptFees::UpdateValue(UnaggregatedReceipts {
21462162
value: $value,
21472163
last_id: 11,
@@ -2431,17 +2447,17 @@ pub mod tests {
24312447
// set retry
24322448
sender_account
24332449
.cast(SenderAccountMessage::UpdateReceiptFees(
2434-
ALLOCATION_ID_0,
2450+
AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)),
24352451
ReceiptFees::NewReceipt(TRIGGER_VALUE, get_current_timestamp_u64_ns()),
24362452
))
24372453
.unwrap();
24382454
let msg = msg_receiver.recv().await.expect("Channel failed");
24392455
assert!(matches!(
24402456
msg,
24412457
SenderAccountMessage::UpdateReceiptFees(
2442-
ALLOCATION_ID_0,
2458+
AllocationId::Legacy(allocation_id),
24432459
ReceiptFees::NewReceipt(TRIGGER_VALUE, _)
2444-
)
2460+
) if allocation_id == AllocationIdCore::from(ALLOCATION_ID_0)
24452461
));
24462462

24472463
let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap();

0 commit comments

Comments
 (0)