Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit d695440

Browse files
committed
feat: switch tce-lib action to spawn tasks
Signed-off-by: Simon Paitrault <[email protected]>
1 parent b8cd730 commit d695440

File tree

3 files changed

+131
-115
lines changed

3 files changed

+131
-115
lines changed

crates/topos-tce/src/app_context/api.rs

Lines changed: 72 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::AppContext;
22
use std::collections::HashMap;
3+
use tokio::spawn;
34
use topos_core::uci::{Certificate, SubnetId};
45
use topos_metrics::CERTIFICATE_DELIVERY_LATENCY;
56
use topos_tce_api::RuntimeError;
@@ -20,79 +21,82 @@ impl AppContext {
2021
self.delivery_latency
2122
.insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer());
2223

23-
_ = match self
24-
.validator_store
25-
.insert_pending_certificate(&certificate)
26-
.await
27-
{
28-
Ok(Some(pending_id)) => {
29-
let certificate_id = certificate.id;
30-
debug!(
31-
"Certificate {} from subnet {} has been inserted into pending pool",
32-
certificate_id, certificate.source_subnet_id
33-
);
24+
let validator_store = self.validator_store.clone();
25+
let double_echo = self.tce_cli.get_double_echo_channel();
3426

35-
if self
36-
.tce_cli
37-
.get_double_echo_channel()
38-
.send(DoubleEchoCommand::Broadcast {
39-
need_gossip: true,
40-
cert: *certificate,
41-
pending_id,
42-
})
43-
.await
44-
.is_err()
45-
{
46-
error!(
47-
"Unable to send DoubleEchoCommand::Broadcast command to double \
48-
echo for {}",
49-
certificate_id
27+
spawn(async move {
28+
_ = match validator_store
29+
.insert_pending_certificate(&certificate)
30+
.await
31+
{
32+
Ok(Some(pending_id)) => {
33+
let certificate_id = certificate.id;
34+
debug!(
35+
"Certificate {} from subnet {} has been inserted into pending pool",
36+
certificate_id, certificate.source_subnet_id
5037
);
5138

52-
sender.send(Err(RuntimeError::CommunicationError(
53-
"Unable to send DoubleEchoCommand::Broadcast command to double \
54-
echo"
55-
.to_string(),
56-
)))
57-
} else {
58-
sender.send(Ok(PendingResult::InPending(pending_id)))
39+
if double_echo
40+
.send(DoubleEchoCommand::Broadcast {
41+
need_gossip: true,
42+
cert: *certificate,
43+
pending_id,
44+
})
45+
.await
46+
.is_err()
47+
{
48+
error!(
49+
"Unable to send DoubleEchoCommand::Broadcast command to \
50+
double echo for {}",
51+
certificate_id
52+
);
53+
54+
sender.send(Err(RuntimeError::CommunicationError(
55+
"Unable to send DoubleEchoCommand::Broadcast command to \
56+
double echo"
57+
.to_string(),
58+
)))
59+
} else {
60+
sender.send(Ok(PendingResult::InPending(pending_id)))
61+
}
5962
}
60-
}
61-
Ok(None) => {
62-
debug!(
63-
"Certificate {} from subnet {} has been inserted into precedence pool \
64-
waiting for {}",
65-
certificate.id, certificate.source_subnet_id, certificate.prev_id
66-
);
67-
sender.send(Ok(PendingResult::AwaitPrecedence))
68-
}
69-
Err(StorageError::InternalStorage(
70-
InternalStorageError::CertificateAlreadyPending,
71-
)) => {
72-
debug!(
73-
"Certificate {} has already been added to the pending pool, skipping",
74-
certificate.id
75-
);
76-
sender.send(Ok(PendingResult::AlreadyPending))
77-
}
78-
Err(StorageError::InternalStorage(
79-
InternalStorageError::CertificateAlreadyExists,
80-
)) => {
81-
debug!(
82-
"Certificate {} has already been delivered, skipping",
83-
certificate.id
84-
);
85-
sender.send(Ok(PendingResult::AlreadyDelivered))
86-
}
87-
Err(error) => {
88-
error!(
89-
"Unable to insert pending certificate {}: {}",
90-
certificate.id, error
91-
);
63+
Ok(None) => {
64+
debug!(
65+
"Certificate {} from subnet {} has been inserted into precedence \
66+
pool waiting for {}",
67+
certificate.id, certificate.source_subnet_id, certificate.prev_id
68+
);
69+
sender.send(Ok(PendingResult::AwaitPrecedence))
70+
}
71+
Err(StorageError::InternalStorage(
72+
InternalStorageError::CertificateAlreadyPending,
73+
)) => {
74+
debug!(
75+
"Certificate {} has already been added to the pending pool, \
76+
skipping",
77+
certificate.id
78+
);
79+
sender.send(Ok(PendingResult::AlreadyPending))
80+
}
81+
Err(StorageError::InternalStorage(
82+
InternalStorageError::CertificateAlreadyExists,
83+
)) => {
84+
debug!(
85+
"Certificate {} has already been delivered, skipping",
86+
certificate.id
87+
);
88+
sender.send(Ok(PendingResult::AlreadyDelivered))
89+
}
90+
Err(error) => {
91+
error!(
92+
"Unable to insert pending certificate {}: {}",
93+
certificate.id, error
94+
);
9295

93-
sender.send(Err(error.into()))
94-
}
95-
};
96+
sender.send(Err(error.into()))
97+
}
98+
};
99+
});
96100
}
97101

98102
ApiEvent::GetSourceHead { subnet_id, sender } => {

crates/topos-tce/src/app_context/protocol.rs

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use tokio::spawn;
12
use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready};
23
use topos_tce_broadcast::event::ProtocolEvents;
34
use tracing::{error, info, warn};
@@ -14,65 +15,68 @@ impl AppContext {
1415
ProtocolEvents::Gossip { cert } => {
1516
let cert_id = cert.id;
1617

17-
let request = DoubleEchoRequest {
18-
request: Some(double_echo_request::Request::Gossip(Gossip {
19-
certificate: Some(cert.into()),
20-
})),
21-
};
18+
let network_client = self.network_client.clone();
19+
spawn(async move {
20+
let request = DoubleEchoRequest {
21+
request: Some(double_echo_request::Request::Gossip(Gossip {
22+
certificate: Some(cert.into()),
23+
})),
24+
};
2225

23-
info!("Sending Gossip for certificate {}", cert_id);
24-
if let Err(e) = self
25-
.network_client
26-
.publish(topos_p2p::TOPOS_GOSSIP, request)
27-
.await
28-
{
29-
error!("Unable to send Gossip: {e}");
30-
}
26+
info!("Sending Gossip for certificate {}", cert_id);
27+
if let Err(e) = network_client
28+
.publish(topos_p2p::TOPOS_GOSSIP, request)
29+
.await
30+
{
31+
error!("Unable to send Gossip: {e}");
32+
}
33+
});
3134
}
3235

3336
ProtocolEvents::Echo {
3437
certificate_id,
3538
signature,
3639
validator_id,
3740
} if self.is_validator => {
38-
// Send echo message
39-
let request = DoubleEchoRequest {
40-
request: Some(double_echo_request::Request::Echo(Echo {
41-
certificate_id: Some(certificate_id.into()),
42-
signature: Some(signature.into()),
43-
validator_id: Some(validator_id.into()),
44-
})),
45-
};
41+
let network_client = self.network_client.clone();
42+
spawn(async move {
43+
// Send echo message
44+
let request = DoubleEchoRequest {
45+
request: Some(double_echo_request::Request::Echo(Echo {
46+
certificate_id: Some(certificate_id.into()),
47+
signature: Some(signature.into()),
48+
validator_id: Some(validator_id.into()),
49+
})),
50+
};
4651

47-
if let Err(e) = self
48-
.network_client
49-
.publish(topos_p2p::TOPOS_ECHO, request)
50-
.await
51-
{
52-
error!("Unable to send Echo: {e}");
53-
}
52+
if let Err(e) = network_client.publish(topos_p2p::TOPOS_ECHO, request).await {
53+
error!("Unable to send Echo: {e}");
54+
}
55+
});
5456
}
5557

5658
ProtocolEvents::Ready {
5759
certificate_id,
5860
signature,
5961
validator_id,
6062
} if self.is_validator => {
61-
let request = DoubleEchoRequest {
62-
request: Some(double_echo_request::Request::Ready(Ready {
63-
certificate_id: Some(certificate_id.into()),
64-
signature: Some(signature.into()),
65-
validator_id: Some(validator_id.into()),
66-
})),
67-
};
63+
let network_client = self.network_client.clone();
64+
spawn(async move {
65+
let request = DoubleEchoRequest {
66+
request: Some(double_echo_request::Request::Ready(Ready {
67+
certificate_id: Some(certificate_id.into()),
68+
signature: Some(signature.into()),
69+
validator_id: Some(validator_id.into()),
70+
})),
71+
};
6872

69-
if let Err(e) = self
70-
.network_client
71-
.publish(topos_p2p::TOPOS_READY, request)
72-
.await
73-
{
74-
error!("Unable to send Ready: {e}");
75-
}
73+
if let Err(e) = network_client
74+
.publish(topos_p2p::TOPOS_READY, request)
75+
.await
76+
{
77+
error!("Unable to send Ready: {e}");
78+
}
79+
});
7680
}
7781
ProtocolEvents::BroadcastFailed { certificate_id } => {
7882
warn!("Broadcast failed for certificate {certificate_id}")

crates/topos-tce/src/tests/mod.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use libp2p::PeerId;
22
use rstest::{fixture, rstest};
3-
use std::{collections::HashSet, future::IntoFuture, sync::Arc};
3+
use std::{collections::HashSet, future::IntoFuture, sync::Arc, time::Duration};
44
use tokio_stream::Stream;
55
use topos_tce_api::RuntimeEvent;
66
use topos_tce_broadcast::event::ProtocolEvents;
@@ -41,8 +41,8 @@ async fn non_validator_publish_gossip(
4141
.await;
4242

4343
assert!(matches!(
44-
p2p_receiver.try_recv(),
45-
Ok(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip"
44+
p2p_receiver.recv().await,
45+
Some(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip"
4646
));
4747
}
4848

@@ -64,7 +64,11 @@ async fn non_validator_do_not_publish_echo(
6464
})
6565
.await;
6666

67-
assert!(p2p_receiver.try_recv().is_err(),);
67+
assert!(
68+
tokio::time::timeout(Duration::from_millis(10), p2p_receiver.recv())
69+
.await
70+
.is_err()
71+
);
6872
}
6973

7074
#[rstest]
@@ -85,7 +89,11 @@ async fn non_validator_do_not_publish_ready(
8589
})
8690
.await;
8791

88-
assert!(p2p_receiver.try_recv().is_err(),);
92+
assert!(
93+
tokio::time::timeout(Duration::from_millis(10), p2p_receiver.recv())
94+
.await
95+
.is_err()
96+
);
8997
}
9098

9199
#[fixture]

0 commit comments

Comments
 (0)