Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 259 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ datafusion = { version = "50.0.0", default-features = false, features = [
"recursive_protection",
] }
datafusion-expr = { version = "50.0.0" }
datafusion-functions-json = { version = "0.50.0" }
datafusion-proto = { version = "50.0.0", default-features = false }
derive_builder = "0.20.0"
derive_more = { version = "2.0.1", features = ["full"] }
dialoguer = { version = "0.11.0" }
Expand Down
3 changes: 2 additions & 1 deletion crates/storage-query-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ bytestring = { workspace = true }
codederror = { workspace = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
datafusion-functions-json = { version = "0.50.0" }
datafusion-functions-json = { workspace = true }
datafusion-proto = { workspace = true }
derive_more = { workspace = true }
enumset = { workspace = true }
futures = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DataFusionError;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchReceiverStream;
use datafusion::physical_plan::{PhysicalExpr, SendableRecordBatchStream};
use std::fmt::Debug;

Check failure on line 21 in crates/storage-query-datafusion/src/invocation_state/table.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `std::fmt::Debug`

Check failure on line 21 in crates/storage-query-datafusion/src/invocation_state/table.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

the name `Debug` is defined multiple times
use std::ops::RangeInclusive;

Check failure on line 22 in crates/storage-query-datafusion/src/invocation_state/table.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `std::ops::RangeInclusive`

Check failure on line 22 in crates/storage-query-datafusion/src/invocation_state/table.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

the name `RangeInclusive` is defined multiple times
use std::sync::Arc;

Check failure on line 23 in crates/storage-query-datafusion/src/invocation_state/table.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `std::sync::Arc`

Check failure on line 23 in crates/storage-query-datafusion/src/invocation_state/table.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

the name `Arc` is defined multiple times
use tokio::sync::mpsc::Sender;

use restate_invoker_api::{InvocationStatusReport, StatusHandle};
Expand Down Expand Up @@ -97,6 +100,7 @@
partition_id: PartitionId,
_range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
_predicate: Option<Arc<dyn PhysicalExpr>>,
batch_size: usize,
limit: Option<usize>,
) -> anyhow::Result<SendableRecordBatchStream> {
Expand Down
30 changes: 30 additions & 0 deletions crates/storage-query-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,17 @@
mod table_providers;
mod table_util;

use std::sync::Arc;

pub use context::BuildError;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::ipc::convert::IpcSchemaEncoder;
use datafusion::arrow::ipc::writer::DictionaryTracker;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::prelude::SessionContext;
use prost::Message;

#[cfg(test)]
pub(crate) mod mocks;
Expand Down Expand Up @@ -71,6 +76,31 @@
Ok(schema)
}

pub(crate) fn encode_expr(expr: &Arc<dyn PhysicalExpr>) -> Result<Vec<u8>, DataFusionError> {
let node = datafusion_proto::physical_plan::to_proto::serialize_physical_expr(
expr,
&datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {},
)?;

Ok(node.encode_to_vec())

Check failure on line 85 in crates/storage-query-datafusion/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

no method named `encode_to_vec` found for struct `datafusion_proto::protobuf::PhysicalExprNode` in the current scope
}

pub(crate) fn decode_expr(
ctx: &SessionContext,
input_schema: &Schema,
proto_bytes: &[u8],
) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
let node = datafusion_proto::protobuf::PhysicalExprNode::decode(proto_bytes)

Check failure on line 93 in crates/storage-query-datafusion/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

no function or associated item named `decode` found for struct `datafusion_proto::protobuf::PhysicalExprNode` in the current scope
.map_err(|err| DataFusionError::External(err.into()))?;

datafusion_proto::physical_plan::from_proto::parse_physical_expr(
&node,
ctx,
input_schema,
&datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {},
)
}

pub(crate) fn encode_record_batch(
schema: &Schema,
record_batch: RecordBatch,
Expand Down
12 changes: 11 additions & 1 deletion crates/storage-query-datafusion/src/partition_store_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use anyhow::anyhow;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::stream::RecordBatchReceiverStream;

use restate_partition_store::{PartitionStore, PartitionStoreManager};
Expand Down Expand Up @@ -76,6 +77,7 @@ where
partition_id: PartitionId,
range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
_predicate: Option<Arc<dyn PhysicalExpr>>,
batch_size: usize,
mut limit: Option<usize>,
) -> anyhow::Result<SendableRecordBatchStream> {
Expand Down Expand Up @@ -142,9 +144,17 @@ where
partition_id: PartitionId,
range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
predicate: Option<Arc<dyn PhysicalExpr>>,
batch_size: usize,
limit: Option<usize>,
) -> anyhow::Result<SendableRecordBatchStream> {
self.scan_partition(partition_id, range, projection, batch_size, limit)
self.scan_partition(
partition_id,
range,
projection,
predicate,
batch_size,
limit,
)
}
}
15 changes: 13 additions & 2 deletions crates/storage-query-datafusion/src/remote_query_scanner_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::stream::RecordBatchReceiverStream;
use tracing::debug;

