Skip to content

Commit

Permalink
feat: allow historical data ingestion based on user defined time (#683)
Browse files Browse the repository at this point in the history
This PR adds enhancement to use a user provided timestamp for partition 
in ingesting logs instead of using server time.

User needs to add custom header X-P-Time-Partition (optional) at stream 
creation api to allow ingestion/query using timestamp column from the 
log data instead of server time p_timestamp

This is time_partition field name is stored in stream.json and in memory 
STREAM_INFO in ingest api. Server checks if timestamp column name exists in 
the log event, if not, throw exception. Also, checks if timestamp value can be 
parsed into datetime, if not, throw exception arrow file name gets the date, 
hr, mm from the timestamp field (if defined in stream) else file name gets 
the date, hr, mm from the server time parquet file name gets a random 
number attached to it. This is because a lot of log data can have same 
date, hr, mm value of the timestamp field and with this random number, 
parquet will not get overwritten in the console, query from and to date will 
be matched against the value of the timestamp column of the log data (if 
defined in the stream), else from and to date will be matched against the 
p_timestamp column.

Fixes #671 
Fixes #685
  • Loading branch information
nikhilsinhaparseable authored Mar 7, 2024
1 parent 3f9a2c5 commit bc730ee
Show file tree
Hide file tree
Showing 17 changed files with 436 additions and 257 deletions.
99 changes: 66 additions & 33 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use relative_path::RelativePathBuf;

use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError},
};
Expand Down Expand Up @@ -69,25 +70,46 @@ impl ManifestFile for manifest::File {
}
}

fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.max)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
fn get_file_bounds(
file: &manifest::File,
partition_column: String,
) -> (DateTime<Utc>, DateTime<Utc>) {
if partition_column == DEFAULT_TIMESTAMP_KEY {
match file
.columns()
.iter()
.find(|col| col.name == partition_column)
.unwrap()
.stats
.as_ref()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.max)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
} else {
match file
.columns()
.iter()
.find(|col| col.name == partition_column)
.unwrap()
.stats
.as_ref()
.unwrap()
{
column::TypedStatistics::String(stats) => (
stats.min.parse::<DateTime<Utc>>().unwrap(),
stats.max.parse::<DateTime<Utc>>().unwrap(),
),
_ => unreachable!(),
}
}
}

Expand All @@ -97,10 +119,19 @@ pub async fn update_snapshot(
change: manifest::File,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;

let (lower_bound, _) = get_file_bounds(&change);
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
let time_partition = meta.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(&change, time_partition);
lower_bound
}
None => {
let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});
Expand All @@ -109,16 +140,18 @@ pub async fn update_snapshot(
// This updates an existing file so there is no need to create a snapshot entry.
if let Some(pos) = pos {
let info = &mut manifests[pos];
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
let Some(mut manifest) = storage.get_manifest(&path).await? else {
let manifest_path =
partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);

let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
storage.put_manifest(&manifest_path, manifest).await?;
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
Expand Down Expand Up @@ -148,7 +181,7 @@ pub async fn update_snapshot(
time_upper_bound: upper_bound,
};
manifests.push(new_snapshot_entriy);
storage.put_snapshot(stream_name, meta).await?;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

Ok(())
Expand All @@ -160,13 +193,13 @@ pub async fn remove_manifest_from_snapshot(
dates: Vec<String>,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;

// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));

storage.put_snapshot(stream_name, meta).await?;
storage.put_snapshot(stream_name, meta.snapshot).await?;
Ok(())
}

Expand All @@ -175,8 +208,8 @@ pub async fn get_first_event(
stream_name: &str,
) -> Result<Option<String>, ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;

if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
Expand All @@ -199,7 +232,7 @@ pub async fn get_first_event(
};

if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event);
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
}
Expand Down
3 changes: 1 addition & 2 deletions server/src/catalog/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ pub fn create_from_parquet_file(
let columns = column_statistics(row_groups);
manifest_file.columns = columns.into_values().collect();
let mut sort_orders = sort_order(row_groups);

if let Some(last_sort_order) = sort_orders.pop() {
if sort_orders
.into_iter()
Expand Down Expand Up @@ -155,7 +154,7 @@ fn sort_order(
})
.collect_vec();

sort_orders.push(sort_order)
sort_orders.push(sort_order);
}
sort_orders
}
Expand Down
15 changes: 11 additions & 4 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use itertools::Itertools;

use std::sync::Arc;

use crate::metadata;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::metadata;
use chrono::NaiveDateTime;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
Expand All @@ -42,6 +42,7 @@ pub struct Event {
pub origin_format: &'static str,
pub origin_size: u64,
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
}

// Events holds the schema related to a each event for a single log stream
Expand All @@ -54,7 +55,12 @@ impl Event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Self::process_event(&self.stream_name, &key, self.rb.clone())?;
Self::process_event(
&self.stream_name,
&key,
self.rb.clone(),
self.parsed_timestamp,
)?;

metadata::STREAM_INFO.update_stats(
&self.stream_name,
Expand All @@ -81,8 +87,9 @@ impl Event {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?;
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub trait EventFormat: Sized {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
};

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
if get_field(&schema, DEFAULT_METADATA_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_METADATA_KEY
));
};

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
Expand Down
29 changes: 18 additions & 11 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::utils;
use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
use arrow_array::{RecordBatch, TimestampMillisecondArray};
use arrow_schema::Schema;
use chrono::Utc;
use chrono::{NaiveDateTime, Utc};
use derive_more::{Deref, DerefMut};
use once_cell::sync::Lazy;

Expand All @@ -48,6 +48,7 @@ impl Writer {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand All @@ -56,7 +57,8 @@ impl Writer {
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

self.disk.push(stream_name, schema_key, &rb)?;
self.disk
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
self.mem.push(schema_key, rb);
Ok(())
}
Expand All @@ -72,29 +74,34 @@ impl WriterTable {
stream_name: &str,
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let hashmap_guard = self.read().unwrap();

match hashmap_guard.get(stream_name) {
Some(stream_writer) => {
stream_writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
}
None => {
drop(hashmap_guard);
let mut map = self.write().unwrap();
// check for race condition
// if map contains entry then just
if let Some(writer) = map.get(stream_name) {
writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
} else {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record)?;
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
Expand Down
43 changes: 16 additions & 27 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*
*/

use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use chrono::NaiveDateTime;
use derive_more::{Deref, DerefMut};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;

use crate::storage::staging::StorageDir;

Expand All @@ -44,27 +44,17 @@ impl FileWriter {
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
writer
.writer
.write(record)
.map_err(StreamWriterError::Writer)?;
}
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
file_path: path,
writer,
},
);
}
};
let (path, writer) =
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
file_path: path,
writer,
},
);

Ok(())
}
Expand All @@ -80,10 +70,10 @@ fn init_new_stream_writer_file(
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key);

let path = dir.path_by_current_time(schema_key, parsed_timestamp);
std::fs::create_dir_all(dir.data_path)?;

let file = OpenOptions::new().create(true).append(true).open(&path)?;
Expand All @@ -94,6 +84,5 @@ fn init_new_stream_writer_file(
stream_writer
.write(record)
.map_err(StreamWriterError::Writer)?;

Ok((path, stream_writer))
}
Loading

0 comments on commit bc730ee

Please sign in to comment.