Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid buffering the output twice when writing to lepton file #113

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 38 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ simple_logger ="5.0"
unroll = "0.1"
rayon-core = { version = "1", optional = true }
git-version = "0.3"
fixed-capacity-vec = "1.0.1"

[target.'cfg(windows)'.dependencies]
cpu-time = "1.0"
Expand Down
1 change: 1 addition & 0 deletions src/lepton_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub trait AddContext<T> {

impl<T, E: Into<LeptonError>> AddContext<T> for core::result::Result<T, E> {
#[track_caller]
#[inline(always)]
fn context(self) -> Result<T> {
match self {
Ok(x) => Ok(x),
Expand Down
2 changes: 0 additions & 2 deletions src/structs/lepton_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ pub fn lepton_encode_row_range<W: Write>(
features,
)
.context()?;

bool_writer.flush_non_final_data().context()?;
}

if is_last_thread && full_file_compression {
Expand Down
56 changes: 31 additions & 25 deletions src/structs/multiplexer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::cmp;
use std::collections::VecDeque;
use std::io::{Cursor, Read, Write};
use std::mem::swap;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};

use byteorder::WriteBytesExt;
use fixed_capacity_vec::FixedCapacityVec;

use crate::lepton_error::{AddContext, ExitCode, LeptonError, Result};
/// Implements a multiplexer that reads and writes blocks to a stream from multiple threads.
Expand All @@ -22,48 +22,54 @@ enum Message {
WriteBlock(usize, Vec<u8>),
}

const WRITE_BUFFER_SIZE: usize = 65536;

pub struct MultiplexWriter {
thread_id: usize,
sender: Sender<Message>,
buffer: Vec<u8>,
buffer: FixedCapacityVec<u8, WRITE_BUFFER_SIZE>,
}

const WRITE_BUFFER_SIZE: usize = 65536;

impl Write for MultiplexWriter {
#[inline(always)]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut copy_start = 0;
while copy_start < buf.len() {
let amount_to_copy = cmp::min(
WRITE_BUFFER_SIZE - self.buffer.len(),
buf.len() - copy_start,
);
self.buffer
.extend_from_slice(&buf[copy_start..copy_start + amount_to_copy]);

if self.buffer.len() == WRITE_BUFFER_SIZE {
self.flush()?;
// the caller writes one byte at time so there's no need to optimize here
for &b in buf {
if self.buffer.try_push_or_discard(b).is_err() {
// send buffer if we are full and include this byte in it
self.send_buffer::<true>(b);
}

copy_start += amount_to_copy;
}

Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
if self.buffer.len() > 0 {
let mut new_buffer = Vec::with_capacity(WRITE_BUFFER_SIZE);
swap(&mut new_buffer, &mut self.buffer);

self.sender
.send(Message::WriteBlock(self.thread_id, new_buffer))
.unwrap();
self.send_buffer::<false>(0);
}
Ok(())
}
}

impl MultiplexWriter {
#[cold]
fn send_buffer<const PUSH_NEXT: bool>(&mut self, next_byte: u8) {
let mut new_buffer = FixedCapacityVec::new();
if PUSH_NEXT {
new_buffer.push(next_byte);
}

swap(&mut new_buffer, &mut self.buffer);

// ignore errors here since there's really no need to interrupt our work
// since errors are not expected and this will slow down the processing
// by requiring the callers up the chain to check for errors
let _ = self
.sender
.send(Message::WriteBlock(self.thread_id, new_buffer.into()));
}
}

// if we are using Rayon, these are the primatives to use to spawn thread pool work items
#[cfg(feature = "use_rayon")]
fn my_scope<'scope, OP, R>(op: OP) -> R
Expand Down Expand Up @@ -145,7 +151,7 @@ where
let mut thread_writer = MultiplexWriter {
thread_id: thread_id,
sender: cloned_sender,
buffer: Vec::with_capacity(WRITE_BUFFER_SIZE),
buffer: FixedCapacityVec::new(),
};

let processor_clone = arc_processor.clone();
Expand Down
Loading
Loading