Skip to content

Commit

Permalink
feat(inserter): use quanta::Instant
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Nov 12, 2023
1 parent c26868e commit c8eddbf
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] - ReleaseDate
### Added
- derive: support `serde::skip_deserializing` ([#83]).
- the `quanta` feature, enabled by default.

### Changed
- **BREAKING** drop the `wa-37420` feature.
- inserter: increase performance if the `quanta` feature is enabled.
- derive: move to syn v2.

### Fixed
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ required-features = ["test-util"]
debug = true

[features]
default = ["lz4", "tls"]
default = ["lz4", "tls", "quanta"]

test-util = ["hyper/server"]
watch = ["dep:sha-1", "dep:serde_json"]
lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"]
uuid = ["dep:uuid"]
time = ["dep:time"]
tls = ["dep:hyper-tls"]
quanta = ["dep:quanta"]

[dependencies]
clickhouse-derive = { version = "0.1.1", path = "derive" }
Expand All @@ -63,6 +64,7 @@ clickhouse-rs-cityhash-sys = { version = "0.1.2", optional = true }
uuid = { version = "1", optional = true }
time = { version = "0.3", optional = true }
bstr = { version = "1.2", default-features = false }
quanta = { version = "0.12", optional = true }

[dev-dependencies]
criterion = "0.5.0"
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,11 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples).
## Feature Flags
* `lz4` (enabled by default) — enables `Compression::Lz4` and `Compression::Lz4Hc(_)` variants. If enabled, `Compression::Lz4` is used by default for all queries except for `WATCH`.
* `tls` (enabled by default) — supports urls with the `HTTPS` schema.
* `quanta` (enabled by default) - uses the [`quanta`](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests).
* `test-util` — adds mocks. See [the example](https://github.com/loyd/clickhouse.rs/tree/master/examples/mock.rs). Use it only in `dev-dependencies`.
* `watch` — enables `client.watch` functionality. See the corresponding section for details.
* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid/latest/uuid/) crate.
* `time` — adds `serde::time` to work with [time](https://docs.rs/time/latest/time/) crate.
* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid) crate.
* `time` — adds `serde::time` to work with [time](https://docs.rs/time) crate.

## Data Types
* `(U)Int(8|16|32|64|128)` maps to/from corresponding `(u|i)(8|16|32|64|128)` types or newtypes around them.
Expand Down
18 changes: 12 additions & 6 deletions src/inserter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ use futures::{
Future,
};
use serde::Serialize;
use tokio::time::{Duration, Instant};

use crate::{error::Result, insert::Insert, row::Row, ticks::Ticks, Client};
use tokio::time::Duration;

use crate::{
error::Result,
insert::Insert,
row::Row,
ticks::{self, Ticks},
Client,
};

const DEFAULT_MAX_ENTRIES: u64 = 500_000;

Expand Down Expand Up @@ -154,7 +160,7 @@ where
Some(
self.ticks
.next_at()?
.saturating_duration_since(Instant::now()),
.saturating_duration_since(ticks::Instant::now()),
)
}

Expand Down Expand Up @@ -184,7 +190,7 @@ where
self.uncommitted_entries = 0;
}

let now = Instant::now();
let now = ticks::Instant::now();

Ok(if self.is_threshold_reached(now) {
let quantities = mem::replace(&mut self.committed, Quantities::ZERO);
Expand All @@ -207,7 +213,7 @@ where
Ok(self.committed)
}

fn is_threshold_reached(&self, now: Instant) -> bool {
fn is_threshold_reached(&self, now: ticks::Instant) -> bool {
self.committed.entries >= self.max_entries
|| self.ticks.next_at().map_or(false, |next_at| now >= next_at)
}
Expand Down
21 changes: 16 additions & 5 deletions src/ticks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
use tokio::time::{Duration, Instant};
use tokio::time::Duration;

const PERIOD_THRESHOLD: Duration = Duration::from_secs(365 * 24 * 3600);

// === Instant ===

// More efficient `Instant` based on TSC.
#[cfg(all(feature = "quanta", not(feature = "test-util")))]
pub(crate) type Instant = quanta::Instant;

#[cfg(any(not(feature = "quanta"), feature = "test-util"))]
pub(crate) type Instant = tokio::time::Instant;

// === Ticks ===

pub(crate) struct Ticks {
period: Duration,
max_bias: f64,
Expand Down Expand Up @@ -62,9 +73,9 @@ impl Ticks {
}
}

#[tokio::test]
#[cfg(feature = "test-util")] // only with `tokio::time::Instant`
#[tokio::test(start_paused = true)]
async fn it_works() {
tokio::time::pause();
let origin = Instant::now();

// No bias.
Expand Down Expand Up @@ -101,9 +112,9 @@ async fn it_works() {
assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(31));
}

#[tokio::test]
#[cfg(feature = "test-util")] // only with `tokio::time::Instant`
#[tokio::test(start_paused = true)]
async fn it_skips_extra_ticks() {
tokio::time::pause();
let origin = Instant::now();

let mut ticks = Ticks::default();
Expand Down

0 comments on commit c8eddbf

Please sign in to comment.