Skip to content

Commit

Permalink
feat: Impl Query Server
Browse files Browse the repository at this point in the history
1. send_request_ingest_server to return proper result
2. fix: bug if there is no parquet files in store
3. Query Server Now gets all the resultant data
  • Loading branch information
Eshanatnight committed Mar 8, 2024
1 parent eaa53b5 commit 0178f27
Show file tree
Hide file tree
Showing 14 changed files with 702 additions and 122 deletions.
43 changes: 34 additions & 9 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ pub(crate) mod query;
pub(crate) mod rbac;
pub(crate) mod role;

// this needs to be removed from here. It is in modal->mod.rs
// include!(concat!(env!("OUT_DIR"), "/generated.rs"));

pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
pub const API_BASE_PATH: &str = "/api";
pub const API_VERSION: &str = "v1";
Expand All @@ -58,23 +55,51 @@ pub(crate) fn cross_origin_config() -> Cors {
}
}

pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_schema::Schema>> {
let mut res = vec![];
let ima = QueryServer::get_ingestor_info().await.unwrap();
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima {
let uri = format!("{}{}/{}",im.domain_name, base_path(), "query");
let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name);
let reqw = reqwest::Client::new()
.get(uri)
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v = serde_json::from_slice(&reqw.bytes().await?)?;
res.push(v);
}
}

Ok(res)
}

pub async fn send_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
let mut res = vec![];
let ima = QueryServer::get_ingester_info().await.unwrap();

for im in ima.iter() {
let uri = format!("{}api/v1/{}", im.domain_name, "query");

let reqw = reqwest::Client::new()
.post(uri)
.json(query)
.basic_auth("admin", Some("admin"))
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
res.push(v);
if let Some(arr) = v.as_array() {
for val in arr {
res.push(val.clone())
}
}
}
}

Expand Down
14 changes: 13 additions & 1 deletion server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::handlers::{
STREAM_NAME_HEADER_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};

use super::logstream::error::CreateStreamError;
Expand Down Expand Up @@ -140,7 +141,18 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
if STREAM_INFO.stream_exists(stream_name) {
return Ok(());
}
super::logstream::create_stream(stream_name.to_string()).await?;

match &CONFIG.parseable.mode {
Mode::All | Mode::Query => {
super::logstream::create_stream(stream_name.to_string()).await?;
}
Mode::Ingest => {
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} not found. Has it been created?",
stream_name
)));
}
}
Ok(())
}

Expand Down
74 changes: 55 additions & 19 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ use serde_json::Value;

use crate::alerts::Alerts;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::option::{Mode, CONFIG};
use crate::storage::retention::Retention;
use crate::storage::{LogStream, StorageDir};
use crate::{catalog, event, stats};
use crate::{metadata, validator};

use self::error::{CreateStreamError, StreamError};

use super::modal::query_server::{self, IngestionStats, QueriedStats, StorageStats};

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

Expand Down Expand Up @@ -111,7 +113,6 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>

pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if metadata::STREAM_INFO.stream_exists(&stream_name) {
// Error if the log stream already exists
return Err(StreamError::Custom {
Expand All @@ -121,7 +122,11 @@ pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError>
status: StatusCode::BAD_REQUEST,
});
}
create_stream(stream_name).await?;
if CONFIG.parseable.mode == Mode::Query {
query_server::QueryServer::sync_streams_with_ingesters(&stream_name).await?;
}

create_stream(stream_name.clone()).await?;

Ok(("log stream created", StatusCode::OK))
}
Expand Down Expand Up @@ -279,30 +284,62 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

if CONFIG.parseable.mode == Mode::Query {
let stats = query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?;
let stats = serde_json::to_value(stats).unwrap();
return Ok((web::Json(stats), StatusCode::OK));
}

let hash_map = STREAM_INFO.read().unwrap();
let stream_meta = &hash_map
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let time = Utc::now();
let qstats = match &stream_meta.first_event_at {
Some(first_event_at) => {
let ingestion_stats = IngestionStats::new(
stats.events,
format!("{} {}", stats.ingestion, "Bytes"),
"json",
);
let storage_stats =
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");

QueriedStats::new(
&stream_name,
&stream_meta.created_at,
Some(first_event_at.to_owned()),
time,
ingestion_stats,
storage_stats,
)
}

let stats = serde_json::json!({
"stream": stream_name,
"creation_time": &stream_meta.created_at,
"first_event_at": Some(&stream_meta.first_event_at),
"time": time,
"ingestion": {
"count": stats.events,
"size": format!("{} {}", stats.ingestion, "Bytes"),
"format": "json"
},
"storage": {
"size": format!("{} {}", stats.storage, "Bytes"),
"format": "parquet"
// ? this case should not happen
None => {
let ingestion_stats = IngestionStats::new(
stats.events,
format!("{} {}", stats.ingestion, "Bytes"),
"json",
);
let storage_stats =
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");

QueriedStats::new(
&stream_name,
&stream_meta.created_at,
Some('0'.to_string()),
time,
ingestion_stats,
storage_stats,
)
}
});
};

let out_stats = serde_json::to_value(qstats).unwrap();

Ok((web::Json(stats), StatusCode::OK))
Ok((web::Json(out_stats), StatusCode::OK))
}

