Skip to content
This repository was archived by the owner on May 22, 2023. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions frame/ddc-validator/src/dac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,24 @@ fn get_timestamps_with_ack(file_requests: &Requests) -> Vec<TimestampInSec> {
timestamps
}

pub fn get_proved_delivered_bytes_sum(file_requests: &Requests) -> u64 {
pub fn get_served_bytes_sum(file_requests: &Requests) -> (u64, u64) {
let ack_timestamps = get_timestamps_with_ack(file_requests);
let mut total_bytes_received = 0u64;
let mut total_bytes_sent = 0u64;

for (_, file_request) in file_requests {
for (_, chunk) in &file_request.chunks {
total_bytes_sent += chunk.log.bytes_sent;

if let Some(ack) = &chunk.ack {
total_bytes_received += &chunk.log.bytes_sent;
total_bytes_received += ack.bytes_received;
} else {
total_bytes_received += get_proved_delivered_bytes(chunk, &ack_timestamps);
}
}
}

total_bytes_received
(total_bytes_sent, total_bytes_received)
}

fn get_proved_delivered_bytes(chunk: &Chunk, ack_timestamps: &Vec<TimestampInSec>) -> u64 {
Expand Down Expand Up @@ -342,9 +345,12 @@ fn get_file_request_url(data_provider_url: &String) -> String {
}

pub(crate) fn fetch_file_request(url: &String) -> Requests {
log::info!("fetch_file_request | url: {:?}", url);
let response: FileRequestWrapper = http_get_json(&url).unwrap();
let value: Value = serde_json::from_str(response.json.as_str()).unwrap();
let map: Requests = serde_json::from_value(value).unwrap();

log::info!("response.json: {:?}", response.json);

map
}
Expand Down Expand Up @@ -409,7 +415,7 @@ fn get_bytes_received_query_url(data_provider_url: &String, era: EraIndex) -> St
format!("{}/FT.AGGREGATE/ddc:dac:searchCommonIndex/@era:[{}%20{}]/GROUPBY/2/@nodePublicKey/@era/REDUCE/SUM/1/@bytesReceived/AS/bytesReceivedSum", data_provider_url, era, era)
}

fn http_get_json<OUT: DeserializeOwned>(url: &str) -> crate::ResultStr<OUT> {
pub(crate) fn http_get_json<OUT: DeserializeOwned>(url: &str) -> crate::ResultStr<OUT> {
let body = http_get_request(url).map_err(|err| {
log::error!("[DAC Validator] Error while getting {}: {:?}", url, err);
"HTTP GET error"
Expand Down Expand Up @@ -540,16 +546,19 @@ pub(crate) fn post_final_decision(
res
}

pub(crate) fn get_final_decision(decisions: Vec<ValidationResult>) -> ValidationDecision {
pub(crate) fn get_final_decision(decisions: Vec<ValidationDecision>) -> ValidationDecision {
let common_decisions = find_largest_group(decisions).unwrap();
let decision_example = common_decisions.get(0).unwrap();

let serialized_decisions = serde_json::to_string(&common_decisions).unwrap();

let final_decision = ValidationDecision {
edge: decision_example.edge.clone(),
result: decision_example.result,
payload: utils::get_hashed(&common_decisions),
payload: utils::hash(&serialized_decisions),
totals: DacTotalAggregates {
received: decision_example.received,
sent: decision_example.sent,
received: decision_example.totals.received,
sent: decision_example.totals.sent,
failed_by_client: 0,
failure_rate: 0,
},
Expand All @@ -558,7 +567,7 @@ pub(crate) fn get_final_decision(decisions: Vec<ValidationResult>) -> Validation
final_decision
}

pub(crate) fn get_validation_results(
pub(crate) fn fetch_validation_results(
data_provider_url: &String,
era: EraIndex,
edge: &String,
Expand All @@ -570,8 +579,8 @@ pub(crate) fn get_validation_results(
Ok(results)
}

fn find_largest_group(decisions: Vec<ValidationResult>) -> Option<Vec<ValidationResult>> {
let mut groups: Vec<Vec<ValidationResult>> = Vec::new();
fn find_largest_group(decisions: Vec<ValidationDecision>) -> Option<Vec<ValidationDecision>> {
let mut groups: Vec<Vec<ValidationDecision>> = Vec::new();
let half = decisions.len() / 2;

for decision in decisions {
Expand All @@ -580,12 +589,12 @@ fn find_largest_group(decisions: Vec<ValidationResult>) -> Option<Vec<Validation
for group in &mut groups {
if group.iter().all(|x| {
x.result == decision.result &&
x.received == decision.received &&
x.sent == decision.sent
x.totals.received == decision.totals.received &&
x.totals.sent == decision.totals.sent
}) {
group.push(decision.clone());
found_group = true;
break
break;
}
}

Expand All @@ -602,3 +611,4 @@ fn find_largest_group(decisions: Vec<ValidationResult>) -> Option<Vec<Validation
None
}
}

Loading