Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Server Impl WIP #694

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/

use actix_cors::Cors;
use arrow_schema::Schema;
use serde_json::Value;

use self::{modal::query_server::QueryServer, query::Query};

pub(crate) mod about;
pub(crate) mod health_check;
Expand Down Expand Up @@ -55,3 +59,59 @@ pub(crate) fn cross_origin_config() -> Cors {
pub fn base_path_without_preceding_slash() -> String {
base_path().trim_start_matches('/').to_string()
}

pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
let mut res = vec![];
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima {
// TODO: when you rebase the code from the Cluster Info PR update this uri generation
let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name);
let reqw = reqwest::Client::new()
.get(uri)
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v = serde_json::from_slice(&reqw.bytes().await?)?;
res.push(v);
}
}

let new_schema = Schema::try_merge(res)?;

Ok(new_schema)
}

pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingester
let mut res = vec![];
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima.iter() {
// TODO: when you rebase the code from the Cluster Info PR update this uri generation
let uri = format!("{}api/v1/{}", im.domain_name, "query");
let reqw = reqwest::Client::new()
.post(uri)
.json(query)
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
// the value returned is an array of json objects
// so it needs to be flattened
if let Some(arr) = v.as_array() {
for val in arr {
res.push(val.to_owned())
}
}
}
}

Ok(res)
}
32 changes: 9 additions & 23 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,7 @@ impl IngestServer {
.service(Self::logstream_api()),
)
.service(Server::get_liveness_factory())
.service(Server::get_readiness_factory())
.service(Self::get_metrics_webscope());
}

fn get_metrics_webscope() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}")
.service(
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
web::get()
.to(logstream::schema)
.authorize_for_stream(Action::GetSchema),
),
)
.service(
web::resource("/stats").route(
web::get()
.to(logstream::get_stats)
.authorize_for_stream(Action::GetStats),
),
),
)
.service(Server::get_readiness_factory());
}

fn logstream_api() -> Scope {
Expand Down Expand Up @@ -175,6 +153,14 @@ impl IngestServer {
)
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
)
.service(
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
web::get()
.to(logstream::schema)
.authorize_for_stream(Action::GetSchema),
),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource("/stats").route(
Expand Down
21 changes: 3 additions & 18 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ use relative_path::RelativePathBuf;
use reqwest::Response;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use url::Url;

use tokio::fs::File as TokioFile;

use crate::option::CONFIG;

use super::server::Server;
Expand Down Expand Up @@ -170,10 +167,6 @@ impl QueryServer {

// TODO: add validation logic here
// validate the ingester metadata

let mut f = Self::get_meta_file().await;
// writer the arr in f
let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
Ok(arr)
}

Expand Down Expand Up @@ -224,8 +217,11 @@ impl QueryServer {
/// initialize the server, run migrations as needed and start the server
async fn initialize(&self) -> anyhow::Result<()> {
migration::run_metadata_migration(&CONFIG).await?;

let metadata = storage::resolve_parseable_metadata().await?;
// do not commit the below line
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;

banner::print(&CONFIG, &metadata).await;

// initialize the rbac map
Expand Down Expand Up @@ -276,17 +272,6 @@ impl QueryServer {
}
}

async fn get_meta_file() -> TokioFile {
let meta_path = CONFIG.staging_dir().join(".query.json");

tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.open(meta_path)
.await
.unwrap()
}

// forward the request to all ingesters to keep them in sync
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
Expand Down
84 changes: 80 additions & 4 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,26 @@ use futures_util::Future;
use http::StatusCode;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::QUERY_SESSION;
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::utils::actix::extract_session_key_from_req;

use super::send_query_request_to_ingester;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Query {
query: String,
Expand All @@ -52,11 +60,42 @@ pub struct Query {
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);
let session_state = QUERY_SESSION.state();
let mut query = into_query(&query_request, &session_state).await?;

if CONFIG.parseable.mode == Mode::Query {
if let Ok(new_schema) = fetch_schema(&query.table_name().unwrap()).await {
dbg!(new_schema.clone());

commit_schema_to_storage(&query.table_name().unwrap(), new_schema.clone())
.await
.map_err(|err| {
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
})?;
commit_schema(&query.table_name().unwrap(), Arc::new(new_schema))
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
}
}

// ? run this code only if the query start time and now is less than 1 minute + margin
let mmem = if CONFIG.parseable.mode == Mode::Query {
// create a new query to send to the ingesters
if let Some(que) = transform_query_for_ingester(&query_request) {
let vals = send_query_request_to_ingester(&que)
.await
.map_err(|err| QueryError::Custom(err.to_string()))?;
Some(vals)
} else {
None
}
} else {
None
};

dbg!(&mmem);
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);

// check authorization of this query if it references physical table;
let table_name = query.table_name();
if let Some(ref table) = table_name {
Expand Down Expand Up @@ -101,7 +140,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http();
.to_http(mmem);

if let Some(table) = table_name {
let time = time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -183,6 +222,41 @@ async fn into_query(
})
}

fn transform_query_for_ingester(query: &Query) -> Option<Query> {
if query.query.is_empty() {
return None;
}

if query.start_time.is_empty() {
return None;
}

if query.end_time.is_empty() {
return None;
}

let end_time: DateTime<Utc> = if query.end_time == "now" {
Utc::now()
} else {
DateTime::parse_from_rfc3339(&query.end_time)
.ok()?
.with_timezone(&Utc)
};

let start_time = end_time - chrono::Duration::minutes(1);
// when transforming the query, the ingesters are forced to return an array of values
let q = Query {
query: query.query.clone(),
fields: false,
filter_tags: query.filter_tags.clone(),
send_null: query.send_null,
start_time: start_time.to_rfc3339(),
end_time: end_time.to_rfc3339(),
};

Some(q)
}

#[derive(Debug, thiserror::Error)]
pub enum QueryError {
#[error("Query cannot be empty")]
Expand All @@ -207,6 +281,8 @@ pub enum QueryError {
Datafusion(#[from] DataFusionError),
#[error("Execution Error: {0}")]
Execute(#[from] ExecuteError),
#[error("Error: {0}")]
Custom(String),
}

impl actix_web::ResponseError for QueryError {
Expand Down
3 changes: 2 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ async fn main() -> anyhow::Result<()> {
};

// MODE == Query / Ingest and storage = local-store
server.validate()?;
// server.validate()?;

server.init().await?;

Ok(())
Expand Down
Loading
Loading