Skip to content

Commit 8caad44

Browse files
committed
feat: Impl Query Server
Add: Schema API for Ingestion Server RM: Redundant API Endpoints feat: Last min data is Queryable Add: Unit Tests
1 parent a39b9f7 commit 8caad44

File tree

7 files changed

+292
-51
lines changed

7 files changed

+292
-51
lines changed

server/src/handlers/http.rs

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
*/
1818

1919
use actix_cors::Cors;
20+
use arrow_schema::Schema;
21+
use serde_json::Value;
22+
23+
use self::{modal::query_server::QueryServer, query::Query};
2024

2125
pub(crate) mod about;
2226
pub(crate) mod health_check;
@@ -33,11 +37,11 @@ pub(crate) mod rbac;
3337
pub(crate) mod role;
3438

3539
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
36-
pub const API_BASE_PATH: &str = "/api";
40+
pub const API_BASE_PATH: &str = "api";
3741
pub const API_VERSION: &str = "v1";
3842

3943
pub(crate) fn base_path() -> String {
40-
format!("{API_BASE_PATH}/{API_VERSION}")
44+
format!("/{API_BASE_PATH}/{API_VERSION}")
4145
}
4246

4347
pub fn metrics_path() -> String {
@@ -53,5 +57,69 @@ pub(crate) fn cross_origin_config() -> Cors {
5357
}
5458

5559
pub fn base_path_without_preceding_slash() -> String {
56-
base_path().trim_start_matches('/').to_string()
60+
format!("/{API_BASE_PATH}/{API_VERSION}")
61+
}
62+
63+
pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
64+
let mut res = vec![];
65+
let ima = QueryServer::get_ingester_info().await.unwrap();
66+
67+
for im in ima {
68+
let uri = format!(
69+
"{}{}/logstream/{}/schema",
70+
im.domain_name,
71+
base_path_without_preceding_slash(),
72+
stream_name
73+
);
74+
let reqw = reqwest::Client::new()
75+
.get(uri)
76+
.header(http::header::AUTHORIZATION, im.token.clone())
77+
.header(http::header::CONTENT_TYPE, "application/json")
78+
.send()
79+
.await?;
80+
81+
if reqw.status().is_success() {
82+
let v = serde_json::from_slice(&reqw.bytes().await?)?;
83+
res.push(v);
84+
}
85+
}
86+
87+
let new_schema = Schema::try_merge(res)?;
88+
89+
Ok(new_schema)
90+
}
91+
92+
pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
93+
// send the query request to the ingester
94+
let mut res = vec![];
95+
let ima = QueryServer::get_ingester_info().await.unwrap();
96+
97+
for im in ima.iter() {
98+
let uri = format!(
99+
"{}{}/{}",
100+
im.domain_name,
101+
base_path_without_preceding_slash(),
102+
"query"
103+
);
104+
let reqw = reqwest::Client::new()
105+
.post(uri)
106+
.json(query)
107+
.header(http::header::AUTHORIZATION, im.token.clone())
108+
.header(http::header::CONTENT_TYPE, "application/json")
109+
.send()
110+
.await?;
111+
112+
if reqw.status().is_success() {
113+
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
114+
// the value returned is an array of json objects
115+
// so it needs to be flattened
116+
if let Some(arr) = v.as_array() {
117+
for val in arr {
118+
res.push(val.to_owned())
119+
}
120+
}
121+
}
122+
}
123+
124+
Ok(res)
57125
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -125,29 +125,7 @@ impl IngestServer {
125125
.service(Self::logstream_api()),
126126
)
127127
.service(Server::get_liveness_factory())
128-
.service(Server::get_readiness_factory())
129-
.service(Self::get_metrics_webscope());
130-
}
131-
132-
fn get_metrics_webscope() -> Scope {
133-
web::scope("/logstream").service(
134-
web::scope("/{logstream}")
135-
.service(
136-
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
137-
web::resource("/schema").route(
138-
web::get()
139-
.to(logstream::schema)
140-
.authorize_for_stream(Action::GetSchema),
141-
),
142-
)
143-
.service(
144-
web::resource("/stats").route(
145-
web::get()
146-
.to(logstream::get_stats)
147-
.authorize_for_stream(Action::GetStats),
148-
),
149-
),
150-
)
128+
.service(Server::get_readiness_factory());
151129
}
152130

