diff --git a/Cargo.lock b/Cargo.lock index a97698eaba..031004da6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,7 +40,7 @@ dependencies = [ "flate2", "futures-core", "h2", - "http", + "http 0.2.12", "httparse", "httpdate", "itoa", @@ -76,7 +76,7 @@ checksum = "13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8" dependencies = [ "bytestring", "cfg-if", - "http", + "http 0.2.12", "regex", "regex-lite", "serde", @@ -105,7 +105,7 @@ dependencies = [ "futures-core", "futures-util", "mio", - "socket2", + "socket2 0.5.7", "tokio", "tracing", ] @@ -150,7 +150,7 @@ dependencies = [ "bytes", "bytestring", "cfg-if", - "cookie", + "cookie 0.16.2", "derive_more", "encoding_rs", "futures-core", @@ -168,7 +168,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2", + "socket2 0.5.7", "time", "url", ] @@ -185,15 +185,6 @@ dependencies = [ "syn", ] -[[package]] -name = "addr2line" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" -dependencies = [ - "gimli", -] - [[package]] name = "adler2" version = "2.0.0" @@ -320,6 +311,12 @@ version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10f00e1f6e58a40e807377c75c6a7f97bf9044fab57816f2414e6f5f4499d7b8" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayref" version = "0.3.8" @@ -381,7 +378,7 @@ dependencies = [ "chrono", "chrono-tz", "half", - "hashbrown", + "hashbrown 0.14.5", "num", ] @@ -604,21 +601,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" -[[package]] -name = "backtrace" -version = "0.3.74" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" -dependencies = [ - "addr2line", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", - "windows-targets", -] - [[package]] name = "base64" version = "0.22.1" @@ -629,11 +611,12 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" name = "benchmarks" version = "2.2.0-alpha" dependencies = [ + "anyhow", "bytes", "columnar_storage", "common", "criterion", - "deadpool", + "hotpath", "num_cpus", "pb_types", "prost", @@ -643,8 +626,6 @@ dependencies = [ "remote_write", "serde", "serde_json", - "tikv-jemalloc-ctl", - "tikv-jemallocator", "tokio", "toml", "tracing", @@ -782,10 +763,11 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.18" +version = "1.2.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476" +checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2" dependencies = [ + "find-msvc-tools", "jobserver", "libc", "shlex", @@ -806,7 +788,7 @@ dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -903,6 +885,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "colored" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "columnar_storage" version = "2.2.0-alpha" @@ -993,6 +984,35 @@ dependencies = [ "version_check", ] +[[package]] +name = "cookie" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ddef33a339a91ea89fb53151bd0a4689cfce27055c291dfa69945475d22c747" +dependencies = [ + "percent-encoding", + "time", + "version_check", +] + +[[package]] +name = "cookie_store" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fc4bff745c9b4c7fb1e97b25d13153da2bc7796260141df62378998d070207f" +dependencies = [ + "cookie 0.18.1", + "document-features", + "idna 1.1.0", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "time", + "url", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1053,6 +1073,15 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -1123,7 +1152,7 @@ checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", - "hashbrown", + "hashbrown 0.14.5", "lock_api", "once_cell", "parking_lot_core", @@ -1165,7 +1194,7 @@ dependencies = [ "futures", "glob", "half", - "hashbrown", + "hashbrown 0.14.5", "indexmap", "itertools 0.13.0", "log", @@ -1214,7 +1243,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "hashbrown", + "hashbrown 0.14.5", "indexmap", "instant", "libc", @@ -1248,7 +1277,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "futures", - "hashbrown", + "hashbrown 0.14.5", "log", "object_store", "parking_lot", @@ -1308,7 +1337,7 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", - "hashbrown", + "hashbrown 0.14.5", "hex", "itertools 0.13.0", "log", @@ -1415,7 +1444,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown", + "hashbrown 0.14.5", "indexmap", "itertools 0.13.0", "log", @@ -1443,7 +1472,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", - "hashbrown", + "hashbrown 0.14.5", "indexmap", "itertools 0.13.0", "log", @@ -1461,7 +1490,7 @@ dependencies = [ "arrow", "datafusion-common", "datafusion-expr-common", - "hashbrown", + "hashbrown 0.14.5", "rand", ] @@ -1505,7 +1534,7 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "half", - "hashbrown", + "hashbrown 0.14.5", "indexmap", "itertools 0.13.0", "log", @@ -1534,24 +1563,6 @@ dependencies = [ "strum", ] -[[package]] -name = "deadpool" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" -dependencies = [ - "async-trait", - "deadpool-runtime", - "num_cpus", - "tokio", -] - -[[package]] -name = "deadpool-runtime" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" - [[package]] name = "deranged" version = "0.3.11" @@ -1585,12 +1596,59 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "document-features" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61" +dependencies = [ + "litrs", +] + [[package]] name = "either" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -1637,12 +1695,28 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "eyre" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec" +dependencies = [ + "indenter", + "once_cell", +] + [[package]] name = "fastrand" version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +[[package]] +name = "find-msvc-tools" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" + [[package]] name = "fixedbitset" version = "0.4.2" @@ -1794,12 +1868,6 @@ dependencies = [ "wasi", ] -[[package]] -name = "gimli" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" - [[package]] name = "glob" version = "0.3.1" @@ -1817,7 +1885,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap", "slab", "tokio", @@ -1846,6 +1914,22 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" + +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "byteorder", + "num-traits", +] + [[package]] name = "heck" version = "0.5.0" @@ -1879,6 +1963,39 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "hotpath" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6189994b73d164becf3187cc046e68b3f7e6c29e2ef61c30c9e75601fd148daf" +dependencies = [ + "arc-swap", + "cfg-if", + "clap", + "colored", + "crossbeam-channel", + "eyre", + "hdrhistogram", + "hotpath-macros", + "prettytable-rs", + "quanta", + "serde", + "serde_json", + "tokio", + "ureq", +] + +[[package]] +name = "hotpath-macros" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15ec66a715a703f2e7b7b84f468b4f0187172ef998baa62fcce8d1174d9edb11" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "http" version = "0.2.12" @@ -1890,6 +2007,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "httparse" version = "1.9.5" @@ -1931,6 +2059,87 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + [[package]] name = "idna" version = "0.5.0" @@ -1941,20 +2150,47 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "impl-more" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae21c3177a27788957044151cc2800043d127acaa460a47ebb9b84dfa2c6aa0" +[[package]] +name = "indenter" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "964de6e86d545b246d84badc0fef527924ace5134f30641c203ef52ba83f58d5" + [[package]] name = "indexmap" -version = "2.5.0" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" +checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.0", ] [[package]] @@ -2118,9 +2354,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.158" +version = "0.2.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" [[package]] name = "libm" @@ -2128,12 +2364,34 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libredox" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +dependencies = [ + "bitflags 2.6.0", + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "litemap" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" + +[[package]] +name = "litrs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" + [[package]] name = "local-channel" version = "0.1.5" @@ -2163,9 +2421,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.22" +version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" [[package]] name = "lz4_flex" @@ -2371,12 +2629,12 @@ dependencies = [ ] [[package]] -name = "object" -version = "0.36.4" +name = "object-pool" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" +checksum = "ceffa2e6ccecd71e60a0f06b655df2c66acd1c0c892dafefc96fd49d65f71d53" dependencies = [ - "memchr", + "parking_lot", ] [[package]] @@ -2447,7 +2705,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -2471,7 +2729,7 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown", + "hashbrown 0.14.5", "lz4_flex", "num", "num-bigint", @@ -2629,6 +2887,15 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "potential_utf" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +dependencies = [ + "zerovec", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -2654,6 +2921,19 @@ dependencies = [ "syn", ] +[[package]] +name = "prettytable-rs" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eea25e07510aa6ab6547308ebe3c036016d162b8da920dbb079e3ba8acf3d95a" +dependencies = [ + "encode_unicode", + "is-terminal", + "lazy_static", + "term", + "unicode-width", +] + [[package]] name = "proc-macro2" version = "1.0.86" @@ -2767,6 +3047,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-protobuf" version = "0.8.1" @@ -2815,6 +3110,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -2844,6 +3148,17 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom", + "libredox", + "thiserror", +] + [[package]] name = "regex" version = "1.10.6" @@ -2899,20 +3214,26 @@ name = "remote_write" version = "2.2.0-alpha" dependencies = [ "anyhow", - "async-trait", "bytes", - "deadpool", + "object-pool", "once_cell", "pb_types", "prost", - "tokio", ] [[package]] -name = "rustc-demangle" -version = "0.1.24" +name = "ring" +version = "0.17.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "untrusted", + "windows-sys 0.52.0", +] [[package]] name = "rustc_version" @@ -2936,6 +3257,50 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.23.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -3157,6 +3522,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "sqlparser" version = "0.51.0" @@ -3178,6 +3553,12 @@ dependencies = [ "syn", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "static_assertions" version = "1.1.0" @@ -3229,6 +3610,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "temp-dir" version = "0.1.14" @@ -3248,6 +3640,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "test-log" version = "0.2.16" @@ -3311,37 +3714,6 @@ dependencies = [ "ordered-float", ] -[[package]] -name = "tikv-jemalloc-ctl" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c" -dependencies = [ - "libc", - "paste", - "tikv-jemalloc-sys", -] - -[[package]] -name = "tikv-jemalloc-sys" -version = "0.5.4+5.3.0-patched" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1" -dependencies = [ - "cc", - "libc", -] - -[[package]] -name = "tikv-jemallocator" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca" -dependencies = [ - "libc", - "tikv-jemalloc-sys", -] - [[package]] name = "time" version = "0.3.37" @@ -3384,6 +3756,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinystr" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -3411,27 +3793,26 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.40.0" +version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ - "backtrace", "bytes", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.1", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", @@ -3597,6 +3978,45 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + +[[package]] +name = "ureq" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99ba1025f18a4a3fc3e9b48c868e9beb4f24f4b4b1a325bada26bd4119f46537" +dependencies = [ + "base64", + "cookie_store", + "flate2", + "log", + "percent-encoding", + "rustls", + "rustls-pemfile", + "rustls-pki-types", + "serde", + "serde_json", + "ureq-proto", + "utf-8", + "webpki-roots", +] + +[[package]] +name = "ureq-proto" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b4531c118335662134346048ddb0e54cc86bd7e81866757873055f0e38f5d2" +dependencies = [ + "base64", + "http 1.3.1", + "httparse", + "log", +] + [[package]] name = "url" version = "2.5.2" @@ -3604,10 +4024,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", - "idna", + "idna 0.5.0", "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -3716,6 +4148,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32b130c0d2d49f8b6889abc456e795e82525204f27c42cf767cf0d7734e089b8" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -3765,16 +4206,22 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -3783,7 +4230,25 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", ] [[package]] @@ -3792,14 +4257,31 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", ] [[package]] @@ -3808,48 +4290,96 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "0.6.20" @@ -3859,6 +4389,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "writeable" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" + [[package]] name = "xz2" version = "0.1.7" @@ -3868,6 +4404,29 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "yoke" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -3889,6 +4448,66 @@ dependencies = [ "syn", ] +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + +[[package]] +name = "zerotrie" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zstd" version = "0.13.2" diff --git a/Cargo.toml b/Cargo.toml index b4bab12b6b..2ced2a4071 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,11 +51,13 @@ columnar_storage = { path = "src/columnar_storage" } common = { path = "src/common" } criterion = "0.5" datafusion = "43" -deadpool = "0.10" futures = "0.3" +hotpath = "0.5.2" itertools = "0.3" lazy_static = "1" metric_engine = { path = "src/metric_engine" } +num_cpus = "1" +object-pool = "0.6" object_store = { version = "0.11" } once_cell = "1" parquet = { version = "53" } diff --git a/docs/assets/remote-write-concurrent-performance.png b/docs/assets/remote-write-concurrent-performance.png index 15d185a11c..55b04fb832 100644 Binary files a/docs/assets/remote-write-concurrent-performance.png and b/docs/assets/remote-write-concurrent-performance.png differ diff --git a/docs/assets/remote-write-memory-performance.png b/docs/assets/remote-write-memory-performance.png index c58598207d..5b113eed3c 100644 Binary files a/docs/assets/remote-write-memory-performance.png and b/docs/assets/remote-write-memory-performance.png differ diff --git a/docs/assets/remote-write-sequential-performance.png b/docs/assets/remote-write-sequential-performance.png index a401c285ca..c5f7942caa 100644 Binary files a/docs/assets/remote-write-sequential-performance.png and b/docs/assets/remote-write-sequential-performance.png differ diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml index de738d0123..56cdfa32d8 100644 --- a/src/benchmarks/Cargo.toml +++ b/src/benchmarks/Cargo.toml @@ -29,18 +29,19 @@ description.workspace = true name = "parser_mem" path = "src/bin/parser_mem.rs" -[[bin]] -name = "pool_stats" -path = "src/bin/pool_stats.rs" - [features] +default = ["hotpath", "hotpath-alloc-bytes-total"] +hotpath = [] +hotpath-alloc-bytes-total = ["hotpath", "hotpath/hotpath-alloc-bytes-total"] unsafe-split = ["remote_write/unsafe-split"] [dependencies] +anyhow = { workspace = true } bytes = { workspace = true } columnar_storage = { workspace = true } common = { workspace = true } -deadpool = { workspace = true } +hotpath = { workspace = true } +num_cpus = { workspace = true } pb_types = { workspace = true } prost = { workspace = true } protobuf = "3.7" @@ -48,21 +49,18 @@ quick-protobuf = "0.8" remote_write = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -tikv-jemalloc-ctl = "0.5" tokio = { workspace = true } toml = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] -tikv-jemallocator = "0.5" [build-dependencies] protobuf-codegen = "3.7" [dev-dependencies] criterion = { workspace = true } -num_cpus = "1.16" [[bench]] name = "bench" diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs index b932a44101..13fe083bc6 100644 --- a/src/benchmarks/benches/bench.rs +++ b/src/benchmarks/benches/bench.rs @@ -64,12 +64,6 @@ fn bench_remote_write(c: &mut Criterion) { let concurrent_scales = config.remote_write.concurrent_scales.clone(); let bench = RefCell::new(RemoteWriteBench::new(config.remote_write)); - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(num_cpus::get()) - .enable_all() - .build() - .unwrap(); - // Sequential parse bench. let mut group = c.benchmark_group("remote_write_sequential"); @@ -85,10 +79,10 @@ fn bench_remote_write(c: &mut Criterion) { group.bench_with_input( BenchmarkId::new("pooled", n), - &(&bench, &rt, n), - |b, (bench, rt, scale)| { + &(&bench, n), + |b, (bench, scale)| { let bench = bench.borrow(); - b.iter(|| rt.block_on(bench.pooled_parser_sequential(*scale)).unwrap()) + b.iter(|| bench.pooled_parser_sequential(*scale).unwrap()) }, ); @@ -118,43 +112,37 @@ fn bench_remote_write(c: &mut Criterion) { for &scale in &concurrent_scales { group.bench_with_input( BenchmarkId::new("prost", scale), - &(&bench, &rt, scale), - |b, (bench, rt, scale)| { + &(&bench, scale), + |b, (bench, scale)| { let bench = bench.borrow(); - b.iter(|| rt.block_on(bench.prost_parser_concurrent(*scale)).unwrap()) + b.iter(|| bench.prost_parser_concurrent(*scale).unwrap()) }, ); group.bench_with_input( BenchmarkId::new("pooled", scale), - &(&bench, &rt, scale), - |b, (bench, rt, scale)| { + &(&bench, scale), + |b, (bench, scale)| { let bench = bench.borrow(); - b.iter(|| rt.block_on(bench.pooled_parser_concurrent(*scale)).unwrap()) + b.iter(|| bench.pooled_parser_concurrent(*scale).unwrap()) }, ); group.bench_with_input( BenchmarkId::new("quick_protobuf", scale), - &(&bench, &rt, scale), - |b, (bench, rt, scale)| { + &(&bench, scale), + |b, (bench, scale)| { let bench = bench.borrow(); - b.iter(|| { - rt.block_on(bench.quick_protobuf_parser_concurrent(*scale)) - .unwrap() - }) + b.iter(|| bench.quick_protobuf_parser_concurrent(*scale).unwrap()) }, ); group.bench_with_input( BenchmarkId::new("rust_protobuf", scale), - &(&bench, &rt, scale), - |b, (bench, rt, scale)| { + &(&bench, scale), + |b, (bench, scale)| { let bench = bench.borrow(); - b.iter(|| { - rt.block_on(bench.rust_protobuf_parser_concurrent(*scale)) - .unwrap() - }) + b.iter(|| bench.rust_protobuf_parser_concurrent(*scale).unwrap()) }, ); } diff --git a/src/benchmarks/remote_write_memory_bench.py b/src/benchmarks/remote_write_memory_bench.py index 42a2e50293..44f257c6a2 100644 --- a/src/benchmarks/remote_write_memory_bench.py +++ b/src/benchmarks/remote_write_memory_bench.py @@ -16,12 +16,9 @@ # specific language governing permissions and limitations # under the License. +import argparse import subprocess -import json import sys -import os -from typing import Dict, Any -import argparse try: from tabulate import tabulate @@ -30,208 +27,95 @@ print("Please install it with: pip3 install tabulate") sys.exit(1) - -class MemoryBenchmark: - def __init__(self, scale, mode, use_unsafe=False): - self.project_path = "." - self.data_path = "../remote_write/tests/workloads/1709380533560664458.data" - self.scale = scale - self.mode = mode - self.use_unsafe = use_unsafe - self.parsers = [ - "pooled", - "prost", - "rust-protobuf", - "quick-protobuf", - ] - - def build_binary(self) -> bool: - features_msg = " with unsafe-split" if self.use_unsafe else "" - print(f"Building binary{features_msg}...") - try: - bin_name = "parser_mem" - build_cmd = ["cargo", "build", "--release", "--bin", bin_name] - if self.use_unsafe: - build_cmd.extend(["--features", "unsafe-split"]) - result = subprocess.run( - build_cmd, - cwd=self.project_path, - check=False, - ) - if result.returncode != 0: - print("Failed to build binary") - return False - return True - except Exception as e: - print(f"Failed to build binary: {e}") - return False - - def run_parser(self, parser: str, mode: str, scale: int, bin_name: str) -> Dict[str, Any]: - binary_path = f"../../target/release/{bin_name}" - - cmd = [binary_path, mode, str(scale), parser] - - try: - result = subprocess.run( - cmd, - cwd=self.project_path, - capture_output=True, - text=True, - timeout=300, # 5 minute timeout - ) - - if result.returncode != 0: - print(f"Error running {parser}: {result.stderr}") - return {} - - return json.loads(result.stdout.strip()) - - except subprocess.TimeoutExpired: - print(f"Timeout running {parser} {mode} {scale}") - return {} - except json.JSONDecodeError as e: - print(f"Failed to parse JSON from {parser}: {e}") - print(f"Raw output: {result.stdout}") - return {} - except FileNotFoundError: - print(f"Binary not found: {binary_path}") - print(f"Please run: cargo build --release --bins") - return {} - except Exception as e: - print(f"Exception running {parser}: {e}") - return {} - - def run_benchmarks(self) -> Dict[str, Dict[str, Any]]: - results = {} - successful_count = 0 - total_count = len(self.parsers) - - print(f"\nRunning benchmarks for {total_count} parsers...") - - for i, parser in enumerate(self.parsers, 1): - print(f"\n[{i}/{total_count}] Testing {parser}...") - print(f"Running {self.mode} mode with scale {self.scale}...") - result = self.run_parser( - parser, self.mode, self.scale, "parser_mem") - - if result: - result["parser"] = parser - results[parser] = result - successful_count += 1 - print(f"Success") - else: - print(f"Failed") - - print( - f"\nCompleted: {successful_count}/{total_count} parsers succeeded") - - if successful_count == total_count: - print("All parsers succeeded - generating report...") - return results - else: - print("Some parsers failed - skipping report generation") - return {} - - def analyze_results(self, results: Dict[str, Dict[str, Any]]): - if not results: - print("\nNo results to analyze - all parsers failed or were skipped") - return - - print(f"\n{'='*80}") - print("MEMORY BENCHMARK RESULTS") - print(f"{'='*80}") - print(f"Mode: {self.mode.upper()}, Scale: {self.scale}") - print() - - headers = [ - "Parser", - "ThreadAlloc", - "ThreadDealloc", - "Allocated", - "Active", - "Metadata", - "Mapped", - "Resident", - "Retained", - ] - - table_data = [] - for parser in self.parsers: - if parser in results: - result = results[parser] - memory = result.get("memory", {}) - row = [ - parser, - f"{memory.get('thread_allocated_diff', 0):,}", - f"{memory.get('thread_deallocated_diff', 0):,}", - f"{memory.get('allocated', 0):,}", - f"{memory.get('active', 0):,}", - f"{memory.get('metadata', 0):,}", - f"{memory.get('mapped', 0):,}", - f"{memory.get('resident', 0):,}", - f"{memory.get('retained', 0):,}", - ] - table_data.append(row) - - print("SUMMARY TABLE (All values in bytes)") +PARSERS = ["pooled", "prost", "rust-protobuf", "quick-protobuf"] +BINARY_PATH = "../../target/release/parser_mem" + + +def build_binary(use_unsafe: bool): + print(f"Building binary{' with unsafe-split' if use_unsafe else ''}...") + cmd = ["cargo", "build", "--release", "--bin", "parser_mem"] + if use_unsafe: + cmd.extend(["--features", "unsafe-split"]) + result = subprocess.run(cmd, cwd=".") + return result.returncode == 0 + + +def parse_total_alloc(stdout: str): + for line in stdout.splitlines(): + if line.startswith("Total allocated bytes (cumulative):"): + try: + raw = line.split(":", 1)[1].strip().split(" ")[0] + return int(raw) + except Exception: + return None + return None + + +def run_parser(parser: str, mode: str, scale: int): + cmd = [BINARY_PATH, mode, str(scale), parser] + try: + res = subprocess.run( + cmd, cwd=".", capture_output=True, text=True, timeout=300) + except Exception as e: + print(f"[error] run {parser}: {e}", file=sys.stderr) + return None + if res.returncode != 0: + print(f"[error] run {parser} failed: {res.stderr}", file=sys.stderr) + return None + total = parse_total_alloc(res.stdout) + if total is None: print( - tabulate( - table_data, - headers=headers, - tablefmt="grid", - stralign="right", - numalign="right", - ) + f"[error] parse total bytes failed for {parser}. Output:\n{res.stdout}", + file=sys.stderr, + ) + return total + + +def print_table(results: dict, mode: str, scale: int): + print(f"\n{'='*60}") + print("MEMORY BENCHMARK RESULTS") + print(f"{'='*60}") + print(f"Mode: {mode}, Scale: {scale}") + headers = ["Parser", "TotalAllocatedBytes"] + rows = [[p, f"{results[p]:,}"] for p in PARSERS if p in results] + print( + tabulate( + rows, + headers=headers, + tablefmt="grid", + stralign="right", + numalign="right", ) - - -def main(): - parser = argparse.ArgumentParser( - description="Memory benchmark for protobuf parsers" - ) - parser.add_argument( - "--unsafe", action="store_true", help="Enable unsafe-split feature" - ) - parser.add_argument( - "--mode", - choices=["sequential", "concurrent"], - default="sequential", - help="Test mode to run (default: sequential)", - ) - parser.add_argument( - "--scale", - type=int, - default=10, - help="Scale value for benchmark (default: 10)", ) - args = parser.parse_args() +def main(): + ap = argparse.ArgumentParser( + description="Memory benchmark for protobuf parsers") + ap.add_argument("--unsafe", action="store_true", + help="Enable unsafe-split feature") + ap.add_argument( + "--mode", choices=["sequential", "concurrent"], default="sequential", help="Run mode") + ap.add_argument("--scale", type=int, default=10, help="Benchmark scale") + args = ap.parse_args() if args.scale <= 0: - print(f"Invalid scale value '{args.scale}', scale must be positive") - sys.exit(1) - - data_path = "../remote_write/tests/workloads/1709380533560664458.data" - if not os.path.exists(data_path): - print(f"Test data file not found at {data_path}") - print("Please ensure test data exists before running benchmarks") - sys.exit(1) - - benchmark = MemoryBenchmark( - scale=args.scale, mode=args.mode, use_unsafe=args.unsafe - ) - - if not benchmark.build_binary(): - sys.exit(1) - - print(f"\nRunning memory benchmarks...") - print(f"Mode: {args.mode}") - print(f"Scale: {args.scale}") - print(f"Unsafe optimization: {'enabled' if args.unsafe else 'disabled'}") - - results = benchmark.run_benchmarks() - benchmark.analyze_results(results) + print("scale must be positive", file=sys.stderr) + return 1 + if not build_binary(args.unsafe): + print("build failed", file=sys.stderr) + return 1 + results = {} + failed = False + for parser in PARSERS: + total = run_parser(parser, args.mode, args.scale) + if total is None: + failed = True + else: + results[parser] = total + if results: + print_table(results, args.mode, args.scale) + return 1 if failed else 0 if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/src/benchmarks/src/bin/parser_mem.rs b/src/benchmarks/src/bin/parser_mem.rs index 0028cd5750..a993ba6f51 100644 --- a/src/benchmarks/src/bin/parser_mem.rs +++ b/src/benchmarks/src/bin/parser_mem.rs @@ -15,124 +15,118 @@ // specific language governing permissions and limitations // under the License. -use benchmarks::util::{MemoryBenchConfig, MemoryStats}; +use anyhow::Result; +use benchmarks::util::{human_bytes, run_concurrent_threads, MemoryBenchConfig}; +use bytes::Bytes; +use hotpath::{GuardBuilder, MetricType, MetricsProvider, Reporter}; use pb_types::WriteRequest as ProstWriteRequest; use prost::Message; use protobuf::Message as ProtobufMessage; use quick_protobuf::{BytesReader, MessageRead}; use remote_write::pooled_parser::PooledParser; -use tikv_jemallocator::Jemalloc; -#[global_allocator] -static ALLOC: Jemalloc = Jemalloc; - -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { +fn main() -> Result<()> { let config = MemoryBenchConfig::from_args(); let args: Vec = std::env::args().collect(); let parser = args.get(3).map(|s| s.as_str()).unwrap_or("pooled"); - let start_stats = MemoryStats::collect()?; + let _guard = GuardBuilder::new("parser_mem") + .reporter(Box::new(TotalAllocReporter)) + .build(); match config.mode.as_str() { "sequential" => match parser { - "pooled" => { - let parser = PooledParser; - for _ in 0..config.scale { - let _ = parser.decode_async(config.test_data.clone()).await?; - } - } - "prost" => { - for _ in 0..config.scale { - ProstWriteRequest::decode(config.test_data.clone())?; - } - } - "rust-protobuf" => { - for _ in 0..config.scale { - let _ = benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes( - &config.test_data, - )?; - } - } - "quick-protobuf" => { - for _ in 0..config.scale { - let mut reader = BytesReader::from_bytes(&config.test_data); - let _ = benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader( - &mut reader, - &config.test_data, - )?; - } - } + "pooled" => pooled_worker(config.test_data.clone(), config.scale), + "prost" => prost_worker(config.test_data.clone(), config.scale), + "rust-protobuf" => rust_protobuf_worker(config.test_data.clone(), config.scale), + "quick-protobuf" => quick_protobuf_worker(config.test_data.clone(), config.scale), other => panic!("unknown parser: {}", other), }, "concurrent" => match parser { - "pooled" => { - let mut handles = Vec::new(); - for _ in 0..config.scale { - let data_clone = config.test_data.clone(); - let handle = tokio::spawn(async move { - let parser = PooledParser; - let _ = parser.decode_async(data_clone).await; - }); - handles.push(handle); - } - for handle in handles { - handle.await?; - } - } - "prost" => { - let mut handles = Vec::new(); - for _ in 0..config.scale { - let data_clone = config.test_data.clone(); - let handle = tokio::spawn(async move { - let _ = ProstWriteRequest::decode(data_clone); - }); - handles.push(handle); - } - for handle in handles { - handle.await?; - } - } - "rust-protobuf" => { - let mut handles = Vec::new(); - for _ in 0..config.scale { - let data_clone = config.test_data.clone(); - let handle = tokio::spawn(async move { - let _ = - benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes( - &data_clone, - ); - }); - handles.push(handle); - } - for handle in handles { - handle.await?; - } - } - "quick-protobuf" => { - let mut handles = Vec::new(); - for _ in 0..config.scale { - let data_clone = config.test_data.clone(); - let handle = tokio::spawn(async move { - let mut reader = BytesReader::from_bytes(&data_clone); - let _ = benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader( - &mut reader, - &data_clone, - ); - }); - handles.push(handle); - } - for handle in handles { - handle.await?; - } - } + "pooled" => run_concurrent_threads(config.scale, move |n| { + pooled_worker(config.test_data.clone(), n); + Ok(()) + }) + .map_err(anyhow::Error::msg)?, + "prost" => run_concurrent_threads(config.scale, move |n| { + prost_worker(config.test_data.clone(), n); + Ok(()) + }) + .map_err(anyhow::Error::msg)?, + "rust-protobuf" => run_concurrent_threads(config.scale, move |n| { + rust_protobuf_worker(config.test_data.clone(), n); + Ok(()) + }) + .map_err(anyhow::Error::msg)?, + "quick-protobuf" => run_concurrent_threads(config.scale, move |n| { + quick_protobuf_worker(config.test_data.clone(), n); + Ok(()) + }) + .map_err(anyhow::Error::msg)?, other => panic!("unknown parser: {}", other), }, _ => panic!("invalid mode"), } - let end_stats = MemoryStats::collect()?; - let memory_diff = start_stats.diff(&end_stats); - config.output_json(&memory_diff); Ok(()) } + +#[cfg_attr(feature = "hotpath", hotpath::measure)] +fn pooled_worker(data: Bytes, iterations: usize) { + let parser = PooledParser; + for _ in 0..iterations { + let _ = parser.decode(data.clone()); + } +} + +#[cfg_attr(feature = "hotpath", hotpath::measure)] +fn prost_worker(data: Bytes, iterations: usize) { + for _ in 0..iterations { + let _ = ProstWriteRequest::decode(data.clone()); + } +} + +#[cfg_attr(feature = "hotpath", hotpath::measure)] +fn rust_protobuf_worker(data: Bytes, iterations: usize) { + for _ in 0..iterations { + let _ = benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(&data); + } +} + +#[cfg_attr(feature = "hotpath", hotpath::measure)] +fn quick_protobuf_worker(data: Bytes, iterations: usize) { + for _ in 0..iterations { + let mut reader = BytesReader::from_bytes(&data); + let _ = + benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(&mut reader, &data); + } +} + +struct TotalAllocReporter; + +impl Reporter for TotalAllocReporter { + fn report(&self, metrics: &dyn MetricsProvider<'_>) -> Result<(), Box> { + // In hotpath, each profiled function will output a row of [metrics](https://github.com/pawurb/hotpath/blob/main/hotpath-alloc-report.png): + // [calls, avg, pXX, total, %total], we only care about the `total` + // metric to verify our zero-allocation optimization. Since each function's + // metrics include those of its nested calls, we need to use the + // maximum value of the `total` field across all functions as + // the total memory allocated during decoding. + let mut max_total: u64 = 0; + for values in metrics.metric_data().values() { + if values.len() < 2 { + continue; + } + let total_idx = values.len() - 2; + if let MetricType::AllocBytes(bytes) = values[total_idx] { + max_total = std::cmp::max(max_total, bytes); + } + } + println!( + "Total allocated bytes (cumulative): {} ({})", + max_total, + human_bytes(max_total) + ); + Ok(()) + } +} diff --git a/src/benchmarks/src/bin/pool_stats.rs b/src/benchmarks/src/bin/pool_stats.rs deleted file mode 100644 index 48115075b3..0000000000 --- a/src/benchmarks/src/bin/pool_stats.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! evaluate the efficiency of the deadpool-backed object pool. - -use std::fs; - -use bytes::Bytes; -use remote_write::{pooled_parser::PooledParser, pooled_types::POOL}; -use tikv_jemallocator::Jemalloc; -use tokio::task::JoinHandle; - -#[global_allocator] -static ALLOC: Jemalloc = Jemalloc; - -async fn run_concurrent_parsing(scale: usize) -> deadpool::Status { - let data = fs::read("../remote_write/tests/workloads/1709380533560664458.data") - .expect("test data load failed"); - let data = Bytes::from(data); - - let handles: Vec> = (0..scale) - .map(|_| { - let data = data.clone(); - tokio::spawn(async move { - let parser = PooledParser; - let _ = parser - .decode_async(data.clone()) - .await - .expect("parse failed"); - }) - }) - .collect(); - - for handle in handles { - handle.await.expect("task completion failed"); - } - - POOL.status() -} - -#[tokio::main] -async fn main() { - let scale_values = [1, 2, 5, 10, 20, 50, 100, 200, 500]; - - println!( - "{:<8} {:<10} {:<10} {:<10} {:<10}", - "Scale", "MaxSize", "PoolSize", "Available", "Waiting" - ); - println!("{}", "=".repeat(50)); - - for &scale in &scale_values { - let status = run_concurrent_parsing(scale).await; - - println!( - "{:<8} {:<10} {:<10} {:<10} {:<10}", - scale, status.max_size, status.size, status.available, status.waiting - ); - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - - println!("=== Final Pool Status ==="); - let final_status = POOL.status(); - println!("Max Pool Size: {}", final_status.max_size); - println!("Current Pool Size: {}", final_status.size); - println!("Available Objects: {}", final_status.available); - println!("Waiting Requests: {}", final_status.waiting); -} diff --git a/src/benchmarks/src/remote_write_bench.rs b/src/benchmarks/src/remote_write_bench.rs index b5372da5b6..c6f8ca6576 100644 --- a/src/benchmarks/src/remote_write_bench.rs +++ b/src/benchmarks/src/remote_write_bench.rs @@ -17,7 +17,7 @@ //! remote write parser bench. -use std::{fs, path::PathBuf}; +use std::{fs, path::PathBuf, sync::Arc}; use bytes::Bytes; use pb_types::WriteRequest as ProstWriteRequest; @@ -25,12 +25,12 @@ use prost::Message; use protobuf::Message as ProtobufMessage; use quick_protobuf::{BytesReader, MessageRead}; use remote_write::pooled_parser::PooledParser; -use tokio::task::JoinHandle; use crate::{ config::RemoteWriteConfig, quick_protobuf_remote_write::WriteRequest as QuickProtobufWriteRequest, rust_protobuf_remote_write::WriteRequest as RustProtobufWriteRequest, + util::run_concurrent_threads, }; pub struct RemoteWriteBench { @@ -59,14 +59,13 @@ impl RemoteWriteBench { } // Hand-written pooled parser sequential bench. - pub async fn pooled_parser_sequential(&self, scale: usize) -> Result<(), String> { + pub fn pooled_parser_sequential(&self, scale: usize) -> Result<(), String> { let parser = PooledParser; for _ in 0..scale { let data = Bytes::from(self.raw_data.clone()); let _ = parser - .decode_async(data.clone()) - .await - .map_err(|e| format!("pooled sequential parse failed: {:?}", e))?; + .decode(data.clone()) + .map_err(|e| format!("pooled sequential parse failed: {e:?}"))?; } Ok(()) } @@ -91,85 +90,55 @@ impl RemoteWriteBench { } // prost parser concurrent bench. - pub async fn prost_parser_concurrent(&self, scale: usize) -> Result<(), String> { - let join_handles: Vec>> = (0..scale) - .map(|_| { - let raw_data = self.raw_data.clone(); - tokio::spawn(async move { - let data = Bytes::from(raw_data); - ProstWriteRequest::decode(data) - .map_err(|e| format!("prost concurrent parse failed: {}", e))?; - Ok(()) - }) - }) - .collect(); - - for join_handle in join_handles { - join_handle.await.unwrap()?; - } - Ok(()) + pub fn prost_parser_concurrent(&self, scale: usize) -> Result<(), String> { + let raw = Arc::new(self.raw_data.clone()); + run_concurrent_threads(scale, move |n| { + for _ in 0..n { + let data = Bytes::from((*raw).clone()); + ProstWriteRequest::decode(data) + .map_err(|e| format!("prost concurrent parse failed: {}", e))?; + } + Ok(()) + }) } // Hand-written pooled parser concurrent bench. - pub async fn pooled_parser_concurrent(&self, scale: usize) -> Result<(), String> { - let parser = PooledParser; - let join_handles: Vec>> = (0..scale) - .map(|_| { - let parser = parser.clone(); - let raw_data = self.raw_data.clone(); - tokio::spawn(async move { - let data = Bytes::from(raw_data); - let _ = parser - .decode_async(data.clone()) - .await - .map_err(|e| format!("pooled concurrent parse failed: {:?}", e))?; - Ok(()) - }) - }) - .collect(); - - for join_handle in join_handles { - join_handle.await.unwrap()?; - } - Ok(()) + pub fn pooled_parser_concurrent(&self, scale: usize) -> Result<(), String> { + let raw = Arc::new(self.raw_data.clone()); + run_concurrent_threads(scale, move |n| { + let parser = PooledParser; + for _ in 0..n { + let data = Bytes::from((*raw).clone()); + let _ = parser + .decode(data.clone()) + .map_err(|e| format!("pooled concurrent parse failed: {e:?}"))?; + } + Ok(()) + }) } // quick-protobuf parser concurrent bench. - pub async fn quick_protobuf_parser_concurrent(&self, scale: usize) -> Result<(), String> { - let join_handles: Vec>> = (0..scale) - .map(|_| { - let data = self.raw_data.clone(); - tokio::spawn(async move { - let mut reader = BytesReader::from_bytes(&data); - QuickProtobufWriteRequest::from_reader(&mut reader, &data) - .map_err(|e| format!("quick-protobuf concurrent parse failed: {}", e))?; - Ok(()) - }) - }) - .collect(); - - for join_handle in join_handles { - join_handle.await.unwrap()?; - } - Ok(()) + pub fn quick_protobuf_parser_concurrent(&self, scale: usize) -> Result<(), String> { + let raw = Arc::new(self.raw_data.clone()); + run_concurrent_threads(scale, move |n| { + for _ in 0..n { + let mut reader: BytesReader = BytesReader::from_bytes(&raw); + QuickProtobufWriteRequest::from_reader(&mut reader, &raw) + .map_err(|e| format!("quick-protobuf concurrent parse failed: {}", e))?; + } + Ok(()) + }) } // rust-protobuf parser concurrent bench. - pub async fn rust_protobuf_parser_concurrent(&self, scale: usize) -> Result<(), String> { - let join_handles: Vec>> = (0..scale) - .map(|_| { - let data = self.raw_data.clone(); - tokio::spawn(async move { - RustProtobufWriteRequest::parse_from_bytes(&data) - .map_err(|e| format!("rust-protobuf concurrent parse failed: {}", e))?; - Ok(()) - }) - }) - .collect(); - - for join_handle in join_handles { - join_handle.await.unwrap()?; - } - Ok(()) + pub fn rust_protobuf_parser_concurrent(&self, scale: usize) -> Result<(), String> { + let raw = Arc::new(self.raw_data.clone()); + run_concurrent_threads(scale, move |n| { + for _ in 0..n { + RustProtobufWriteRequest::parse_from_bytes(&raw) + .map_err(|e| format!("rust-protobuf concurrent parse failed: {}", e))?; + } + Ok(()) + }) } } diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs index 4d74f78558..92f321213b 100644 --- a/src/benchmarks/src/util.rs +++ b/src/benchmarks/src/util.rs @@ -30,8 +30,6 @@ use serde::{ de::{self, Visitor}, Deserialize, Deserializer, Serialize, Serializer, }; -use serde_json::json; -use tikv_jemalloc_ctl::{epoch, stats, thread}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)] pub struct ReadableDuration(pub Duration); @@ -174,84 +172,6 @@ impl<'de> Deserialize<'de> for ReadableDuration { } } -// Memory bench utilities. -#[derive(Debug, Clone)] -pub struct MemoryStats { - pub thread_allocated: u64, - pub thread_deallocated: u64, - pub allocated: u64, - pub active: u64, - pub metadata: u64, - pub mapped: u64, - pub resident: u64, - pub retained: u64, -} - -#[derive(Debug, Clone)] -pub struct MemoryStatsDiff { - pub thread_allocated_diff: i64, - pub thread_deallocated_diff: i64, - pub allocated: i64, - pub active: i64, - pub metadata: i64, - pub mapped: i64, - pub resident: i64, - pub retained: i64, -} - -impl MemoryStats { - pub fn collect() -> Result { - epoch::advance().map_err(|e| format!("failed to advance jemalloc epoch: {}", e))?; - - Ok(MemoryStats { - thread_allocated: thread::allocatedp::read() - .map_err(|e| format!("failed to read thread.allocatedp: {}", e))? - .get(), - thread_deallocated: thread::deallocatedp::read() - .map_err(|e| format!("failed to read thread.deallocatedp: {}", e))? - .get(), - allocated: stats::allocated::read() - .map_err(|e| format!("failed to read allocated: {}", e))? - .try_into() - .unwrap(), - active: stats::active::read() - .map_err(|e| format!("failed to read active: {}", e))? - .try_into() - .unwrap(), - metadata: stats::metadata::read() - .map_err(|e| format!("failed to read metadata: {}", e))? - .try_into() - .unwrap(), - mapped: stats::mapped::read() - .map_err(|e| format!("failed to read mapped: {}", e))? - .try_into() - .unwrap(), - resident: stats::resident::read() - .map_err(|e| format!("failed to read resident: {}", e))? - .try_into() - .unwrap(), - retained: stats::retained::read() - .map_err(|e| format!("failed to read retained: {}", e))? - .try_into() - .unwrap(), - }) - } - - pub fn diff(&self, other: &MemoryStats) -> MemoryStatsDiff { - MemoryStatsDiff { - thread_allocated_diff: other.thread_allocated as i64 - self.thread_allocated as i64, - thread_deallocated_diff: other.thread_deallocated as i64 - - self.thread_deallocated as i64, - allocated: other.allocated as i64 - self.allocated as i64, - active: other.active as i64 - self.active as i64, - metadata: other.metadata as i64 - self.metadata as i64, - mapped: other.mapped as i64 - self.mapped as i64, - resident: other.resident as i64 - self.resident as i64, - retained: other.retained as i64 - self.retained as i64, - } - } -} - pub struct MemoryBenchConfig { pub test_data: Bytes, pub scale: usize, @@ -274,22 +194,50 @@ impl MemoryBenchConfig { mode, } } +} - pub fn output_json(&self, memory_diff: &MemoryStatsDiff) { - let result = json!({ - "mode": self.mode, - "scale": self.scale, - "memory": { - "thread_allocated_diff": memory_diff.thread_allocated_diff, - "thread_deallocated_diff": memory_diff.thread_deallocated_diff, - "allocated": memory_diff.allocated, - "active": memory_diff.active, - "metadata": memory_diff.metadata, - "mapped": memory_diff.mapped, - "resident": memory_diff.resident, - "retained": memory_diff.retained - } - }); - println!("{}", result); +/// Run a workload concurrently with up to CPU core count threads. +pub fn run_concurrent_threads(scale: usize, worker: F) -> Result<(), String> +where + F: Fn(usize) -> Result<(), String> + Send + Sync + 'static, +{ + let threads = std::cmp::min(scale, num_cpus::get()); + if threads == 0 { + return Ok(()); + } + let base = scale / threads; + let extra = scale % threads; + let worker = std::sync::Arc::new(worker); + let mut handles = Vec::with_capacity(threads); + for i in 0..threads { + let n = base + if i < extra { 1 } else { 0 }; + if n == 0 { + continue; + } + let w = std::sync::Arc::clone(&worker); + handles.push(std::thread::spawn(move || (w)(n))); + } + for h in handles { + h.join().map_err(|_| "thread panicked".to_string())??; + } + Ok(()) +} + +/// Convert bytes number to a human-readable string. +pub fn human_bytes(bytes: u64) -> String { + const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"]; + if bytes == 0 { + return "0 B".to_string(); + } + let mut size = bytes as f64; + let mut unit = 0usize; + while size >= 1024.0 && unit < UNITS.len() - 1 { + size /= 1024.0; + unit += 1; + } + if unit == 0 { + format!("{} {}", bytes, UNITS[unit]) + } else { + format!("{:.1} {}", size, UNITS[unit]) } } diff --git a/src/remote_write/Cargo.toml b/src/remote_write/Cargo.toml index 344308c4ed..e0295cad59 100644 --- a/src/remote_write/Cargo.toml +++ b/src/remote_write/Cargo.toml @@ -30,10 +30,8 @@ unsafe-split = [] [dependencies] anyhow = { workspace = true } -async-trait = { workspace = true } bytes = { workspace = true } -deadpool = { workspace = true } +object-pool = { workspace = true } once_cell = { workspace = true } pb_types = { workspace = true } prost = { workspace = true } -tokio = { workspace = true } diff --git a/src/remote_write/README.md b/src/remote_write/README.md index f41a00c5b6..173d6794b6 100644 --- a/src/remote_write/README.md +++ b/src/remote_write/README.md @@ -6,7 +6,7 @@ A hand-written [Prometheus Remote Write Request (V1)](https://prometheus.io/docs Key optimization techniques: -- Object pooling backed by deadpool. +- Object pooling backed by [object-pool](https://github.com/CJP10/object-pool). - `RepeatedField` data structures. @@ -29,6 +29,12 @@ pip3 install tabulate matplotlib ### CPU Time +#### Benchmark Logic + +We benchmarked both sequential and concurrent parsing scenarios. In the sequential case, we simply executed the parsing task `scale` times within a for loop. In the concurrent case, we spawned a number of system-level threads equal to the number of CPU cores and evenly distributed the `scale` parsing tasks among them, with each thread performing sequential parsing in a for loop. + +**Note**: Since Go cannot directly create a system-level thread like `std::thread::spawn` in Rust, we ignored the concurrent benchmark logic for Go. + #### Steps Navigate to the benchmarks directory: @@ -58,7 +64,7 @@ cd VictoriaMetrics/lib/prompb vim prom_decode_bench_test.go ``` -and add the following code (please change the path of 1709380533560664458.data): +and add the following code (please change the path of `1709380533560664458.data`): ```go package prompb @@ -142,29 +148,6 @@ func benchDecoderSequential(decoder Decoder, data []byte, n int) error { return nil } -// Concurrent benchmark. -func benchDecoderConcurrent(decoder Decoder, data []byte, workers int) error { - results := make(chan error, workers) - - // Spawn workers (similar to tokio::spawn in Rust). - for w := 0; w < workers; w++ { - go func() { - clonedDecoder := decoder.Clone() - clonedDecoder.Reset() - err := clonedDecoder.Parse(data) - results <- err - }() - } - - // Wait for all workers to complete (similar to join_handle.await). - for w := 0; w < workers; w++ { - if err := <-results; err != nil { - return err - } - } - return nil -} - func BenchmarkSequentialParse(b *testing.B) { data, err := getTestDataPath() if err != nil { @@ -191,33 +174,6 @@ func BenchmarkSequentialParse(b *testing.B) { } } } - -func BenchmarkConcurrentParse(b *testing.B) { - data, err := getTestDataPath() - if err != nil { - b.Skipf("test data file not found: %v", err) - } - - decoders := map[string]Decoder{ - "pooled": NewPooledDecoder(), - "nopool": NewNoPoolDecoder(), - } - - workers := []int{1, 5, 10, 20, 100} - - for decoderName, decoder := range decoders { - for _, w := range workers { - b.Run(fmt.Sprintf("%s/%d", decoderName, w), func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := benchDecoderConcurrent(decoder, data, w); err != nil { - b.Fatalf("failed to parse: %v", err) - } - } - }) - } - } -} ``` Execute the Go benchmarks: @@ -232,16 +188,18 @@ Test results are as follows: ![Sequential Performance](../../docs/assets/remote-write-sequential-performance.png) -In sequential parsing scenarios, the hand-written pooled parsers (with and without unsafe optimization) achieve the best performance across all scales compared to other Rust parsers. The unsafe optimization provides nearly 50% performance improvement. - ![Concurrent Performance](../../docs/assets/remote-write-concurrent-performance.png) -**Note**: Due to the nature of concurrent execution, concurrent parsing benchmark results may vary (sometimes dramatically) across different runs. However, we can still draw an overall conclusion. - -In concurrent parsing scenarios, from an overall perspective, the hand-written pooled parsers (with and without unsafe optimization) still achieve the best performance compared to other Rust parsers. The unsafe optimization continues to provide performance improvements. +In both scenarios, the hand-written pooled parsers (with and without unsafe optimization) achieve the best performance across all scales compared to other Rust parsers. The unsafe optimization provides nearly 50% performance improvement. ### Memory Allocation +#### Benchmark Logic + +We use [hotpath](https://github.com/pawurb/hotpath) to mesure the memory allocation performance of different parsers. + +#### Steps + Navigate to the benchmarks directory: ```shell @@ -251,40 +209,22 @@ cd src/benchmarks Run memory allocation benchmarks: ```shell -python3 remote_write_memory_bench.py --mode sequential --scale 10 +python3 remote_write_memory_bench.py --mode sequential --scale 64 ``` Or enable unsafe optimization: ```shell -python3 remote_write_memory_bench.py --mode concurrent --scale 10 --unsafe +python3 remote_write_memory_bench.py --mode concurrent --scale 64 --unsafe ``` -**Note**: Sequential and concurrent mode results are similar due to the enforced `#[tokio::main(flavor = "current_thread")]` configuration. This constraint is necessary because Jemalloc's `thread::allocatedp` and `thread::deallocatedp` statistics can only track single-threaded allocations accurately. - -We focus on the [allocatedp](https://docs.rs/tikv-jemalloc-ctl/0.6.0/tikv_jemalloc_ctl/thread/struct.allocatedp.html) value to verify our zero-allocation parsing efforts, since it represents the number of bytes that **have ever been** allocated by the thread. +#### Results The results are as follows: ![Memory](../../docs/assets/remote-write-memory-performance.png) -The hand-written pooled parser allocates minimal memory compared to other Rust parsers. Note that the difference between `ThreadAlloc` and `ThreadDealloc` in the pooled decoder is expected since we gather statistics right before the program terminates and objects remain in the pool (not freed) at that time. - -### Object Pool Efficiency - -Navigate to the benchmarks directory: - -```shell -cd src/benchmarks -``` - -Analyze pool utilization: - -```shell -cargo run --bin pool_stats --release -``` - -Our testing finds that only 8 objects in the pool are sufficient to handle 500 concurrent parsing operations. +The hand-written pooled parser allocates minimal memory allocation in total compared to other Rust parsers, demonstrating the effectiveness of our zero-allocation optimization. ## Acknowledgements diff --git a/src/remote_write/src/pooled_parser.rs b/src/remote_write/src/pooled_parser.rs index 8b2c62fb1c..3b90b75143 100644 --- a/src/remote_write/src/pooled_parser.rs +++ b/src/remote_write/src/pooled_parser.rs @@ -25,10 +25,11 @@ use anyhow::Result; use bytes::Bytes; +use object_pool::ReusableOwned; use crate::{ pb_reader::read_write_request, - pooled_types::{WriteRequest, WriteRequestManager, POOL}, + pooled_types::{WriteRequest, POOL}, repeated_field::Clear, }; @@ -40,26 +41,13 @@ impl PooledParser { Self } - /// Decode a [`WriteRequest`] from the buffer and return it. - pub fn decode(&self, buf: Bytes) -> Result { - // Cannot get a WriteRequest instance from the pool in sync functions. - let mut request = WriteRequest::default(); - read_write_request(buf, &mut request)?; - Ok(request) - } - - /// Decode a [`WriteRequest`] from the buffer and return a pooled object. + /// Decode a [`WriteRequest`] from the buffer. /// /// This method will reuse a [`WriteRequest`] instance from the object /// pool. After the returned object is dropped, it will be returned to the - pub async fn decode_async( - &self, - buf: Bytes, - ) -> Result> { - let mut pooled_request = POOL - .get() - .await - .map_err(|e| anyhow::anyhow!("failed to get object from pool: {e:?}"))?; + /// pool. + pub fn decode(&self, buf: Bytes) -> Result> { + let mut pooled_request = POOL.pull_owned(WriteRequest::default); pooled_request.clear(); read_write_request(buf, &mut pooled_request)?; Ok(pooled_request) diff --git a/src/remote_write/src/pooled_types.rs b/src/remote_write/src/pooled_types.rs index 427eb60788..b0ba6e2dfb 100644 --- a/src/remote_write/src/pooled_types.rs +++ b/src/remote_write/src/pooled_types.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use async_trait::async_trait; +use std::sync::Arc; + use bytes::Bytes; -use deadpool::managed::{Manager, Metrics, Pool, RecycleResult}; +use object_pool::Pool; use once_cell::sync::Lazy; use crate::repeated_field::{Clear, RepeatedField}; @@ -160,33 +161,8 @@ impl Clear for WriteRequest { } } -/// A deadpool manager for PooledWriteRequest. -pub struct WriteRequestManager; - -#[async_trait] -impl Manager for WriteRequestManager { - type Error = (); - type Type = WriteRequest; - - async fn create(&self) -> Result { - Ok(WriteRequest::default()) - } - - async fn recycle( - &self, - _obj: &mut Self::Type, - _metrics: &Metrics, - ) -> RecycleResult { - // We will reset the object after acquiring it. - Ok(()) - } -} - -const POOL_SIZE: usize = 64; // Maximum number of objects in the pool. +const POOL_SIZE: usize = 16; // Maximum number of objects in the pool. -pub static POOL: Lazy> = Lazy::new(|| { - Pool::builder(WriteRequestManager) - .max_size(POOL_SIZE) - .build() - .unwrap() -}); +/// Global thread-safe object pool for `WriteRequest`. +pub static POOL: Lazy>> = + Lazy::new(|| Arc::new(Pool::new(POOL_SIZE, WriteRequest::default))); diff --git a/src/remote_write/tests/equivalence_test.rs b/src/remote_write/tests/equivalence_test.rs index e545ecd44b..96b2c7bd38 100644 --- a/src/remote_write/tests/equivalence_test.rs +++ b/src/remote_write/tests/equivalence_test.rs @@ -21,13 +21,12 @@ //! //! Test with `--features unsafe-split` to enable the unsafe optimization. -use std::{fs, sync::Arc}; +use std::{fs, sync::Arc, thread}; use bytes::Bytes; use pb_types::{Exemplar, Label, MetricMetadata, Sample, TimeSeries, WriteRequest}; use prost::Message; use remote_write::pooled_parser::PooledParser; -use tokio::task::JoinHandle; const ITERATIONS: usize = 50; @@ -47,13 +46,10 @@ fn parse_with_prost(data: &Bytes) -> WriteRequest { WriteRequest::decode(data.clone()).expect("prost decode failed") } -async fn parse_with_pooled(data: &Bytes) -> WriteRequest { +fn parse_with_pooled(data: &Bytes) -> WriteRequest { let data_copy = data.clone(); let parser = PooledParser; - let pooled_request = parser - .decode_async(data_copy) - .await - .expect("pooled decode failed"); + let pooled_request = parser.decode(data_copy).expect("pooled decode failed"); // Convert pooled types to pb_types to compare with prost. let mut write_request = WriteRequest { @@ -118,8 +114,8 @@ async fn parse_with_pooled(data: &Bytes) -> WriteRequest { write_request } -#[tokio::test] -async fn test_sequential_correctness() { +#[test] +fn test_sequential_correctness() { let (data1, data2) = load_test_data(); let datasets = [&data1, &data2]; @@ -128,7 +124,7 @@ async fn test_sequential_correctness() { let data = datasets[data_index]; let prost_result = parse_with_prost(data); - let pooled_result = parse_with_pooled(data).await; + let pooled_result = parse_with_pooled(data); assert_eq!( &prost_result, &pooled_result, @@ -138,19 +134,19 @@ async fn test_sequential_correctness() { } } -#[tokio::test] -async fn test_concurrent_correctness() { +#[test] +fn test_concurrent_correctness() { let (data1, data2) = load_test_data(); let data1 = Arc::new(data1); let data2 = Arc::new(data2); - let mut handles: Vec> = Vec::new(); + let mut handles = Vec::new(); for iteration in 0..ITERATIONS { let data1_clone = Arc::clone(&data1); let data2_clone = Arc::clone(&data2); - let handle = tokio::spawn(async move { + let handle = thread::spawn(move || { let data_index = iteration % 2; let data = if data_index == 0 { &*data1_clone @@ -159,7 +155,7 @@ async fn test_concurrent_correctness() { }; let prost_result = parse_with_prost(data); - let pooled_result = parse_with_pooled(data).await; + let pooled_result = parse_with_pooled(data); assert_eq!( &prost_result, &pooled_result, @@ -172,6 +168,6 @@ async fn test_concurrent_correctness() { } for handle in handles { - handle.await.expect("task completion failed"); + handle.join().expect("thread completion failed"); } }