// Check if the first_event_at is empty
Expand Down Expand Up @@ -345,7 +382,6 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
let created_at = stream_meta.unwrap().created_at;

metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);

Ok(())
}

Expand Down
80 changes: 57 additions & 23 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::analytics;
use crate::banner;
use crate::handlers::http::logstream;
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE;
use crate::localcache::LocalCacheManager;
use crate::metadata;
use crate::metrics;
Expand All @@ -30,8 +31,6 @@ use crate::storage::ObjectStorageError;
use crate::storage::PARSEABLE_METADATA_FILE_NAME;
use crate::sync;

use std::net::SocketAddr;

use super::server::Server;
use super::ssl_acceptor::get_ssl_acceptor;
use super::IngesterMetadata;
Expand Down Expand Up @@ -63,8 +62,8 @@ impl ParseableServer for IngestServer {
prometheus: PrometheusMetrics,
_oidc_client: Option<crate::oidc::OpenidConfig>,
) -> anyhow::Result<()> {
// set the ingestor metadata
self.set_ingestor_metadata().await?;
// set the ingester metadata
self.set_ingester_metadata().await?;

// get the ssl stuff
let ssl = get_ssl_acceptor(
Expand Down Expand Up @@ -99,11 +98,9 @@ impl ParseableServer for IngestServer {

/// implement the init method will just invoke the initialize method
async fn init(&self) -> anyhow::Result<()> {
// self.validate()?;
self.initialize().await
}

#[allow(unused)]
fn validate(&self) -> anyhow::Result<()> {
if CONFIG.get_storage_mode_string() == "Local drive" {
return Err(anyhow::Error::msg(
Expand All @@ -123,8 +120,9 @@ impl IngestServer {
.service(
// Base path "{url}/api/v1"
web::scope(&base_path())
.service(Server::get_query_factory())
.service(Server::get_ingest_factory()),
.service(Server::get_query_factory())
.service(Server::get_ingest_factory())
.service(Self::logstream_api()),
)
.service(Server::get_liveness_factory())
.service(Server::get_readiness_factory())
Expand Down Expand Up @@ -152,46 +150,82 @@ impl IngestServer {
)
}

#[inline(always)]
fn get_ingestor_address(&self) -> SocketAddr {
// this might cause an issue down the line
// best is to make the Cli Struct better, but thats a chore
(CONFIG.parseable.address.clone())
.parse::<SocketAddr>()
.unwrap()
fn logstream_api() -> Scope {
web::scope("/logstream")
.service(
// GET "/logstream" ==> Get list of all Log Streams on the server
web::resource("")
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
)
.service(
web::scope("/{logstream}")
.service(
web::resource("")
// PUT "/logstream/{logstream}" ==> Create log stream
.route(
web::put()
.to(logstream::put_stream)
.authorize_for_stream(Action::CreateStream),
)
// DELETE "/logstream/{logstream}" ==> Delete log stream
.route(
web::delete()
.to(logstream::delete)
.authorize_for_stream(Action::DeleteStream),
)
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
)
.service(
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
web::get()
.to(logstream::schema)
.authorize_for_stream(Action::GetSchema),
),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource("/stats").route(
web::get()
.to(logstream::get_stats)
.authorize_for_stream(Action::GetStats),
),
),
)
}

// create the ingestor metadata and put the .ingestor.json file in the object store
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
// create the ingester metadata and put the .ingester.json file in the object store
async fn set_ingester_metadata(&self) -> anyhow::Result<()> {
let store = CONFIG.storage().get_object_store();

// remove ip adn go with the domain name
let sock = self.get_ingestor_address();
let sock = Server::get_server_address();
let path = RelativePathBuf::from(format!(
"ingestor.{}.{}.json",
"ingester.{}.{}.json",
sock.ip(), // this might be wrong
sock.port()
));

if store.get_object(&path).await.is_ok() {
println!("Ingestor metadata already exists");
println!("Ingester metadata already exists");
return Ok(());
};

let scheme = CONFIG.parseable.get_scheme();
let resource = IngesterMetadata::new(
sock.port().to_string(),
CONFIG
.parseable
.domain_address
.clone()
.unwrap_or_else(|| {
Url::parse(&format!("http://{}:{}", sock.ip(), sock.port())).unwrap()
Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap()
})
.to_string(),
DEFAULT_VERSION.to_string(),
store.get_bucket_name(),
&CONFIG.parseable.username,
&CONFIG.parseable.password, // is this secure?
&CONFIG.parseable.password,
);

let resource = serde_json::to_string(&resource)
Expand All @@ -205,7 +239,7 @@ impl IngestServer {
}

// check for querier state. Is it there, or was it there in the past
// this should happen before the set the ingestor metadata
// this should happen before the set the ingester metadata
async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> {
// how do we check for querier state?
// based on the work flow of the system, the querier will always need to start first
Expand Down
Loading

0 comments on commit 0178f27

Please sign in to comment.