Skip to content

Commit

Permalink
multiple fixes on server:
Browse files Browse the repository at this point in the history
1. fixed banner spacing
2. modified server mode: All to Standalone, Ingest to Distributed (Ingest), Query to Distributed (Query)
3. updated server mode in about API response
4. updated logic for env var P_INGESTOR_URL to use HOSTNAME and PORT from env
5. remvoed put cache api from querier
6. added put cache api to ingester
  • Loading branch information
nikhilsinhaparseable committed Apr 10, 2024
1 parent 32fa2bc commit 70937d4
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 62 deletions.
6 changes: 3 additions & 3 deletions server/src/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn print_about(
eprint!(
"
{}
Version:\t\t\t\t\t\"v{}\"",
Version: \"v{}\"",
"About:".to_string().bold(),
current_version,
); // " " " "
Expand All @@ -103,8 +103,8 @@ pub fn print_about(

eprintln!(
"
Commit:\t\t\t\t\t\t\"{commit_hash}\"
Docs:\t\t\t\t\t\t\"https://logg.ing/docs\""
Commit: \"{commit_hash}\"
Docs: \"https://logg.ing/docs\""
);
}

Expand Down
18 changes: 9 additions & 9 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
eprintln!(
"
{}
Address:\t\t\t\t\t{}
Credentials:\t\t\t\t\t{}
Server Mode:\t\t\t\t\t\"{}\"
LLM Status:\t\t\t\t\t\"{}\"",
Address: {}
Credentials: {}
Server Mode: \"{}\"
LLM Status: \"{}\"",
"Server:".to_string().bold(),
address,
credentials,
config.parseable.mode.to_str(),
config.get_server_mode_string(),
llm_status
);
}
Expand All @@ -101,8 +101,8 @@ async fn storage_info(config: &Config) {
eprintln!(
"
{}
Storage Mode:\t\t\t\t\t\"{}\"
Staging Path:\t\t\t\t\t\"{}\"",
Storage Mode: \"{}\"
Staging Path: \"{}\"",
"Storage:".to_string().bold(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
Expand All @@ -116,7 +116,7 @@ async fn storage_info(config: &Config) {

eprintln!(
"\
{:8}Cache:\t\t\t\t\t\"{}\", (size: {})",
{:8}Cache: \"{}\", (size: {})",
"",
path.display(),
size
Expand All @@ -125,7 +125,7 @@ async fn storage_info(config: &Config) {

eprintln!(
"\
{:8}Store:\t\t\t\t\t\t\"{}\", (latency: {:?})",
{:8}Store: \"{}\", (latency: {:?})",
"",
storage.get_endpoint(),
latency
Expand Down
6 changes: 3 additions & 3 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub async fn update_snapshot(
let mut ch = false;
for m in manifests.iter() {
let s = get_address();
let p = format!("{}.{}.{}", s.0, s.1, MANIFEST_FILE);
let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE);
if m.manifest_path.contains(&p) {
ch = true;
}
Expand Down Expand Up @@ -152,7 +152,7 @@ pub async fn update_snapshot(
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let path =
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
Expand Down Expand Up @@ -186,7 +186,7 @@ pub async fn update_snapshot(
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
Expand Down
3 changes: 1 addition & 2 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ impl Cli {
.env("P_INGESTOR_URL")
.value_name("URL")
.required(false)
.value_parser(validation::socket_addr)
.help("URL to connect to this specific ingestor. Default is the address of the server.")
)
.arg(
Expand Down Expand Up @@ -371,7 +370,7 @@ impl FromArgMatches for Cli {
self.ingestor_url = m
.get_one::<String>(Self::INGESTOR_URL)
.cloned()
.unwrap_or_else(|| self.address.clone());
.unwrap_or_else(String::default);

self.local_staging_path = m
.get_one::<PathBuf>(Self::STAGING)
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn about() -> Json<serde_json::Value> {
let current_version = format!("v{}", current_release.released_version);
let commit = current_release.commit_hash;
let deployment_id = meta.deployment_id.to_string();
let mode = CONFIG.parseable.mode.to_str();
let mode = CONFIG.get_server_mode_string();
let staging = if CONFIG.parseable.mode == Mode::Query {
"".to_string()
} else {
Expand Down
12 changes: 11 additions & 1 deletion server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use super::OpenIdClient;
use super::ParseableServer;
use super::DEFAULT_VERSION;

use crate::utils::get_address;
use actix_web::body::MessageBody;
use actix_web::Scope;
use actix_web::{web, App, HttpServer};
Expand Down Expand Up @@ -196,6 +197,15 @@ impl IngestServer {
.to(logstream::get_stats)
.authorize_for_stream(Action::GetStats),
),
)
.service(
web::resource("/cache")
// PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream
.route(
web::put()
.to(logstream::put_enable_cache)
.authorize_for_stream(Action::PutCacheEnabled),
),
),
)
}
Expand All @@ -204,7 +214,7 @@ impl IngestServer {
async fn set_ingester_metadata(&self) -> anyhow::Result<()> {
let store = CONFIG.storage().get_object_store();

let sock = Server::get_server_address();
let sock = get_address();
let path = ingester_metadata_path(sock.ip().to_string(), sock.port().to_string());

if store.get_object(&path).await.is_ok() {
Expand Down
16 changes: 0 additions & 16 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use crate::migration;
use crate::rbac;
use crate::storage;
use crate::sync;
use std::net::SocketAddr;
use std::{fs::File, io::BufReader, sync::Arc};

use actix_web::web::resource;
Expand Down Expand Up @@ -270,12 +269,6 @@ impl Server {
)
.service(
web::resource("/cache")
// PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream
.route(
web::put()
.to(logstream::put_enable_cache)
.authorize_for_stream(Action::PutCacheEnabled),
)
// GET "/logstream/{logstream}/cache" ==> Get retention for given logstream
.route(
web::get()
Expand Down Expand Up @@ -475,13 +468,4 @@ impl Server {
};
}
}

#[inline(always)]
pub fn get_server_address() -> SocketAddr {
// this might cause an issue down the line
// best is to make the Cli Struct better, but thats a chore
(CONFIG.parseable.ingestor_url.clone())
.parse::<SocketAddr>()
.unwrap()
}
}
5 changes: 2 additions & 3 deletions server/src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::utils::get_address;
use prometheus_parse::Sample as PromSample;
use prometheus_parse::Value as PromValue;
use serde::Serialize;
use serde_json::Error as JsonError;
use serde_json::Value as JsonValue;

use crate::handlers::http::modal::server::Server;

#[derive(Debug, Serialize, Clone)]
pub struct Metrics {
address: String,
Expand All @@ -23,7 +22,7 @@ struct StorageMetrics {

impl Default for Metrics {
fn default() -> Self {
let socket = Server::get_server_address();
let socket = get_address();
let address = format!("http://{}:{}", socket.ip(), socket.port());
Metrics {
address,
Expand Down
8 changes: 8 additions & 0 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ impl Config {
}
"S3 bucket"
}

pub fn get_server_mode_string(&self) -> &str {
match self.parseable.mode {
Mode::Query => "Distributed (Query)",
Mode::Ingest => "Distributed (Ingest)",
Mode::All => "Standalone",
}
}
}

fn create_parseable_cli_command() -> Command {
Expand Down
22 changes: 16 additions & 6 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ pub trait ObjectStorage: Sync + 'static {
cache_updates
.entry(stream)
.or_default()
.push((absolute_path, file))
.push((absolute_path, file));
} else {
let _ = fs::remove_file(file);
}
Expand Down Expand Up @@ -539,8 +539,13 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {
fn schema_path(stream_name: &str) -> RelativePathBuf {
match CONFIG.parseable.mode {
Mode::Ingest => {
let (ip, port) = get_address();
let file_name = format!(".ingester.{}.{}{}", ip, port, SCHEMA_FILE_NAME);
let addr = get_address();
let file_name = format!(
".ingester.{}.{}{}",
addr.ip(),
addr.port(),
SCHEMA_FILE_NAME
);

RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
}
Expand All @@ -554,8 +559,13 @@ fn schema_path(stream_name: &str) -> RelativePathBuf {
pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
match &CONFIG.parseable.mode {
Mode::Ingest => {
let (ip, port) = get_address();
let file_name = format!(".ingester.{}.{}{}", ip, port, STREAM_METADATA_FILE_NAME);
let addr = get_address();
let file_name = format!(
".ingester.{}.{}{}",
addr.ip(),
addr.port(),
STREAM_METADATA_FILE_NAME
);
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
}
Mode::Query | Mode::All => RelativePathBuf::from_iter([
Expand All @@ -580,7 +590,7 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf {
#[inline(always)]
fn manifest_path(prefix: &str) -> RelativePathBuf {
let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
RelativePathBuf::from_iter([prefix, &mainfest_file_name])
}

Expand Down
18 changes: 8 additions & 10 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ use std::{
sync::Arc,
};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
metrics,
option::CONFIG,
storage::OBJECT_STORE_DATA_GRANULARITY,
utils::{self, arrow::merged_reader::MergedReverseRecordReader, get_address},
};
use arrow_schema::{ArrowError, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
use parquet::{
Expand All @@ -36,15 +43,6 @@ use parquet::{
schema::types::ColumnPath,
};

use super::super::handlers::http::modal::server::Server;
use crate::{
event::DEFAULT_TIMESTAMP_KEY,
metrics,
option::CONFIG,
storage::OBJECT_STORE_DATA_GRANULARITY,
utils::{self, arrow::merged_reader::MergedReverseRecordReader},
};

const ARROW_FILE_EXTENSION: &str = "data.arrows";
const PARQUET_FILE_EXTENSION: &str = "data.parquet";

Expand All @@ -65,7 +63,7 @@ impl StorageDir {
+ &utils::hour_to_prefix(time.hour())
+ &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap();
let local_uri = str::replace(&uri, "/", ".");
let sock = Server::get_server_address();
let sock = get_address();
let ip = sock.ip();
let port = sock.port();
format!("{local_uri}{ip}.{port}.{extention}")
Expand Down
35 changes: 27 additions & 8 deletions server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ pub mod header_parsing;
pub mod json;
pub mod uid;
pub mod update;

use std::net::{IpAddr, SocketAddr};

use chrono::{DateTime, NaiveDate, Timelike, Utc};

use crate::option::CONFIG;
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use std::env;
use std::net::SocketAddr;

#[allow(dead_code)]
pub fn hostname() -> Option<String> {
Expand Down Expand Up @@ -227,9 +225,30 @@ impl TimePeriod {
}

#[inline(always)]
pub fn get_address() -> (IpAddr, u16) {
let addr = CONFIG.parseable.ingestor_url.parse::<SocketAddr>().unwrap();
(addr.ip(), addr.port())
pub fn get_address() -> SocketAddr {
if CONFIG.parseable.ingestor_url.is_empty() {
CONFIG.parseable.address.parse::<SocketAddr>().unwrap()
} else {
let addr_from_env = CONFIG
.parseable
.ingestor_url
.split(':')
.collect::<Vec<&str>>();

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();
if hostname.starts_with('$') && port.starts_with('$') {
hostname = get_from_env("HOSTNAME");
port = get_from_env("PORT");
let addr = format!("{}:{}", hostname, port);
addr.parse::<SocketAddr>().unwrap()
} else {
CONFIG.parseable.ingestor_url.parse::<SocketAddr>().unwrap()
}
}
}
fn get_from_env(var_to_fetch: &str) -> String {
env::var(var_to_fetch).unwrap_or_else(|_| "".to_string())
}

#[cfg(test)]
Expand Down

0 comments on commit 70937d4

Please sign in to comment.