From c40ffb2b4ec9222aeff835a78d12a38ce6fcb5e2 Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 24 Sep 2024 17:55:09 +0530 Subject: [PATCH] init --- server/src/handlers/airplane.rs | 2 +- .../src/handlers/http/modal/ingest_server.rs | 3 +- server/src/handlers/http/query.rs | 207 +++++++++++++++++- server/src/query.rs | 2 +- server/src/query/stream_schema_provider.rs | 67 +++++- server/src/response.rs | 1 + server/src/utils/arrow/flight.rs | 6 +- 7 files changed, 273 insertions(+), 15 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index c803ba194..2a83f76d2 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -135,7 +135,7 @@ impl FlightService for AirServiceImpl { let ticket = get_query_from_ticket(&req)?; - log::info!("query requested to airplane: {:?}", ticket); + log::warn!("query requested to airplane: {:?}", ticket); // get the query session_state let session_state = QUERY_SESSION.state(); diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 895fb8b8a..5c2f42cc1 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -187,7 +187,8 @@ impl IngestServer { .service(Self::get_user_webscope()) .service(Server::get_user_role_webscope()) .service(Server::get_metrics_webscope()) - .service(Server::get_readiness_factory()), + .service(Server::get_readiness_factory()) + .service(Server::get_query_factory()), ) .service(Server::get_ingest_otel_factory()); } diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 8fe9c8229..a30b7e3ee 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -16,16 +16,30 @@ * */ -use actix_web::http::header::ContentType; +use actix_web::http::header::{self, ContentType}; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; use anyhow::anyhow; +use arrow_flight::{FlightClient, Ticket}; +use arrow_schema::{Schema, SchemaRef}; use chrono::{DateTime, Utc}; -use datafusion::common::tree_node::TreeNode; +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion::datasource::{provider_as_source, source_as_provider, MemTable, TableProvider, TableType}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; +use datafusion::logical_expr::LogicalPlanBuilder; +use datafusion::physical_plan::insert::{DataSink, DataSinkExec}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::{DataFrame, Expr, SessionContext}; +use futures::{StreamExt, TryStreamExt}; use futures_util::Future; -use http::StatusCode; +use http::{header as http_header, StatusCode, Uri}; +use itertools::Itertools; +use serde_json::json; +use tonic::transport::Channel; +use tonic::Status; +use url::Url; +use std::any::Any; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -33,6 +47,8 @@ use std::time::Instant; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; +use crate::handlers::http::logstream::error::StreamError; +use crate::utils::arrow::flight::run_do_get_rpc; use arrow_array::RecordBatch; use crate::event::commit_schema; @@ -51,8 +67,11 @@ use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; +use super::cluster::get_ingestor_info; +use super::modal::IngestorMetadata; + /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, @@ -130,13 +149,149 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result { + // lpb = lpb.clone().project(projection.expr.clone())?; + // }, + // datafusion::logical_expr::LogicalPlan::Filter(filter) => { + // lpb = lpb.clone().filter(filter.predicate.clone())?; + // }, + // datafusion::logical_expr::LogicalPlan::Window(window) => { + // lpb = lpb.clone().window(window.window_expr.clone())?; + // }, + // datafusion::logical_expr::LogicalPlan::Aggregate(aggregate) => { + // lpb = lpb.clone().aggregate(aggregate.group_expr.clone(), aggregate.aggr_expr.clone())?; + // }, + // datafusion::logical_expr::LogicalPlan::Sort(sort) => { + // lpb = lpb.clone().sort(sort.expr.clone())?; + // }, + // datafusion::logical_expr::LogicalPlan::Join(join) => todo!(), + // datafusion::logical_expr::LogicalPlan::CrossJoin(cross_join) => todo!(), + // datafusion::logical_expr::LogicalPlan::Repartition(repartition) => todo!(), + // datafusion::logical_expr::LogicalPlan::Union(union) => todo!(), + // datafusion::logical_expr::LogicalPlan::TableScan(table_scan) => {}, + // datafusion::logical_expr::LogicalPlan::EmptyRelation(empty_relation) => todo!(), + // datafusion::logical_expr::LogicalPlan::Subquery(subquery) => todo!(), + // datafusion::logical_expr::LogicalPlan::SubqueryAlias(subquery_alias) => todo!(), + // datafusion::logical_expr::LogicalPlan::Limit(limit) => { + // lpb = lpb.clone().limit(limit.skip.clone(), limit.fetch.clone())?; + // }, + // datafusion::logical_expr::LogicalPlan::Statement(statement) => todo!(), + // datafusion::logical_expr::LogicalPlan::Values(values) => todo!(), + // datafusion::logical_expr::LogicalPlan::Explain(explain) => todo!(), + // datafusion::logical_expr::LogicalPlan::Analyze(analyze) => todo!(), + // datafusion::logical_expr::LogicalPlan::Extension(extension) => todo!(), + // datafusion::logical_expr::LogicalPlan::Distinct(distinct) => todo!(), + // datafusion::logical_expr::LogicalPlan::Prepare(prepare) => todo!(), + // datafusion::logical_expr::LogicalPlan::Dml(dml_statement) => todo!(), + // datafusion::logical_expr::LogicalPlan::Ddl(ddl_statement) => todo!(), + // datafusion::logical_expr::LogicalPlan::Copy(copy_to) => todo!(), + // datafusion::logical_expr::LogicalPlan::DescribeTable(describe_table) => todo!(), + // datafusion::logical_expr::LogicalPlan::Unnest(unnest) => todo!(), + // datafusion::logical_expr::LogicalPlan::RecursiveQuery(recursive_query) => todo!(), + // } + // Ok(TreeNodeRecursion::Continue) + // })?; + + // log::warn!("modified logical plan builder-\n{lpb:?}\n"); + // // let view = datafusion::datasource::ViewTable::try_new(temp_logical_plan, None)?; + + let df = DataFrame::new( + new_state, + temp_logical_plan + ); + ingestor_records = df.collect().await.unwrap(); + + } // deal with cache saving if let Err(err) = put_results_in_cache( cache_results, user_id, query_cache_manager, &table_name, - &records, + &ingestor_records, query.start.to_rfc3339(), query.end.to_rfc3339(), query_request.query, @@ -147,7 +302,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result, Status> { + + // // get the ingestor gRPC url + // let url = im + // .domain_name + // .rsplit_once(':') + // .ok_or(Status::failed_precondition( + // "Ingester metadata is courupted", + // )).unwrap() + // .0; + // let url = format!("{}:{}", url, im.flight_port); + // let uri = url.clone() + // .parse::() + // .map_err(|_| Status::failed_precondition("Ingester metadata is courupted")).unwrap(); + + // // make a client + // let mut client = FlightClient::new( + // Channel::builder(uri) + // .connect() + // .await + // .unwrap() + // ); + // client.add_header("authorization", &im.token).unwrap(); + + // let response: Vec = client + // .do_get(Ticket { + // ticket: ticket.into(), + // }) + // .await + // .unwrap() + // .try_collect() + // .await + // .unwrap(); + // log::warn!("ingestor result- \n{response:?}\n"); + + let response = run_do_get_rpc(im.clone(), ticket.into()).await?; + + Ok(response) +} + pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), QueryError> { if CONFIG.parseable.mode == Mode::Query { for table in tables { diff --git a/server/src/query.rs b/server/src/query.rs index 4e345148a..a8bca737f 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -83,7 +83,7 @@ impl Query { let config = SessionConfig::default() .with_parquet_pruning(true) - .with_prefer_existing_sort(true) + .with_prefer_existing_sort(false) .with_round_robin_repartition(true); let state = SessionState::new_with_config_rt(config, runtime); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index e5d6055d5..1165b5a33 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -18,6 +18,7 @@ use crate::catalog::manifest::File; use crate::hottier::HotTierManager; +use crate::metadata::LogStreamMetadata; use crate::Mode; use crate::{ catalog::snapshot::{self, Snapshot}, @@ -83,6 +84,21 @@ impl SchemaProvider for GlobalSchemaProvider { self } + #[allow(unused_variables)] + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>, DataFusionError> { + if self.table_exist(&name) { + return Err(DataFusionError::External("Table already exists".into())) + } + let mut map = STREAM_INFO.write().unwrap(); + map.insert(name, LogStreamMetadata::default()); + Ok(None) + // exec_err!("schema provider does not support registering tables") + } + fn table_names(&self) -> Vec { STREAM_INFO.list_streams() } @@ -216,8 +232,9 @@ async fn collect_from_snapshot( fn partitioned_files( manifest_files: Vec, table_schema: &Schema, - target_partition: usize, + // target_partition: usize, ) -> (Vec>, datafusion::common::Statistics) { + let target_partition = num_cpus::get(); let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); let mut column_statistics = HashMap::>::new(); let mut count = 0; @@ -286,6 +303,48 @@ impl TableProvider for StandardTableProvider { TableType::Base } + // fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { + // let support: Vec<_> = filters.iter().map(|expr| { + // match expr { + // Expr::Alias(alias) => todo!(), + // Expr::Column(column) => todo!(), + // Expr::ScalarVariable(data_type, vec) => todo!(), + // Expr::Literal(scalar_value) => todo!(), + // Expr::BinaryExpr(binary_expr) => todo!(), + // Expr::Like(like) => todo!(), + // Expr::SimilarTo(like) => todo!(), + // Expr::Not(expr) => todo!(), + // Expr::IsNotNull(expr) => todo!(), + // Expr::IsNull(expr) => todo!(), + // Expr::IsTrue(expr) => todo!(), + // Expr::IsFalse(expr) => todo!(), + // Expr::IsUnknown(expr) => todo!(), + // Expr::IsNotTrue(expr) => todo!(), + // Expr::IsNotFalse(expr) => todo!(), + // Expr::IsNotUnknown(expr) => todo!(), + // Expr::Negative(expr) => todo!(), + // Expr::Between(between) => todo!(), + // Expr::Case(case) => todo!(), + // Expr::Cast(cast) => todo!(), + // Expr::TryCast(try_cast) => todo!(), + // Expr::Sort(sort) => todo!(), + // Expr::ScalarFunction(scalar_function) => todo!(), + // Expr::AggregateFunction(aggregate_function) => todo!(), + // Expr::WindowFunction(window_function) => todo!(), + // Expr::InList(in_list) => todo!(), + // Expr::Exists(exists) => todo!(), + // Expr::InSubquery(in_subquery) => todo!(), + // Expr::ScalarSubquery(subquery) => todo!(), + // Expr::Wildcard { qualifier } => todo!(), + // Expr::GroupingSet(grouping_set) => todo!(), + // Expr::Placeholder(placeholder) => todo!(), + // Expr::OuterReferenceColumn(data_type, column) => todo!(), + // Expr::Unnest(unnest) => todo!(), + // } + // }).collect(); + // Ok(support) + // } + async fn scan( &self, state: &SessionState, @@ -435,7 +494,7 @@ impl TableProvider for StandardTableProvider { ); } - let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1); + let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema); let remote_exec = create_parquet_physical_plan( ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(), partitioned_files, @@ -519,7 +578,7 @@ async fn get_cache_exectuion_plan( }) .collect(); - let (partitioned_files, statistics) = partitioned_files(cached, &schema, 1); + let (partitioned_files, statistics) = partitioned_files(cached, &schema); let plan = create_parquet_physical_plan( ObjectStoreUrl::parse("file:///").unwrap(), partitioned_files, @@ -570,7 +629,7 @@ async fn get_hottier_exectuion_plan( }) .collect(); - let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema, 1); + let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema); let plan = create_parquet_physical_plan( ObjectStoreUrl::parse("file:///").unwrap(), partitioned_files, diff --git a/server/src/response.rs b/server/src/response.rs index e2abfa2d2..1223c630b 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -29,6 +29,7 @@ use itertools::Itertools; use serde_json::{json, Value}; use tonic::{Response, Status}; +#[derive(Debug)] pub struct QueryResponse { pub records: Vec, pub fields: Vec, diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index 3b0b17eba..367c7560d 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -61,6 +61,7 @@ pub async fn run_do_get_rpc( ))? .0; let url = format!("{}:{}", url, im.flight_port); + log::warn!("ingestor gRPC url-\n{url}\n"); let url = url .parse::() .map_err(|_| Status::failed_precondition("Ingester metadata is courupted"))?; @@ -68,8 +69,9 @@ pub async fn run_do_get_rpc( .connect() .await .map_err(|err| Status::failed_precondition(err.to_string()))?; - + log::warn!("built channel"); let client = FlightClient::new(channel); + log::warn!("built flightclient"); let inn = client .into_inner() .accept_compressed(tonic::codec::CompressionEncoding::Gzip) @@ -77,7 +79,7 @@ pub async fn run_do_get_rpc( .max_encoding_message_size(usize::MAX); let mut client = FlightClient::new_from_inner(inn); - + log::warn!("built channel with inner"); client.add_header("authorization", &im.token)?; let response = client