153131
fn logstream_api() -> Scope {
@@ -175,6 +153,14 @@ impl IngestServer {
175153
)
176154
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
177155
)
156+
.service(
157+
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
158+
web::resource("/schema").route(
159+
web::get()
160+
.to(logstream::schema)
161+
.authorize_for_stream(Action::GetSchema),
162+
),
163+
)
178164
.service(
179165
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
180166
web::resource("/stats").route(

server/src/handlers/http/modal/query_server.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,8 @@ use relative_path::RelativePathBuf;
3535
use reqwest::Response;
3636
use serde::{Deserialize, Serialize};
3737
use std::sync::Arc;
38-
use tokio::io::AsyncWriteExt;
3938
use url::Url;
4039

41-
use tokio::fs::File as TokioFile;
42-
4340
use crate::option::CONFIG;
4441

4542
use super::server::Server;
@@ -170,10 +167,6 @@ impl QueryServer {
170167

171168
// TODO: add validation logic here
172169
// validate the ingester metadata
173-
174-
let mut f = Self::get_meta_file().await;
175-
// writer the arr in f
176-
let _ = f.write(serde_json::to_string(&arr)?.as_bytes()).await?;
177170
Ok(arr)
178171
}
179172

@@ -224,8 +217,11 @@ impl QueryServer {
224217
/// initialize the server, run migrations as needed and start the server
225218
async fn initialize(&self) -> anyhow::Result<()> {
226219
migration::run_metadata_migration(&CONFIG).await?;
220+
227221
let metadata = storage::resolve_parseable_metadata().await?;
222+
// do not commit the below line
228223
tokio::fs::File::create(CONFIG.staging_dir().join(".query.json")).await?;
224+
229225
banner::print(&CONFIG, &metadata).await;
230226

231227
// initialize the rbac map
@@ -276,17 +272,6 @@ impl QueryServer {
276272
}
277273
}
278274

279-
async fn get_meta_file() -> TokioFile {
280-
let meta_path = CONFIG.staging_dir().join(".query.json");
281-
282-
tokio::fs::OpenOptions::new()
283-
.read(true)
284-
.write(true)
285-
.open(meta_path)
286-
.await
287-
.unwrap()
288-
}
289-
290275
// forward the request to all ingesters to keep them in sync
291276
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
292277
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {

server/src/handlers/http/query.rs

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,26 @@ use futures_util::Future;
2626
use http::StatusCode;
2727
use std::collections::HashMap;
2828
use std::pin::Pin;
29+
use std::sync::Arc;
2930
use std::time::Instant;
3031

32+
use crate::handlers::http::fetch_schema;
33+
34+
use crate::event::commit_schema;
3135
use crate::metrics::QUERY_EXECUTE_TIME;
36+
use crate::option::{Mode, CONFIG};
3237
use crate::query::error::ExecuteError;
3338
use crate::query::QUERY_SESSION;
3439
use crate::rbac::role::{Action, Permission};
3540
use crate::rbac::Users;
3641
use crate::response::QueryResponse;
42+
use crate::storage::object_storage::commit_schema_to_storage;
3743
use crate::utils::actix::extract_session_key_from_req;
3844

45+
use super::send_query_request_to_ingester;
46+
3947
/// Query Request through http endpoint.
40-
#[derive(Debug, serde::Deserialize)]
48+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
4149
#[serde(rename_all = "camelCase")]
4250
pub struct Query {
4351
query: String,
@@ -52,11 +60,39 @@ pub struct Query {
5260
}
5361

5462
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
55-
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
56-
let permissions = Users.get_permissions(&creds);
5763
let session_state = QUERY_SESSION.state();
5864
let mut query = into_query(&query_request, &session_state).await?;
5965

66+
if CONFIG.parseable.mode == Mode::Query {
67+
if let Ok(new_schema) = fetch_schema(&query.table_name().unwrap()).await {
68+
commit_schema_to_storage(&query.table_name().unwrap(), new_schema.clone())
69+
.await
70+
.map_err(|err| {
71+
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
72+
})?;
73+
commit_schema(&query.table_name().unwrap(), Arc::new(new_schema))
74+
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
75+
}
76+
}
77+
78+
// ? run this code only if the query start time and now is less than 1 minute + margin
79+
let mmem = if CONFIG.parseable.mode == Mode::Query {
80+
// create a new query to send to the ingesters
81+
if let Some(que) = transform_query_for_ingester(&query_request) {
82+
let vals = send_query_request_to_ingester(&que)
83+
.await
84+
.map_err(|err| QueryError::Custom(err.to_string()))?;
85+
Some(vals)
86+
} else {
87+
None
88+
}
89+
} else {
90+
None
91+
};
92+
93+
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
94+
let permissions = Users.get_permissions(&creds);
95+
6096
// check authorization of this query if it references physical table;
6197
let table_name = query.table_name();
6298
if let Some(ref table) = table_name {
@@ -101,7 +137,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
101137
fill_null: query_request.send_null,
102138
with_fields: query_request.fields,
103139
}
104-
.to_http();
140+
.to_http(mmem);
105141

106142
if let Some(table) = table_name {
107143
let time = time.elapsed().as_secs_f64();
@@ -183,6 +219,41 @@ async fn into_query(
183219
})
184220
}
185221

222+
fn transform_query_for_ingester(query: &Query) -> Option<Query> {
223+
if query.query.is_empty() {
224+
return None;
225+
}
226+
227+
if query.start_time.is_empty() {
228+
return None;
229+
}
230+
231+
if query.end_time.is_empty() {
232+
return None;
233+
}
234+
235+
let end_time: DateTime<Utc> = if query.end_time == "now" {
236+
Utc::now()
237+
} else {
238+
DateTime::parse_from_rfc3339(&query.end_time)
239+
.ok()?
240+
.with_timezone(&Utc)
241+
};
242+
243+
let start_time = end_time - chrono::Duration::minutes(1);
244+
// when transforming the query, the ingesters are forced to return an array of values
245+
let q = Query {
246+
query: query.query.clone(),
247+
fields: false,
248+
filter_tags: query.filter_tags.clone(),
249+
send_null: query.send_null,
250+
start_time: start_time.to_rfc3339(),
251+
end_time: end_time.to_rfc3339(),
252+
};
253+
254+
Some(q)
255+
}
256+
186257
#[derive(Debug, thiserror::Error)]
187258
pub enum QueryError {
188259
#[error("Query cannot be empty")]
@@ -207,6 +278,8 @@ pub enum QueryError {
207278
Datafusion(#[from] DataFusionError),
208279
#[error("Execution Error: {0}")]
209280
Execute(#[from] ExecuteError),
281+
#[error("Error: {0}")]
282+
Custom(String),
210283
}
211284

212285
impl actix_web::ResponseError for QueryError {

0 commit comments

Comments
 (0)