Skip to content

Commit 3a73b2a

Browse files
authored
Propagate incomplete frame data (#295)
## Summary of changes Writes the `complete` flag of each frame into the nexus file, as well as recording the digitisers present as a run log, in the case of `complete == false`. Also modified the type of the time field in the runlogs: `i32` changed to `u64` as `i32` does not have the range for recording `ns` precision. ## Instruction for review/testing General code review. Has been tested with simulated data. Please excuse the use of `anyhow`, will be changed later.
1 parent dcf5ca3 commit 3a73b2a

File tree

6 files changed

+120
-16
lines changed

6 files changed

+120
-16
lines changed

nexus-writer/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ where
293293
metadata_veto_flags = tracing::field::Empty,
294294
metadata_protons_per_pulse = tracing::field::Empty,
295295
metadata_running = tracing::field::Empty,
296+
frame_is_complete = tracing::field::Empty,
296297
)
297298
)]
298299
fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, payload: &[u8]) {
@@ -307,6 +308,7 @@ fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, pa
307308
.try_into()
308309
.map(|metadata: FrameMetadata| {
309310
record_metadata_fields_to_span!(metadata, tracing::Span::current());
311+
tracing::Span::current().record("frame_is_complete", data.complete());
310312
})
311313
.ok();
312314
match nexus_engine.process_event_list(&data) {
@@ -321,6 +323,7 @@ fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, pa
321323
"metadata_veto_flags" = tracing::field::Empty,
322324
"metadata_protons_per_pulse" = tracing::field::Empty,
323325
"metadata_running" = tracing::field::Empty,
326+
"frame_is_complete" = data.complete(),
324327
);
325328
data.metadata()
326329
.try_into()

nexus-writer/src/nexus/engine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl NexusEngine {
210210
.iter_mut()
211211
.find(|run| run.is_message_timestamp_valid(&timestamp))
212212
{
213-
run.push_message(self.local_path.as_deref(), message)?;
213+
run.push_message(self.local_path.as_deref(), message, &self.nexus_settings)?;
214214
Some(run)
215215
} else {
216216
warn!("No run found for message with timestamp: {timestamp}");

nexus-writer/src/nexus/hdf5_file/run_file.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,24 @@ impl RunFile {
316316
pub(crate) fn push_message_to_runfile(
317317
&mut self,
318318
message: &FrameAssembledEventListMessage,
319+
nexus_settings: &NexusSettings,
319320
) -> anyhow::Result<()> {
320-
self.contents.lists.push_message_to_event_runfile(message)
321+
self.contents.lists.push_message_to_event_runfile(message)?;
322+
323+
if !message.complete() {
324+
let time_zero = self.contents.lists.get_time_zero(message)?;
325+
326+
self.contents.logs.push_incomplete_frame_log(
327+
time_zero,
328+
message
329+
.digitizers_present()
330+
.unwrap_or_default()
331+
.iter()
332+
.collect(),
333+
nexus_settings,
334+
)?;
335+
}
336+
Ok(())
321337
}
322338

323339
fn try_read_scalar<T: H5Type>(dataset: &Dataset) -> anyhow::Result<T> {

nexus-writer/src/nexus/hdf5_file/run_file_components/event_run_file.rs

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub(crate) struct EventRun {
2020
event_time_zero: Dataset,
2121
period_number: Dataset,
2222
frame_number: Dataset,
23+
frame_complete: Dataset,
2324
// Events
2425
event_id: Dataset,
2526
pulse_height: Dataset,
@@ -81,6 +82,13 @@ impl EventRun {
8182
nexus_settings.framelist_chunk_size,
8283
)?;
8384

85+
let frame_complete = create_resizable_dataset::<u64>(
86+
&detector,
87+
"is_frame_complete",
88+
0,
89+
nexus_settings.framelist_chunk_size,
90+
)?;
91+
8492
Ok(Self {
8593
offset: None,
8694
num_events: 0,
@@ -92,6 +100,7 @@ impl EventRun {
92100
event_time_zero,
93101
period_number,
94102
frame_number,
103+
frame_complete,
95104
})
96105
}
97106

@@ -107,6 +116,7 @@ impl EventRun {
107116
let event_time_zero = detector.dataset("event_time_zero")?;
108117
let period_number = detector.dataset("period_number")?;
109118
let frame_number = detector.dataset("frame_number")?;
119+
let frame_complete = detector.dataset("is_frame_complete")?;
110120

111121
let offset: Option<DateTime<Utc>> = {
112122
if let Ok(offset) = event_time_zero.attr("offset") {
@@ -128,6 +138,7 @@ impl EventRun {
128138
event_time_zero,
129139
period_number,
130140
frame_number,
141+
frame_complete,
131142
})
132143
}
133144

@@ -156,19 +167,9 @@ impl EventRun {
156167
self.event_index
157168
.write_slice(&[self.num_events], next_message_slice)?;
158169

159-
let timestamp: DateTime<Utc> = (*message
160-
.metadata()
161-
.timestamp()
162-
.ok_or(anyhow::anyhow!("Message timestamp missing."))?)
163-
.try_into()?;
164-
165170
// Recalculate time_zero of the frame to be relative to the offset value
166171
// (set at the start of the run).
167-
let time_zero = self
168-
.offset
169-
.and_then(|offset| (timestamp - offset).num_nanoseconds())
170-
.ok_or(anyhow::anyhow!("event_time_zero cannot be calculated."))?
171-
as u64;
172+
let time_zero = self.get_time_zero(message)?;
172173

173174
self.event_time_zero.resize(self.num_messages + 1)?;
174175
self.event_time_zero
@@ -182,6 +183,10 @@ impl EventRun {
182183
self.frame_number
183184
.write_slice(&[message.metadata().frame_number()], next_message_slice)?;
184185

186+
self.frame_complete.resize(self.num_messages + 1)?;
187+
self.frame_complete
188+
.write_slice(&[message.complete()], next_message_slice)?;
189+
185190
// Fields Indexed By Event
186191
let num_new_events = message.channel().unwrap_or_default().len();
187192
let total_events = self.num_events + num_new_events;
@@ -223,4 +228,25 @@ impl EventRun {
223228
tracing::Span::current().record("num_events", num_new_events);
224229
Ok(())
225230
}
231+
232+
pub(crate) fn get_time_zero(
233+
&self,
234+
message: &FrameAssembledEventListMessage,
235+
) -> anyhow::Result<u64> {
236+
let timestamp: DateTime<Utc> = (*message
237+
.metadata()
238+
.timestamp()
239+
.ok_or(anyhow::anyhow!("Message timestamp missing."))?)
240+
.try_into()?;
241+
242+
// Recalculate time_zero of the frame to be relative to the offset value
243+
// (set at the start of the run).
244+
let time_zero = self
245+
.offset
246+
.and_then(|offset| (timestamp - offset).num_nanoseconds())
247+
.ok_or(anyhow::anyhow!("event_time_zero cannot be calculated."))?
248+
as u64;
249+
250+
Ok(time_zero)
251+
}
226252
}

nexus-writer/src/nexus/hdf5_file/run_file_components/runlog_file.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use hdf5::{
1010
types::{IntSize, TypeDescriptor},
1111
Group, SimpleExtents,
1212
};
13+
use ndarray::s;
14+
use supermusr_common::DigitizerId;
1315
use supermusr_streaming_types::ecs_f144_logdata_generated::f144_LogData;
1416
use tracing::debug;
1517

@@ -48,7 +50,7 @@ impl RunLog {
4850
let group = add_new_group_to(&self.parent, logdata.source_name(), NX::LOG)
4951
.map_err(|e| e.context(err))?;
5052

51-
let time = create_resizable_dataset::<i32>(
53+
let time = create_resizable_dataset::<u64>(
5254
&group,
5355
"time",
5456
0,
@@ -80,7 +82,7 @@ impl RunLog {
8082
let group =
8183
add_new_group_to(&self.parent, LOG_NAME, NX::LOG).map_err(|e| e.context(err))?;
8284

83-
let _time = create_resizable_dataset::<i32>(
85+
let _time = create_resizable_dataset::<u64>(
8486
&group,
8587
"time",
8688
0,
@@ -100,4 +102,60 @@ impl RunLog {
100102

101103
Ok(())
102104
}
105+
106+
pub(crate) fn push_incomplete_frame_log(
107+
&mut self,
108+
event_time_zero: u64,
109+
digitisers_present: Vec<DigitizerId>,
110+
nexus_settings: &NexusSettings,
111+
) -> anyhow::Result<()> {
112+
const LOG_NAME: &str = "SuperMuSRDataPipeline_DigitisersPresentInIncompleteFrame";
113+
let timeseries = self.parent.group(LOG_NAME).or_else(|err| {
114+
debug!("Cannot find {LOG_NAME}. Creating new group.");
115+
116+
let group =
117+
add_new_group_to(&self.parent, LOG_NAME, NX::LOG).map_err(|e| e.context(err))?;
118+
119+
create_resizable_dataset::<u64>(
120+
&group,
121+
"time",
122+
0,
123+
nexus_settings.runloglist_chunk_size,
124+
)?;
125+
create_resizable_dataset::<hdf5::types::VarLenUnicode>(
126+
&group,
127+
"value",
128+
0,
129+
nexus_settings.runloglist_chunk_size,
130+
)?;
131+
Ok::<_, anyhow::Error>(group)
132+
})?;
133+
let timestamps = timeseries.dataset("time")?;
134+
let values = timeseries.dataset("value")?;
135+
136+
if timestamps.size() != values.size() {
137+
anyhow::bail!(
138+
"time length ({}) and value length ({}) differ",
139+
timestamps.size(),
140+
values.size()
141+
)
142+
}
143+
144+
let current_size = timestamps.size();
145+
let next_slice = s![current_size..(current_size + 1)];
146+
147+
timestamps.resize(current_size + 1)?;
148+
timestamps.write_slice(&[event_time_zero], next_slice)?;
149+
150+
values.resize(current_size + 1)?;
151+
let value = digitisers_present
152+
.iter()
153+
.map(DigitizerId::to_string)
154+
.collect::<Vec<_>>()
155+
.join(",")
156+
.parse::<hdf5::types::VarLenUnicode>()?;
157+
values.write_slice(&[value], next_slice)?;
158+
159+
Ok(())
160+
}
103161
}

nexus-writer/src/nexus/run.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,11 @@ impl Run {
130130
&mut self,
131131
local_path: Option<&Path>,
132132
message: &FrameAssembledEventListMessage,
133+
nexus_settings: &NexusSettings,
133134
) -> anyhow::Result<()> {
134135
if let Some(local_path) = local_path {
135136
let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?;
136-
hdf5.push_message_to_runfile(message)?;
137+
hdf5.push_message_to_runfile(message, nexus_settings)?;
137138
hdf5.close()?;
138139
}
139140

0 commit comments

Comments
 (0)