Skip to content

Commit dcf5ca3

Browse files
authored
Fixed Aggregator Bug (#294)
## Summary of changes Changed behaviour of aggregator to block when the message send queue is full rather than terminate. The previous behaviour prevented the aggregator from restarting after a brief time offline, due to the backlog building up form the trace-to-events component. Closes #293. ## Instruction for review/testing General code review. Tested with HiFi data.
1 parent 0e0d2bd commit dcf5ca3

File tree

1 file changed

+12
-19
lines changed

1 file changed

+12
-19
lines changed

digitiser-aggregator/src/main.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ use supermusr_streaming_types::{
3535
use tokio::{
3636
select,
3737
signal::unix::{signal, Signal, SignalKind},
38-
sync::mpsc::{error::TrySendError, Receiver, Sender},
38+
sync::mpsc::{error::SendError, Receiver, Sender},
3939
task::JoinHandle,
4040
};
4141
use tracing::{debug, error, info, info_span, instrument, level_filters::LevelFilter, warn};
4242

4343
const PRODUCER_TIMEOUT: Timeout = Timeout::After(Duration::from_millis(100));
4444

4545
type AggregatedFrameToBufferSender = Sender<AggregatedFrame<EventData>>;
46-
type TrySendAggregatedFrameError = TrySendError<AggregatedFrame<EventData>>;
46+
type SendAggregatedFrameError = SendError<AggregatedFrame<EventData>>;
4747

4848
#[derive(Debug, Parser)]
4949
#[clap(author, version, about)]
@@ -212,7 +212,7 @@ async fn process_kafka_message(
212212
channel_send: &AggregatedFrameToBufferSender,
213213
cache: &mut FrameCache<EventData>,
214214
msg: &BorrowedMessage<'_>,
215-
) -> Result<(), TrySendAggregatedFrameError> {
215+
) -> Result<(), SendAggregatedFrameError> {
216216
msg.headers().conditional_extract_to_current_span(use_otel);
217217

218218
if let Some(payload) = msg.payload() {
@@ -263,7 +263,7 @@ async fn process_digitiser_event_list_message(
263263
channel_send: &AggregatedFrameToBufferSender,
264264
cache: &mut FrameCache<EventData>,
265265
msg: DigitizerEventListMessage<'_>,
266-
) -> Result<(), TrySendAggregatedFrameError> {
266+
) -> Result<(), SendAggregatedFrameError> {
267267
match msg.metadata().try_into() {
268268
Ok(metadata) => {
269269
debug!("Event packet: metadata: {:?}", msg.metadata());
@@ -291,7 +291,7 @@ async fn process_digitiser_event_list_message(
291291
async fn cache_poll(
292292
channel_send: &AggregatedFrameToBufferSender,
293293
cache: &mut FrameCache<EventData>,
294-
) -> Result<(), TrySendAggregatedFrameError> {
294+
) -> Result<(), SendAggregatedFrameError> {
295295
while let Some(frame) = cache.poll() {
296296
let span = info_span!(target: "otel", "Frame Completed");
297297
span.follows_from(
@@ -302,21 +302,14 @@ async fn cache_poll(
302302
);
303303
let _guard = span.enter();
304304

305-
// For each frame that is ready to send,
306-
// `try_send` appends it to the channel queue
307-
// without blocking
308-
if let Err(e) = channel_send.try_send(frame) {
309-
// If the queue is full (or another error occurs),
310-
// then we emit a fatal error and close the program.
311-
match &e {
312-
TrySendError::Closed(_) => {
313-
error!("Send-Frame Channel Closed");
314-
}
315-
TrySendError::Full(_) => {
316-
error!("Send-Frame Buffer Full");
317-
}
305+
// Reserves space in the message queue if it is available
306+
// Or waits for space if none is available.
307+
match channel_send.reserve().await {
308+
Ok(permit) => permit.send(frame),
309+
Err(_) => {
310+
error!("Send-Frame Error");
311+
return Err(SendError(frame));
318312
}
319-
return Err(e);
320313
}
321314
}
322315
Ok(())

0 commit comments

Comments
 (0)