diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 26f29b592..49dcbc869 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -37,6 +37,7 @@ use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; +use crate::query::Query as LogicalQuery; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; @@ -67,69 +68,37 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result = Users.get_permissions(&creds); - // check authorization of this query if it references physical table; - let table_name = query.table_name(); - if let Some(ref table) = table_name { - let mut authorized = false; - let mut tags = Vec::new(); - - // in permission check if user can run query on the stream. - // also while iterating add any filter tags for this stream - for permission in permissions { - match permission { - Permission::Stream(Action::All, _) => { - authorized = true; - break; - } - Permission::StreamWithTag(Action::Query, ref stream, tag) - if stream == table || stream == "*" => - { - authorized = true; - if let Some(tag) = tag { - tags.push(tag) - } - } - _ => (), - } - } - - if !authorized { - return Err(QueryError::Unauthorized); - } - - if !tags.is_empty() { - query.filter_tag = Some(tags) - } - } + let table_name = query + .first_table_name() + .ok_or_else(|| QueryError::MalformedQuery("No table name found in query".to_string()))?; + authorize_and_set_filter_tags(&mut query, permissions, &table_name)?; let time = Instant::now(); - let (records, fields) = query.execute(table_name.clone().unwrap()).await?; + let (records, fields) = query.execute(table_name.clone()).await?; let response = QueryResponse { records, fields, @@ -138,16 +107,55 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result, + table_name: &str, +) -> Result<(), QueryError> { + // check authorization of this query if it references physical table; + let mut authorized = false; + let mut tags = Vec::new(); + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions { + match permission { + Permission::Stream(Action::All, _) => { + authorized = true; + break; + } + Permission::StreamWithTag(Action::Query, ref stream, tag) + if stream == table_name || stream == "*" => + { + authorized = true; + if let Some(tag) = tag { + tags.push(tag) + } + } + _ => (), + } + } + + if !authorized { + return Err(QueryError::Unauthorized); + } + + if !tags.is_empty() { + query.filter_tag = Some(tags) + } + + Ok(()) +} + impl FromRequest for Query { type Error = actix_web::Error; type Future = Pin>>>; @@ -178,7 +186,7 @@ impl FromRequest for Query { async fn into_query( query: &Query, session_state: &SessionState, -) -> Result { +) -> Result { if query.query.is_empty() { return Err(QueryError::EmptyQuery); } diff --git a/server/src/query.rs b/server/src/query.rs index c3abe3d61..a841b7c47 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -167,7 +167,7 @@ impl Query { } } - pub fn table_name(&self) -> Option { + pub fn first_table_name(&self) -> Option { let mut visitor = TableScanVisitor::default(); let _ = self.raw_logical_plan.visit(&mut visitor); visitor.into_inner().pop() @@ -192,7 +192,7 @@ impl TreeNodeVisitor for TableScanVisitor { match node { LogicalPlan::TableScan(table) => { self.tables.push(table.table_name.table().to_string()); - Ok(VisitRecursion::Stop) + Ok(VisitRecursion::Skip) } _ => Ok(VisitRecursion::Continue), } @@ -290,7 +290,7 @@ fn table_contains_any_time_filters( }) .any(|expr| { matches!(&*expr.left, Expr::Column(Column { name, .. }) - if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) || + if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) || (!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY))) }) }