Skip to content

Commit

Permalink
use DroppableEncoder everywhere we can, properly finish() otherwise
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 17, 2024
1 parent 847ff71 commit 7cf97f0
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 17 deletions.
4 changes: 2 additions & 2 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl RetryableFileReader {
mod tests {
use re_build_info::CrateVersion;
use re_chunk::RowId;
use re_log_encoding::{decoder, encoder::Encoder};
use re_log_encoding::{decoder, encoder::DroppableEncoder};
use re_log_types::{
ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time,
};
Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {
.open(rrd_file_path.to_str().unwrap())
.unwrap();

let mut encoder = Encoder::new(
let mut encoder = DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
re_log_encoding::EncodingOptions::UNCOMPRESSED,
rrd_file,
Expand Down
18 changes: 13 additions & 5 deletions crates/store/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ pub fn encode(
write: &mut impl std::io::Write,
) -> Result<u64, EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new(version, options, write)?;
let mut encoder = DroppableEncoder::new(version, options, write)?;
let mut size_bytes = 0;
for message in messages {
size_bytes += encoder.append(&message?)?;
Expand All @@ -222,7 +222,7 @@ pub fn encode_ref<'a>(
write: &mut impl std::io::Write,
) -> Result<u64, EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new(version, options, write)?;
let mut encoder = DroppableEncoder::new(version, options, write)?;
let mut size_bytes = 0;
for message in messages {
size_bytes += encoder.append(message?)?;
Expand All @@ -241,32 +241,40 @@ pub fn encode_as_bytes(
for message in messages {
encoder.append(&message?)?;
}
encoder.finish()?;
Ok(bytes)
}

#[inline]
pub fn local_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
pub fn local_encoder() -> Result<DroppableEncoder<Vec<u8>>, EncodeError> {
DroppableEncoder::new(CrateVersion::LOCAL, EncodingOptions::COMPRESSED, Vec::new())
}

#[inline]
pub fn local_raw_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
Encoder::new(CrateVersion::LOCAL, EncodingOptions::COMPRESSED, Vec::new())
}

#[inline]
pub fn encode_as_bytes_local(
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;
for message in messages {
encoder.append(&message?)?;
}
encoder.finish()?;
Ok(encoder.into_inner())
}

#[inline]
pub fn encode_ref_as_bytes_local<'a>(
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;
for message in messages {
encoder.append(message?)?;
}
encoder.finish()?;
Ok(encoder.into_inner())
}
6 changes: 3 additions & 3 deletions crates/store/re_log_encoding/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl FileSink {

let file = std::fs::File::create(&path)
.map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let encoder = crate::encoder::Encoder::new(
let encoder = crate::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
file,
Expand All @@ -97,7 +97,7 @@ impl FileSink {

re_log::debug!("Writing to stdout…");

let encoder = crate::encoder::Encoder::new(
let encoder = crate::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
std::io::stdout(),
Expand Down Expand Up @@ -127,7 +127,7 @@ impl FileSink {
/// Set `filepath` to `None` to stream to standard output.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
filepath: Option<&std::path::Path>,
mut encoder: crate::encoder::Encoder<W>,
mut encoder: crate::encoder::DroppableEncoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, FileSinkError> {
let (name, target) = if let Some(filepath) = filepath {
Expand Down
4 changes: 2 additions & 2 deletions crates/top/re_sdk/src/binary_stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl BinaryStreamSink {

let (tx, rx) = std::sync::mpsc::channel();

let encoder = re_log_encoding::encoder::Encoder::new(
let encoder = re_log_encoding::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
storage.inner.clone(),
Expand Down Expand Up @@ -167,7 +167,7 @@ impl LogSink for BinaryStreamSink {

/// Spawn the encoder thread that will write log messages to the binary stream.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
mut encoder: re_log_encoding::encoder::Encoder<W>,
mut encoder: re_log_encoding::encoder::DroppableEncoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, BinaryStreamSinkError> {
std::thread::Builder::new()
Expand Down
8 changes: 5 additions & 3 deletions crates/top/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::fmt;
use std::sync::Arc;

use parking_lot::Mutex;
use re_log_encoding::encoder::EncodeError;
use re_log_encoding::encoder::{encode_as_bytes_local, local_encoder};
use re_log_encoding::encoder::encode_as_bytes_local;
use re_log_encoding::encoder::{local_raw_encoder, EncodeError};
use re_log_types::{BlueprintActivationCommand, LogMsg, StoreId};

use crate::RecordingStream;
Expand Down Expand Up @@ -250,7 +250,7 @@ impl MemorySinkStorage {
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn concat_memory_sinks_as_bytes(sinks: &[&Self]) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;

for sink in sinks {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
Expand All @@ -264,6 +264,8 @@ impl MemorySinkStorage {
}
}

encoder.finish()?;

Ok(encoder.into_inner())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/src/commands/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ fn stream_to_rrd_on_disk(
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let file =
std::fs::File::create(path).map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let mut encoder = re_log_encoding::encoder::Encoder::new(
let mut encoder = re_log_encoding::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
file,
Expand Down
3 changes: 2 additions & 1 deletion crates/top/rerun/src/commands/rrd/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl FilterCommand {
// TODO(cmc): encoding options & version should match the original.
let version = CrateVersion::LOCAL;
let options = re_log_encoding::EncodingOptions::COMPRESSED;
re_log_encoding::encoder::Encoder::new(version, options, &mut rrd_out)
re_log_encoding::encoder::DroppableEncoder::new(version, options, &mut rrd_out)
.context("couldn't init encoder")?
};

Expand All @@ -93,6 +93,7 @@ impl FilterCommand {
size_bytes += encoder.append(&msg).context("encoding failure")?;
}

drop(encoder);
rrd_out.flush().context("couldn't flush output")?;

Ok(size_bytes)
Expand Down

0 comments on commit 7cf97f0

Please sign in to comment.