diff --git a/Cargo.lock b/Cargo.lock index af9e6052f9..a7f8789ae1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,21 @@ dependencies = [ "equator", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -295,6 +310,7 @@ dependencies = [ "arrow-select", "flatbuffers", "lz4_flex", + "zstd 0.13.2", ] [[package]] @@ -432,6 +448,23 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compression" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c" +dependencies = [ + "bzip2 0.5.2", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "xz2", + "zstd 0.13.2", + "zstd-safe 7.2.1", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -1250,6 +1283,27 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.5.1" @@ -1347,6 +1401,15 @@ dependencies = [ "bzip2-sys", ] +[[package]] +name = "bzip2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bea8dcd42434048e4f7a304411d9273a411f647446c1234a65ce0554923f4cff" +dependencies = [ + "libbz2-rs-sys", +] + [[package]] name = "bzip2-sys" version = "0.1.13+1.0.8" @@ -2075,6 +2138,7 @@ dependencies = [ "arrow-schema", "async-trait", "bytes", + "bzip2 0.6.0", "chrono", "datafusion-catalog", "datafusion-catalog-listing", @@ -2083,11 +2147,13 @@ dependencies = [ "datafusion-datasource", "datafusion-datasource-csv", "datafusion-datasource-json", + "datafusion-datasource-parquet", "datafusion-execution", "datafusion-expr", "datafusion-expr-common", "datafusion-functions", "datafusion-functions-aggregate", + "datafusion-functions-nested", "datafusion-functions-table", "datafusion-functions-window", "datafusion-optimizer", @@ -2098,11 +2164,13 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "datafusion-sql", + "flate2", "futures", "itertools 0.14.0", "log", "object_store", "parking_lot", + "parquet", "rand 0.9.1", "regex", "sqlparser", @@ -2110,6 +2178,8 @@ dependencies = [ "tokio", "url", "uuid", + "xz2", + "zstd 0.13.2", ] [[package]] @@ -2178,6 +2248,7 @@ dependencies = [ "libc", "log", "object_store", + "parquet", "paste", "recursive", "sqlparser", @@ -2203,8 +2274,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d855160469020982880fd9bd0962e033d2f4728f56f85a83d8c90785638b6519" dependencies = [ "arrow", + "async-compression", "async-trait", "bytes", + "bzip2 0.6.0", "chrono", "datafusion-common", "datafusion-common-runtime", @@ -2215,14 +2288,20 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-session", + "flate2", "futures", "glob", "itertools 0.14.0", "log", "object_store", + "parquet", "rand 0.9.1", + "tempfile", "tokio", + "tokio-util", "url", + "xz2", + "zstd 0.13.2", ] [[package]] @@ -2275,6 +2354,39 @@ dependencies = [ "tokio", ] +[[package]] +name = "datafusion-datasource-parquet" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3bfb48fb4ff42ac1485a12ea56434eaab53f7da8f00b2443b1a3d35a0b6d10" +dependencies = [ + "arrow", + "async-trait", + "bytes", + "datafusion-catalog", + "datafusion-common", + "datafusion-common-runtime", + "datafusion-datasource", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-aggregate", + "datafusion-physical-expr", + "datafusion-physical-expr-adapter", + "datafusion-physical-expr-common", + "datafusion-physical-optimizer", + "datafusion-physical-plan", + "datafusion-pruning", + "datafusion-session", + "futures", + "itertools 0.14.0", + "log", + "object_store", + "parking_lot", + "parquet", + "rand 0.9.1", + "tokio", +] + [[package]] name = "datafusion-doc" version = "50.0.0" @@ -2411,6 +2523,28 @@ dependencies = [ "paste", ] +[[package]] +name = "datafusion-functions-nested" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4227782023f4fb68d3d5c5eb190665212f43c9a0b437553e4b938b379aff6cf6" +dependencies = [ + "arrow", + "arrow-ord", + "datafusion-common", + "datafusion-doc", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions", + "datafusion-functions-aggregate", + "datafusion-functions-aggregate-common", + "datafusion-macros", + "datafusion-physical-expr-common", + "itertools 0.14.0", + "log", + "paste", +] + [[package]] name = "datafusion-functions-table" version = "50.0.0" @@ -2589,6 +2723,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "datafusion-proto" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521cd45740788e751bf59b25d2879d162b157a45fb9b5fa2ec03034923f3658a" +dependencies = [ + "arrow", + "chrono", + "datafusion", + "datafusion-common", + "datafusion-expr", + "datafusion-proto-common", + "object_store", + "prost 0.13.4", +] + +[[package]] +name = "datafusion-proto-common" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c15ede0e0f1e51d5b5bea1a196db35e00aa3cae9e58cc12df3cc900e36328437" +dependencies = [ + "arrow", + "datafusion-common", + "prost 0.13.4", +] + [[package]] name = "datafusion-pruning" version = "50.0.0" @@ -3112,9 +3273,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" +checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" dependencies = [ "crc32fast", "libz-rs-sys", @@ -4004,6 +4165,12 @@ dependencies = [ "generic-array 0.14.7", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "io-uring" version = "0.7.8" @@ -4419,6 +4586,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "libbz2-rs-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" + [[package]] name = "libc" version = "0.2.175" @@ -4454,9 +4627,9 @@ dependencies = [ [[package]] name = "libz-rs-sys" -version = "0.5.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a" +checksum = "840db8cf39d9ec4dd794376f38acc40d0fc65eec2a8f484f7fd375b84602becd" dependencies = [ "zlib-rs", ] @@ -4557,7 +4730,7 @@ version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" dependencies = [ - "twox-hash", + "twox-hash 1.6.3", ] [[package]] @@ -4687,7 +4860,7 @@ dependencies = [ "hashbrown 0.15.2", "indexmap 2.11.4", "metrics", - "ordered-float", + "ordered-float 4.6.0", "quanta", "radix_trie", "rand 0.9.1", @@ -5392,6 +5565,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "4.6.0" @@ -5436,6 +5618,43 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parquet" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b56b41d1bd36aae415e42f91cae70ee75cf6cba74416b14dce3e958d5990ec" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.22.1", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "half", + "hashbrown 0.15.2", + "lz4_flex", + "num", + "num-bigint", + "object_store", + "paste", + "ring", + "seq-macro", + "simdutf8", + "snap", + "thrift", + "tokio", + "twox-hash 2.1.2", + "zstd 0.13.2", +] + [[package]] name = "parse-zoneinfo" version = "0.3.1" @@ -7702,6 +7921,7 @@ dependencies = [ "dashmap", "datafusion", "datafusion-functions-json", + "datafusion-proto", "derive_more", "enumset", "futures", @@ -8090,6 +8310,7 @@ dependencies = [ "libc", "libz-sys", "log", + "lzma-sys", "md-5", "memchr", "metrics-util", @@ -8153,6 +8374,7 @@ dependencies = [ "typenum", "ulid", "uuid", + "xz2", "zerocopy 0.7.35", "zeroize", "zstd 0.13.2", @@ -8546,6 +8768,12 @@ dependencies = [ "serde", ] +[[package]] +name = "seq-macro" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" + [[package]] name = "serde" version = "1.0.226" @@ -8919,6 +9147,12 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.5.10" @@ -9284,6 +9518,17 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float 2.10.1", +] + [[package]] name = "tikv-jemalloc-ctl" version = "0.6.0" @@ -9915,6 +10160,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" + [[package]] name = "typed-builder" version = "0.21.0" @@ -10965,9 +11216,9 @@ dependencies = [ [[package]] name = "zlib-rs" -version = "0.5.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8" +checksum = "2f06ae92f42f5e5c42443fd094f245eb656abf56dd7cce9b8b263236565e00f2" [[package]] name = "zopfli" diff --git a/Cargo.toml b/Cargo.toml index 93583cdec2..533cc7832e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,6 +128,8 @@ datafusion = { version = "50.0.0", default-features = false, features = [ "recursive_protection", ] } datafusion-expr = { version = "50.0.0" } +datafusion-functions-json = { version = "0.50.0" } +datafusion-proto = { version = "50.0.0", default-features = false } derive_builder = "0.20.0" derive_more = { version = "2.0.1", features = ["full"] } dialoguer = { version = "0.11.0" } diff --git a/crates/storage-query-datafusion/Cargo.toml b/crates/storage-query-datafusion/Cargo.toml index 2e6c8b93b6..2bc8050883 100644 --- a/crates/storage-query-datafusion/Cargo.toml +++ b/crates/storage-query-datafusion/Cargo.toml @@ -31,7 +31,8 @@ bytestring = { workspace = true } codederror = { workspace = true } dashmap = { workspace = true } datafusion = { workspace = true } -datafusion-functions-json = { version = "0.50.0" } +datafusion-functions-json = { workspace = true } +datafusion-proto = { workspace = true } derive_more = { workspace = true } enumset = { workspace = true } futures = { workspace = true } diff --git a/crates/storage-query-datafusion/src/invocation_state/table.rs b/crates/storage-query-datafusion/src/invocation_state/table.rs index e5e4c83bf0..d9a79c1e07 100644 --- a/crates/storage-query-datafusion/src/invocation_state/table.rs +++ b/crates/storage-query-datafusion/src/invocation_state/table.rs @@ -16,8 +16,11 @@ use anyhow::anyhow; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::DataFusionError; -use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchReceiverStream; +use datafusion::physical_plan::{PhysicalExpr, SendableRecordBatchStream}; +use std::fmt::Debug; +use std::ops::RangeInclusive; +use std::sync::Arc; use tokio::sync::mpsc::Sender; use restate_invoker_api::{InvocationStatusReport, StatusHandle}; @@ -97,6 +100,7 @@ impl ScanPartition for partition_id: PartitionId, _range: RangeInclusive, projection: SchemaRef, + _predicate: Option>, batch_size: usize, limit: Option, ) -> anyhow::Result { diff --git a/crates/storage-query-datafusion/src/lib.rs b/crates/storage-query-datafusion/src/lib.rs index a3b5be7325..ca9610be91 100644 --- a/crates/storage-query-datafusion/src/lib.rs +++ b/crates/storage-query-datafusion/src/lib.rs @@ -38,12 +38,17 @@ mod table_macro; mod table_providers; mod table_util; +use std::sync::Arc; + pub use context::BuildError; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::ipc::convert::IpcSchemaEncoder; use datafusion::arrow::ipc::writer::DictionaryTracker; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::DataFusionError; +use datafusion::physical_plan::PhysicalExpr; +use datafusion::prelude::SessionContext; +use prost::Message; #[cfg(test)] pub(crate) mod mocks; @@ -71,6 +76,31 @@ pub(crate) fn decode_schema(ipc_bytes: &[u8]) -> anyhow::Result { Ok(schema) } +pub(crate) fn encode_expr(expr: &Arc) -> Result, DataFusionError> { + let node = datafusion_proto::physical_plan::to_proto::serialize_physical_expr( + expr, + &datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {}, + )?; + + Ok(node.encode_to_vec()) +} + +pub(crate) fn decode_expr( + ctx: &SessionContext, + input_schema: &Schema, + proto_bytes: &[u8], +) -> Result, DataFusionError> { + let node = datafusion_proto::protobuf::PhysicalExprNode::decode(proto_bytes) + .map_err(|err| DataFusionError::External(err.into()))?; + + datafusion_proto::physical_plan::from_proto::parse_physical_expr( + &node, + ctx, + input_schema, + &datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {}, + ) +} + pub(crate) fn encode_record_batch( schema: &Schema, record_batch: RecordBatch, diff --git a/crates/storage-query-datafusion/src/partition_store_scanner.rs b/crates/storage-query-datafusion/src/partition_store_scanner.rs index 23fe5bf2fc..5c11c72e08 100644 --- a/crates/storage-query-datafusion/src/partition_store_scanner.rs +++ b/crates/storage-query-datafusion/src/partition_store_scanner.rs @@ -16,6 +16,7 @@ use anyhow::anyhow; use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::DataFusionError; use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::PhysicalExpr; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use restate_partition_store::{PartitionStore, PartitionStoreManager}; @@ -76,6 +77,7 @@ where partition_id: PartitionId, range: RangeInclusive, projection: SchemaRef, + _predicate: Option>, batch_size: usize, mut limit: Option, ) -> anyhow::Result { @@ -142,9 +144,17 @@ where partition_id: PartitionId, range: RangeInclusive, projection: SchemaRef, + predicate: Option>, batch_size: usize, limit: Option, ) -> anyhow::Result { - self.scan_partition(partition_id, range, projection, batch_size, limit) + self.scan_partition( + partition_id, + range, + projection, + predicate, + batch_size, + limit, + ) } } diff --git a/crates/storage-query-datafusion/src/remote_query_scanner_client.rs b/crates/storage-query-datafusion/src/remote_query_scanner_client.rs index 3618784727..4bc2b28252 100644 --- a/crates/storage-query-datafusion/src/remote_query_scanner_client.rs +++ b/crates/storage-query-datafusion/src/remote_query_scanner_client.rs @@ -16,6 +16,7 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::DataFusionError; use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::PhysicalExpr; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use tracing::debug; @@ -25,10 +26,11 @@ use restate_types::NodeId; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::net::remote_query_scanner::{ RemoteQueryScannerClose, RemoteQueryScannerNext, RemoteQueryScannerNextResult, - RemoteQueryScannerOpen, RemoteQueryScannerOpened, ScannerBatch, ScannerFailure, ScannerId, + RemoteQueryScannerOpen, RemoteQueryScannerOpened, RemoteQueryScannerPredicate, ScannerBatch, + ScannerFailure, ScannerId, }; -use crate::{decode_record_batch, encode_schema}; +use crate::{decode_record_batch, encode_expr, encode_schema}; #[derive(Debug, Clone)] pub struct RemoteScanner { @@ -139,6 +141,7 @@ pub fn remote_scan_as_datafusion_stream( range: RangeInclusive, table_name: String, projection_schema: SchemaRef, + predicate: Option>, batch_size: usize, limit: Option, ) -> SendableRecordBatchStream { @@ -147,6 +150,13 @@ pub fn remote_scan_as_datafusion_stream( let tx = builder.tx(); let task = async move { + let initial_predicate = match &predicate { + Some(predicate) => Some(RemoteQueryScannerPredicate { + serialized_physical_expression: encode_expr(predicate)?, + }), + None => None, + }; + // // get a scanner id // @@ -156,6 +166,7 @@ pub fn remote_scan_as_datafusion_stream( table: table_name, projection_schema_bytes: encode_schema(&projection_schema), limit: limit.map(|limit| u64::try_from(limit).expect("limit to fit in a u64")), + predicate: initial_predicate, batch_size: u64::try_from(batch_size).expect("batch_size to fit in a u64"), }; diff --git a/crates/storage-query-datafusion/src/remote_query_scanner_manager.rs b/crates/storage-query-datafusion/src/remote_query_scanner_manager.rs index b58d2a4d14..b4422aea09 100644 --- a/crates/storage-query-datafusion/src/remote_query_scanner_manager.rs +++ b/crates/storage-query-datafusion/src/remote_query_scanner_manager.rs @@ -17,6 +17,7 @@ use anyhow::{anyhow, bail}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::PhysicalExpr; use restate_core::Metadata; use restate_core::partitions::PartitionRouting; use restate_types::NodeId; @@ -180,6 +181,7 @@ impl ScanPartition for RemotePartitionsScanner { partition_id: PartitionId, range: RangeInclusive, projection: SchemaRef, + predicate: Option>, batch_size: usize, limit: Option, ) -> anyhow::Result { @@ -188,7 +190,14 @@ impl ScanPartition for RemotePartitionsScanner { let scanner = self.manager.local_partition_scanner(&self.table_name).ok_or_else( ||anyhow!("was expecting a local partition to be present on this node. It could be that this partition is being opened right now.") )?; - Ok(scanner.scan_partition(partition_id, range, projection, batch_size, limit)?) + Ok(scanner.scan_partition( + partition_id, + range, + projection, + predicate, + batch_size, + limit, + )?) } PartitionLocation::Remote { node_id } => Ok(remote_scan_as_datafusion_stream( self.manager.remote_scanner.clone(), @@ -197,6 +206,7 @@ impl ScanPartition for RemotePartitionsScanner { range, self.table_name.clone(), projection, + predicate, batch_size, limit, )), diff --git a/crates/storage-query-datafusion/src/remote_query_scanner_server.rs b/crates/storage-query-datafusion/src/remote_query_scanner_server.rs index 7ed97b51e7..095d98bd5a 100644 --- a/crates/storage-query-datafusion/src/remote_query_scanner_server.rs +++ b/crates/storage-query-datafusion/src/remote_query_scanner_server.rs @@ -134,12 +134,16 @@ impl RemoteQueryScannerServer { // Note: we trust that the task will remove the scanner from the map on Drop and we will not try to // do that again here. If we do, we might end up dead-locking the map because we are holding a // reference into it (scanner). - if let Err(mpsc::error::SendError(reciprocal)) = scanner.send(reciprocal) { + if let Err(mpsc::error::SendError(request)) = + scanner.send(super::scanner_task::NextRequest { reciprocal }) + { tracing::info!( "No such scanner {}. This could be an expired scanner due to a slow scan with no activity.", scanner_id ); - reciprocal.send(RemoteQueryScannerNextResult::NoSuchScanner(scanner_id)); + request + .reciprocal + .send(RemoteQueryScannerNextResult::NoSuchScanner(scanner_id)); } } } diff --git a/crates/storage-query-datafusion/src/scanner_task.rs b/crates/storage-query-datafusion/src/scanner_task.rs index 66f6201cbd..4add652c9e 100644 --- a/crates/storage-query-datafusion/src/scanner_task.rs +++ b/crates/storage-query-datafusion/src/scanner_task.rs @@ -13,6 +13,7 @@ use std::time::Duration; use anyhow::Context; use datafusion::execution::SendableRecordBatchStream; +use datafusion::prelude::SessionContext; use tokio::sync::mpsc; use tokio_stream::StreamExt as TokioStreamExt; use tracing::{debug, warn}; @@ -26,20 +27,22 @@ use restate_types::net::remote_query_scanner::{ use crate::remote_query_scanner_manager::RemoteScannerManager; use crate::remote_query_scanner_server::ScannerMap; -use crate::{decode_schema, encode_record_batch}; +use crate::{decode_expr, decode_schema, encode_record_batch}; const SCANNER_EXPIRATION: Duration = Duration::from_secs(60); -type NextReciprocal = Reciprocal>; +pub(crate) struct NextRequest { + pub reciprocal: Reciprocal>, +} -pub(crate) type ScannerHandle = mpsc::UnboundedSender; +pub(crate) type ScannerHandle = mpsc::UnboundedSender; // Tracks a single scanner's lifecycle running in [`RemoteQueryScannerServer`] pub(crate) struct ScannerTask { peer: GenerationalNodeId, scanner_id: ScannerId, stream: SendableRecordBatchStream, - rx: mpsc::UnboundedReceiver, + rx: mpsc::UnboundedReceiver, scanners: Weak, } @@ -56,10 +59,20 @@ impl ScannerTask { .local_partition_scanner(&request.table) .context("not registered scanner for a table")?; let schema = decode_schema(&request.projection_schema_bytes).context("bad schema bytes")?; + let ctx = SessionContext::new(); + + let predicate = request + .predicate + .map(|predicate| decode_expr(&ctx, &schema, &predicate.serialized_physical_expression)) + .transpose()?; + + let schema = Arc::new(schema); + let stream = scanner.scan_partition( request.partition_id, request.range.clone(), - Arc::new(schema), + schema.clone(), + predicate, usize::try_from(request.batch_size).expect("batch_size to fit in a usize"), request .limit @@ -96,7 +109,7 @@ impl ScannerTask { ); loop { - let reciprocal = tokio::select! { + let request = tokio::select! { _ = &mut watch_fut => { // peer is dead, dispose the scanner debug!("Removing scanner due to peer {} being dead", self.peer); @@ -120,7 +133,7 @@ impl ScannerTask { // connection/request has been closed, don't bother with driving the stream. // The scanner will be dropped because we want to make sure that we don't get supurious // next messages from the client after. - if reciprocal.is_closed() { + if request.reciprocal.is_closed() { return; } @@ -129,29 +142,37 @@ impl ScannerTask { Some(Err(e)) => { warn!("Error while scanning {}: {e}", self.scanner_id); - reciprocal.send(RemoteQueryScannerNextResult::Failure(ScannerFailure { - scanner_id: self.scanner_id, - message: e.to_string(), - })); + request + .reciprocal + .send(RemoteQueryScannerNextResult::Failure(ScannerFailure { + scanner_id: self.scanner_id, + message: e.to_string(), + })); return; } None => { - reciprocal.send(RemoteQueryScannerNextResult::NoMoreRecords(self.scanner_id)); + request + .reciprocal + .send(RemoteQueryScannerNextResult::NoMoreRecords(self.scanner_id)); return; } }; match encode_record_batch(&self.stream.schema(), record_batch) { Ok(record_batch) => { - reciprocal.send(RemoteQueryScannerNextResult::NextBatch(ScannerBatch { - scanner_id: self.scanner_id, - record_batch, - })) + request + .reciprocal + .send(RemoteQueryScannerNextResult::NextBatch(ScannerBatch { + scanner_id: self.scanner_id, + record_batch, + })) } Err(e) => { - reciprocal.send(RemoteQueryScannerNextResult::Failure(ScannerFailure { - scanner_id: self.scanner_id, - message: e.to_string(), - })); + request + .reciprocal + .send(RemoteQueryScannerNextResult::Failure(ScannerFailure { + scanner_id: self.scanner_id, + message: e.to_string(), + })); break; } } diff --git a/crates/storage-query-datafusion/src/table_providers.rs b/crates/storage-query-datafusion/src/table_providers.rs index 17557f71c2..bb0e88f8af 100644 --- a/crates/storage-query-datafusion/src/table_providers.rs +++ b/crates/storage-query-datafusion/src/table_providers.rs @@ -22,7 +22,7 @@ use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties, SendableRecordBatchStream, }; use futures::stream::{self, StreamExt, TryStreamExt}; @@ -40,6 +40,7 @@ pub trait ScanPartition: Send + Sync + Debug + 'static { partition_id: PartitionId, range: RangeInclusive, projection: SchemaRef, + predicate: Option>, batch_size: usize, limit: Option, ) -> anyhow::Result; @@ -148,6 +149,11 @@ where None => self.schema.clone(), }; + let predicate = datafusion::logical_expr::utils::conjunction(filters.iter().cloned()); + // as report our filter pushdown as inexact, all columns needed for the filters will be in the projection + let predicate = predicate + .map(|p| datafusion::physical_expr::planner::logical2physical(&p, &projected_schema)); + let partition_keys = self .partition_key_extractor .try_extract(filters) @@ -223,6 +229,7 @@ where logical_partitions, projected_schema, limit, + predicate, scanner: self.partition_scanner.clone(), plan, statistics: self.statistics.clone().project(projection), @@ -247,6 +254,7 @@ struct PartitionedExecutionPlan { logical_partitions: Vec, projected_schema: SchemaRef, limit: Option, + predicate: Option>, scanner: T, plan: PlanProperties, statistics: Statistics, @@ -317,6 +325,7 @@ where let scanner = self.scanner.clone(); let schema = self.projected_schema.clone(); let limit = self.limit; + let predicate = self.predicate.clone(); let batch_size = context.session_config().batch_size(); move |(partition_id, partition)| { scanner @@ -324,6 +333,7 @@ where partition_id, partition.key_range, schema.clone(), + predicate.clone(), batch_size, limit, ) diff --git a/crates/types/src/net/remote_query_scanner.rs b/crates/types/src/net/remote_query_scanner.rs index 1b89f4961d..2a60b9402c 100644 --- a/crates/types/src/net/remote_query_scanner.rs +++ b/crates/types/src/net/remote_query_scanner.rs @@ -53,12 +53,24 @@ pub struct RemoteQueryScannerOpen { #[bilrost(tag(6))] #[serde(default = "default_batch_size")] pub batch_size: u64, + #[bilrost(tag(7))] + #[serde(default)] + pub predicate: Option, } fn default_batch_size() -> u64 { 64 } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, bilrost::Message)] +pub struct RemoteQueryScannerPredicate { + // We ship the expression passed to scan() over the wire to filter records before sending + // them back + // see `encode_expr` / `decode_expr` in storage-query-datafusion/src/lib.rs + #[bilrost(tag(1), encoding(plainbytes))] + pub serialized_physical_expression: Vec, +} + #[derive( Debug, Clone, diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 431dea589b..c2452ff716 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -20,7 +20,7 @@ ahash = { version = "0.8", features = ["serde"] } arrow = { version = "56", features = ["chrono-tz", "prettyprint"] } arrow-array = { version = "56", default-features = false, features = ["chrono-tz"] } arrow-cast = { version = "56", default-features = false, features = ["prettyprint"] } -arrow-ipc = { version = "56", features = ["lz4"] } +arrow-ipc = { version = "56", features = ["lz4", "zstd"] } arrow-schema = { version = "56", default-features = false, features = ["canonical_extension_types"] } aws-credential-types = { version = "1", default-features = false, features = ["test-util"] } aws-runtime = { version = "1", default-features = false, features = ["event-stream"] } @@ -44,7 +44,7 @@ clap_builder = { version = "4", default-features = false, features = ["color", " comfy-table = { version = "7" } criterion = { version = "0.5", features = ["async_tokio"] } crossbeam-epoch = { version = "0.9" } -datafusion-common = { version = "50", default-features = false, features = ["object_store", "recursive_protection"] } +datafusion-common = { version = "50", default-features = false, features = ["object_store", "parquet", "recursive_protection"] } datafusion-expr = { version = "50", default-features = false, features = ["recursive_protection"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1" } @@ -79,6 +79,7 @@ lexical-util = { version = "1", default-features = false, features = ["format", libc = { version = "0.2", features = ["extra_traits"] } libz-sys = { version = "1", features = ["static"] } log = { version = "0.4", default-features = false, features = ["std"] } +lzma-sys = { version = "0.1", default-features = false, features = ["static"] } md-5 = { version = "0.10" } memchr = { version = "2" } metrics-util = { version = "0.20" } @@ -135,6 +136,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parki typenum = { version = "1", default-features = false, features = ["const-generics"] } ulid = { version = "1", features = ["serde"] } uuid = { version = "1", features = ["js", "serde", "v4", "v7"] } +xz2 = { version = "0.1", default-features = false, features = ["static"] } zerocopy = { version = "0.7", features = ["derive", "simd"] } zeroize = { version = "1", features = ["zeroize_derive"] } zstd = { version = "0.13" } @@ -146,7 +148,7 @@ ahash = { version = "0.8", features = ["serde"] } arrow = { version = "56", features = ["chrono-tz", "prettyprint"] } arrow-array = { version = "56", default-features = false, features = ["chrono-tz"] } arrow-cast = { version = "56", default-features = false, features = ["prettyprint"] } -arrow-ipc = { version = "56", features = ["lz4"] } +arrow-ipc = { version = "56", features = ["lz4", "zstd"] } arrow-schema = { version = "56", default-features = false, features = ["canonical_extension_types"] } aws-credential-types = { version = "1", default-features = false, features = ["test-util"] } aws-runtime = { version = "1", default-features = false, features = ["event-stream"] } @@ -171,7 +173,7 @@ clap_builder = { version = "4", default-features = false, features = ["color", " comfy-table = { version = "7" } criterion = { version = "0.5", features = ["async_tokio"] } crossbeam-epoch = { version = "0.9" } -datafusion-common = { version = "50", default-features = false, features = ["object_store", "recursive_protection"] } +datafusion-common = { version = "50", default-features = false, features = ["object_store", "parquet", "recursive_protection"] } datafusion-expr = { version = "50", default-features = false, features = ["recursive_protection"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1" } @@ -208,6 +210,7 @@ lexical-util = { version = "1", default-features = false, features = ["format", libc = { version = "0.2", features = ["extra_traits"] } libz-sys = { version = "1", features = ["static"] } log = { version = "0.4", default-features = false, features = ["std"] } +lzma-sys = { version = "0.1", default-features = false, features = ["static"] } md-5 = { version = "0.10" } memchr = { version = "2" } metrics-util = { version = "0.20" } @@ -266,6 +269,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parki typenum = { version = "1", default-features = false, features = ["const-generics"] } ulid = { version = "1", features = ["serde"] } uuid = { version = "1", features = ["js", "serde", "v4", "v7"] } +xz2 = { version = "0.1", default-features = false, features = ["static"] } zerocopy = { version = "0.7", features = ["derive", "simd"] } zeroize = { version = "1", features = ["zeroize_derive"] } zstd = { version = "0.13" }