diff --git a/crates/store/re_data_loader/src/loader_rrd.rs b/crates/store/re_data_loader/src/loader_rrd.rs index 8b35851cf91e..4714733c1d06 100644 --- a/crates/store/re_data_loader/src/loader_rrd.rs +++ b/crates/store/re_data_loader/src/loader_rrd.rs @@ -216,7 +216,7 @@ impl RetryableFileReader { mod tests { use re_build_info::CrateVersion; use re_chunk::RowId; - use re_log_encoding::{decoder, encoder::Encoder}; + use re_log_encoding::{decoder, encoder::DroppableEncoder}; use re_log_types::{ ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time, }; @@ -247,7 +247,7 @@ mod tests { .open(rrd_file_path.to_str().unwrap()) .unwrap(); - let mut encoder = Encoder::new( + let mut encoder = DroppableEncoder::new( re_build_info::CrateVersion::LOCAL, re_log_encoding::EncodingOptions::UNCOMPRESSED, rrd_file, diff --git a/crates/store/re_log_encoding/src/encoder.rs b/crates/store/re_log_encoding/src/encoder.rs index d93f7772a144..edc966ce0d19 100644 --- a/crates/store/re_log_encoding/src/encoder.rs +++ b/crates/store/re_log_encoding/src/encoder.rs @@ -48,7 +48,66 @@ pub fn encode_to_bytes<'a>( // ---------------------------------------------------------------------------- +/// An [`Encoder`] that properly closes the stream on drop. +/// +/// When dropped, it will automatically insert an end-of-stream marker, if that wasn't already done manually. +pub struct DroppableEncoder { + encoder: Encoder, + + /// Tracks whether the end-of-stream marker has been written out already. + is_finished: bool, +} + +impl DroppableEncoder { + #[inline] + pub fn new( + version: CrateVersion, + options: EncodingOptions, + write: W, + ) -> Result { + Ok(Self { + encoder: Encoder::new(version, options, write)?, + is_finished: false, + }) + } + + /// Returns the size in bytes of the encoded data. + #[inline] + pub fn append(&mut self, message: &LogMsg) -> Result { + self.encoder.append(message) + } + + #[inline] + pub fn finish(&mut self) -> Result<(), EncodeError> { + if !self.is_finished { + self.encoder.finish()?; + } + + self.is_finished = true; + + Ok(()) + } + + #[inline] + pub fn flush_blocking(&mut self) -> std::io::Result<()> { + self.encoder.flush_blocking() + } +} + +impl std::ops::Drop for DroppableEncoder { + fn drop(&mut self) { + if !self.is_finished { + if let Err(err) = self.finish() { + re_log::warn!("encoder couldn't be finished: {err}"); + } + } + } +} + /// Encode a stream of [`LogMsg`] into an `.rrd` file. +/// +/// Prefer [`DroppableEncoder`] if possible, make sure to call [`Encoder::finish`] when appropriate +/// otherwise. pub struct Encoder { compression: Compression, write: W, @@ -120,15 +179,20 @@ impl Encoder { } } + // NOTE: This cannot be done in a `Drop` implementation because of `Self::into_inner` which + // does a partial move. + #[inline] pub fn finish(&mut self) -> Result<(), EncodeError> { MessageHeader::EndOfStream.encode(&mut self.write)?; Ok(()) } + #[inline] pub fn flush_blocking(&mut self) -> std::io::Result<()> { self.write.flush() } + #[inline] pub fn into_inner(self) -> W { self.write } @@ -142,7 +206,7 @@ pub fn encode( write: &mut impl std::io::Write, ) -> Result { re_tracing::profile_function!(); - let mut encoder = Encoder::new(version, options, write)?; + let mut encoder = DroppableEncoder::new(version, options, write)?; let mut size_bytes = 0; for message in messages { size_bytes += encoder.append(&message?)?; @@ -158,7 +222,7 @@ pub fn encode_ref<'a>( write: &mut impl std::io::Write, ) -> Result { re_tracing::profile_function!(); - let mut encoder = Encoder::new(version, options, write)?; + let mut encoder = DroppableEncoder::new(version, options, write)?; let mut size_bytes = 0; for message in messages { size_bytes += encoder.append(message?)?; @@ -177,11 +241,17 @@ pub fn encode_as_bytes( for message in messages { encoder.append(&message?)?; } + encoder.finish()?; Ok(bytes) } #[inline] -pub fn local_encoder() -> Result>, EncodeError> { +pub fn local_encoder() -> Result>, EncodeError> { + DroppableEncoder::new(CrateVersion::LOCAL, EncodingOptions::COMPRESSED, Vec::new()) +} + +#[inline] +pub fn local_raw_encoder() -> Result>, EncodeError> { Encoder::new(CrateVersion::LOCAL, EncodingOptions::COMPRESSED, Vec::new()) } @@ -189,10 +259,11 @@ pub fn local_encoder() -> Result>, EncodeError> { pub fn encode_as_bytes_local( messages: impl Iterator>, ) -> Result, EncodeError> { - let mut encoder = local_encoder()?; + let mut encoder = local_raw_encoder()?; for message in messages { encoder.append(&message?)?; } + encoder.finish()?; Ok(encoder.into_inner()) } @@ -200,9 +271,10 @@ pub fn encode_as_bytes_local( pub fn encode_ref_as_bytes_local<'a>( messages: impl Iterator>, ) -> Result, EncodeError> { - let mut encoder = local_encoder()?; + let mut encoder = local_raw_encoder()?; for message in messages { encoder.append(message?)?; } + encoder.finish()?; Ok(encoder.into_inner()) } diff --git a/crates/store/re_log_encoding/src/file_sink.rs b/crates/store/re_log_encoding/src/file_sink.rs index 166e4403dd84..cd5e7d2d953f 100644 --- a/crates/store/re_log_encoding/src/file_sink.rs +++ b/crates/store/re_log_encoding/src/file_sink.rs @@ -75,7 +75,7 @@ impl FileSink { let file = std::fs::File::create(&path) .map_err(|err| FileSinkError::CreateFile(path.clone(), err))?; - let encoder = crate::encoder::Encoder::new( + let encoder = crate::encoder::DroppableEncoder::new( re_build_info::CrateVersion::LOCAL, encoding_options, file, @@ -97,7 +97,7 @@ impl FileSink { re_log::debug!("Writing to stdoutā€¦"); - let encoder = crate::encoder::Encoder::new( + let encoder = crate::encoder::DroppableEncoder::new( re_build_info::CrateVersion::LOCAL, encoding_options, std::io::stdout(), @@ -127,7 +127,7 @@ impl FileSink { /// Set `filepath` to `None` to stream to standard output. fn spawn_and_stream( filepath: Option<&std::path::Path>, - mut encoder: crate::encoder::Encoder, + mut encoder: crate::encoder::DroppableEncoder, rx: Receiver>, ) -> Result, FileSinkError> { let (name, target) = if let Some(filepath) = filepath { diff --git a/crates/top/re_sdk/src/binary_stream_sink.rs b/crates/top/re_sdk/src/binary_stream_sink.rs index 6a74c869ece5..a673ea24992a 100644 --- a/crates/top/re_sdk/src/binary_stream_sink.rs +++ b/crates/top/re_sdk/src/binary_stream_sink.rs @@ -133,7 +133,7 @@ impl BinaryStreamSink { let (tx, rx) = std::sync::mpsc::channel(); - let encoder = re_log_encoding::encoder::Encoder::new( + let encoder = re_log_encoding::encoder::DroppableEncoder::new( re_build_info::CrateVersion::LOCAL, encoding_options, storage.inner.clone(), @@ -167,7 +167,7 @@ impl LogSink for BinaryStreamSink { /// Spawn the encoder thread that will write log messages to the binary stream. fn spawn_and_stream( - mut encoder: re_log_encoding::encoder::Encoder, + mut encoder: re_log_encoding::encoder::DroppableEncoder, rx: Receiver>, ) -> Result, BinaryStreamSinkError> { std::thread::Builder::new() diff --git a/crates/top/re_sdk/src/log_sink.rs b/crates/top/re_sdk/src/log_sink.rs index c54001cc3fd6..e89732dcbf2b 100644 --- a/crates/top/re_sdk/src/log_sink.rs +++ b/crates/top/re_sdk/src/log_sink.rs @@ -2,8 +2,8 @@ use std::fmt; use std::sync::Arc; use parking_lot::Mutex; -use re_log_encoding::encoder::EncodeError; -use re_log_encoding::encoder::{encode_as_bytes_local, local_encoder}; +use re_log_encoding::encoder::encode_as_bytes_local; +use re_log_encoding::encoder::{local_raw_encoder, EncodeError}; use re_log_types::{BlueprintActivationCommand, LogMsg, StoreId}; use crate::RecordingStream; @@ -250,7 +250,7 @@ impl MemorySinkStorage { /// This automatically takes care of flushing the underlying [`crate::RecordingStream`]. #[inline] pub fn concat_memory_sinks_as_bytes(sinks: &[&Self]) -> Result, EncodeError> { - let mut encoder = local_encoder()?; + let mut encoder = local_raw_encoder()?; for sink in sinks { // NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved @@ -264,6 +264,8 @@ impl MemorySinkStorage { } } + encoder.finish()?; + Ok(encoder.into_inner()) } diff --git a/crates/top/rerun/src/commands/entrypoint.rs b/crates/top/rerun/src/commands/entrypoint.rs index 6fe9e16c5b05..c2add380eb5b 100644 --- a/crates/top/rerun/src/commands/entrypoint.rs +++ b/crates/top/rerun/src/commands/entrypoint.rs @@ -950,7 +950,7 @@ fn stream_to_rrd_on_disk( let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED; let file = std::fs::File::create(path).map_err(|err| FileSinkError::CreateFile(path.clone(), err))?; - let mut encoder = re_log_encoding::encoder::Encoder::new( + let mut encoder = re_log_encoding::encoder::DroppableEncoder::new( re_build_info::CrateVersion::LOCAL, encoding_options, file, diff --git a/crates/top/rerun/src/commands/rrd/filter.rs b/crates/top/rerun/src/commands/rrd/filter.rs index 9d4fbb0a38af..fdc2086729f1 100644 --- a/crates/top/rerun/src/commands/rrd/filter.rs +++ b/crates/top/rerun/src/commands/rrd/filter.rs @@ -84,7 +84,7 @@ impl FilterCommand { // TODO(cmc): encoding options & version should match the original. let version = CrateVersion::LOCAL; let options = re_log_encoding::EncodingOptions::COMPRESSED; - re_log_encoding::encoder::Encoder::new(version, options, &mut rrd_out) + re_log_encoding::encoder::DroppableEncoder::new(version, options, &mut rrd_out) .context("couldn't init encoder")? }; @@ -93,6 +93,7 @@ impl FilterCommand { size_bytes += encoder.append(&msg).context("encoding failure")?; } + drop(encoder); rrd_out.flush().context("couldn't flush output")?; Ok(size_bytes)