Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 72 additions & 68 deletions crates/topos-tce/src/app_context/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::AppContext;
use std::collections::HashMap;
use tokio::spawn;
use topos_core::uci::{Certificate, SubnetId};
use topos_metrics::CERTIFICATE_DELIVERY_LATENCY;
use topos_tce_api::RuntimeError;
Expand All @@ -20,79 +21,82 @@ impl AppContext {
self.delivery_latency
.insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer());

_ = match self
.validator_store
.insert_pending_certificate(&certificate)
.await
{
Ok(Some(pending_id)) => {
let certificate_id = certificate.id;
debug!(
"Certificate {} from subnet {} has been inserted into pending pool",
certificate_id, certificate.source_subnet_id
);
let validator_store = self.validator_store.clone();
let double_echo = self.tce_cli.get_double_echo_channel();

if self
.tce_cli
.get_double_echo_channel()
.send(DoubleEchoCommand::Broadcast {
need_gossip: true,
cert: *certificate,
pending_id,
})
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast command to double \
echo for {}",
certificate_id
spawn(async move {
_ = match validator_store
.insert_pending_certificate(&certificate)
.await
{
Ok(Some(pending_id)) => {
let certificate_id = certificate.id;
debug!(
"Certificate {} from subnet {} has been inserted into pending pool",
certificate_id, certificate.source_subnet_id
);

sender.send(Err(RuntimeError::CommunicationError(
"Unable to send DoubleEchoCommand::Broadcast command to double \
echo"
.to_string(),
)))
} else {
sender.send(Ok(PendingResult::InPending(pending_id)))
if double_echo
.send(DoubleEchoCommand::Broadcast {
need_gossip: true,
cert: *certificate,
pending_id,
})
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast command to \
double echo for {}",
certificate_id
);

sender.send(Err(RuntimeError::CommunicationError(
"Unable to send DoubleEchoCommand::Broadcast command to \
double echo"
.to_string(),
)))
} else {
sender.send(Ok(PendingResult::InPending(pending_id)))
}
}
}
Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into precedence pool \
waiting for {}",
certificate.id, certificate.source_subnet_id, certificate.prev_id
);
sender.send(Ok(PendingResult::AwaitPrecedence))
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyPending,
)) => {
debug!(
"Certificate {} has already been added to the pending pool, skipping",
certificate.id
);
sender.send(Ok(PendingResult::AlreadyPending))
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyExists,
)) => {
debug!(
"Certificate {} has already been delivered, skipping",
certificate.id
);
sender.send(Ok(PendingResult::AlreadyDelivered))
}
Err(error) => {
error!(
"Unable to insert pending certificate {}: {}",
certificate.id, error
);
Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into precedence \
pool waiting for {}",
certificate.id, certificate.source_subnet_id, certificate.prev_id
);
sender.send(Ok(PendingResult::AwaitPrecedence))
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyPending,
)) => {
debug!(
"Certificate {} has already been added to the pending pool, \
skipping",
certificate.id
);
sender.send(Ok(PendingResult::AlreadyPending))
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyExists,
)) => {
debug!(
"Certificate {} has already been delivered, skipping",
certificate.id
);
sender.send(Ok(PendingResult::AlreadyDelivered))
}
Err(error) => {
error!(
"Unable to insert pending certificate {}: {}",
certificate.id, error
);

sender.send(Err(error.into()))
}
};
sender.send(Err(error.into()))
}
};
});
}

ApiEvent::GetSourceHead { subnet_id, sender } => {
Expand Down
116 changes: 59 additions & 57 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,69 +37,71 @@ impl AppContext {
{
entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer());
}
info!(
"Received certificate {} from GossipSub from {}",
cert.id, from
);

match self.validator_store.insert_pending_certificate(&cert).await {
Ok(Some(pending_id)) => {
let certificate_id = cert.id;
debug!(
"Certificate {} has been inserted into pending pool",
certificate_id
);
let validator_store = self.validator_store.clone();
let double_echo = self.tce_cli.get_double_echo_channel();
spawn(async move {
info!(
"Received certificate {} from GossipSub from {}",
cert.id, from
);

if self
.tce_cli
.get_double_echo_channel()
.send(DoubleEchoCommand::Broadcast {
need_gossip: false,
cert,
pending_id,
})
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast command \
to double echo for {}",
match validator_store.insert_pending_certificate(&cert).await {
Ok(Some(pending_id)) => {
let certificate_id = cert.id;
debug!(
"Certificate {} has been inserted into pending pool",
certificate_id
);

if double_echo
.send(DoubleEchoCommand::Broadcast {
need_gossip: false,
cert,
pending_id,
})
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast \
command to double echo for {}",
certificate_id
);
}
}
}

Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into \
precedence pool waiting for {}",
cert.id, cert.source_subnet_id, cert.prev_id
);
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyPending,
)) => {
debug!(
"Certificate {} has been already added to the pending \
pool, skipping",
cert.id
);
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyExists,
)) => {
debug!(
"Certificate {} has been already delivered, skipping",
cert.id
);
}
Err(error) => {
error!(
"Unable to insert pending certificate {}: {}",
cert.id, error
);
Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into \
precedence pool waiting for {}",
cert.id, cert.source_subnet_id, cert.prev_id
);
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyPending,
)) => {
debug!(
"Certificate {} has been already added to the pending \
pool, skipping",
cert.id
);
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyExists,
)) => {
debug!(
"Certificate {} has been already delivered, skipping",
cert.id
);
}
Err(error) => {
error!(
"Unable to insert pending certificate {}: {}",
cert.id, error
);
}
}
}
});
}
Err(e) => {
error!("Failed to parse the received Certificate: {e}");
Expand Down
Loading