Expand All @@ -25,10 +26,11 @@ use restate_types::NodeId;
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::net::remote_query_scanner::{
RemoteQueryScannerClose, RemoteQueryScannerNext, RemoteQueryScannerNextResult,
RemoteQueryScannerOpen, RemoteQueryScannerOpened, ScannerBatch, ScannerFailure, ScannerId,
RemoteQueryScannerOpen, RemoteQueryScannerOpened, RemoteQueryScannerPredicate, ScannerBatch,
ScannerFailure, ScannerId,
};

use crate::{decode_record_batch, encode_schema};
use crate::{decode_record_batch, encode_expr, encode_schema};

#[derive(Debug, Clone)]
pub struct RemoteScanner {
Expand Down Expand Up @@ -139,6 +141,7 @@ pub fn remote_scan_as_datafusion_stream(
range: RangeInclusive<PartitionKey>,
table_name: String,
projection_schema: SchemaRef,
predicate: Option<Arc<dyn PhysicalExpr>>,
batch_size: usize,
limit: Option<usize>,
) -> SendableRecordBatchStream {
Expand All @@ -147,6 +150,13 @@ pub fn remote_scan_as_datafusion_stream(
let tx = builder.tx();

let task = async move {
let initial_predicate = match &predicate {
Some(predicate) => Some(RemoteQueryScannerPredicate {
serialized_physical_expression: encode_expr(predicate)?,
}),
None => None,
};

//
// get a scanner id
//
Expand All @@ -156,6 +166,7 @@ pub fn remote_scan_as_datafusion_stream(
table: table_name,
projection_schema_bytes: encode_schema(&projection_schema),
limit: limit.map(|limit| u64::try_from(limit).expect("limit to fit in a u64")),
predicate: initial_predicate,
batch_size: u64::try_from(batch_size).expect("batch_size to fit in a u64"),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use anyhow::{anyhow, bail};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::SendableRecordBatchStream;

use datafusion::physical_plan::PhysicalExpr;
use restate_core::Metadata;
use restate_core::partitions::PartitionRouting;
use restate_types::NodeId;
Expand Down Expand Up @@ -180,6 +181,7 @@ impl ScanPartition for RemotePartitionsScanner {
partition_id: PartitionId,
range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
predicate: Option<Arc<dyn PhysicalExpr>>,
batch_size: usize,
limit: Option<usize>,
) -> anyhow::Result<SendableRecordBatchStream> {
Expand All @@ -188,7 +190,14 @@ impl ScanPartition for RemotePartitionsScanner {
let scanner = self.manager.local_partition_scanner(&self.table_name).ok_or_else(
||anyhow!("was expecting a local partition to be present on this node. It could be that this partition is being opened right now.")
)?;
Ok(scanner.scan_partition(partition_id, range, projection, batch_size, limit)?)
Ok(scanner.scan_partition(
partition_id,
range,
projection,
predicate,
batch_size,
limit,
)?)
}
PartitionLocation::Remote { node_id } => Ok(remote_scan_as_datafusion_stream(
self.manager.remote_scanner.clone(),
Expand All @@ -197,6 +206,7 @@ impl ScanPartition for RemotePartitionsScanner {
range,
self.table_name.clone(),
projection,
predicate,
batch_size,
limit,
)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,16 @@ impl RemoteQueryScannerServer {
// Note: we trust that the task will remove the scanner from the map on Drop and we will not try to
// do that again here. If we do, we might end up dead-locking the map because we are holding a
// reference into it (scanner).
if let Err(mpsc::error::SendError(reciprocal)) = scanner.send(reciprocal) {
if let Err(mpsc::error::SendError(request)) =
scanner.send(super::scanner_task::NextRequest { reciprocal })
{
tracing::info!(
"No such scanner {}. This could be an expired scanner due to a slow scan with no activity.",
scanner_id
);
reciprocal.send(RemoteQueryScannerNextResult::NoSuchScanner(scanner_id));
request
.reciprocal
.send(RemoteQueryScannerNextResult::NoSuchScanner(scanner_id));
}
}
}
61 changes: 41 additions & 20 deletions crates/storage-query-datafusion/src/scanner_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::Duration;

use anyhow::Context;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::prelude::SessionContext;
use tokio::sync::mpsc;
use tokio_stream::StreamExt as TokioStreamExt;
use tracing::{debug, warn};
Expand All @@ -26,20 +27,22 @@ use restate_types::net::remote_query_scanner::{

use crate::remote_query_scanner_manager::RemoteScannerManager;
use crate::remote_query_scanner_server::ScannerMap;
use crate::{decode_schema, encode_record_batch};
use crate::{decode_expr, decode_schema, encode_record_batch};

const SCANNER_EXPIRATION: Duration = Duration::from_secs(60);

type NextReciprocal = Reciprocal<Oneshot<RemoteQueryScannerNextResult>>;
pub(crate) struct NextRequest {
pub reciprocal: Reciprocal<Oneshot<RemoteQueryScannerNextResult>>,
}

pub(crate) type ScannerHandle = mpsc::UnboundedSender<NextReciprocal>;
pub(crate) type ScannerHandle = mpsc::UnboundedSender<NextRequest>;

// Tracks a single scanner's lifecycle running in [`RemoteQueryScannerServer`]
pub(crate) struct ScannerTask {
peer: GenerationalNodeId,
scanner_id: ScannerId,
stream: SendableRecordBatchStream,
rx: mpsc::UnboundedReceiver<NextReciprocal>,
rx: mpsc::UnboundedReceiver<NextRequest>,
scanners: Weak<ScannerMap>,
}

Expand All @@ -56,10 +59,20 @@ impl ScannerTask {
.local_partition_scanner(&request.table)
.context("not registered scanner for a table")?;
let schema = decode_schema(&request.projection_schema_bytes).context("bad schema bytes")?;
let ctx = SessionContext::new();

let predicate = request
.predicate
.map(|predicate| decode_expr(&ctx, &schema, &predicate.serialized_physical_expression))
.transpose()?;

let schema = Arc::new(schema);

let stream = scanner.scan_partition(
request.partition_id,
request.range.clone(),
Arc::new(schema),
schema.clone(),
predicate,
usize::try_from(request.batch_size).expect("batch_size to fit in a usize"),
request
.limit
Expand Down Expand Up @@ -96,7 +109,7 @@ impl ScannerTask {
);

loop {
let reciprocal = tokio::select! {
let request = tokio::select! {
_ = &mut watch_fut => {
// peer is dead, dispose the scanner
debug!("Removing scanner due to peer {} being dead", self.peer);
Expand All @@ -120,7 +133,7 @@ impl ScannerTask {
// connection/request has been closed, don't bother with driving the stream.
// The scanner will be dropped because we want to make sure that we don't get supurious
// next messages from the client after.
if reciprocal.is_closed() {
if request.reciprocal.is_closed() {
return;
}

Expand All @@ -129,29 +142,37 @@ impl ScannerTask {
Some(Err(e)) => {
warn!("Error while scanning {}: {e}", self.scanner_id);

reciprocal.send(RemoteQueryScannerNextResult::Failure(ScannerFailure {
scanner_id: self.scanner_id,
message: e.to_string(),
}));
request
.reciprocal
.send(RemoteQueryScannerNextResult::Failure(ScannerFailure {
scanner_id: self.scanner_id,
message: e.to_string(),
}));
return;
}
None => {
reciprocal.send(RemoteQueryScannerNextResult::NoMoreRecords(self.scanner_id));
request
.reciprocal
.send(RemoteQueryScannerNextResult::NoMoreRecords(self.scanner_id));
return;
}
};
match encode_record_batch(&self.stream.schema(), record_batch) {
Ok(record_batch) => {
reciprocal.send(RemoteQueryScannerNextResult::NextBatch(ScannerBatch {
scanner_id: self.scanner_id,
record_batch,
}))
request
.reciprocal
.send(RemoteQueryScannerNextResult::NextBatch(ScannerBatch {
scanner_id: self.scanner_id,
record_batch,
}))
}
Err(e) => {
reciprocal.send(RemoteQueryScannerNextResult::Failure(ScannerFailure {
scanner_id: self.scanner_id,
message: e.to_string(),
}));
request
.reciprocal
.send(RemoteQueryScannerNextResult::Failure(ScannerFailure {
scanner_id: self.scanner_id,
message: e.to_string(),
}));
break;
}
}
Expand Down
Loading
Loading