Skip to content

Commit

Permalink
feat: Impl Query Server
Browse files Browse the repository at this point in the history
1. send_request_ingest_server to return proper result
2. fix: bug if there is no parquet files in store
3. Query Server Now gets all the resultant data
  • Loading branch information
Eshanatnight committed Mar 11, 2024
1 parent 9fda908 commit e4a3430
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 38 deletions.
40 changes: 34 additions & 6 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,51 @@ pub(crate) fn cross_origin_config() -> Cors {
}
}

pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_schema::Schema>> {
let mut res = vec![];
let ima = QueryServer::get_ingestor_info().await.unwrap();
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima {
let uri = format!("{}{}/{}",im.domain_name, base_path(), "query");
let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name);
let reqw = reqwest::Client::new()
.get(uri)
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v = serde_json::from_slice(&reqw.bytes().await?)?;
res.push(v);
}
}

Ok(res)
}

pub async fn send_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
let mut res = vec![];
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima.iter() {
let uri = format!("{}api/v1/{}", im.domain_name, "query");

let reqw = reqwest::Client::new()
.post(uri)
.json(query)
.basic_auth("admin", Some("admin"))
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
res.push(v);
if let Some(arr) = v.as_array() {
for val in arr {
res.push(val.clone())
}
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl QueryServer {

let mut f = Self::get_meta_file().await;
// writer the arr in f
let write_size = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
Ok(arr)
}

Expand All @@ -177,9 +177,11 @@ impl QueryServer {
/// initialize the server, run migrations as needed and start the server
async fn initialize(&self) -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;

let metadata = storage::resolve_parseable_metadata().await?;
// do not commit the below line
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;

banner::print(&CONFIG, &metadata).await;

// initialize the rbac map
Expand Down
81 changes: 63 additions & 18 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,30 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use chrono::{DateTime, Timelike, Utc};
use arrow_schema::Schema;
use chrono::{DateTime, Utc};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures_util::Future;
use http::StatusCode;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use crate::event::commit_schema;
use crate::handlers::http::send_schema_request;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::QUERY_SESSION;
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::utils::actix::extract_session_key_from_req;

use super::send_query_request_to_ingestor;
use super::send_request_to_ingestor;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize)]
Expand All @@ -54,19 +60,41 @@ pub struct Query {
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
// create a new query to send to the ingestors
let mut mmem= vec![];
if let Some(query) = transform_query_for_ingestor(&query_request) {
mmem = send_query_request_to_ingestor(&query)
.await
.map_err(|err| QueryError::Custom(err.to_string()))?;
let session_state = QUERY_SESSION.state();
let mut query = into_query(&query_request, &session_state).await?;

if CONFIG.parseable.mode == Mode::Query {
if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await {
let new_schema =
Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?;

commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone()))
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;

commit_schema_to_storage(&query.table_name().unwrap(), new_schema)
.await
.map_err(|err| {
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
})?;
}
}

// let rbj = arrow_json::ReaderBuilder::new();
let mmem = if CONFIG.parseable.mode == Mode::Query {
// create a new query to send to the ingestors
if let Some(que) = transform_query_for_ingestor(&query_request) {
let vals = send_request_to_ingestor(&que)
.await
.map_err(|err| QueryError::Custom(err.to_string()))?;
Some(vals)
} else {
None
}
} else {
None
};

let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);
let session_state = QUERY_SESSION.state();
let mut query = into_query(&query_request, &session_state).await?;

// check authorization of this query if it references physical table;
let table_name = query.table_name();
Expand Down Expand Up @@ -105,14 +133,14 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon

let time = Instant::now();

let (records, fields) = query.execute(Some(mmem)).await?;
let (records, fields) = query.execute().await?;
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http();
.to_http(mmem);

Check failure on line 143 in server/src/handlers/http/query.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

this method takes 0 arguments but 1 argument was supplied

Check failure on line 143 in server/src/handlers/http/query.rs

View workflow job for this annotation

GitHub Actions / Unit tests

this method takes 0 arguments but 1 argument was supplied

