Skip to content

Commit 8302679

Browse files
committed
feat: adds fourfours strategy to glados-audit
1 parent 98984c0 commit 8302679

File tree

8 files changed

+207
-82
lines changed

8 files changed

+207
-82
lines changed

entity/src/content_audit.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ pub enum SelectionStrategy {
3737
SelectOldestUnaudited = 3,
3838
/// Perform a single audit for a previously audited content key.
3939
SpecificContentKey = 4,
40+
/// Perform audits of random fourfours data.
41+
FourFours = 5,
4042
}
4143

4244
impl AuditResult {
@@ -157,6 +159,7 @@ impl SelectionStrategy {
157159
SelectionStrategy::Latest => "Latest".to_string(),
158160
SelectionStrategy::Random => "Random".to_string(),
159161
SelectionStrategy::Failed => "Failed".to_string(),
162+
SelectionStrategy::FourFours => "FourFours".to_string(),
160163
SelectionStrategy::SelectOldestUnaudited => "Select Oldest Unaudited".to_string(),
161164
SelectionStrategy::SpecificContentKey => "Specific Content Key".to_string(),
162165
}

glados-audit/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ clap = { version = "4.0.24", features = ["derive"] }
1515
entity = { path = "../entity" }
1616
env_logger = "0.9.3"
1717
ethereum-types = "0.14.0"
18+
web3 = "0.18.0"
1819
ethportal-api = "0.2.2"
1920
glados-core = { path = "../glados-core" }
2021
migration = { path = "../migration" }
@@ -24,5 +25,3 @@ serde_json = "1.0.95"
2425
tokio = "1.21.2"
2526
tracing = "0.1.37"
2627
url = "2.3.1"
27-
web3 = "0.18.0"
28-

glados-audit/src/cli.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ const DEFAULT_STATS_PERIOD: &str = "300";
99
pub struct Args {
1010
#[arg(short, long, default_value = DEFAULT_DB_URL)]
1111
pub database_url: String,
12+
#[arg(
13+
short,
14+
long,
15+
default_value = "",
16+
help = "web3 api provider url, eg https://mainnet.infura.io/v3/..."
17+
)]
18+
pub provider_url: String,
1219
#[arg(short, long, default_value = "4", help = "number of auditing threads")]
1320
pub concurrency: u8,
1421
#[arg(short, long, action(ArgAction::Append), value_enum, default_value = None, help = "Specific strategy to use. Default is to use all available strategies. May be passed multiple times for multiple strategies (--strategy latest --strategy random). Duplicates are permitted (--strategy random --strategy random).")]
@@ -41,9 +48,15 @@ pub struct Args {
4148
help = "relative weight of the 'random' strategy"
4249
)]
4350
pub random_strategy_weight: u8,
51+
#[arg(
52+
long,
53+
default_value = "1",
54+
help = "relative weight of the 'four_fours' strategy"
55+
)]
56+
pub four_fours_strategy_weight: u8,
4457
#[arg(long, default_value = DEFAULT_STATS_PERIOD, help = "stats recording period (seconds)")]
4558
pub stats_recording_period: u64,
46-
#[arg(short, long, action(ArgAction::Append))]
59+
#[arg(long, action(ArgAction::Append))]
4760
pub portal_client: Vec<String>,
4861
#[command(subcommand)]
4962
pub subcommand: Option<Command>,
@@ -63,11 +76,13 @@ impl Default for Args {
6376
fn default() -> Self {
6477
Self {
6578
database_url: DEFAULT_DB_URL.to_string(),
79+
provider_url: "".to_string(),
6680
concurrency: 4,
6781
latest_strategy_weight: 1,
6882
failed_strategy_weight: 1,
6983
oldest_strategy_weight: 1,
7084
random_strategy_weight: 1,
85+
four_fours_strategy_weight: 1,
7186
strategy: None,
7287
portal_client: vec!["ipc:////tmp/trin-jsonrpc.ipc".to_owned()],
7388
subcommand: None,
@@ -162,6 +177,26 @@ mod test {
162177
};
163178
assert_eq!(result, expected);
164179
}
180+
181+
/// Tests that the provider_url is passed through properly.
182+
#[test]
183+
fn test_provider_url() {
184+
const PORTAL_CLIENT_STRING: &str = "ipc:////path/to/ipc";
185+
const PROVIDER_URL: &str = "https://example.io/key";
186+
let result = Args::parse_from([
187+
"test",
188+
"--provider-url",
189+
PROVIDER_URL,
190+
"--portal-client",
191+
PORTAL_CLIENT_STRING,
192+
]);
193+
let expected = Args {
194+
provider_url: PROVIDER_URL.to_string(),
195+
portal_client: vec![PORTAL_CLIENT_STRING.to_owned()],
196+
..Default::default()
197+
};
198+
assert_eq!(result, expected);
199+
}
165200
}
166201

167202
/// Used by a user to specify the intended form of transport

glados-audit/src/lib.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ pub(crate) mod validation;
3737
pub struct AuditConfig {
3838
/// For Glados-related data.
3939
pub database_url: String,
40+
/// For getting on-the-fly block information.
41+
pub provider_url: String,
4042
/// Specific strategies to run.
4143
pub strategies: Vec<SelectionStrategy>,
4244
/// Weight for each strategy.
@@ -65,6 +67,7 @@ impl AuditConfig {
6567
"Selected concurrency set."
6668
)
6769
}
70+
6871
let strategies = match args.strategy {
6972
Some(s) => s,
7073
None => {
@@ -73,6 +76,7 @@ impl AuditConfig {
7376
SelectionStrategy::Random,
7477
SelectionStrategy::Failed,
7578
SelectionStrategy::SelectOldestUnaudited,
79+
SelectionStrategy::FourFours,
7680
]
7781
}
7882
};
@@ -83,11 +87,16 @@ impl AuditConfig {
8387
SelectionStrategy::Random => args.random_strategy_weight,
8488
SelectionStrategy::Failed => args.failed_strategy_weight,
8589
SelectionStrategy::SelectOldestUnaudited => args.oldest_strategy_weight,
90+
SelectionStrategy::FourFours => args.four_fours_strategy_weight,
8691
SelectionStrategy::SpecificContentKey => 0,
8792
};
8893
weights.insert(strat.clone(), weight);
8994
}
90-
95+
if args.provider_url.is_empty() && strategies.contains(&SelectionStrategy::FourFours) {
96+
return Err(anyhow::anyhow!(
97+
"No provider URL provided, required when `four_fours` strategy is enabled."
98+
));
99+
}
91100
let mut portal_clients: Vec<PortalClient> = vec![];
92101
for client_url in args.portal_client {
93102
let client = PortalClient::from(client_url).await?;
@@ -96,6 +105,7 @@ impl AuditConfig {
96105
}
97106
Ok(AuditConfig {
98107
database_url: args.database_url,
108+
provider_url: args.provider_url,
99109
strategies,
100110
weights,
101111
concurrency: args.concurrency,
@@ -160,6 +170,7 @@ pub async fn run_glados_audit(conn: DatabaseConnection, config: AuditConfig) {
160170
strategy.clone(),
161171
tx,
162172
conn.clone(),
173+
config.clone(),
163174
));
164175
}
165176
// Collation of generated tasks, taken proportional to weights.

glados-audit/src/selection.rs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashSet;
22

33
use chrono::{DateTime, TimeZone, Utc};
44
use ethportal_api::HistoryContentKey;
5+
use glados_core::db::store_block_keys;
56
use rand::{thread_rng, Rng};
67
use sea_orm::{
78
ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
@@ -17,17 +18,26 @@ use entity::{
1718
content::{self, Model},
1819
content_audit::{self, SelectionStrategy},
1920
};
21+
use web3::types::{BlockId, BlockNumber};
2022

21-
use crate::AuditTask;
23+
use crate::{AuditConfig, AuditTask};
24+
25+
pub const MERGE_BLOCK_HEIGHT: i32 = 15537393;
2226

2327
pub async fn start_audit_selection_task(
2428
strategy: SelectionStrategy,
2529
tx: mpsc::Sender<AuditTask>,
2630
conn: DatabaseConnection,
31+
config: AuditConfig,
2732
) {
2833
match strategy {
2934
SelectionStrategy::Latest => select_latest_content_for_audit(tx, conn).await,
3035
SelectionStrategy::Random => select_random_content_for_audit(tx, conn).await,
36+
SelectionStrategy::FourFours => {
37+
// Fourfours strategy downloads its own keys rather than waiting on glados-monitor to put them in the DB.
38+
let w3 = web3::Web3::new(web3::transports::Http::new(&config.provider_url).unwrap());
39+
select_fourfours_content_for_audit(tx, conn, w3).await
40+
}
3141
SelectionStrategy::Failed => warn!("Need to implement SelectionStrategy::Failed"),
3242
SelectionStrategy::SelectOldestUnaudited => {
3343
select_oldest_unaudited_content_for_audit(tx, conn).await
@@ -95,6 +105,46 @@ async fn select_latest_content_for_audit(
95105
}
96106
}
97107

108+
/// Finds and sends audit tasks for [SelectionStrategy::FourFours].
109+
///
110+
/// 1. Get a random block number between 1 and MERGE_BLOCK_HEIGHT.
111+
/// 2. Get the block hash for that block.
112+
/// 3. Send content keys for header, body, receipts.
113+
///
114+
async fn select_fourfours_content_for_audit(
115+
tx: mpsc::Sender<AuditTask>,
116+
conn: DatabaseConnection,
117+
w3: web3::Web3<web3::transports::Http>,
118+
) -> ! {
119+
let mut interval = interval(Duration::from_secs(5));
120+
121+
loop {
122+
interval.tick().await;
123+
let block_number = thread_rng().gen_range(1..MERGE_BLOCK_HEIGHT);
124+
debug!(
125+
strategy = "4444s",
126+
"Getting hash for block number {block_number}."
127+
);
128+
let block_hash = w3
129+
.eth()
130+
.block(BlockId::Number(BlockNumber::Number(block_number.into())))
131+
.await
132+
.unwrap()
133+
.unwrap()
134+
.hash
135+
.unwrap();
136+
137+
let items_to_audit =
138+
store_block_keys(block_number, block_hash.as_fixed_bytes(), &conn).await;
139+
debug!(
140+
strategy = "4444s",
141+
item_count = items_to_audit.len(),
142+
"Adding content keys to the audit queue."
143+
);
144+
add_to_queue(tx.clone(), SelectionStrategy::FourFours, items_to_audit).await;
145+
}
146+
}
147+
98148
/// Adds Glados database History sub-protocol search results
99149
/// to a channel for auditing against a Portal Node.
100150
async fn add_to_queue(

glados-core/src/db.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use anyhow::Error;
2+
use entity::{content, execution_metadata};
3+
use ethportal_api::{
4+
utils::bytes::hex_encode, BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, HistoryContentKey,
5+
OverlayContentKey,
6+
};
7+
use sea_orm::DatabaseConnection;
8+
use tracing::{debug, error};
9+
10+
/// Stores the content keys and block metadata for the given block.
11+
///
12+
/// The metadata included is the block number and hash under the execution
13+
/// header, body and receipts tables.
14+
///
15+
/// Errors are logged.
16+
pub async fn store_block_keys(
17+
block_number: i32,
18+
block_hash: &[u8; 32],
19+
conn: &DatabaseConnection,
20+
) -> Vec<content::Model> {
21+
let header = HistoryContentKey::BlockHeaderWithProof(BlockHeaderKey {
22+
block_hash: *block_hash,
23+
});
24+
let body = HistoryContentKey::BlockBody(BlockBodyKey {
25+
block_hash: *block_hash,
26+
});
27+
let receipts = HistoryContentKey::BlockReceipts(BlockReceiptsKey {
28+
block_hash: *block_hash,
29+
});
30+
31+
let header = store_content_key(&header, "block_header", block_number, conn).await;
32+
let body = store_content_key(&body, "block_body", block_number, conn).await;
33+
let receipts = store_content_key(&receipts, "block_receipts", block_number, conn).await;
34+
35+
let mut returned_values = vec![];
36+
if let Some(header) = header {
37+
returned_values.push(header);
38+
}
39+
if let Some(body) = body {
40+
returned_values.push(body);
41+
}
42+
if let Some(receipts) = receipts {
43+
returned_values.push(receipts);
44+
}
45+
returned_values
46+
}
47+
48+
/// Accepts a ContentKey from the History and attempts to store it.
49+
///
50+
/// Errors are logged.
51+
pub async fn store_content_key<T: OverlayContentKey>(
52+
key: &T,
53+
name: &str,
54+
block_number: i32,
55+
conn: &DatabaseConnection,
56+
) -> Option<content::Model> {
57+
// Store key
58+
match content::get_or_create(key, conn).await {
59+
Ok(content_model) => {
60+
log_record_outcome(key, name, DbOutcome::Success);
61+
// Store metadata
62+
let metadata_str = format!("{name}_metadata");
63+
match execution_metadata::get_or_create(content_model.id, block_number, conn).await {
64+
Ok(_) => log_record_outcome(key, metadata_str.as_str(), DbOutcome::Success),
65+
Err(e) => log_record_outcome(key, metadata_str.as_str(), DbOutcome::Fail(e)),
66+
};
67+
Some(content_model)
68+
}
69+
Err(e) => {
70+
log_record_outcome(key, name, DbOutcome::Fail(e));
71+
None
72+
}
73+
}
74+
}
75+
76+
/// Logs a database record error for the given key.
77+
///
78+
/// Helper function for common error pattern to be logged.
79+
pub fn log_record_outcome<T: OverlayContentKey>(key: &T, name: &str, outcome: DbOutcome) {
80+
match outcome {
81+
DbOutcome::Success => debug!(
82+
content.key = hex_encode(key.to_bytes()),
83+
content.kind = name,
84+
"Imported new record",
85+
),
86+
DbOutcome::Fail(e) => error!(
87+
content.key=hex_encode(key.to_bytes()),
88+
content.kind=name,
89+
err=?e,
90+
"Failed to create database record",
91+
),
92+
}
93+
}
94+
95+
pub enum DbOutcome {
96+
Success,
97+
Fail(Error),
98+
}

glados-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
pub mod db;
12
pub mod jsonrpc;
23
pub mod stats;

0 commit comments

Comments
 (0)