Skip to content

Commit

Permalink
perf(query): improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed May 14, 2023
1 parent 3ac194e commit 0d92ce2
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 16 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Added
- query: `Query::fetch_optional()`.

### Changed
- query: increase performance up to 40%.

## [0.11.3] - 2023-02-19
### Added
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ rust-version = "1.60"
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[[bench]]
name = "select_numbers"
harness = false

[[bench]]
name = "insert"
harness = false
Expand Down
36 changes: 36 additions & 0 deletions benches/select_numbers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use serde::Deserialize;

use clickhouse::{error::Result, Client, Compression, Row};

#[derive(Row, Deserialize)]
struct Data {
no: u64,
}

async fn bench() -> u64 {
let client = Client::default()
.with_compression(Compression::None)
.with_url("http://localhost:8123");

let mut cursor = client
.query("SELECT number FROM system.numbers_mt LIMIT 500000000")
.fetch::<Data>()
.unwrap();

let mut sum = 0;
while let Some(row) = cursor.next().await.unwrap() {
sum += row.no;
}

sum
}

#[tokio::main]
async fn main() -> Result<()> {
println!("Started");
let start = std::time::Instant::now();
let sum = tokio::spawn(bench()).await.unwrap();
let elapsed = start.elapsed();
println!("Done: elapsed={elapsed:?} sum={sum}");
Ok(())
}
46 changes: 33 additions & 13 deletions src/buflist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::collections::VecDeque;

use bytes::Buf;

#[derive(Default)]
#[derive(Debug, Default)]
pub(crate) struct BufList<T> {
next_buf: Option<T>,
bufs: VecDeque<T>,
rem: usize,
cursor: usize,
Expand All @@ -13,21 +14,28 @@ impl<T: Buf> BufList<T> {
#[inline]
pub(crate) fn push(&mut self, buf: T) {
let rem = buf.remaining();
if rem == 0 {
return;
}

if rem > 0 {
if self.next_buf.is_none() {
self.next_buf = Some(buf);
} else {
self.bufs.push_back(buf);
self.rem += rem;
}

self.rem += rem;
}

#[inline]
pub(crate) fn bufs_cnt(&self) -> usize {
self.bufs.len()
self.next_buf.is_some() as usize + self.bufs.len()
}

#[inline]
pub(crate) fn commit(&mut self) {
while self.cursor > 0 {
let front = &mut self.bufs[0];
let front = self.next_buf.as_mut().unwrap();
let rem = front.remaining();

if rem > self.cursor {
Expand All @@ -36,7 +44,7 @@ impl<T: Buf> BufList<T> {
} else {
front.advance(rem);
self.cursor -= rem;
self.bufs.pop_front();
self.next_buf = self.bufs.pop_front();
}
}
}
Expand All @@ -45,6 +53,21 @@ impl<T: Buf> BufList<T> {
self.rem += self.cursor;
self.cursor = 0;
}

#[cold]
fn chunk_slow(&self) -> &[u8] {
let mut cnt = self.cursor - self.next_buf.as_ref().map_or(0, |b| b.chunk().len());

for buf in &self.bufs {
let bytes = buf.chunk();
if bytes.len() > cnt {
return &bytes[cnt..];
}
cnt -= bytes.len();
}

b""
}
}

impl<T: Buf> Buf for BufList<T> {
Expand All @@ -55,17 +78,14 @@ impl<T: Buf> Buf for BufList<T> {

#[inline]
fn chunk(&self) -> &[u8] {
let mut cnt = self.cursor;

for buf in &self.bufs {
if let Some(buf) = &self.next_buf {
let bytes = buf.chunk();
if bytes.len() > cnt {
return &bytes[cnt..];
if bytes.len() > self.cursor {
return &bytes[self.cursor..];
}
cnt -= bytes.len();
}

b""
self.chunk_slow()
}

#[inline]
Expand Down
6 changes: 5 additions & 1 deletion src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ impl RawCursor {
&mut self,
mut f: impl FnMut(&mut BufList<Bytes>) -> ControlFlow<T>,
) -> Result<Option<T>> {
let chunks = self.response.chunks().await?;
let chunks = if let Some(chunks) = self.response.chunks() {
chunks
} else {
self.response.chunks_slow().await?
};

loop {
match f(&mut self.pending) {
Expand Down
14 changes: 12 additions & 2 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,17 @@ impl Response {
}))
}

pub(crate) async fn chunks(&mut self) -> Result<&mut Chunks<Body>> {
#[inline]
pub(crate) fn chunks(&mut self) -> Option<&mut Chunks<Body>> {
match self {
Self::Waiting(_) => None,
Self::Loading(chunks) => Some(chunks),
}
}

#[cold]
#[inline(never)]
pub(crate) async fn chunks_slow(&mut self) -> Result<&mut Chunks<Body>> {
loop {
match self {
Self::Waiting(future) => *self = Self::Loading(future.await?),
Expand All @@ -50,7 +60,7 @@ impl Response {
}

pub(crate) async fn finish(&mut self) -> Result<()> {
let chunks = self.chunks().await?;
let chunks = self.chunks_slow().await?;
while chunks.try_next().await?.is_some() {}
Ok(())
}
Expand Down

0 comments on commit 0d92ce2

Please sign in to comment.