Skip to content

Commit

Permalink
fix: query result when running complex queries (with sub-quries)
Browse files Browse the repository at this point in the history
Refactor authorization and filter tag logic in query handler
fix: schema getting updated for only one when query refers multiple
tables
  • Loading branch information
Eshanatnight committed Apr 23, 2024
1 parent 09b9e40 commit 47e7d1b
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 60 deletions.
122 changes: 65 additions & 57 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
Expand Down Expand Up @@ -67,69 +68,37 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
let raw_logical_plan = session_state
.create_logical_plan(&query_request.query)
.await?;

// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let table_name = visitor
.into_inner()
.pop()
.ok_or(QueryError::MalformedQuery(
"No table found from sql".to_string(),
))?;

let tables = visitor.into_inner();

if CONFIG.parseable.mode == Mode::Query {
if let Ok(new_schema) = fetch_schema(&table_name).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table_name, new_schema.clone())
.await
.map_err(QueryError::ObjectStorage)?;
commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?;
for table in tables {
if let Ok(new_schema) = fetch_schema(&table).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table, new_schema.clone())
.await
.map_err(QueryError::ObjectStorage)?;
commit_schema(&table, Arc::new(new_schema)).map_err(QueryError::EventError)?;
}
}
}

let mut query = into_query(&query_request, &session_state).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;

let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);
let permissions: Vec<Permission> = Users.get_permissions(&creds);

// check authorization of this query if it references physical table;
let table_name = query.table_name();
if let Some(ref table) = table_name {
let mut authorized = false;
let mut tags = Vec::new();

// in permission check if user can run query on the stream.
// also while iterating add any filter tags for this stream
for permission in permissions {
match permission {
Permission::Stream(Action::All, _) => {
authorized = true;
break;
}
Permission::StreamWithTag(Action::Query, ref stream, tag)
if stream == table || stream == "*" =>
{
authorized = true;
if let Some(tag) = tag {
tags.push(tag)
}
}
_ => (),
}
}

if !authorized {
return Err(QueryError::Unauthorized);
}

if !tags.is_empty() {
query.filter_tag = Some(tags)
}
}
let table_name = query
.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query".to_string()))?;
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;

let time = Instant::now();

let (records, fields) = query.execute(table_name.clone().unwrap()).await?;
let (records, fields) = query.execute(table_name.clone()).await?;
let response = QueryResponse {
records,
fields,
Expand All @@ -138,16 +107,55 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
}
.to_http();

if let Some(table) = table_name {
let time = time.elapsed().as_secs_f64();
QUERY_EXECUTE_TIME
.with_label_values(&[&table])
.observe(time);
}
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);

Ok(response)
}

fn authorize_and_set_filter_tags(
query: &mut LogicalQuery,
permissions: Vec<Permission>,
table_name: &str,
) -> Result<(), QueryError> {
// check authorization of this query if it references physical table;
let mut authorized = false;
let mut tags = Vec::new();

// in permission check if user can run query on the stream.
// also while iterating add any filter tags for this stream
for permission in permissions {
match permission {
Permission::Stream(Action::All, _) => {
authorized = true;
break;
}
Permission::StreamWithTag(Action::Query, ref stream, tag)
if stream == table_name || stream == "*" =>
{
authorized = true;
if let Some(tag) = tag {
tags.push(tag)
}
}
_ => (),
}
}

if !authorized {
return Err(QueryError::Unauthorized);
}

if !tags.is_empty() {
query.filter_tag = Some(tags)
}

Ok(())
}

impl FromRequest for Query {
type Error = actix_web::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
Expand Down Expand Up @@ -178,7 +186,7 @@ impl FromRequest for Query {
async fn into_query(
query: &Query,
session_state: &SessionState,
) -> Result<crate::query::Query, QueryError> {
) -> Result<LogicalQuery, QueryError> {
if query.query.is_empty() {
return Err(QueryError::EmptyQuery);
}
Expand Down
6 changes: 3 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Query {
}
}

pub fn table_name(&self) -> Option<String> {
pub fn first_table_name(&self) -> Option<String> {
let mut visitor = TableScanVisitor::default();
let _ = self.raw_logical_plan.visit(&mut visitor);
visitor.into_inner().pop()
Expand All @@ -192,7 +192,7 @@ impl TreeNodeVisitor for TableScanVisitor {
match node {
LogicalPlan::TableScan(table) => {
self.tables.push(table.table_name.table().to_string());
Ok(VisitRecursion::Stop)
Ok(VisitRecursion::Skip)
}
_ => Ok(VisitRecursion::Continue),
}
Expand Down Expand Up @@ -290,7 +290,7 @@ fn table_contains_any_time_filters(
})
.any(|expr| {
matches!(&*expr.left, Expr::Column(Column { name, .. })
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
})
}
Expand Down

0 comments on commit 47e7d1b

Please sign in to comment.