Skip to content

Commit

Permalink
Rm staging query result (#740)
Browse files Browse the repository at this point in the history
* remove staging query from the query result (for distributed)

* Refactor get_schema method to handle missing schema in object storage
  • Loading branch information
Eshanatnight authored Apr 5, 2024
1 parent d7fcf01 commit a54f2d6
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 34 deletions.
2 changes: 2 additions & 0 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
Ok(new_schema)
}

/// unused for now, might need it later
#[allow(unused)]
pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingester
let mut res = vec![];
Expand Down
23 changes: 3 additions & 20 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;

use super::send_query_request_to_ingester;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -85,21 +83,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon

let mut query = into_query(&query_request, &session_state).await?;

// ? run this code only if the query start time and now is less than 1 minute + margin
let mmem = if CONFIG.parseable.mode == Mode::Query {
// create a new query to send to the ingesters
if let Some(que) = transform_query_for_ingester(&query_request) {
let vals = send_query_request_to_ingester(&que)
.await
.map_err(|err| QueryError::Custom(err.to_string()))?;
Some(vals)
} else {
None
}
} else {
None
};

let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);

Expand Down Expand Up @@ -147,7 +130,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http(mmem);
.to_http();

if let Some(table) = table_name {
let time = time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -229,6 +212,8 @@ async fn into_query(
})
}

/// unused for now, might need it in the future
#[allow(unused)]
fn transform_query_for_ingester(query: &Query) -> Option<Query> {
if query.query.is_empty() {
return None;
Expand Down Expand Up @@ -288,8 +273,6 @@ pub enum QueryError {
Datafusion(#[from] DataFusionError),
#[error("Execution Error: {0}")]
Execute(#[from] ExecuteError),
#[error("Error: {0}")]
Custom(String),
#[error("ObjectStorage Error: {0}")]
ObjectStorage(#[from] ObjectStorageError),
#[error("Evern Error: {0}")]
Expand Down
2 changes: 2 additions & 0 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ fn time_from_path(path: &Path) -> DateTime<Utc> {
.unwrap()
}

/// unused for now might need it later
#[allow(unused)]
pub fn flatten_objects_for_count(objects: Vec<Value>) -> Vec<Value> {
if objects.is_empty() {
return objects;
Expand Down
12 changes: 2 additions & 10 deletions server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};

use crate::query::flatten_objects_for_count;

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
Expand All @@ -32,7 +30,7 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub fn to_http(&self, imem: Option<Vec<Value>>) -> impl Responder {
pub fn to_http(&self) -> impl Responder {
log::info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json_rows(&records).unwrap();
Expand All @@ -45,13 +43,7 @@ impl QueryResponse {
}
}
}
let mut values = json_records.into_iter().map(Value::Object).collect_vec();

if let Some(mut imem) = imem {
values.append(&mut imem);
}

let values = flatten_objects_for_count(values);
let values = json_records.into_iter().map(Value::Object).collect_vec();

let response = if self.with_fields {
json!({
Expand Down
27 changes: 23 additions & 4 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,29 @@ pub trait ObjectStorage: Sync + 'static {
&self,
stream_name: &str,
) -> Result<Schema, ObjectStorageError> {
let schema_path =
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
let schema_map = self.get_object(&schema_path).await?;
Ok(serde_json::from_slice(&schema_map)?)
// try get my schema
// if fails get the base schema
// put the schema to storage??
let schema_path = schema_path(stream_name);
let byte_data = match self.get_object(&schema_path).await {
Ok(bytes) => bytes,
Err(err) => {
log::info!("{:?}", err);
// base schema path
let schema_path = RelativePathBuf::from_iter([
stream_name,
STREAM_ROOT_DIRECTORY,
SCHEMA_FILE_NAME,
]);
let data = self.get_object(&schema_path).await?;
// schema was not found in store, so it needs to be placed
self.put_schema(stream_name, &serde_json::from_slice(&data).unwrap())
.await?;

data
}
};
Ok(serde_json::from_slice(&byte_data)?)
}

async fn get_schema(&self, stream_name: &str) -> Result<Schema, ObjectStorageError> {
Expand Down

0 comments on commit a54f2d6

Please sign in to comment.