Skip to content

Commit

Permalink
csv: only flush on explicit flush call
Browse files Browse the repository at this point in the history
This fixes a bug where the CSV writer was erroneously flushing the
underlying writer when its internal buffer was full. Instead, when the
internal buffer is full, it should simply write the contents of the buffer
to the underlying writer. There is no need to flush it. Indeed, this causes
other problems such as those observed in
BurntSushi/xsv#151.

We are careful to ensure that calling `flush` explicitly still calls `flush`
on the underlying writer.

Fixes #173
  • Loading branch information
owst authored and BurntSushi committed Jan 7, 2020
1 parent fcdbea3 commit e2c2468
Showing 1 changed file with 54 additions and 4 deletions.
58 changes: 54 additions & 4 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ impl<W: io::Write> Writer<W> {
self.state.fields_written += 1;
return Ok(());
}
WriteResult::OutputFull => self.flush()?,
WriteResult::OutputFull => self.flush_buf()?,
}
}
}
Expand All @@ -1055,12 +1055,19 @@ impl<W: io::Write> Writer<W> {
///
/// Note that this also flushes the underlying writer.
pub fn flush(&mut self) -> io::Result<()> {
self.flush_buf()?;
self.wtr.as_mut().unwrap().flush()?;
Ok(())
}

/// Flush the contents of the internal buffer to the underlying writer,
/// without flushing the underlying writer.
fn flush_buf(&mut self) -> io::Result<()> {
self.state.panicked = true;
let result = self.wtr.as_mut().unwrap().write_all(self.buf.readable());
self.state.panicked = false;
result?;
self.buf.clear();
self.wtr.as_mut().unwrap().flush()?;
Ok(())
}

Expand All @@ -1082,7 +1089,7 @@ impl<W: io::Write> Writer<W> {
self.buf.written(nout);
match res {
WriteResult::InputEmpty => return Ok(()),
WriteResult::OutputFull => self.flush()?,
WriteResult::OutputFull => self.flush_buf()?,
}
}
}
Expand All @@ -1098,7 +1105,7 @@ impl<W: io::Write> Writer<W> {
self.state.fields_written = 0;
return Ok(());
}
WriteResult::OutputFull => self.flush()?,
WriteResult::OutputFull => self.flush_buf()?,
}
}
}
Expand Down Expand Up @@ -1179,6 +1186,8 @@ impl Buffer {
mod tests {
use serde::{serde_if_integer128, Serialize};

use std::io::{self, Write};

use crate::byte_record::ByteRecord;
use crate::error::ErrorKind;
use crate::string_record::StringRecord;
Expand Down Expand Up @@ -1306,6 +1315,47 @@ mod tests {
assert_eq!(wtr_as_string(wtr), "a,b,c\na\n");
}

#[test]
fn full_buffer_should_not_flush_underlying() {
struct MarkWriteAndFlush(Vec<u8>);

impl MarkWriteAndFlush {
fn to_str(self) -> String {
String::from_utf8(self.0).unwrap()
}
}

impl Write for MarkWriteAndFlush {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
self.0.write(b">")?;
let written = self.0.write(data)?;
self.0.write(b"<")?;

Ok(written)
}

fn flush(&mut self) -> io::Result<()> {
self.0.write(b"!")?;
Ok(())
}
}

let underlying = MarkWriteAndFlush(vec![]);
let mut wtr =
WriterBuilder::new().buffer_capacity(4).from_writer(underlying);

wtr.write_byte_record(&ByteRecord::from(vec!["a", "b"])).unwrap();
wtr.write_byte_record(&ByteRecord::from(vec!["c", "d"])).unwrap();
wtr.flush().unwrap();
wtr.write_byte_record(&ByteRecord::from(vec!["e", "f"])).unwrap();

let got = wtr.into_inner().unwrap().to_str();

// As the buffer size is 4 we should write each record separately, and
// flush when explicitly called and implictly in into_inner.
assert_eq!(got, ">a,b\n<>c,d\n<!>e,f\n<!");
}

#[test]
fn serialize_with_headers() {
#[derive(Serialize)]
Expand Down

0 comments on commit e2c2468

Please sign in to comment.