Skip to content

Commit

Permalink
feat(inserter): start new insert only when the first row is provided (#…
Browse files Browse the repository at this point in the history
…68)

feat(inserter): start new insert only when the first row is provided
  • Loading branch information
laplab authored Jun 9, 2023
1 parent 51c47da commit bd0c381
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Changed
- inserter: start new insert only when the first row is provided ([#68]).

[#68]: https://github.com/loyd/clickhouse.rs/pull/68

## [0.11.4] - 2023-05-14
### Added
Expand Down
42 changes: 32 additions & 10 deletions src/inserter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::{future::Future, mem};
use std::mem;

use futures::{
future::{self, Either},
Future,
};
use serde::Serialize;
use tokio::time::{Duration, Instant};

Expand All @@ -17,7 +21,7 @@ pub struct Inserter<T> {
max_entries: u64,
send_timeout: Option<Duration>,
end_timeout: Option<Duration>,
insert: Insert<T>,
insert: Option<Insert<T>>,
ticks: Ticks,
committed: Quantities,
uncommitted_entries: u64,
Expand Down Expand Up @@ -51,7 +55,7 @@ where
max_entries: DEFAULT_MAX_ENTRIES,
send_timeout: None,
end_timeout: None,
insert: client.insert(table)?,
insert: None,
ticks: Ticks::default(),
committed: Quantities::ZERO,
uncommitted_entries: 0,
Expand Down Expand Up @@ -116,7 +120,9 @@ where
pub fn set_timeouts(&mut self, send_timeout: Option<Duration>, end_timeout: Option<Duration>) {
self.send_timeout = send_timeout;
self.end_timeout = end_timeout;
self.insert.set_timeouts(send_timeout, end_timeout);
if let Some(insert) = &mut self.insert {
insert.set_timeouts(self.send_timeout, self.end_timeout);
}
}

/// See [`Inserter::with_max_entries()`].
Expand Down Expand Up @@ -162,7 +168,12 @@ where
T: Serialize,
{
self.uncommitted_entries += 1;
self.insert.write(row)
if self.insert.is_none() {
if let Err(e) = self.init_insert() {
return Either::Right(future::ready(Result::<()>::Err(e)));
}
}
Either::Left(self.insert.as_mut().unwrap().write(row))
}

/// Checks limits and ends a current `INSERT` if they are reached.
Expand All @@ -189,8 +200,10 @@ where
/// Ends a current `INSERT` and whole `Inserter` unconditionally.
///
/// If it isn't called, the current `INSERT` is aborted.
pub async fn end(self) -> Result<Quantities> {
self.insert.end().await?;
pub async fn end(mut self) -> Result<Quantities> {
if let Some(insert) = self.insert.take() {
insert.end().await?;
}
Ok(self.committed)
}

Expand All @@ -200,9 +213,18 @@ where
}

async fn insert(&mut self) -> Result<()> {
let mut new_insert = self.client.insert(&self.table)?; // Actually it mustn't fail.
if let Some(insert) = self.insert.take() {
insert.end().await?;
}
Ok(())
}

#[cold]
#[inline(never)]
fn init_insert(&mut self) -> Result<()> {
debug_assert!(self.insert.is_none());
let mut new_insert: Insert<T> = self.client.insert(&self.table)?;
new_insert.set_timeouts(self.send_timeout, self.end_timeout);
let insert = mem::replace(&mut self.insert, new_insert);
insert.end().await
Ok(())
}
}

0 comments on commit bd0c381

Please sign in to comment.