Skip to content

Commit

Permalink
fix: Session Context not in sync
Browse files Browse the repository at this point in the history
1. Session Context was not being synced at the time of schema update
2. Make struct TableScanVisitor pub at crate level
  • Loading branch information
Eshanatnight committed Mar 17, 2024
1 parent f6f76cf commit d3066fb
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
20 changes: 15 additions & 5 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures_util::Future;
Expand All @@ -35,7 +36,7 @@ use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::QUERY_SESSION;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand All @@ -61,20 +62,29 @@ pub struct Query {

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
let session_state = QUERY_SESSION.state();
let mut query = into_query(&query_request, &session_state).await?;

// get the logical plan and extract the table name
let raw_logical_plan = session_state
.create_logical_plan(&query_request.query)
.await?;
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let table_name = visitor.into_inner().pop().unwrap();

if CONFIG.parseable.mode == Mode::Query {
if let Ok(new_schema) = fetch_schema(&query.table_name().unwrap()).await {
commit_schema_to_storage(&query.table_name().unwrap(), new_schema.clone())
if let Ok(new_schema) = fetch_schema(&table_name).await {
commit_schema_to_storage(&table_name, new_schema.clone())
.await
.map_err(|err| {
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
})?;
commit_schema(&query.table_name().unwrap(), Arc::new(new_schema))
commit_schema(&table_name, Arc::new(new_schema))
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
}
}

let mut query = into_query(&query_request, &session_state).await?;

// ? run this code only if the query start time and now is less than 1 minute + margin
let mmem = if CONFIG.parseable.mode == Mode::Query {
// create a new query to send to the ingesters
Expand Down
4 changes: 2 additions & 2 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ impl Query {
}

#[derive(Debug, Default)]
struct TableScanVisitor {
pub(crate) struct TableScanVisitor {
tables: Vec<String>,
}

impl TableScanVisitor {
fn into_inner(self) -> Vec<String> {
pub fn into_inner(self) -> Vec<String> {
self.tables
}
}
Expand Down

0 comments on commit d3066fb

Please sign in to comment.