Skip to content

Commit

Permalink
rm dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Mar 13, 2024
1 parent aa6927f commit 337dae8
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 54 deletions.
6 changes: 4 additions & 2 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ pub fn base_path_without_preceding_slash() -> String {
base_path().trim_start_matches('/').to_string()
}


pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_schema::Schema>> {
let mut res = vec![];
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima {
// todo:
// TODO: when you rebase the code from the Cluster Info PR update this uri generation
let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name);
let reqw = reqwest::Client::new()
.get(uri)
Expand All @@ -89,6 +88,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima.iter() {
// TODO: when you rebase the code from the Cluster Info PR update this uri generation
let uri = format!("{}api/v1/{}", im.domain_name, "query");
let reqw = reqwest::Client::new()
.post(uri)
Expand All @@ -100,6 +100,8 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec

if reqw.status().is_success() {
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
// the value returned is an array of json objects
// so it needs to be flattened
if let Some(arr) = v.as_array() {
for val in arr {
res.push(val.clone())
Expand Down
24 changes: 1 addition & 23 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,7 @@ impl IngestServer {
.service(Self::logstream_api()),
)
.service(Server::get_liveness_factory())
.service(Server::get_readiness_factory())
.service(Self::get_metrics_webscope());
}

fn get_metrics_webscope() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}")
.service(
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
web::get()
.to(logstream::schema)
.authorize_for_stream(Action::GetSchema),
),
)
.service(
web::resource("/stats").route(
web::get()
.to(logstream::get_stats)
.authorize_for_stream(Action::GetStats),
),
),
)
.service(Server::get_readiness_factory());
}

fn logstream_api() -> Scope {
Expand Down
42 changes: 13 additions & 29 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use arrow_json::reader::infer_json_schema_from_iterator;
use chrono::{DateTime, Utc};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
Expand All @@ -30,10 +29,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

// Eshan's Code Under test
#[allow(unused_imports)]
use arrow_schema::Schema;
#[allow(unused_imports)]
use crate::handlers::http::send_schema_request;

use crate::event::commit_schema;
Expand Down Expand Up @@ -68,40 +64,28 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
let session_state = QUERY_SESSION.state();
let mut query = into_query(&query_request, &session_state).await?;

// if CONFIG.parseable.mode == Mode::Query {
// if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await {
// let new_schema =
// Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?;
if CONFIG.parseable.mode == Mode::Query {
if let Ok(schs) = send_schema_request(&query.table_name().unwrap()).await {
let new_schema =
Schema::try_merge(schs).map_err(|err| QueryError::Custom(err.to_string()))?;

// commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone()))
// .map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
commit_schema(&query.table_name().unwrap(), Arc::new(new_schema.clone()))
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;

// commit_schema_to_storage(&query.table_name().unwrap(), new_schema)
// .await
// .map_err(|err| {
// QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
// })?;
// }
// }
commit_schema_to_storage(&query.table_name().unwrap(), new_schema)
.await
.map_err(|err| {
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
})?;
}
}

let mmem = if CONFIG.parseable.mode == Mode::Query {
// create a new query to send to the ingestors
if let Some(que) = transform_query_for_ingestor(&query_request) {
let vals = send_request_to_ingestor(&que)
.await
.map_err(|err| QueryError::Custom(err.to_string()))?;
let infered_schema = infer_json_schema_from_iterator(vals.iter().map(Ok)).map_err(|err| {
QueryError::Custom(format!("Error inferring schema from iterator\nError:{err}"))
})?;

commit_schema(&query.table_name().unwrap(), Arc::new(infered_schema.clone()))
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;

commit_schema_to_storage(&query.table_name().unwrap(), infered_schema)
.await
.map_err(|err| {
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
})?;

Some(vals)
} else {
Expand Down

0 comments on commit 337dae8

Please sign in to comment.