Skip to content

Commit

Permalink
feat!(inserter): limit by size and make write() sync
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Nov 15, 2023
1 parent a99e304 commit 8c1cf9f
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 202 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- derive: support `serde::skip_deserializing` ([#83]).
- the `quanta` feature, enabled by default.
- inserter: can be limited by size, see `Inserter::with_max_bytes()`.

### Changed
- **BREAKING** inserter: `Inserter::write` is synchronous now.
- **BREAKING** inserter: rename `entries` to `rows`.
- **BREAKING** drop the `wa-37420` feature.
- **BREAKING** remove deprecated items.
- inserter: increase performance if the `quanta` feature is enabled.
- inserter: increase performance if the time limit isn't used.
- derive: move to syn v2.

### Fixed
Expand Down
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,22 +143,24 @@ insert.end().await?;
```rust,ignore
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_entries(750_000)
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() }).await?;
inserter.write(&MyRow { no: 1, name: "bar".into() }).await?;
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.entries > 0 {
if stats.rows > 0 {
println!(
"{} entries ({} transactions) have been inserted",
stats.entries, stats.transactions,
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
```

* `Inserter` ends an active insert in `commit()` if thresholds (`max_entries`, `period`) are reached.
* `Inserter` ends an active insert in `commit()` if thresholds (`max_bytes`, `max_rows`, `period`) are reached.
* The interval between ending active `INSERT`s can be biased by using `with_period_bias` to avoid load spikes by parallel inserters.
* `Inserter::time_left()` can be used to detect when the current period ends. Call `Inserter::commit()` again to check limits.
* All rows between `commit()` calls are inserted in the same `INSERT` statement.
* Do not forget to flush if you want to terminate inserting:
```rust,ignore
Expand Down
16 changes: 7 additions & 9 deletions benches/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,15 @@ async fn run_insert(client: Client, iters: u64) -> Result<Duration> {

async fn run_inserter(client: Client, iters: u64) -> Result<Duration> {
let start = Instant::now();
let mut inserter = client.inserter("table")?.with_max_entries(iters);
let mut inserter = client.inserter("table")?.with_max_rows(iters);

for _ in 0..iters {
inserter
.write(&black_box(SomeRow {
a: 42,
b: 42,
c: 42,
d: 42,
}))
.await?;
inserter.write(&black_box(SomeRow {
a: 42,
b: 42,
c: 42,
d: 42,
}))?;
inserter.commit().await?;
}

Expand Down
6 changes: 3 additions & 3 deletions examples/usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ async fn insert(client: &Client) -> Result<()> {
async fn inserter(client: &Client) -> Result<()> {
let mut inserter = client
.inserter("some")?
.with_max_entries(100_000)
.with_max_rows(100_000)
.with_period(Some(Duration::from_secs(15)));

for i in 0..1000 {
if i == 500 {
inserter.set_max_entries(300);
inserter.set_max_rows(300);
}

inserter.write(&MyRow { no: i, name: "foo" }).await?;
inserter.write(&MyRow { no: i, name: "foo" })?;
inserter.commit().await?;
}

Expand Down
2 changes: 1 addition & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
edition = "2018"
edition = "2021"

merge_derives = false
51 changes: 40 additions & 11 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use crate::{
const BUFFER_SIZE: usize = 128 * 1024;
const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; // slightly less to avoid extra reallocations

/// Performs only one `INSERT`.
/// Performs one `INSERT`.
///
/// The [`Insert::end`] must be called to finalize the `INSERT`.
/// Otherwise, the whole `INSERT` will be aborted.
///
/// Rows are being sent progressively to spread network load.
#[must_use]
Expand Down Expand Up @@ -143,20 +146,26 @@ impl<T> Insert<T> {
self.end_timeout = end_timeout;
}

/// Serializes and writes to the socket a provided row.
/// Serializes the provided row into an internal buffer.
/// Once the buffer is full, it's sent to a background task writing to the socket.
///
/// Close to:
/// ```ignore
/// async fn write<T>(&self, row: &T) -> Result<usize>;
/// ```
///
/// A returned future doesn't depend on the row's lifetime.
///
/// Returns an error if the row cannot be serialized or the background task failed.
/// Once failed, the whole `INSERT` is aborted and cannot be used anymore.
///
/// # Panics
/// If called after previous call returned an error.
pub fn write<'a>(&'a mut self, row: &T) -> impl Future<Output = Result<()>> + 'a + Send
where
T: Serialize,
{
assert!(self.sender.is_some(), "write() after error");

let result = rowbinary::serialize_into(&mut self.buffer, row);
if result.is_err() {
self.abort();
}
let result = self.do_write(row);

async move {
result?;
Expand All @@ -167,10 +176,30 @@ impl<T> Insert<T> {
}
}

/// Ends `INSERT`.
/// Succeeds if the server returns 200.
#[inline(always)]
pub(crate) fn do_write(&mut self, row: &T) -> Result<usize>
where
T: Serialize,
{
assert!(self.sender.is_some(), "write() after error");

let old_buf_size = self.buffer.len();
let result = rowbinary::serialize_into(&mut self.buffer, row);
let written = self.buffer.len() - old_buf_size;

if result.is_err() {
self.abort();
}

result.and(Ok(written))
}

/// Ends `INSERT`, the server starts processing the data.
///
/// Succeeds if the server returns 200, that means the `INSERT` was handled successfully,
/// including all materialized views and quorum writes.
///
/// If it isn't called, the whole `INSERT` is aborted.
/// NOTE: If it isn't called, the whole `INSERT` is aborted.
pub async fn end(mut self) -> Result<()> {
if !self.buffer.is_empty() {
self.send_chunk().await?;
Expand Down
Loading

0 comments on commit 8c1cf9f

Please sign in to comment.