Skip to content

Commit

Permalink
feat: allow historical ingestion only when date column provided in he…
Browse files Browse the repository at this point in the history
…ader x-p-time-partition and server time are within the same minute, no change for default ingestions
  • Loading branch information
nikhilsinhaparseable committed Mar 26, 2024
1 parent 72502ad commit f4ae7d8
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 72 deletions.
4 changes: 3 additions & 1 deletion server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ pub trait EventFormat: Sized {
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
fn into_recordbatch(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?;
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema, time_partition)?;

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
Expand Down
3 changes: 2 additions & 1 deletion server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ impl EventFormat for Event {
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data)?;
let data = flatten_json_body(self.data, time_partition)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';

Expand Down
70 changes: 53 additions & 17 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,14 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
.schema
.clone();
into_event_batch(req, body, schema)?

let time_partition = hash_map
.get(&stream_name)
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
.time_partition
.clone();

into_event_batch(req, body, schema, time_partition)?
};

event::Event {
Expand All @@ -119,6 +126,7 @@ fn into_event_batch(
req: HttpRequest,
body: Bytes,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> {
let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?;
let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?;
Expand All @@ -129,7 +137,7 @@ fn into_event_batch(
tags,
metadata,
};
let (rb, is_first) = event.into_recordbatch(schema)?;
let (rb, is_first) = event.into_recordbatch(schema, time_partition)?;
Ok((size, rb, is_first))
}

Expand All @@ -138,7 +146,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
if STREAM_INFO.stream_exists(stream_name) {
return Ok(());
}
super::logstream::create_stream(stream_name.to_string()).await?;
super::logstream::create_stream(stream_name.to_string(), "").await?;
Ok(())
}

Expand Down Expand Up @@ -241,6 +249,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -287,6 +296,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -320,8 +330,13 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (_, rb, _) =
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
let (_, rb, _) = into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None,
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -353,10 +368,13 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,)
.is_err()
);
assert!(into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None
)
.is_err());
}

#[test]
Expand All @@ -374,8 +392,13 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (_, rb, _) =
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
let (_, rb, _) = into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None,
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand All @@ -391,6 +414,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None
)
.is_err())
}
Expand Down Expand Up @@ -419,6 +443,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -472,6 +497,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -521,8 +547,13 @@ mod tests {
);
let req = TestRequest::default().to_http_request();

let (_, rb, _) =
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
let (_, rb, _) = into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None,
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -566,6 +597,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -612,10 +644,13 @@ mod tests {
.into_iter(),
);

assert!(
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,)
.is_err()
);
assert!(into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None
)
.is_err());
}

#[test]
Expand Down Expand Up @@ -647,6 +682,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down
35 changes: 26 additions & 9 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@

use std::fs;

use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
use chrono::Utc;
use serde_json::Value;

use self::error::{CreateStreamError, StreamError};
use crate::alerts::Alerts;
use crate::handlers::TIME_PARTITION_KEY;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::{retention::Retention, LogStream, StorageDir};
use crate::{catalog, event, stats};
use crate::{metadata, validator};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
use chrono::Utc;
use serde_json::Value;

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
Expand Down Expand Up @@ -108,6 +108,16 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
}

pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError> {
let time_partition = if let Some((_, time_partition_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == TIME_PARTITION_KEY)
{
time_partition_name.to_str().unwrap()
} else {
""
};

let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if metadata::STREAM_INFO.stream_exists(&stream_name) {
Expand All @@ -119,7 +129,7 @@ pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError>
status: StatusCode::BAD_REQUEST,
});
} else {
create_stream(stream_name).await?;
create_stream(stream_name, time_partition).await?;
}

Ok(("log stream created", StatusCode::OK))
Expand Down Expand Up @@ -326,13 +336,16 @@ fn remove_id_from_alerts(value: &mut Value) {
}
}

pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> {
pub async fn create_stream(
stream_name: String,
time_partition: &str,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
validator::stream_name(&stream_name)?;

// Proceed to create log stream if it doesn't exist
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage.create_stream(&stream_name).await {
if let Err(err) = storage.create_stream(&stream_name, time_partition).await {
return Err(CreateStreamError::Storage { stream_name, err });
}

Expand All @@ -344,7 +357,11 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
let stream_meta = stream_meta.unwrap();
let created_at = stream_meta.created_at;

metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);
metadata::STREAM_INFO.add_stream(
stream_name.to_string(),
created_at,
time_partition.to_string(),
);

Ok(())
}
Expand Down
14 changes: 13 additions & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ impl StreamInfo {
.map(|metadata| metadata.cache_enabled)
}

pub fn get_time_partition(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.time_partition.clone())
}

pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
let stream = map
Expand Down Expand Up @@ -143,14 +150,19 @@ impl StreamInfo {
})
}

pub fn add_stream(&self, stream_name: String, created_at: String) {
pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String) {
let mut map = self.write().expect(LOCK_EXPECT);
let metadata = LogStreamMetadata {
created_at: if created_at.is_empty() {
Local::now().to_rfc3339()
} else {
created_at
},
time_partition: if time_partition.is_empty() {
None
} else {
Some(time_partition)
},
..Default::default()
};
map.insert(stream_name, metadata);
Expand Down
16 changes: 14 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,20 @@ pub trait ObjectStorage: Sync + 'static {
Ok(())
}

async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
async fn create_stream(
&self,
stream_name: &str,
time_partition: &str,
) -> Result<(), ObjectStorageError> {
let mut format = ObjectStoreFormat::default();
format.set_id(CONFIG.parseable.username.clone());
let permission = Permisssion::new(CONFIG.parseable.username.clone());
format.permissions = vec![permission];
if time_partition.is_empty() {
format.time_partition = None;
} else {
format.time_partition = Some(time_partition.to_string());
}
let format_json = to_bytes(&format);
self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty()))
.await?;
Expand Down Expand Up @@ -325,8 +334,11 @@ pub trait ObjectStorage: Sync + 'static {
let cache_enabled = STREAM_INFO
.cache_enabled(stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
let time_partition = STREAM_INFO
.get_time_partition(stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
let dir = StorageDir::new(stream);
let schema = convert_disk_files_to_parquet(stream, &dir)
let schema = convert_disk_files_to_parquet(stream, &dir, time_partition)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;

if let Some(schema) = schema {
Expand Down
Loading

0 comments on commit f4ae7d8

Please sign in to comment.