if let Some(table) = table_name {
let time = time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -195,14 +223,31 @@ async fn into_query(
}

fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
let end_time = DateTime::parse_from_rfc3339(&query.end_time).ok()?;
let start_time = end_time - chrono::Duration::minutes(1);
if query.query.is_empty() {
return None;
}

if query.start_time.is_empty() {
return None;
}

dbg!(start_time.minute());
if query.end_time.is_empty() {
return None;
}

let end_time: DateTime<Utc> = if query.end_time == "now" {
Utc::now()
} else {
DateTime::parse_from_rfc3339(&query.end_time)
.ok()?
.with_timezone(&Utc)
};

let start_time = end_time - chrono::Duration::minutes(1);
// when transforming the query, the ingestors are forced to return an array of values
let q = Query {
query: query.query.clone(),
fields: query.fields,
fields: false,
filter_tags: query.filter_tags.clone(),
send_null: query.send_null,
start_time: start_time.to_rfc3339(),
Expand Down Expand Up @@ -236,7 +281,7 @@ pub enum QueryError {
Datafusion(#[from] DataFusionError),
#[error("Execution Error: {0}")]
Execute(#[from] ExecuteError),
#[error("Query Error: {0}")]
#[error("Error: {0}")]
Custom(String),
}

Expand Down
17 changes: 8 additions & 9 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi
use datafusion::prelude::*;
use itertools::Itertools;
use once_cell::sync::Lazy;
use serde_json::Value;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -103,18 +102,14 @@ impl Query {
SessionContext::new_with_state(state)
}

pub async fn execute(&self, mem: Option<Vec<Value>>) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
pub async fn execute(
&self,
mem: Option<Vec<Value>>,

Check failure on line 107 in server/src/query.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

cannot find type `Value` in this scope

Check failure on line 107 in server/src/query.rs

View workflow job for this annotation

GitHub Actions / Unit tests

cannot find type `Value` in this scope
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let df = QUERY_SESSION
.execute_logical_plan(self.final_logical_plan())
.await?;

let schema = df.schema();
if let Some(mem) = mem {
let mem = arrow_json::ReaderBuilder::new(schema.clone())
.build(mem.iter().map(|x| serde_json::to_string(x).unwrap()).collect());

}

let fields = df
.schema()
.fields()
Expand All @@ -123,6 +118,10 @@ impl Query {
.cloned()
.collect_vec();

if fields.is_empty() {
return Ok((vec![], fields));
}

let results = df.collect().await?;
Ok((results, fields))
}
Expand Down
7 changes: 6 additions & 1 deletion server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ impl QueryResponse {
}
}
}
let values = json_records.into_iter().map(Value::Object).collect_vec();
let mut values = json_records.into_iter().map(Value::Object).collect_vec();

if let Some(mut imem) = imem {

Check failure on line 48 in server/src/response.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

cannot find value `imem` in this scope

Check failure on line 48 in server/src/response.rs

View workflow job for this annotation

GitHub Actions / Unit tests

cannot find value `imem` in this scope
values.append(&mut imem);
}

let response = if self.with_fields {
json!({
"fields": self.fields,
Expand Down
12 changes: 10 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ pub trait ObjectStorage: Sync + 'static {
fn get_bucket_name(&self) -> String;
}

async fn commit_schema_to_storage(
pub async fn commit_schema_to_storage(
stream_name: &str,
schema: Schema,
) -> Result<(), ObjectStorageError> {
Expand All @@ -451,7 +451,15 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {

#[inline(always)]
fn schema_path(stream_name: &str) -> RelativePathBuf {
RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])
match CONFIG.parseable.mode {
Mode::Ingest => {
let (ip, port) = get_address();
let file_name = format!("ingester.{}.{}{}", ip, port, SCHEMA_FILE_NAME);

RelativePathBuf::from_iter([stream_name, &file_name])
}
Mode::All | Mode::Query => RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]),
}
}

#[inline(always)]
Expand Down

0 comments on commit e4a3430

Please sign in to comment.