From 0d92ce23377a3541fcf2d7acf5506e0a20da17b3 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 14 May 2023 19:14:34 +0400 Subject: [PATCH] perf(query): improve performance --- CHANGELOG.md | 5 +++++ Cargo.toml | 4 ++++ benches/select_numbers.rs | 36 ++++++++++++++++++++++++++++++ src/buflist.rs | 46 ++++++++++++++++++++++++++++----------- src/cursor.rs | 6 ++++- src/response.rs | 14 ++++++++++-- 6 files changed, 95 insertions(+), 16 deletions(-) create mode 100644 benches/select_numbers.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 3336a34..6dba32c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Added +- query: `Query::fetch_optional()`. + +### Changed +- query: increase performance up to 40%. ## [0.11.3] - 2023-02-19 ### Added diff --git a/Cargo.toml b/Cargo.toml index c7bc3c3..d92f2f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,10 @@ rust-version = "1.60" all-features = true rustdoc-args = ["--cfg", "docsrs"] +[[bench]] +name = "select_numbers" +harness = false + [[bench]] name = "insert" harness = false diff --git a/benches/select_numbers.rs b/benches/select_numbers.rs new file mode 100644 index 0000000..d08f3ac --- /dev/null +++ b/benches/select_numbers.rs @@ -0,0 +1,36 @@ +use serde::Deserialize; + +use clickhouse::{error::Result, Client, Compression, Row}; + +#[derive(Row, Deserialize)] +struct Data { + no: u64, +} + +async fn bench() -> u64 { + let client = Client::default() + .with_compression(Compression::None) + .with_url("http://localhost:8123"); + + let mut cursor = client + .query("SELECT number FROM system.numbers_mt LIMIT 500000000") + .fetch::() + .unwrap(); + + let mut sum = 0; + while let Some(row) = cursor.next().await.unwrap() { + sum += row.no; + } + + sum +} + +#[tokio::main] +async fn main() -> Result<()> { + println!("Started"); + let start = std::time::Instant::now(); + let sum = tokio::spawn(bench()).await.unwrap(); + let elapsed = start.elapsed(); + println!("Done: elapsed={elapsed:?} sum={sum}"); + Ok(()) +} diff --git a/src/buflist.rs b/src/buflist.rs index 96bfdd3..cd29bc2 100644 --- a/src/buflist.rs +++ b/src/buflist.rs @@ -2,8 +2,9 @@ use std::collections::VecDeque; use bytes::Buf; -#[derive(Default)] +#[derive(Debug, Default)] pub(crate) struct BufList { + next_buf: Option, bufs: VecDeque, rem: usize, cursor: usize, @@ -13,21 +14,28 @@ impl BufList { #[inline] pub(crate) fn push(&mut self, buf: T) { let rem = buf.remaining(); + if rem == 0 { + return; + } - if rem > 0 { + if self.next_buf.is_none() { + self.next_buf = Some(buf); + } else { self.bufs.push_back(buf); - self.rem += rem; } + + self.rem += rem; } #[inline] pub(crate) fn bufs_cnt(&self) -> usize { - self.bufs.len() + self.next_buf.is_some() as usize + self.bufs.len() } + #[inline] pub(crate) fn commit(&mut self) { while self.cursor > 0 { - let front = &mut self.bufs[0]; + let front = self.next_buf.as_mut().unwrap(); let rem = front.remaining(); if rem > self.cursor { @@ -36,7 +44,7 @@ impl BufList { } else { front.advance(rem); self.cursor -= rem; - self.bufs.pop_front(); + self.next_buf = self.bufs.pop_front(); } } } @@ -45,6 +53,21 @@ impl BufList { self.rem += self.cursor; self.cursor = 0; } + + #[cold] + fn chunk_slow(&self) -> &[u8] { + let mut cnt = self.cursor - self.next_buf.as_ref().map_or(0, |b| b.chunk().len()); + + for buf in &self.bufs { + let bytes = buf.chunk(); + if bytes.len() > cnt { + return &bytes[cnt..]; + } + cnt -= bytes.len(); + } + + b"" + } } impl Buf for BufList { @@ -55,17 +78,14 @@ impl Buf for BufList { #[inline] fn chunk(&self) -> &[u8] { - let mut cnt = self.cursor; - - for buf in &self.bufs { + if let Some(buf) = &self.next_buf { let bytes = buf.chunk(); - if bytes.len() > cnt { - return &bytes[cnt..]; + if bytes.len() > self.cursor { + return &bytes[self.cursor..]; } - cnt -= bytes.len(); } - b"" + self.chunk_slow() } #[inline] diff --git a/src/cursor.rs b/src/cursor.rs index 4752317..81dc441 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -33,7 +33,11 @@ impl RawCursor { &mut self, mut f: impl FnMut(&mut BufList) -> ControlFlow, ) -> Result> { - let chunks = self.response.chunks().await?; + let chunks = if let Some(chunks) = self.response.chunks() { + chunks + } else { + self.response.chunks_slow().await? + }; loop { match f(&mut self.pending) { diff --git a/src/response.rs b/src/response.rs index 54498ce..f43e813 100644 --- a/src/response.rs +++ b/src/response.rs @@ -40,7 +40,17 @@ impl Response { })) } - pub(crate) async fn chunks(&mut self) -> Result<&mut Chunks> { + #[inline] + pub(crate) fn chunks(&mut self) -> Option<&mut Chunks> { + match self { + Self::Waiting(_) => None, + Self::Loading(chunks) => Some(chunks), + } + } + + #[cold] + #[inline(never)] + pub(crate) async fn chunks_slow(&mut self) -> Result<&mut Chunks> { loop { match self { Self::Waiting(future) => *self = Self::Loading(future.await?), @@ -50,7 +60,7 @@ impl Response { } pub(crate) async fn finish(&mut self) -> Result<()> { - let chunks = self.chunks().await?; + let chunks = self.chunks_slow().await?; while chunks.try_next().await?.is_some() {} Ok(()) }