Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Sep 24, 2024
1 parent 715a546 commit c40ffb2
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 15 deletions.
2 changes: 1 addition & 1 deletion server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl FlightService for AirServiceImpl {

let ticket = get_query_from_ticket(&req)?;

log::info!("query requested to airplane: {:?}", ticket);
log::warn!("query requested to airplane: {:?}", ticket);

// get the query session_state
let session_state = QUERY_SESSION.state();
Expand Down
3 changes: 2 additions & 1 deletion server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ impl IngestServer {
.service(Self::get_user_webscope())
.service(Server::get_user_role_webscope())
.service(Server::get_metrics_webscope())
.service(Server::get_readiness_factory()),
.service(Server::get_readiness_factory())
.service(Server::get_query_factory()),
)
.service(Server::get_ingest_otel_factory());
}
Expand Down
207 changes: 201 additions & 6 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,39 @@
*
*/

use actix_web::http::header::ContentType;
use actix_web::http::header::{self, ContentType};
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use anyhow::anyhow;
use arrow_flight::{FlightClient, Ticket};
use arrow_schema::{Schema, SchemaRef};
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion::datasource::{provider_as_source, source_as_provider, MemTable, TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_plan::insert::{DataSink, DataSinkExec};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{DataFrame, Expr, SessionContext};
use futures::{StreamExt, TryStreamExt};
use futures_util::Future;
use http::StatusCode;
use http::{header as http_header, StatusCode, Uri};
use itertools::Itertools;
use serde_json::json;
use tonic::transport::Channel;
use tonic::Status;
use url::Url;
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use crate::handlers::http::logstream::error::StreamError;
use crate::utils::arrow::flight::run_do_get_rpc;
use arrow_array::RecordBatch;

use crate::event::commit_schema;
Expand All @@ -51,8 +67,11 @@ use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;

use super::cluster::get_ingestor_info;
use super::modal::IngestorMetadata;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub query: String,
Expand Down Expand Up @@ -130,13 +149,149 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon

let time = Instant::now();
let (records, fields) = query.execute(table_name.clone()).await?;

let mut ingestor_records = records;
log::warn!("num records with querier- {}", ingestor_records.len());
log::warn!("records with querier- \n{:?}\n", ingestor_records);

// also get results from staging of ingestors
if CONFIG.get_server_mode_string().eq("Distributed (Query)") {
log::warn!("Entered ingestor query section");
let ingestor_infos = get_ingestor_info().await.map_err(|e| {
log::error!("{}",e.to_string());
}).unwrap();

let mut reqwest_headers = http_header::HeaderMap::new();

for (key, value) in req.headers().iter() {
reqwest_headers.insert(key.clone(), value.clone());
}

for ingestor in ingestor_infos.into_iter() {
log::warn!("ingestor metadata- {ingestor:?}");

let obtained_records = do_get(
&ingestor,
json!({
"query": query_request.query,
"startTime": query_request.start_time,
"endTime": query_request.end_time
}).to_string()
).await.unwrap();

log::warn!("got {} records from ingestor. records- \n{:?}\n",obtained_records.len(), obtained_records);

ingestor_records.extend(
obtained_records
);
}

// now that we have combined results from our querier and ingester, run the combined LogicalPlan on it
// to do so, create a new in memory table from RecordBatches
let mut batches = ingestor_records.clone().into_iter().peekable();
let schema = if let Some(batch) = batches.peek() {
batch.schema().clone()
} else {
Arc::new(Schema::empty())
};
let provider = MemTable::try_new(schema.clone(), vec![batches.clone().collect()])?;

let new_ctx = SessionContext::new();

log::warn!("created new context");

let mod_query = query_request.query.replace(&table_name, "temptable");
let mod_query = format!("select count(*) from temptable");
// let mut lpb = LogicalPlanBuilder::scan(
// "temptable",
// provider_as_source(Arc::new(provider)),
// None,
// )?;

// let new_p = MemTable::try_new(schema, vec![batches.collect()])?;

// log::warn!("Created new LogicalPlanBuilder-\n{lpb:?}\n");
new_ctx.register_table("temptable", source_as_provider(&provider_as_source(Arc::new(provider))).unwrap()).unwrap();
let new_state = new_ctx.state();
let dialect = &new_state.config().options().sql_parser.dialect;
let stmt = new_state.sql_to_statement(&mod_query, dialect)?;
log::warn!("created stmt-\n{stmt:?}\n");
// new_state.catalog_list()
// .catalog("datafusion")
// .unwrap()
// .schema("public")
// .unwrap()
// .register_table(name, table)

let temp_logical_plan = new_state.statement_to_plan(stmt).await?;
// let temp_logical_plan = new_state
// .create_logical_plan(&mod_query)
// .await?;

log::warn!("created new Logical Plan from modified query- \n{temp_logical_plan:?}\n");

// temp_logical_plan.apply(|node| {
// match node {
// datafusion::logical_expr::LogicalPlan::Projection(projection) => {
// lpb = lpb.clone().project(projection.expr.clone())?;
// },
// datafusion::logical_expr::LogicalPlan::Filter(filter) => {
// lpb = lpb.clone().filter(filter.predicate.clone())?;
// },
// datafusion::logical_expr::LogicalPlan::Window(window) => {
// lpb = lpb.clone().window(window.window_expr.clone())?;
// },
// datafusion::logical_expr::LogicalPlan::Aggregate(aggregate) => {
// lpb = lpb.clone().aggregate(aggregate.group_expr.clone(), aggregate.aggr_expr.clone())?;
// },
// datafusion::logical_expr::LogicalPlan::Sort(sort) => {
// lpb = lpb.clone().sort(sort.expr.clone())?;
// },
// datafusion::logical_expr::LogicalPlan::Join(join) => todo!(),
// datafusion::logical_expr::LogicalPlan::CrossJoin(cross_join) => todo!(),
// datafusion::logical_expr::LogicalPlan::Repartition(repartition) => todo!(),
// datafusion::logical_expr::LogicalPlan::Union(union) => todo!(),
// datafusion::logical_expr::LogicalPlan::TableScan(table_scan) => {},
// datafusion::logical_expr::LogicalPlan::EmptyRelation(empty_relation) => todo!(),
// datafusion::logical_expr::LogicalPlan::Subquery(subquery) => todo!(),
// datafusion::logical_expr::LogicalPlan::SubqueryAlias(subquery_alias) => todo!(),
// datafusion::logical_expr::LogicalPlan::Limit(limit) => {
// lpb = lpb.clone().limit(limit.skip.clone(), limit.fetch.clone())?;
// },
// datafusion::logical_expr::LogicalPlan::Statement(statement) => todo!(),
// datafusion::logical_expr::LogicalPlan::Values(values) => todo!(),
// datafusion::logical_expr::LogicalPlan::Explain(explain) => todo!(),
// datafusion::logical_expr::LogicalPlan::Analyze(analyze) => todo!(),
// datafusion::logical_expr::LogicalPlan::Extension(extension) => todo!(),
// datafusion::logical_expr::LogicalPlan::Distinct(distinct) => todo!(),
// datafusion::logical_expr::LogicalPlan::Prepare(prepare) => todo!(),
// datafusion::logical_expr::LogicalPlan::Dml(dml_statement) => todo!(),
// datafusion::logical_expr::LogicalPlan::Ddl(ddl_statement) => todo!(),
// datafusion::logical_expr::LogicalPlan::Copy(copy_to) => todo!(),
// datafusion::logical_expr::LogicalPlan::DescribeTable(describe_table) => todo!(),
// datafusion::logical_expr::LogicalPlan::Unnest(unnest) => todo!(),
// datafusion::logical_expr::LogicalPlan::RecursiveQuery(recursive_query) => todo!(),
// }
// Ok(TreeNodeRecursion::Continue)
// })?;

// log::warn!("modified logical plan builder-\n{lpb:?}\n");
// // let view = datafusion::datasource::ViewTable::try_new(temp_logical_plan, None)?;

let df = DataFrame::new(
new_state,
temp_logical_plan
);
ingestor_records = df.collect().await.unwrap();

}
// deal with cache saving
if let Err(err) = put_results_in_cache(
cache_results,
user_id,
query_cache_manager,
&table_name,
&records,
&ingestor_records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query_request.query,
Expand All @@ -147,7 +302,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
};

let response = QueryResponse {
records,
records: ingestor_records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
Expand All @@ -163,6 +318,46 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
Ok(response)
}

pub async fn do_get(im: &IngestorMetadata, ticket: String) -> Result<Vec<RecordBatch>, Status> {

// // get the ingestor gRPC url
// let url = im
// .domain_name
// .rsplit_once(':')
// .ok_or(Status::failed_precondition(
// "Ingester metadata is courupted",
// )).unwrap()
// .0;
// let url = format!("{}:{}", url, im.flight_port);
// let uri = url.clone()
// .parse::<Uri>()
// .map_err(|_| Status::failed_precondition("Ingester metadata is courupted")).unwrap();

// // make a client
// let mut client = FlightClient::new(
// Channel::builder(uri)
// .connect()
// .await
// .unwrap()
// );
// client.add_header("authorization", &im.token).unwrap();

// let response: Vec<RecordBatch> = client
// .do_get(Ticket {
// ticket: ticket.into(),
// })
// .await
// .unwrap()
// .try_collect()
// .await
// .unwrap();
// log::warn!("ingestor result- \n{response:?}\n");

let response = run_do_get_rpc(im.clone(), ticket.into()).await?;

Ok(response)
}

pub async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
if CONFIG.parseable.mode == Mode::Query {
for table in tables {
Expand Down
2 changes: 1 addition & 1 deletion server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Query {

let config = SessionConfig::default()
.with_parquet_pruning(true)
.with_prefer_existing_sort(true)
.with_prefer_existing_sort(false)
.with_round_robin_repartition(true);

let state = SessionState::new_with_config_rt(config, runtime);
Expand Down
Loading

0 comments on commit c40ffb2

Please sign in to comment.