Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for time partition limit #792

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl EventFormat for Event {
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, false)?;
let data = flatten_json_body(self.data, None, None, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';
Expand Down
8 changes: 7 additions & 1 deletion server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
.map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?;

let time_partition = object_store_format.time_partition;
let time_partition_limit = object_store_format.time_partition_limit;
let static_schema_flag = object_store_format.static_schema_flag;
let body_val: Value = serde_json::from_slice(&body)?;
let size: usize = body.len();
Expand All @@ -129,7 +130,11 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
.process()
.await?;
} else {
let data = convert_array_to_object(body_val.clone(), time_partition.clone())?;
let data = convert_array_to_object(
body_val.clone(),
time_partition.clone(),
time_partition_limit,
)?;
for value in data {
let body_timestamp = value.get(&time_partition.clone().unwrap().to_string());
parsed_timestamp = body_timestamp
Expand Down Expand Up @@ -210,6 +215,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
stream_name.to_string(),
"",
"",
"",
Arc::new(Schema::empty()),
)
.await?;
Expand Down
39 changes: 37 additions & 2 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use crate::alerts::Alerts;
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY};
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
Expand All @@ -40,6 +40,7 @@ use itertools::Itertools;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::num::NonZeroU32;
use std::sync::Arc;

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Expand Down Expand Up @@ -191,6 +192,29 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
} else {
""
};
let mut time_partition_in_days: &str = "";
if let Some((_, time_partition_limit_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == TIME_PARTITION_LIMIT_KEY)
{
let time_partition_limit = time_partition_limit_name.to_str().unwrap();
if !time_partition_limit.ends_with('d') {
return Err(StreamError::Custom {
msg: "missing 'd' suffix for duration value".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
if days.parse::<NonZeroU32>().is_err() {
return Err(StreamError::Custom {
msg: "could not convert duration to an unsigned number".to_string(),
status: StatusCode::BAD_REQUEST,
});
} else {
time_partition_in_days = days;
}
}
let static_schema_flag = if let Some((_, static_schema_flag)) = req
.headers()
.iter()
Expand Down Expand Up @@ -235,7 +259,14 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
});
}

create_stream(stream_name, time_partition, static_schema_flag, schema).await?;
create_stream(
stream_name,
time_partition,
time_partition_in_days,
static_schema_flag,
schema,
)
.await?;

Ok(("log stream created", StatusCode::OK))
}
Expand Down Expand Up @@ -516,6 +547,7 @@ fn remove_id_from_alerts(value: &mut Value) {
pub async fn create_stream(
stream_name: String,
time_partition: &str,
time_partition_limit: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
) -> Result<(), CreateStreamError> {
Expand All @@ -528,6 +560,7 @@ pub async fn create_stream(
.create_stream(
&stream_name,
time_partition,
time_partition_limit,
static_schema_flag,
schema.clone(),
)
Expand Down Expand Up @@ -557,6 +590,7 @@ pub async fn create_stream(
stream_name.to_string(),
created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
static_schema_flag.to_string(),
static_schema,
);
Expand Down Expand Up @@ -595,6 +629,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
created_at: stream_meta.created_at.clone(),
first_event_at: stream_meta.first_event_at.clone(),
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta.time_partition_limit.clone(),
cache_enabled: stream_meta.cache_enabled,
static_schema_flag: stream_meta.static_schema_flag.clone(),
};
Expand Down
11 changes: 9 additions & 2 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
use crate::alerts::Alerts;
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE};
use crate::storage::{LogStream, ObjectStorage, StorageDir};
use crate::utils::arrow::MergedRecordReader;

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
use derive_more::{Deref, DerefMut};

// TODO: make return type be of 'static lifetime instead of cloning
Expand All @@ -47,6 +46,7 @@ pub struct LogStreamMetadata {
pub created_at: String,
pub first_event_at: Option<String>,
pub time_partition: Option<String>,
pub time_partition_limit: Option<String>,
pub static_schema_flag: Option<String>,
}

Expand Down Expand Up @@ -166,6 +166,7 @@ impl StreamInfo {
stream_name: String,
created_at: String,
time_partition: String,
time_partition_limit: String,
static_schema_flag: String,
static_schema: HashMap<String, Arc<Field>>,
) {
Expand All @@ -181,6 +182,11 @@ impl StreamInfo {
} else {
Some(time_partition)
},
time_partition_limit: if time_partition_limit.is_empty() {
None
} else {
Some(time_partition_limit)
},
static_schema_flag: if static_schema_flag != "true" {
None
} else {
Expand Down Expand Up @@ -237,6 +243,7 @@ impl StreamInfo {
created_at: meta.created_at,
first_event_at: meta.first_event_at,
time_partition: meta.time_partition,
time_partition_limit: meta.time_partition_limit,
static_schema_flag: meta.static_schema_flag,
};

Expand Down
10 changes: 7 additions & 3 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ mod s3;
pub mod staging;
mod store_metadata;

use self::retention::Retention;
pub use self::staging::StorageDir;
pub use localfs::FSConfig;
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::S3Config;
pub use store_metadata::{
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
};

use self::retention::Retention;
pub use self::staging::StorageDir;

// metadata file names in a Stream prefix
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
Expand Down Expand Up @@ -94,6 +93,8 @@ pub struct ObjectStoreFormat {
#[serde(skip_serializing_if = "Option::is_none")]
pub time_partition: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub time_partition_limit: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub static_schema_flag: Option<String>,
}

Expand All @@ -109,6 +110,8 @@ pub struct StreamInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub time_partition: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub time_partition_limit: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub static_schema_flag: Option<String>,
}

Expand Down Expand Up @@ -155,6 +158,7 @@ impl Default for ObjectStoreFormat {
cache_enabled: false,
retention: None,
time_partition: None,
time_partition_limit: None,
static_schema_flag: None,
}
}
Expand Down
6 changes: 6 additions & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub trait ObjectStorage: Sync + 'static {
&self,
stream_name: &str,
time_partition: &str,
time_partition_limit: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
) -> Result<(), ObjectStorageError> {
Expand All @@ -139,6 +140,11 @@ pub trait ObjectStorage: Sync + 'static {
} else {
format.time_partition = Some(time_partition.to_string());
}
if time_partition_limit.is_empty() {
format.time_partition_limit = None;
} else {
format.time_partition_limit = Some(time_partition_limit.to_string());
}
if static_schema_flag != "true" {
format.static_schema_flag = None;
} else {
Expand Down
12 changes: 10 additions & 2 deletions server/src/utils/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@ pub mod flatten;
pub fn flatten_json_body(
body: serde_json::Value,
time_partition: Option<String>,
time_partition_limit: Option<String>,
validation_required: bool,
) -> Result<Value, anyhow::Error> {
flatten::flatten(body, "_", time_partition, validation_required)
flatten::flatten(
body,
"_",
time_partition,
time_partition_limit,
validation_required,
)
}

pub fn convert_array_to_object(
body: Value,
time_partition: Option<String>,
time_partition_limit: Option<String>,
) -> Result<Vec<Value>, anyhow::Error> {
let data = flatten_json_body(body, time_partition, true)?;
let data = flatten_json_body(body, time_partition, time_partition_limit, true)?;
let value_arr = match data {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
Expand Down
Loading
Loading