Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure to always write out end-of-stream markers during encoding #7796

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl RetryableFileReader {
mod tests {
use re_build_info::CrateVersion;
use re_chunk::RowId;
use re_log_encoding::{decoder, encoder::Encoder};
use re_log_encoding::{decoder, encoder::DroppableEncoder};
use re_log_types::{
ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time,
};
Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {
.open(rrd_file_path.to_str().unwrap())
.unwrap();

let mut encoder = Encoder::new(
let mut encoder = DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
re_log_encoding::EncodingOptions::UNCOMPRESSED,
rrd_file,
Expand Down
82 changes: 77 additions & 5 deletions crates/store/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,66 @@ pub fn encode_to_bytes<'a>(

// ----------------------------------------------------------------------------

/// An [`Encoder`] that properly closes the stream on drop.
///
/// When dropped, it will automatically insert an end-of-stream marker, if that wasn't already done manually.
pub struct DroppableEncoder<W: std::io::Write> {
encoder: Encoder<W>,

/// Tracks whether the end-of-stream marker has been written out already.
is_finished: bool,
}

impl<W: std::io::Write> DroppableEncoder<W> {
#[inline]
pub fn new(
version: CrateVersion,
options: EncodingOptions,
write: W,
) -> Result<Self, EncodeError> {
Ok(Self {
encoder: Encoder::new(version, options, write)?,
is_finished: false,
})
}

/// Returns the size in bytes of the encoded data.
#[inline]
pub fn append(&mut self, message: &LogMsg) -> Result<u64, EncodeError> {
self.encoder.append(message)
}

#[inline]
pub fn finish(&mut self) -> Result<(), EncodeError> {
if !self.is_finished {
self.encoder.finish()?;
}

self.is_finished = true;

Ok(())
}

#[inline]
pub fn flush_blocking(&mut self) -> std::io::Result<()> {
self.encoder.flush_blocking()
}
}

impl<W: std::io::Write> std::ops::Drop for DroppableEncoder<W> {
fn drop(&mut self) {
if !self.is_finished {
if let Err(err) = self.finish() {
re_log::warn!("encoder couldn't be finished: {err}");
}
}
}
}

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
///
/// Prefer [`DroppableEncoder`] if possible, make sure to call [`Encoder::finish`] when appropriate
/// otherwise.
pub struct Encoder<W: std::io::Write> {
compression: Compression,
write: W,
Expand Down Expand Up @@ -120,15 +179,20 @@ impl<W: std::io::Write> Encoder<W> {
}
}

// NOTE: This cannot be done in a `Drop` implementation because of `Self::into_inner` which
// does a partial move.
#[inline]
pub fn finish(&mut self) -> Result<(), EncodeError> {
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
MessageHeader::EndOfStream.encode(&mut self.write)?;
Ok(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could warn in Drop if this hasn't been called, at least in debug builds

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean? We cannot implement Drop for Encoder?

}

#[inline]
pub fn flush_blocking(&mut self) -> std::io::Result<()> {
self.write.flush()
}

#[inline]
pub fn into_inner(self) -> W {
self.write
}
Expand All @@ -142,7 +206,7 @@ pub fn encode(
write: &mut impl std::io::Write,
) -> Result<u64, EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new(version, options, write)?;
let mut encoder = DroppableEncoder::new(version, options, write)?;
let mut size_bytes = 0;
for message in messages {
size_bytes += encoder.append(&message?)?;
Expand All @@ -158,7 +222,7 @@ pub fn encode_ref<'a>(
write: &mut impl std::io::Write,
) -> Result<u64, EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new(version, options, write)?;
let mut encoder = DroppableEncoder::new(version, options, write)?;
let mut size_bytes = 0;
for message in messages {
size_bytes += encoder.append(message?)?;
Expand All @@ -177,32 +241,40 @@ pub fn encode_as_bytes(
for message in messages {
encoder.append(&message?)?;
}
encoder.finish()?;
Ok(bytes)
}

#[inline]
pub fn local_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
pub fn local_encoder() -> Result<DroppableEncoder<Vec<u8>>, EncodeError> {
DroppableEncoder::new(CrateVersion::LOCAL, EncodingOptions::COMPRESSED, Vec::new())
}

#[inline]
pub fn local_raw_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
Encoder::new(CrateVersion::LOCAL, EncodingOptions::COMPRESSED, Vec::new())
}

#[inline]
pub fn encode_as_bytes_local(
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;
for message in messages {
encoder.append(&message?)?;
}
encoder.finish()?;
Ok(encoder.into_inner())
}

#[inline]
pub fn encode_ref_as_bytes_local<'a>(
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;
for message in messages {
encoder.append(message?)?;
}
encoder.finish()?;
Ok(encoder.into_inner())
}
6 changes: 3 additions & 3 deletions crates/store/re_log_encoding/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl FileSink {

let file = std::fs::File::create(&path)
.map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let encoder = crate::encoder::Encoder::new(
let encoder = crate::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
file,
Expand All @@ -97,7 +97,7 @@ impl FileSink {

re_log::debug!("Writing to stdout…");

let encoder = crate::encoder::Encoder::new(
let encoder = crate::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
std::io::stdout(),
Expand Down Expand Up @@ -127,7 +127,7 @@ impl FileSink {
/// Set `filepath` to `None` to stream to standard output.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
filepath: Option<&std::path::Path>,
mut encoder: crate::encoder::Encoder<W>,
mut encoder: crate::encoder::DroppableEncoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, FileSinkError> {
let (name, target) = if let Some(filepath) = filepath {
Expand Down
4 changes: 2 additions & 2 deletions crates/top/re_sdk/src/binary_stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl BinaryStreamSink {

let (tx, rx) = std::sync::mpsc::channel();

let encoder = re_log_encoding::encoder::Encoder::new(
let encoder = re_log_encoding::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
storage.inner.clone(),
Expand Down Expand Up @@ -167,7 +167,7 @@ impl LogSink for BinaryStreamSink {

/// Spawn the encoder thread that will write log messages to the binary stream.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
mut encoder: re_log_encoding::encoder::Encoder<W>,
mut encoder: re_log_encoding::encoder::DroppableEncoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, BinaryStreamSinkError> {
std::thread::Builder::new()
Expand Down
8 changes: 5 additions & 3 deletions crates/top/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::fmt;
use std::sync::Arc;

use parking_lot::Mutex;
use re_log_encoding::encoder::EncodeError;
use re_log_encoding::encoder::{encode_as_bytes_local, local_encoder};
use re_log_encoding::encoder::encode_as_bytes_local;
use re_log_encoding::encoder::{local_raw_encoder, EncodeError};
use re_log_types::{BlueprintActivationCommand, LogMsg, StoreId};

use crate::RecordingStream;
Expand Down Expand Up @@ -250,7 +250,7 @@ impl MemorySinkStorage {
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn concat_memory_sinks_as_bytes(sinks: &[&Self]) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;

for sink in sinks {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
Expand All @@ -264,6 +264,8 @@ impl MemorySinkStorage {
}
}

encoder.finish()?;

Ok(encoder.into_inner())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/src/commands/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ fn stream_to_rrd_on_disk(
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let file =
std::fs::File::create(path).map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let mut encoder = re_log_encoding::encoder::Encoder::new(
let mut encoder = re_log_encoding::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
file,
Expand Down
3 changes: 2 additions & 1 deletion crates/top/rerun/src/commands/rrd/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl FilterCommand {
// TODO(cmc): encoding options & version should match the original.
let version = CrateVersion::LOCAL;
let options = re_log_encoding::EncodingOptions::COMPRESSED;
re_log_encoding::encoder::Encoder::new(version, options, &mut rrd_out)
re_log_encoding::encoder::DroppableEncoder::new(version, options, &mut rrd_out)
.context("couldn't init encoder")?
};

Expand All @@ -93,6 +93,7 @@ impl FilterCommand {
size_bytes += encoder.append(&msg).context("encoding failure")?;
}

drop(encoder);
rrd_out.flush().context("couldn't flush output")?;

Ok(size_bytes)
Expand Down
Loading