diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs
deleted file mode 100644
index 8fce4bcec..000000000
--- a/src/alerts/mod.rs
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Parseable Server (C) 2022 - 2024 Parseable, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-use arrow_array::cast::as_string_array;
-use arrow_array::RecordBatch;
-use arrow_schema::DataType;
-use async_trait::async_trait;
-use datafusion::arrow::compute::kernels::cast;
-use datafusion::arrow::datatypes::Schema;
-use regex::Regex;
-use serde::{Deserialize, Serialize};
-use std::fmt;
-
-pub mod parser;
-pub mod rule;
-pub mod target;
-
-use crate::metrics::ALERTS_STATES;
-use crate::option::CONFIG;
-use crate::utils::arrow::get_field;
-use crate::utils::uid;
-use crate::{storage, utils};
-
-pub use self::rule::Rule;
-use self::target::Target;
-
-#[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct Alerts {
- pub version: AlertVerison,
- pub alerts: Vec,
-}
-
-#[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
-#[serde(rename_all = "lowercase")]
-pub enum AlertVerison {
- #[default]
- V1,
-}
-
-#[derive(Debug, serde::Serialize, serde::Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct Alert {
- #[serde(default = "crate::utils::uid::gen")]
- pub id: uid::Uid,
- pub name: String,
- #[serde(flatten)]
- pub message: Message,
- pub rule: Rule,
- pub targets: Vec,
-}
-
-impl Alert {
- pub fn check_alert(&self, stream_name: &str, events: RecordBatch) {
- let resolves = self.rule.resolves(events.clone());
-
- for (index, state) in resolves.into_iter().enumerate() {
- match state {
- AlertState::Listening | AlertState::Firing => (),
- alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => {
- let context = self.get_context(
- stream_name.to_owned(),
- alert_state,
- &self.rule,
- events.slice(index, 1),
- );
- ALERTS_STATES
- .with_label_values(&[
- context.stream.as_str(),
- context.alert_info.alert_name.as_str(),
- context.alert_info.alert_state.to_string().as_str(),
- ])
- .inc();
- for target in &self.targets {
- target.call(context.clone());
- }
- }
- }
- }
- }
-
- fn get_context(
- &self,
- stream_name: String,
- alert_state: AlertState,
- rule: &Rule,
- event_row: RecordBatch,
- ) -> Context {
- let deployment_instance = format!(
- "{}://{}",
- CONFIG.parseable.get_scheme(),
- CONFIG.parseable.address
- );
- let deployment_id = storage::StorageMetadata::global().deployment_id;
- let deployment_mode = storage::StorageMetadata::global().mode.to_string();
- let additional_labels =
- serde_json::to_value(rule).expect("rule is perfectly deserializable");
- let flatten_additional_labels =
- utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_")
- .expect("can be flattened");
- Context::new(
- stream_name,
- AlertInfo::new(
- self.name.clone(),
- self.message.get(event_row),
- rule.trigger_reason(),
- alert_state,
- ),
- DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode),
- flatten_additional_labels,
- )
- }
-}
-
-#[derive(Debug, Serialize, Deserialize, Clone)]
-#[serde(rename_all = "camelCase")]
-pub struct Message {
- pub message: String,
-}
-
-impl Message {
- // checks if message (with a column name) is valid (i.e. the column name is present in the schema)
- pub fn valid(&self, schema: &Schema, column: &str) -> bool {
- get_field(&schema.fields, column).is_some()
- }
-
- pub fn extract_column_names(&self) -> Vec<&str> {
- // the message can have either no column name ({column_name} not present) or any number of {column_name} present
- Regex::new(r"\{(.*?)\}")
- .unwrap()
- .captures_iter(self.message.as_str())
- .map(|cap| cap.get(1).unwrap().as_str())
- .collect()
- }
-
- /// Returns the message with the column names replaced with the values in the column.
- fn get(&self, event: RecordBatch) -> String {
- let mut replace_message = self.message.clone();
- for column in self.extract_column_names() {
- if let Some(value) = event.column_by_name(column) {
- let arr = cast(value, &DataType::Utf8).unwrap();
- let value = as_string_array(&arr).value(0);
-
- replace_message =
- replace_message.replace(&format!("{{{column}}}"), value.to_string().as_str());
- }
- }
- replace_message
- }
-}
-
-#[async_trait]
-pub trait CallableTarget {
- async fn call(&self, payload: &Context);
-}
-
-#[derive(Debug, Clone)]
-pub struct Context {
- stream: String,
- alert_info: AlertInfo,
- deployment_info: DeploymentInfo,
- additional_labels: serde_json::Value,
-}
-
-impl Context {
- pub fn new(
- stream: String,
- alert_info: AlertInfo,
- deployment_info: DeploymentInfo,
- additional_labels: serde_json::Value,
- ) -> Self {
- Self {
- stream,
- alert_info,
- deployment_info,
- additional_labels,
- }
- }
-
- fn default_alert_string(&self) -> String {
- format!(
- "{} triggered on {}\nMessage: {}\nFailing Condition: {}",
- self.alert_info.alert_name,
- self.stream,
- self.alert_info.message,
- self.alert_info.reason
- )
- }
-
- fn default_resolved_string(&self) -> String {
- format!(
- "{} on {} is now resolved ",
- self.alert_info.alert_name, self.stream
- )
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct AlertInfo {
- alert_name: String,
- message: String,
- reason: String,
- alert_state: AlertState,
-}
-
-impl AlertInfo {
- pub fn new(
- alert_name: String,
- message: String,
- reason: String,
- alert_state: AlertState,
- ) -> Self {
- Self {
- alert_name,
- message,
- reason,
- alert_state,
- }
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct DeploymentInfo {
- deployment_instance: String,
- deployment_id: uid::Uid,
- deployment_mode: String,
-}
-
-impl DeploymentInfo {
- pub fn new(
- deployment_instance: String,
- deployment_id: uid::Uid,
- deployment_mode: String,
- ) -> Self {
- Self {
- deployment_instance,
- deployment_id,
- deployment_mode,
- }
- }
-}
-
-#[derive(Debug, PartialEq, Eq, Clone, Copy)]
-pub enum AlertState {
- Listening,
- SetToFiring,
- Firing,
- Resolved,
-}
-
-impl Default for AlertState {
- fn default() -> Self {
- Self::Listening
- }
-}
-
-impl fmt::Display for AlertState {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match *self {
- AlertState::Listening => write!(f, "Listening"),
- AlertState::SetToFiring => write!(f, "SetToFiring"),
- AlertState::Firing => write!(f, "Firing"),
- AlertState::Resolved => write!(f, "Resolved"),
- }
- }
-}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 42773ed12..2af7f082b 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -24,7 +24,6 @@ use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use itertools::Itertools;
use std::sync::Arc;
-use tracing::error;
use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
@@ -90,13 +89,6 @@ impl Event {
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
- if let Err(e) = metadata::STREAM_INFO
- .check_alerts(&self.stream_name, &self.rb)
- .await
- {
- error!("Error checking for alerts. {:?}", e);
- }
-
Ok(())
}
diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs
index e910c7035..65c4cf400 100644
--- a/src/handlers/airplane.rs
+++ b/src/handlers/airplane.rs
@@ -157,7 +157,7 @@ impl FlightService for AirServiceImpl {
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();
- update_schema_when_distributed(streams)
+ update_schema_when_distributed(&streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;
@@ -212,7 +212,7 @@ impl FlightService for AirServiceImpl {
let permissions = Users.get_permissions(&key);
- authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
+ authorize_and_set_filter_tags(&mut query, permissions, &streams).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
diff --git a/src/handlers/http/alerts/alerts_utils.rs b/src/handlers/http/alerts/alerts_utils.rs
new file mode 100644
index 000000000..245da6329
--- /dev/null
+++ b/src/handlers/http/alerts/alerts_utils.rs
@@ -0,0 +1,160 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use datafusion::{
+ common::tree_node::TreeNode,
+ prelude::{col, lit, Expr},
+};
+use tracing::trace;
+
+use crate::{
+ handlers::http::alerts::{AlertState, ALERTS},
+ query::{TableScanVisitor, QUERY_SESSION},
+ rbac::{
+ map::SessionKey,
+ role::{Action, Permission},
+ Users,
+ },
+ utils::time::TimeRange,
+};
+
+use super::{AlertConfig, AlertError};
+
+async fn get_tables_from_query(query: &str) -> Result {
+ let session_state = QUERY_SESSION.state();
+ let raw_logical_plan = session_state.create_logical_plan(query).await?;
+
+ let mut visitor = TableScanVisitor::default();
+ let _ = raw_logical_plan.visit(&mut visitor);
+ Ok(visitor)
+}
+
+pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Result<(), AlertError> {
+ let tables = get_tables_from_query(query).await?;
+ let permissions = Users.get_permissions(session_key);
+
+ for table_name in tables.into_inner().iter() {
+ let mut authorized = false;
+
+ // in permission check if user can run query on the stream.
+ // also while iterating add any filter tags for this stream
+ for permission in permissions.iter() {
+ match permission {
+ Permission::Stream(Action::All, _) => {
+ authorized = true;
+ break;
+ }
+ Permission::StreamWithTag(Action::Query, ref stream, _)
+ if stream == table_name || stream == "*" =>
+ {
+ authorized = true;
+ }
+ _ => (),
+ }
+ }
+
+ if !authorized {
+ return Err(AlertError::Unauthorized);
+ }
+ }
+
+ Ok(())
+}
+
+/// This function contains the logic to run the alert evaluation task
+pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {
+ println!("RUNNING EVAL TASK FOR- {alert:?}");
+
+ let (start_time, end_time) = match &alert.eval_type {
+ super::EvalConfig::RollingWindow(rolling_window) => {
+ (&rolling_window.eval_start, &rolling_window.eval_end)
+ }
+ };
+
+ let session_state = QUERY_SESSION.state();
+ let raw_logical_plan = session_state
+ .create_logical_plan(&alert.query)
+ .await
+ .unwrap();
+
+ // TODO: Filter tags should be taken care of!!!
+ let time_range = TimeRange::parse_human_time(start_time, end_time).unwrap();
+ let query = crate::query::Query {
+ raw_logical_plan,
+ time_range,
+ filter_tag: None,
+ };
+
+ // for now proceed in a similar fashion as we do in query
+ // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
+ let stream_name = query.first_table_name().unwrap();
+
+ let df = query.get_dataframe(stream_name).await.unwrap();
+
+ // let df = DataFrame::new(session_state, raw_logical_plan);
+
+ let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
+ for threshold in &alert.thresholds {
+ let res = match threshold.operator {
+ crate::handlers::http::alerts::AlertOperator::GreaterThan => {
+ col(&threshold.column).gt(lit(threshold.value))
+ }
+ crate::handlers::http::alerts::AlertOperator::LessThan => {
+ col(&threshold.column).lt(lit(threshold.value))
+ }
+ crate::handlers::http::alerts::AlertOperator::EqualTo => {
+ col(&threshold.column).eq(lit(threshold.value))
+ }
+ crate::handlers::http::alerts::AlertOperator::NotEqualTo => {
+ col(&threshold.column).not_eq(lit(threshold.value))
+ }
+ crate::handlers::http::alerts::AlertOperator::GreaterThanEqualTo => {
+ col(&threshold.column).gt_eq(lit(threshold.value))
+ }
+ crate::handlers::http::alerts::AlertOperator::LessThanEqualTo => {
+ col(&threshold.column).lt_eq(lit(threshold.value))
+ }
+ crate::handlers::http::alerts::AlertOperator::Like => {
+ col(&threshold.column).like(lit(threshold.value))
+ }
+ crate::handlers::http::alerts::AlertOperator::NotLike => {
+ col(&threshold.column).not_like(lit(threshold.value))
+ }
+ };
+
+ expr = expr.and(res);
+ }
+
+ let nrows = df.clone().filter(expr).unwrap().count().await.unwrap();
+ trace!("dataframe-\n{:?}", df.collect().await);
+
+ if nrows > 0 {
+ trace!("ALERT!!!!!!");
+
+ // update state
+ ALERTS
+ .update_state(&alert.id.to_string(), AlertState::Triggered, true)
+ .await?;
+ } else {
+ ALERTS
+ .update_state(&alert.id.to_string(), AlertState::Resolved, false)
+ .await?;
+ }
+
+ Ok(())
+}
diff --git a/src/handlers/http/alerts/http_handlers.rs b/src/handlers/http/alerts/http_handlers.rs
new file mode 100644
index 000000000..28afd3fb6
--- /dev/null
+++ b/src/handlers/http/alerts/http_handlers.rs
@@ -0,0 +1,183 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use crate::{
+ option::CONFIG,
+ storage::object_storage::alert_json_path,
+ sync::schedule_alert_task,
+ utils::{actix::extract_session_key_from_req, uid::Uid},
+};
+use actix_web::{web, HttpRequest, Responder};
+use bytes::Bytes;
+use tracing::warn;
+
+use super::{alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertState, ALERTS};
+
+// GET /alerts
+/// User needs at least a read access to the stream(s) that is being referenced in an alert
+/// Read all alerts then return alerts which satisfy the condition
+pub async fn list(req: HttpRequest) -> Result {
+ let session_key = extract_session_key_from_req(&req)?;
+ let alerts = ALERTS.list_alerts_for_user(session_key).await?;
+
+ Ok(web::Json(alerts))
+}
+
+// POST /alerts
+pub async fn post(req: HttpRequest, alert: AlertConfig) -> Result {
+ // validate the incoming alert query
+ // does the user have access to these tables or not?
+ let session_key = extract_session_key_from_req(&req).unwrap();
+ user_auth_for_query(&session_key, &alert.query).await?;
+
+ // now that we've validated that the user can run this query
+ // move on to saving the alert in ObjectStore
+ ALERTS.update(&alert).await;
+
+ let path = alert_json_path(&alert.id.to_string());
+
+ let store = CONFIG.storage().get_object_store();
+ let alert_bytes = serde_json::to_vec(&alert)?;
+ store.put_object(&path, Bytes::from(alert_bytes)).await?;
+
+ // create scheduled tasks
+ let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
+
+ ALERTS.update_task(alert.id, handle, rx, tx).await;
+
+ Ok(format!("alert created with ID- {}", alert.id))
+}
+
+// GET /alerts/{alert_id}
+pub async fn get(req: HttpRequest) -> Result {
+ let session_key = extract_session_key_from_req(&req)?;
+ let id = req
+ .match_info()
+ .get("alert_id")
+ .ok_or(AlertError::Metadata("No alert ID Provided"))?;
+
+ let alert = ALERTS.get_alert_by_id(session_key, id).await?;
+ Ok(web::Json(alert))
+}
+
+// DELETE /alerts/{alert_id}
+/// Deletion should happen from disk, sheduled tasks, then memory
+pub async fn delete(req: HttpRequest) -> Result {
+ let alert_id = req
+ .match_info()
+ .get("alert_id")
+ .ok_or(AlertError::Metadata("No alert ID Provided"))?;
+
+ // delete from disk and memory
+ ALERTS.delete(alert_id).await?;
+
+ // delete the scheduled task
+ ALERTS.delete_task(alert_id).await?;
+
+ Ok(format!("Deleted alert with ID- {alert_id}"))
+}
+
+// PUT /alerts/{alert_id}
+/// first save on disk, then in memory
+/// then modify scheduled task
+pub async fn modify(
+ req: HttpRequest,
+ mut alert: AlertConfig,
+) -> Result {
+ let session_key = extract_session_key_from_req(&req)?;
+ let alert_id = req
+ .match_info()
+ .get("alert_id")
+ .ok_or(AlertError::Metadata("No alert ID Provided"))?;
+
+ // ensure that the user doesn't unknowingly change the ID
+ if alert_id != alert.id.to_string() {
+ warn!("Alert modify request is trying to change Alert ID, reverting ID");
+ alert.id = Uid::from_string(alert_id)
+ .map_err(|_| AlertError::CustomError("Unable to get Uid from String".to_owned()))?;
+ }
+
+ // validate that the user has access to the tables mentioned
+ user_auth_for_query(&session_key, &alert.query).await?;
+
+ // // fetch the alert from this ID to get AlertState
+ // let state = ALERTS.get_alert_by_id(session_key, alert_id).await?.state;
+
+ let store = CONFIG.storage().get_object_store();
+
+ // modify on disk
+ store.put_alert(&alert.id.to_string(), &alert).await?;
+
+ // modify in memory
+ ALERTS.update(&alert).await;
+
+ // modify task
+ let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
+
+ ALERTS.update_task(alert.id, handle, rx, tx).await;
+
+ Ok(format!("Modified alert {}", alert.id))
+}
+
+// PUT /alerts/{alert_id}/update_state
+pub async fn update_state(req: HttpRequest, state: String) -> Result {
+ let alert_id = req
+ .match_info()
+ .get("alert_id")
+ .ok_or(AlertError::Metadata("No alert ID Provided"))?;
+
+ // get current state
+ let current_state = ALERTS.get_state(alert_id).await?;
+
+ let new_state: AlertState = serde_json::from_str(&state)?;
+
+ match current_state {
+ AlertState::Triggered => {
+ match new_state {
+ AlertState::Triggered => {
+ let msg = format!("Not allowed to manually go from Triggered to {new_state}");
+ return Err(AlertError::InvalidStateChange(msg));
+ }
+ _ => {
+ // update state on disk and in memory
+ ALERTS.update_state(alert_id, new_state, true).await?;
+ }
+ }
+ }
+ AlertState::Silenced => {
+ // from here, the user can only go to Resolved
+ match new_state {
+ AlertState::Resolved => {
+ // update state on disk and in memory
+ ALERTS.update_state(alert_id, new_state, true).await?;
+ }
+ _ => {
+ let msg = format!("Not allowed to manually go from Silenced to {new_state}");
+ return Err(AlertError::InvalidStateChange(msg));
+ }
+ }
+ }
+ AlertState::Resolved => {
+ // user shouldn't logically be changing states if current state is Resolved
+ let msg = format!("Not allowed to go manually from Resolved to {new_state}");
+ return Err(AlertError::InvalidStateChange(msg));
+ }
+ }
+
+ Ok("")
+}
diff --git a/src/handlers/http/alerts/mod.rs b/src/handlers/http/alerts/mod.rs
new file mode 100644
index 000000000..2a01d2bd9
--- /dev/null
+++ b/src/handlers/http/alerts/mod.rs
@@ -0,0 +1,538 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use actix_web::http::header::ContentType;
+use actix_web::web::Json;
+use actix_web::{FromRequest, HttpRequest};
+use alerts_utils::user_auth_for_query;
+use async_trait::async_trait;
+use http::StatusCode;
+use once_cell::sync::Lazy;
+use serde_json::Error as SerdeError;
+use std::collections::HashMap;
+use std::fmt::{self, Display};
+use std::future::Future;
+use std::pin::Pin;
+use tokio::sync::oneshot::{Receiver, Sender};
+use tokio::sync::RwLock;
+use tokio::task::JoinHandle;
+use tracing::{trace, warn};
+use ulid::Ulid;
+
+pub mod alerts_utils;
+pub mod http_handlers;
+pub mod target;
+
+use crate::option::CONFIG;
+use crate::rbac::map::SessionKey;
+use crate::storage;
+use crate::storage::object_storage::alert_json_path;
+use crate::storage::ObjectStorageError;
+use crate::sync::schedule_alert_task;
+use crate::utils::uid;
+use crate::utils::uid::Uid;
+
+use self::target::Target;
+
+// these types describe the scheduled task for an alert
+pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>);
+pub type ScheduledTasks = RwLock>;
+
+pub static ALERTS: Lazy = Lazy::new(Alerts::default);
+
+#[derive(Debug, Default)]
+pub struct Alerts {
+ pub alerts: RwLock>,
+ pub scheduled_tasks: ScheduledTasks,
+}
+
+#[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)]
+#[serde(rename_all = "lowercase")]
+pub enum AlertVerison {
+ #[default]
+ V1,
+}
+
+#[async_trait]
+pub trait CallableTarget {
+ async fn call(&self, payload: &Context);
+}
+
+#[derive(Debug, Clone)]
+pub struct Context {
+ alert_info: AlertInfo,
+ deployment_info: DeploymentInfo,
+}
+
+impl Context {
+ pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo) -> Self {
+ Self {
+ alert_info,
+ deployment_info,
+ }
+ }
+
+ fn default_alert_string(&self) -> String {
+ format!(
+ "triggered on {}",
+ self.alert_info.alert_name,
+ // self.alert_info.message,
+ // self.alert_info.reason
+ )
+ }
+
+ fn default_resolved_string(&self) -> String {
+ format!("{} is now resolved ", self.alert_info.alert_name)
+ }
+
+ fn default_silenced_string(&self) -> String {
+ format!(
+ "Notifications for {} have been silenced ",
+ self.alert_info.alert_name
+ )
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct AlertInfo {
+ alert_id: String,
+ alert_name: String,
+ // message: String,
+ // reason: String,
+ alert_state: AlertState,
+}
+
+impl AlertInfo {
+ pub fn new(alert_id: String, alert_name: String, alert_state: AlertState) -> Self {
+ Self {
+ alert_id,
+ alert_name,
+ alert_state,
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct DeploymentInfo {
+ deployment_instance: String,
+ deployment_id: uid::Uid,
+ deployment_mode: String,
+}
+
+impl DeploymentInfo {
+ pub fn new(
+ deployment_instance: String,
+ deployment_id: uid::Uid,
+ deployment_mode: String,
+ ) -> Self {
+ Self {
+ deployment_instance,
+ deployment_id,
+ deployment_mode,
+ }
+ }
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub enum AlertType {
+ Threshold,
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub enum AlertOperator {
+ GreaterThan,
+ LessThan,
+ EqualTo,
+ NotEqualTo,
+ GreaterThanEqualTo,
+ LessThanEqualTo,
+ Like,
+ NotLike,
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub enum Aggregate {
+ Avg,
+ Count,
+ Min,
+ Max,
+ Sum,
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+pub struct ThresholdConfig {
+ pub agg: Aggregate,
+ pub column: String,
+ pub operator: AlertOperator,
+ pub value: f32,
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub struct RollingWindow {
+ // x minutes (25m)
+ pub eval_start: String,
+ // should always be "now"
+ pub eval_end: String,
+ // x minutes (5m)
+ pub eval_frequency: u32,
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub enum EvalConfig {
+ RollingWindow(RollingWindow),
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub struct AlertEval {}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Default)]
+#[serde(rename_all = "camelCase")]
+pub enum AlertState {
+ Triggered,
+ Silenced,
+ #[default]
+ Resolved,
+}
+
+impl Display for AlertState {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ AlertState::Triggered => write!(f, "Triggered"),
+ AlertState::Silenced => write!(f, "Silenced"),
+ AlertState::Resolved => write!(f, "Resolved"),
+ }
+ }
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub struct AlertConfig {
+ pub version: AlertVerison,
+ #[serde(default = "crate::utils::uid::gen")]
+ pub id: uid::Uid,
+ pub title: String,
+ pub query: String,
+ pub alert_type: AlertType,
+ pub thresholds: Vec,
+ pub eval_type: EvalConfig,
+ pub targets: Vec,
+ // for new alerts, state should be resolved
+ #[serde(default = "AlertState::default")]
+ pub state: AlertState,
+}
+
+impl FromRequest for AlertConfig {
+ type Error = actix_web::Error;
+ type Future = Pin>>>;
+
+ fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future {
+ let body = Json::::from_request(req, payload);
+ let fut = async move {
+ let body = body.await?.into_inner();
+ Ok(body)
+ };
+
+ Box::pin(fut)
+ }
+}
+
+impl AlertConfig {
+ pub fn get_eval_frequency(&self) -> u32 {
+ match &self.eval_type {
+ EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency,
+ }
+ }
+
+ fn get_context(&self, alert_state: AlertState) -> Context {
+ let deployment_instance = format!(
+ "{}://{}",
+ CONFIG.parseable.get_scheme(),
+ CONFIG.parseable.address
+ );
+ let deployment_id = storage::StorageMetadata::global().deployment_id;
+ let deployment_mode = storage::StorageMetadata::global().mode.to_string();
+
+ // let additional_labels =
+ // serde_json::to_value(rule).expect("rule is perfectly deserializable");
+ // let flatten_additional_labels =
+ // utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_")
+ // .expect("can be flattened");
+
+ Context::new(
+ AlertInfo::new(self.id.to_string(), self.title.clone(), alert_state),
+ DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode),
+ )
+ }
+
+ pub async fn trigger_notifications(&self) -> Result<(), AlertError> {
+ let context = self.get_context(self.state);
+ for target in &self.targets {
+ target.call(context.clone());
+ }
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum AlertError {
+ #[error("Storage Error: {0}")]
+ ObjectStorage(#[from] ObjectStorageError),
+ #[error("Serde Error: {0}")]
+ Serde(#[from] SerdeError),
+ #[error("Cannot perform this operation: {0}")]
+ Metadata(&'static str),
+ #[error("User is not authorized to run this query")]
+ Unauthorized,
+ #[error("ActixError: {0}")]
+ Error(#[from] actix_web::Error),
+ #[error("DataFusion Error: {0}")]
+ DatafusionError(#[from] datafusion::error::DataFusionError),
+ #[error("Error: {0}")]
+ CustomError(String),
+ #[error("Invalid State Change: {0}")]
+ InvalidStateChange(String),
+}
+
+impl actix_web::ResponseError for AlertError {
+ fn status_code(&self) -> StatusCode {
+ match self {
+ Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ Self::Serde(_) => StatusCode::BAD_REQUEST,
+ Self::Metadata(_) => StatusCode::BAD_REQUEST,
+ Self::Unauthorized => StatusCode::BAD_REQUEST,
+ Self::Error(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ Self::CustomError(_) => StatusCode::BAD_REQUEST,
+ Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
+ }
+ }
+
+ fn error_response(&self) -> actix_web::HttpResponse {
+ actix_web::HttpResponse::build(self.status_code())
+ .insert_header(ContentType::plaintext())
+ .body(self.to_string())
+ }
+}
+
+impl Alerts {
+ /// Loads alerts from disk
+ /// spawn scheduled tasks
+ pub async fn load(&self) -> Result<(), AlertError> {
+ let mut this = vec![];
+ let store = CONFIG.storage().get_object_store();
+ let all_alerts = store.get_alerts().await.unwrap_or_default();
+
+ for alert in all_alerts {
+ if alert.is_empty() {
+ continue;
+ }
+
+ let alert: AlertConfig = serde_json::from_slice(&alert)?;
+
+ let (handle, rx, tx) =
+ schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
+
+ self.update_task(alert.id, handle, rx, tx).await;
+
+ this.push(alert);
+ }
+
+ let mut s = self.alerts.write().await;
+ s.append(&mut this);
+
+ Ok(())
+ }
+
+ /// Returns a list of alerts that the user has access to (based on query auth)
+ pub async fn list_alerts_for_user(
+ &self,
+ session: SessionKey,
+ ) -> Result, AlertError> {
+ let mut alerts: Vec = Vec::new();
+ for alert in self.alerts.read().await.iter() {
+ // filter based on whether the user can execute this query or not
+ let query = &alert.query;
+ if user_auth_for_query(&session, query).await.is_ok() {
+ alerts.push(alert.to_owned());
+ }
+ }
+
+ Ok(alerts)
+ }
+
+ /// Returns a sigle alert that the user has access to (based on query auth)
+ pub async fn get_alert_by_id(
+ &self,
+ session: SessionKey,
+ id: &str,
+ ) -> Result {
+ let mut alert = None;
+ for a in self.alerts.read().await.iter() {
+ if a.id.to_string() == id {
+ let query = &a.query;
+ match user_auth_for_query(&session, query).await {
+ Ok(_) => {
+ alert = Some(a.clone());
+ break;
+ }
+ Err(err) => return Err(err),
+ }
+ }
+ }
+
+ if let Some(alert) = alert {
+ Ok(alert.clone())
+ } else {
+ Err(AlertError::CustomError(format!(
+ "No alert found for the given ID- {id}"
+ )))
+ }
+ }
+
+ /// Update the in-mem vector of alerts
+ pub async fn update(&self, alert: &AlertConfig) {
+ let mut s = self.alerts.write().await;
+ s.retain(|a| a.id != alert.id);
+ s.push(alert.clone());
+ }
+
+ /// Update the state of alert
+ pub async fn update_state(
+ &self,
+ alert_id: &str,
+ new_state: AlertState,
+ trigger_notif: bool,
+ ) -> Result<(), AlertError> {
+ let store = CONFIG.storage().get_object_store();
+ let alert_path = alert_json_path(alert_id);
+
+ // read and modify alert
+ let mut alert: AlertConfig = serde_json::from_slice(&store.get_object(&alert_path).await?)?;
+ alert.state = new_state;
+
+ // save to disk
+ store.put_alert(alert_id, &alert).await?;
+
+ // modify in memory
+ self.alerts.write().await.iter_mut().for_each(|alert| {
+ if alert.id.to_string() == alert_id {
+ alert.state = new_state;
+ }
+ });
+
+ if trigger_notif {
+ alert.trigger_notifications().await?;
+ }
+
+ Ok(())
+ }
+
+ /// Remove alert and scheduled task from disk and memory
+ pub async fn delete(&self, alert_id: &str) -> Result<(), AlertError> {
+ let store = CONFIG.storage().get_object_store();
+ let alert_path = alert_json_path(alert_id);
+
+ // delete from disk
+ store
+ .delete_object(&alert_path)
+ .await
+ .map_err(AlertError::ObjectStorage)?;
+ trace!("Deleted from disk");
+
+ // now delete from memory
+ let read_access = self.alerts.read().await;
+
+ let index = read_access
+ .iter()
+ .enumerate()
+ .find(|(_, alert)| alert.id.to_string() == alert_id)
+ .to_owned();
+
+ if let Some((index, _)) = index {
+ // drop the read access in order to get exclusive write access
+ drop(read_access);
+ self.alerts.write().await.remove(index);
+ trace!("removed alert from memory");
+ } else {
+ warn!("Alert ID- {alert_id} not found in memory!");
+ }
+ Ok(())
+ }
+
+ /// Get state of alert using alert_id
+ pub async fn get_state(&self, alert_id: &str) -> Result {
+ let read_access = self.alerts.read().await;
+ let alert = read_access.iter().find(|a| a.id.to_string() == alert_id);
+
+ if let Some(alert) = alert {
+ Ok(alert.state)
+ } else {
+ let msg = format!("No alert present for ID- {alert_id}");
+ Err(AlertError::CustomError(msg))
+ }
+ }
+
+ /// Update the scheduled alert tasks in-memory map
+ pub async fn update_task(
+ &self,
+ id: Uid,
+ handle: JoinHandle<()>,
+ rx: Receiver<()>,
+ tx: Sender<()>,
+ ) {
+ let mut s = self.scheduled_tasks.write().await;
+ s.insert(id, (handle, rx, tx));
+ }
+
+ /// Remove a scheduled alert task
+ pub async fn delete_task(&self, alert_id: &str) -> Result<(), AlertError> {
+ let read_access = self.scheduled_tasks.read().await;
+
+ let hashed_object = read_access
+ .iter()
+ .find(|(id, _)| id.to_string() == alert_id);
+
+ if hashed_object.is_some() {
+ // drop the read access in order to get exclusive write access
+ drop(read_access);
+
+ // now delete from hashmap
+ let removed =
+ self.scheduled_tasks
+ .write()
+ .await
+ .remove(&Ulid::from_string(alert_id).map_err(|_| {
+ AlertError::CustomError("Unable to decode Ulid".to_owned())
+ })?);
+
+ if removed.is_none() {
+ trace!("Unable to remove alert task {alert_id} from hashmap");
+ }
+ } else {
+ trace!("Alert task {alert_id} not found in hashmap");
+ }
+
+ Ok(())
+ }
+}
diff --git a/src/alerts/parser.rs b/src/handlers/http/alerts/parser.rs
similarity index 99%
rename from src/alerts/parser.rs
rename to src/handlers/http/alerts/parser.rs
index 562c14b07..98f9ca871 100644
--- a/src/alerts/parser.rs
+++ b/src/handlers/http/alerts/parser.rs
@@ -229,7 +229,7 @@ impl FromStr for CompositeRule {
mod tests {
use std::str::FromStr;
- use crate::alerts::rule::{
+ use crate::handlers::http::alerts::rule::{
base::{
ops::{NumericOperator, StringOperator},
NumericRule, StringRule,
@@ -237,6 +237,8 @@ mod tests {
CompositeRule,
};
+
+
#[test]
fn test_and_or_not() {
let input = r#"key=500 and key="value" or !(key=300)"#;
diff --git a/src/alerts/rule.rs b/src/handlers/http/alerts/rule.rs
similarity index 100%
rename from src/alerts/rule.rs
rename to src/handlers/http/alerts/rule.rs
diff --git a/src/alerts/target.rs b/src/handlers/http/alerts/target.rs
similarity index 72%
rename from src/alerts/target.rs
rename to src/handlers/http/alerts/target.rs
index c7e2c7586..3ab7014c7 100644
--- a/src/alerts/target.rs
+++ b/src/handlers/http/alerts/target.rs
@@ -28,9 +28,9 @@ use chrono::Utc;
use http::{header::AUTHORIZATION, HeaderMap, HeaderValue};
use humantime_serde::re::humantime;
use reqwest::ClientBuilder;
-use tracing::error;
+use tracing::{error, trace};
-use crate::utils::json;
+use crate::handlers::http::alerts::ALERTS;
use super::{AlertState, CallableTarget, Context};
@@ -42,7 +42,13 @@ pub enum Retry {
Finite(usize),
}
-#[derive(Debug, serde::Serialize, serde::Deserialize)]
+impl Default for Retry {
+ fn default() -> Self {
+ Retry::Finite(1)
+ }
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
#[serde(rename_all = "lowercase")]
#[serde(try_from = "TargetVerifier")]
pub struct Target {
@@ -57,11 +63,12 @@ impl Target {
let timeout = &self.timeout;
let resolves = context.alert_info.alert_state;
let mut state = timeout.state.lock().unwrap();
+ state.alert_state = resolves;
match resolves {
- AlertState::SetToFiring => {
- state.alert_state = AlertState::Firing;
+ AlertState::Triggered => {
if !state.timed_out {
+ trace!("state not timed out- {state:?}");
// set state
state.timed_out = true;
state.awaiting_resolve = true;
@@ -70,8 +77,8 @@ impl Target {
call_target(self.target.clone(), context)
}
}
- AlertState::Resolved => {
- state.alert_state = AlertState::Listening;
+ alert_state @ (AlertState::Resolved | AlertState::Silenced) => {
+ state.alert_state = alert_state;
if state.timed_out {
// if in timeout and resolve came in, only process if it's the first one ( awaiting resolve )
if state.awaiting_resolve {
@@ -84,42 +91,51 @@ impl Target {
call_target(self.target.clone(), context);
}
- _ => unreachable!(),
}
}
- fn spawn_timeout_task(&self, repeat: &Timeout, alert_context: Context) {
- let state = Arc::clone(&repeat.state);
- let retry = repeat.times;
- let timeout = repeat.interval;
+ fn spawn_timeout_task(&self, target_timeout: &Timeout, alert_context: Context) {
+ trace!("repeat-\n{target_timeout:?}\ncontext-\n{alert_context:?}");
+ let state = Arc::clone(&target_timeout.state);
+ let retry = target_timeout.times;
+ let timeout = target_timeout.interval;
let target = self.target.clone();
+ let alert_id = alert_context.alert_info.alert_id.clone();
- let sleep_and_check_if_call = move |timeout_state: Arc>| {
- async move {
- tokio::time::sleep(timeout).await;
- let mut state = timeout_state.lock().unwrap();
- if state.alert_state == AlertState::Firing {
- // it is still firing .. sleep more and come back
- state.awaiting_resolve = true;
- true
- } else {
- state.timed_out = false;
- false
+ let sleep_and_check_if_call =
+ move |timeout_state: Arc>, current_state: AlertState| {
+ async move {
+ tokio::time::sleep(timeout).await;
+
+ let mut state = timeout_state.lock().unwrap();
+
+ if current_state == AlertState::Triggered {
+ // it is still firing .. sleep more and come back
+ state.awaiting_resolve = true;
+ true
+ } else {
+ state.timed_out = false;
+ false
+ }
}
- }
- };
+ };
- actix_web::rt::spawn(async move {
+ trace!("Spawning retry task");
+ tokio::spawn(async move {
match retry {
Retry::Infinite => loop {
- let should_call = sleep_and_check_if_call(Arc::clone(&state)).await;
+ let current_state = ALERTS.get_state(&alert_id).await.unwrap();
+ let should_call =
+ sleep_and_check_if_call(Arc::clone(&state), current_state).await;
if should_call {
call_target(target.clone(), alert_context.clone())
}
},
Retry::Finite(times) => {
for _ in 0..times {
- let should_call = sleep_and_check_if_call(Arc::clone(&state)).await;
+ let current_state = ALERTS.get_state(&alert_id).await.unwrap();
+ let should_call =
+ sleep_and_check_if_call(Arc::clone(&state), current_state).await;
if should_call {
call_target(target.clone(), alert_context.clone())
}
@@ -128,9 +144,9 @@ impl Target {
// Stream might be dead and sending too many alerts is not great
// Send and alert stating that this alert will only work once it has seen a RESOLVE
state.lock().unwrap().timed_out = false;
- let mut context = alert_context;
- context.alert_info.message = format!(
- "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves");
+ let context = alert_context;
+ // context.alert_info.message = format!(
+ // "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves");
// Send and exit this task.
call_target(target, context);
}
@@ -140,7 +156,7 @@ impl Target {
}
fn call_target(target: TargetType, context: Context) {
- actix_web::rt::spawn(async move { target.call(&context).await });
+ tokio::spawn(async move { target.call(&context).await });
}
#[derive(Debug, serde::Deserialize)]
@@ -230,13 +246,15 @@ impl CallableTarget for SlackWebHook {
.expect("Client can be constructed on this system");
let alert = match payload.alert_info.alert_state {
- AlertState::SetToFiring => {
+ AlertState::Triggered => {
serde_json::json!({ "text": payload.default_alert_string() })
}
AlertState::Resolved => {
serde_json::json!({ "text": payload.default_resolved_string() })
}
- _ => unreachable!(),
+ AlertState::Silenced => {
+ serde_json::json!({ "text": payload.default_silenced_string() })
+ }
};
if let Err(e) = client.post(&self.endpoint).json(&alert).send().await {
@@ -268,9 +286,9 @@ impl CallableTarget for OtherWebHook {
.expect("Client can be constructed on this system");
let alert = match payload.alert_info.alert_state {
- AlertState::SetToFiring => payload.default_alert_string(),
+ AlertState::Triggered => payload.default_alert_string(),
AlertState::Resolved => payload.default_resolved_string(),
- _ => unreachable!(),
+ AlertState::Silenced => payload.default_silenced_string(),
};
let request = client
@@ -318,33 +336,33 @@ impl CallableTarget for AlertManager {
let mut alerts = serde_json::json!([{
"labels": {
"alertname": payload.alert_info.alert_name,
- "stream": payload.stream,
+ // "stream": payload.stream,
"deployment_instance": payload.deployment_info.deployment_instance,
"deployment_id": payload.deployment_info.deployment_id,
"deployment_mode": payload.deployment_info.deployment_mode
},
"annotations": {
- "message": payload.alert_info.message,
- "reason": payload.alert_info.reason
+ "message": "MESSAGE",
+ "reason": "REASON"
}
}]);
let alert = &mut alerts[0];
- alert["labels"].as_object_mut().expect("is object").extend(
- payload
- .additional_labels
- .as_object()
- .expect("is object")
- .iter()
- // filter non null values for alertmanager and only pass strings
- .filter(|(_, value)| !value.is_null())
- .map(|(k, value)| (k.to_owned(), json::convert_to_string(value))),
- );
+ // alert["labels"].as_object_mut().expect("is object").extend(
+ // payload
+ // .additional_labels
+ // .as_object()
+ // .expect("is object")
+ // .iter()
+ // // filter non null values for alertmanager and only pass strings
+ // .filter(|(_, value)| !value.is_null())
+ // .map(|(k, value)| (k.to_owned(), json::convert_to_string(value))),
+ // );
// fill in status label accordingly
match payload.alert_info.alert_state {
- AlertState::SetToFiring => alert["labels"]["status"] = "firing".into(),
+ AlertState::Triggered => alert["labels"]["status"] = "triggered".into(),
AlertState::Resolved => {
alert["labels"]["status"] = "resolved".into();
alert["annotations"]["reason"] =
@@ -353,7 +371,14 @@ impl CallableTarget for AlertManager {
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
.into();
}
- _ => unreachable!(),
+ AlertState::Silenced => {
+ alert["labels"]["status"] = "silenced".into();
+ alert["annotations"]["reason"] =
+ serde_json::Value::String(payload.default_silenced_string());
+ // alert["endsAt"] = Utc::now()
+ // .to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
+ // .into();
+ }
};
if let Err(e) = client.post(&self.endpoint).json(&alerts).send().await {
@@ -362,10 +387,11 @@ impl CallableTarget for AlertManager {
}
}
-#[derive(Debug, serde::Serialize, serde::Deserialize)]
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
pub struct Timeout {
#[serde(with = "humantime_serde")]
pub interval: Duration,
+ #[serde(default = "Retry::default")]
pub times: Retry,
#[serde(skip)]
pub state: Arc>,
@@ -374,8 +400,8 @@ pub struct Timeout {
impl Default for Timeout {
fn default() -> Self {
Self {
- interval: Duration::from_secs(200),
- times: Retry::Finite(5),
+ interval: Duration::from_secs(60),
+ times: Retry::default(),
state: Arc::>::default(),
}
}
diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs
index 4321e42e9..483cc21f0 100644
--- a/src/handlers/http/logstream.rs
+++ b/src/handlers/http/logstream.rs
@@ -24,7 +24,6 @@ use super::modal::utils::logstream_utils::{
create_stream_and_schema_from_storage, create_update_stream,
};
use super::query::update_schema_when_distributed;
-use crate::alerts::Alerts;
use crate::catalog::get_first_event;
use crate::event::format::update_data_type_to_datetime;
use crate::handlers::STREAM_TYPE_KEY;
@@ -129,7 +128,7 @@ pub async fn schema(req: HttpRequest) -> Result {
}
Err(err) => return Err(StreamError::from(err)),
};
- match update_schema_when_distributed(vec![stream_name.clone()]).await {
+ match update_schema_when_distributed(&vec![stream_name.clone()]).await {
Ok(_) => {
let schema = STREAM_INFO.schema(&stream_name)?;
Ok((web::Json(schema), StatusCode::OK))
@@ -141,39 +140,6 @@ pub async fn schema(req: HttpRequest) -> Result {
}
}
-pub async fn get_alert(req: HttpRequest) -> Result {
- let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
-
- let alerts = metadata::STREAM_INFO
- .read()
- .expect(metadata::LOCK_EXPECT)
- .get(&stream_name)
- .map(|metadata| {
- serde_json::to_value(&metadata.alerts).expect("alerts can serialize to valid json")
- });
-
- let mut alerts = match alerts {
- Some(alerts) => alerts,
- None => {
- let alerts = CONFIG
- .storage()
- .get_object_store()
- .get_alerts(&stream_name)
- .await?;
-
- if alerts.alerts.is_empty() {
- return Err(StreamError::NoAlertsSet);
- }
-
- serde_json::to_value(alerts).expect("alerts can serialize to valid json")
- }
- };
-
- remove_id_from_alerts(&mut alerts);
-
- Ok((web::Json(alerts), StatusCode::OK))
-}
-
pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
@@ -182,73 +148,6 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result,
-) -> Result {
- let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
-
- let mut body = body.into_inner();
- remove_id_from_alerts(&mut body);
-
- let alerts: Alerts = match serde_json::from_value(body) {
- Ok(alerts) => alerts,
- Err(err) => {
- return Err(StreamError::BadAlertJson {
- stream: stream_name,
- err,
- })
- }
- };
-
- validator::alert(&alerts)?;
-
- if !STREAM_INFO.stream_initialized(&stream_name)? {
- // For query mode, if the stream not found in memory map,
- //check if it exists in the storage
- //create stream and schema from storage
- if CONFIG.parseable.mode == Mode::Query {
- match create_stream_and_schema_from_storage(&stream_name).await {
- Ok(true) => {}
- Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
- }
- } else {
- return Err(StreamError::UninitializedLogstream);
- }
- }
-
- let schema = STREAM_INFO.schema(&stream_name)?;
- for alert in &alerts.alerts {
- for column in alert.message.extract_column_names() {
- let is_valid = alert.message.valid(&schema, column);
- if !is_valid {
- return Err(StreamError::InvalidAlertMessage(
- alert.name.to_owned(),
- column.to_string(),
- ));
- }
- if !alert.rule.valid_for_schema(&schema) {
- return Err(StreamError::InvalidAlert(alert.name.to_owned()));
- }
- }
- }
-
- CONFIG
- .storage()
- .get_object_store()
- .put_alerts(&stream_name, &alerts)
- .await?;
-
- metadata::STREAM_INFO
- .set_alert(&stream_name, alerts)
- .expect("alerts set on existing stream");
-
- Ok((
- format!("set alert configuration for log stream {stream_name}"),
- StatusCode::OK,
- ))
-}
-
pub async fn get_retention(req: HttpRequest) -> Result {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
if !STREAM_INFO.stream_exists(&stream_name) {
@@ -457,17 +356,6 @@ pub fn first_event_at_empty(stream_name: &str) -> bool {
true
}
-fn remove_id_from_alerts(value: &mut Value) {
- if let Some(Value::Array(alerts)) = value.get_mut("alerts") {
- alerts
- .iter_mut()
- .map_while(|alert| alert.as_object_mut())
- .for_each(|map| {
- map.remove("id");
- });
- }
-}
-
pub async fn create_stream(
stream_name: String,
time_partition: &str,
diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs
index f627b613a..217f588ba 100644
--- a/src/handlers/http/mod.rs
+++ b/src/handlers/http/mod.rs
@@ -28,6 +28,7 @@ use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY};
use self::{cluster::get_ingestor_info, query::Query};
pub mod about;
+pub mod alerts;
pub mod cluster;
pub mod health_check;
pub mod ingest;
diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs
index 792bb6571..f3e1e369c 100644
--- a/src/handlers/http/modal/query_server.rs
+++ b/src/handlers/http/modal/query_server.rs
@@ -17,11 +17,13 @@
*/
use crate::handlers::airplane;
+use crate::handlers::http::alerts::ALERTS;
+use crate::handlers::http::base_path;
+use crate::handlers::http::caching_removed;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
use crate::handlers::http::{self, role};
-use crate::handlers::http::{base_path, caching_removed};
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
use crate::hottier::HotTierManager;
use crate::rbac::role::Action;
@@ -94,8 +96,18 @@ impl ParseableServer for QueryServer {
//create internal stream at server start
create_internal_stream_if_not_exists().await?;
- FILTERS.load().await?;
- DASHBOARDS.load().await?;
+ if let Err(err) = FILTERS.load().await {
+ error!("{err}")
+ };
+
+ if let Err(err) = DASHBOARDS.load().await {
+ error!("{err}")
+ };
+
+ if let Err(err) = ALERTS.load().await {
+ error!("{err}")
+ };
+
// track all parquet files already in the data directory
storage::retention::load_retention_from_global();
@@ -280,21 +292,21 @@ impl QueryServer {
.authorize_for_stream(Action::GetStreamInfo),
),
)
- .service(
- web::resource("/alert")
- // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream
- .route(
- web::put()
- .to(logstream::put_alert)
- .authorize_for_stream(Action::PutAlert),
- )
- // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream
- .route(
- web::get()
- .to(logstream::get_alert)
- .authorize_for_stream(Action::GetAlert),
- ),
- )
+ // .service(
+ // web::resource("/alert")
+ // // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream
+ // .route(
+ // web::put()
+ // .to(logstream::put_alert)
+ // .authorize_for_stream(Action::PutAlert),
+ // )
+ // // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream
+ // .route(
+ // web::get()
+ // .to(logstream::get_alert)
+ // .authorize_for_stream(Action::GetAlert),
+ // ),
+ // )
.service(
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs
index 6c0ec9fd8..37d2775a8 100644
--- a/src/handlers/http/modal/server.rs
+++ b/src/handlers/http/modal/server.rs
@@ -19,6 +19,8 @@
use crate::analytics;
use crate::handlers;
use crate::handlers::http::about;
+use crate::handlers::http::alerts;
+use crate::handlers::http::alerts::ALERTS;
use crate::handlers::http::base_path;
use crate::handlers::http::caching_removed;
use crate::handlers::http::health_check;
@@ -81,7 +83,8 @@ impl ParseableServer for Server {
.service(Self::get_llm_webscope())
.service(Self::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope())
- .service(Self::get_metrics_webscope()),
+ .service(Self::get_metrics_webscope())
+ .service(Self::get_alerts_webscope()),
)
.service(Self::get_ingest_otel_factory())
.service(Self::get_generated());
@@ -102,8 +105,17 @@ impl ParseableServer for Server {
migration::run_migration(&CONFIG).await?;
- FILTERS.load().await?;
- DASHBOARDS.load().await?;
+ if let Err(err) = FILTERS.load().await {
+ error!("{err}")
+ };
+
+ if let Err(err) = DASHBOARDS.load().await {
+ error!("{err}")
+ };
+
+ if let Err(err) = ALERTS.load().await {
+ error!("{err}")
+ };
storage::retention::load_retention_from_global();
@@ -172,6 +184,48 @@ impl Server {
)
}
+ pub fn get_alerts_webscope() -> Scope {
+ web::scope("/alerts")
+ .service(
+ web::resource("")
+ .route(
+ web::get()
+ .to(alerts::http_handlers::list)
+ .authorize(Action::GetAlert),
+ )
+ .route(
+ web::post()
+ .to(alerts::http_handlers::post)
+ .authorize(Action::PutAlert),
+ ),
+ )
+ .service(
+ web::resource("/{alert_id}")
+ .route(
+ web::get()
+ .to(alerts::http_handlers::get)
+ .authorize(Action::GetAlert),
+ )
+ .route(
+ web::put()
+ .to(alerts::http_handlers::modify)
+ .authorize(Action::PutAlert),
+ )
+ .route(
+ web::delete()
+ .to(alerts::http_handlers::delete)
+ .authorize(Action::DeleteAlert),
+ ),
+ )
+ .service(
+ web::resource("/{alert_id}/update_state").route(
+ web::put()
+ .to(alerts::http_handlers::update_state)
+ .authorize(Action::PutAlert),
+ ),
+ )
+ }
+
// get the dashboards web scope
pub fn get_dashboards_webscope() -> Scope {
web::scope("/dashboards")
@@ -305,21 +359,6 @@ impl Server {
.authorize_for_stream(Action::GetStreamInfo),
),
)
- .service(
- web::resource("/alert")
- // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream
- .route(
- web::put()
- .to(logstream::put_alert)
- .authorize_for_stream(Action::PutAlert),
- )
- // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream
- .route(
- web::get()
- .to(logstream::get_alert)
- .authorize_for_stream(Action::GetAlert),
- ),
- )
.service(
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs
index 27414b9d0..64c8244a7 100644
--- a/src/handlers/http/query.rs
+++ b/src/handlers/http/query.rs
@@ -29,7 +29,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
-use tracing::error;
+use tracing::{error, trace};
use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
@@ -85,12 +85,12 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result) -> Result<(), QueryError> {
+pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), QueryError> {
if CONFIG.parseable.mode == Mode::Query {
for table in tables {
- if let Ok(new_schema) = fetch_schema(&table).await {
+ if let Ok(new_schema) = fetch_schema(table).await {
// commit schema merges the schema internally and updates the schema in storage.
- commit_schema_to_storage(&table, new_schema.clone()).await?;
+ commit_schema_to_storage(table, new_schema.clone()).await?;
- commit_schema(&table, Arc::new(new_schema))?;
+ commit_schema(table, Arc::new(new_schema))?;
}
}
}
@@ -156,38 +156,42 @@ pub async fn create_streams_for_querier() {
pub fn authorize_and_set_filter_tags(
query: &mut LogicalQuery,
permissions: Vec,
- table_name: &str,
+ tables: &Vec,
) -> Result<(), QueryError> {
// check authorization of this query if it references physical table;
let mut authorized = false;
- let mut tags = Vec::new();
-
- // in permission check if user can run query on the stream.
- // also while iterating add any filter tags for this stream
- for permission in permissions {
- match permission {
- Permission::Stream(Action::All, _) => {
- authorized = true;
- break;
- }
- Permission::StreamWithTag(Action::Query, ref stream, tag)
- if stream == table_name || stream == "*" =>
- {
- authorized = true;
- if let Some(tag) = tag {
- tags.push(tag)
+
+ trace!("table names in auth- {tables:?}");
+ for table_name in tables.iter() {
+ let mut tags = Vec::new();
+
+ // in permission check if user can run query on the stream.
+ // also while iterating add any filter tags for this stream
+ for permission in &permissions {
+ match permission {
+ Permission::Stream(Action::All, _) => {
+ authorized = true;
+ break;
}
+ Permission::StreamWithTag(Action::Query, ref stream, tag)
+ if stream == table_name || stream == "*" =>
+ {
+ authorized = true;
+ if let Some(tag) = tag {
+ tags.push(tag.clone())
+ }
+ }
+ _ => (),
}
- _ => (),
}
- }
- if !authorized {
- return Err(QueryError::Unauthorized);
- }
+ if !authorized {
+ return Err(QueryError::Unauthorized);
+ }
- if !tags.is_empty() {
- query.filter_tag = Some(tags)
+ if !tags.is_empty() {
+ query.filter_tag = Some(tags)
+ }
}
Ok(())
diff --git a/src/lib.rs b/src/lib.rs
index ddb7f8244..a3e2110ac 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -17,7 +17,6 @@
*/
mod about;
-mod alerts;
pub mod analytics;
pub mod banner;
mod catalog;
diff --git a/src/metadata.rs b/src/metadata.rs
index 1fe01034c..4fee6cb4d 100644
--- a/src/metadata.rs
+++ b/src/metadata.rs
@@ -16,7 +16,6 @@
*
*/
-use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
use chrono::{Local, NaiveDateTime};
use itertools::Itertools;
@@ -25,8 +24,7 @@ use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
-use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
-use crate::alerts::Alerts;
+use self::error::stream_info::{LoadError, MetadataError};
use crate::metrics::{
fetch_stats_from_storage, EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE,
EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED,
@@ -47,7 +45,6 @@ pub struct StreamInfo(RwLock>);
#[derive(Debug, Default)]
pub struct LogStreamMetadata {
pub schema: HashMap>,
- pub alerts: Alerts,
pub retention: Option,
pub cache_enabled: bool,
pub created_at: String,
@@ -70,32 +67,11 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding
// 4. When first event is sent to stream (update the schema)
// 5. When set alert API is called (update the alert)
impl StreamInfo {
- pub async fn check_alerts(
- &self,
- stream_name: &str,
- rb: &RecordBatch,
- ) -> Result<(), CheckAlertError> {
- let map = self.read().expect(LOCK_EXPECT);
- let meta = map
- .get(stream_name)
- .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?;
-
- for alert in &meta.alerts.alerts {
- alert.check_alert(stream_name, rb.clone())
- }
-
- Ok(())
- }
-
pub fn stream_exists(&self, stream_name: &str) -> bool {
let map = self.read().expect(LOCK_EXPECT);
map.contains_key(stream_name)
}
- pub fn stream_initialized(&self, stream_name: &str) -> Result {
- Ok(!self.schema(stream_name)?.fields.is_empty())
- }
-
pub fn get_first_event(&self, stream_name: &str) -> Result