Skip to content

Commit

Permalink
feat: allow ingestion with custom time partition (#790)
Browse files Browse the repository at this point in the history
This PR updates the time partition logic to check if time partition 
field value is not less than a month old then partition based on
 time partition field also, if static schema is provided
check if static schema has time partition field at the time of log creation

This PR is partly related to #752
  • Loading branch information
nikhilsinhaparseable authored May 5, 2024
1 parent 62aea42 commit ebf2c16
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 229 deletions.
36 changes: 31 additions & 5 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::handlers::http::base_path_without_preceding_slash;
use crate::option::CONFIG;
use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
query::PartialTimeFilter,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
Expand Down Expand Up @@ -73,14 +74,17 @@ impl ManifestFile for manifest::File {
}
}

fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
fn get_file_bounds(
file: &manifest::File,
partition_column: String,
) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.find(|col| col.name == partition_column)
.unwrap()
.stats
.clone()
.as_ref()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
Expand All @@ -98,8 +102,19 @@ pub async fn update_snapshot(
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = &mut meta.snapshot.manifest_list;
let (lower_bound, _) = get_file_bounds(&change);
let time_partition: Option<String> = meta_clone.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(&change, time_partition);
lower_bound
}
None => {
let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});
Expand Down Expand Up @@ -242,6 +257,7 @@ pub async fn get_first_event(
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
let time_partition = meta.time_partition;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
Expand All @@ -260,7 +276,17 @@ pub async fn get_first_event(
));
};
if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event);
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
lower_bound
}
None => {
let (lower_bound, _) =
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
}
}
Expand Down
23 changes: 18 additions & 5 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::sync::Arc;
use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::metadata;
use chrono::NaiveDateTime;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
Expand All @@ -41,19 +42,30 @@ pub struct Event {
pub origin_format: &'static str,
pub origin_size: u64,
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
pub time_partition: Option<String>,
}

// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);
let num_rows = self.rb.num_rows() as u64;
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
key = format!("{key}{parsed_timestamp_to_min}");
}

let num_rows = self.rb.num_rows() as u64;
if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Self::process_event(&self.stream_name, &key, self.rb.clone())?;
Self::process_event(
&self.stream_name,
&key,
self.rb.clone(),
self.parsed_timestamp,
)?;

metadata::STREAM_INFO.update_stats(
&self.stream_name,
Expand All @@ -80,8 +92,9 @@ impl Event {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?;
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
Ok(())
}
}
Expand All @@ -90,7 +103,7 @@ pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
// Fields must be sorted
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
for field in fields.iter().sorted_by_key(|v| v.name()) {
hasher.update(field.name().as_bytes())
hasher.update(field.name().as_bytes());
}
let hash = hasher.digest();
format!("{hash:x}")
Expand Down
36 changes: 32 additions & 4 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ pub trait EventFormat: Sized {
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
fn into_recordbatch(
self,
storage_schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema.clone(),
time_partition,
static_schema_flag.clone(),
time_partition.clone(),
)?;

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
Expand Down Expand Up @@ -96,10 +96,11 @@ pub trait EventFormat: Sized {
)));

// prepare the record batch and new fields to be added
let new_schema = Arc::new(Schema::new(schema));
let mut new_schema = Arc::new(Schema::new(schema));
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
return Err(anyhow!("Schema mismatch"));
}
new_schema = update_field_type_in_schema(new_schema, time_partition);
let rb = Self::decode(data, new_schema.clone())?;
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
let metadata_arr =
Expand Down Expand Up @@ -143,3 +144,30 @@ pub trait EventFormat: Sized {
true
}
}

pub fn update_field_type_in_schema(
schema: Arc<Schema>,
time_partition: Option<String>,
) -> Arc<Schema> {
if time_partition.is_none() {
return schema;
}
let field_name = time_partition.unwrap();
let new_schema: Vec<Field> = schema
.fields()
.iter()
.map(|field| {
if *field.name() == field_name {
if field.data_type() == &DataType::Utf8 {
let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None);
Field::new(field.name().clone(), new_data_type, true)
} else {
Field::new(field.name(), field.data_type().clone(), true)
}
} else {
Field::new(field.name(), field.data_type().clone(), true)
}
})
.collect();
Arc::new(Schema::new(new_schema))
}
11 changes: 8 additions & 3 deletions server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ impl EventFormat for Event {
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, time_partition)?;
let data = flatten_json_body(self.data, None, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand All @@ -68,7 +68,12 @@ impl EventFormat for Event {
let schema = match derive_arrow_schema(&stream_schema, fields) {
Ok(schema) => schema,
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
Ok(infer_schema) => {
Ok(mut infer_schema) => {
let new_infer_schema = super::super::format::update_field_type_in_schema(
Arc::new(infer_schema),
time_partition,
);
infer_schema = Schema::new(new_infer_schema.fields().clone());
if let Err(err) = Schema::try_merge(vec![
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
infer_schema.clone(),
Expand Down
31 changes: 19 additions & 12 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use std::{
sync::{Arc, Mutex, RwLock},
};

use crate::utils;

use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
use crate::utils;
use arrow_array::{RecordBatch, TimestampMillisecondArray};
use arrow_schema::Schema;
use chrono::NaiveDateTime;
use chrono::Utc;
use derive_more::{Deref, DerefMut};
use once_cell::sync::Lazy;
Expand All @@ -48,6 +48,7 @@ impl Writer {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand All @@ -56,7 +57,8 @@ impl Writer {
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

self.disk.push(stream_name, schema_key, &rb)?;
self.disk
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
self.mem.push(schema_key, rb);
Ok(())
}
Expand All @@ -72,29 +74,34 @@ impl WriterTable {
stream_name: &str,
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let hashmap_guard = self.read().unwrap();

match hashmap_guard.get(stream_name) {
Some(stream_writer) => {
stream_writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
}
None => {
drop(hashmap_guard);
let mut map = self.write().unwrap();
// check for race condition
// if map contains entry then just
if let Some(writer) = map.get(stream_name) {
writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
} else {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record)?;
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
Expand Down
11 changes: 7 additions & 4 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;

use crate::storage::staging::StorageDir;

use super::errors::StreamWriterError;
use crate::storage::staging::StorageDir;
use chrono::NaiveDateTime;

pub struct ArrowWriter {
pub file_path: PathBuf,
Expand All @@ -43,6 +43,7 @@ impl FileWriter {
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
Expand All @@ -54,7 +55,8 @@ impl FileWriter {
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
let (path, writer) =
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
Expand All @@ -79,9 +81,10 @@ fn init_new_stream_writer_file(
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key);
let path = dir.path_by_current_time(schema_key, parsed_timestamp);
std::fs::create_dir_all(dir.data_path)?;

let file = OpenOptions::new().create(true).append(true).open(&path)?;
Expand Down
Loading

0 comments on commit ebf2c16

Please sign in to comment.