Skip to content

Commit 72502ad

Browse files
fix: removed time partition logic from ingestion flow (#703)
* removed the X-P-Time-Partition header in log stream creation API * removed the logic that partitions the ingested log based on the X-P-Time-Partition header value of which was stored in `stream.json` * Query still uses the logic to make query compatible with the external tool that ingests based on time partition
1 parent a04dd4f commit 72502ad

File tree

12 files changed

+190
-354
lines changed

12 files changed

+190
-354
lines changed

server/src/catalog.rs

Lines changed: 27 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use relative_path::RelativePathBuf;
2323

2424
use crate::{
2525
catalog::manifest::Manifest,
26-
event::DEFAULT_TIMESTAMP_KEY,
2726
query::PartialTimeFilter,
2827
storage::{ObjectStorage, ObjectStorageError},
2928
};
@@ -70,46 +69,25 @@ impl ManifestFile for manifest::File {
7069
}
7170
}
7271

73-
fn get_file_bounds(
74-
file: &manifest::File,
75-
partition_column: String,
76-
) -> (DateTime<Utc>, DateTime<Utc>) {
77-
if partition_column == DEFAULT_TIMESTAMP_KEY {
78-
match file
79-
.columns()
80-
.iter()
81-
.find(|col| col.name == partition_column)
82-
.unwrap()
83-
.stats
84-
.as_ref()
85-
.unwrap()
86-
{
87-
column::TypedStatistics::Int(stats) => (
88-
NaiveDateTime::from_timestamp_millis(stats.min)
89-
.unwrap()
90-
.and_utc(),
91-
NaiveDateTime::from_timestamp_millis(stats.max)
92-
.unwrap()
93-
.and_utc(),
94-
),
95-
_ => unreachable!(),
96-
}
97-
} else {
98-
match file
99-
.columns()
100-
.iter()
101-
.find(|col| col.name == partition_column)
102-
.unwrap()
103-
.stats
104-
.as_ref()
105-
.unwrap()
106-
{
107-
column::TypedStatistics::String(stats) => (
108-
stats.min.parse::<DateTime<Utc>>().unwrap(),
109-
stats.max.parse::<DateTime<Utc>>().unwrap(),
110-
),
111-
_ => unreachable!(),
112-
}
72+
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
73+
match file
74+
.columns()
75+
.iter()
76+
.find(|col| col.name == "p_timestamp")
77+
.unwrap()
78+
.stats
79+
.clone()
80+
.unwrap()
81+
{
82+
column::TypedStatistics::Int(stats) => (
83+
NaiveDateTime::from_timestamp_millis(stats.min)
84+
.unwrap()
85+
.and_utc(),
86+
NaiveDateTime::from_timestamp_millis(stats.max)
87+
.unwrap()
88+
.and_utc(),
89+
),
90+
_ => unreachable!(),
11391
}
11492
}
11593

@@ -121,17 +99,8 @@ pub async fn update_snapshot(
12199
// get current snapshot
122100
let mut meta = storage.get_object_store_format(stream_name).await?;
123101
let manifests = &mut meta.snapshot.manifest_list;
124-
let time_partition = meta.time_partition;
125-
let lower_bound = match time_partition {
126-
Some(time_partition) => {
127-
let (lower_bound, _) = get_file_bounds(&change, time_partition);
128-
lower_bound
129-
}
130-
None => {
131-
let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string());
132-
lower_bound
133-
}
134-
};
102+
103+
let (lower_bound, _) = get_file_bounds(&change);
135104
let pos = manifests.iter().position(|item| {
136105
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
137106
});
@@ -140,18 +109,16 @@ pub async fn update_snapshot(
140109
// This updates an existing file so there is no need to create a snapshot entry.
141110
if let Some(pos) = pos {
142111
let info = &mut manifests[pos];
143-
let manifest_path =
144-
partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
145-
146-
let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else {
112+
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
113+
let Some(mut manifest) = storage.get_manifest(&path).await? else {
147114
return Err(ObjectStorageError::UnhandledError(
148115
"Manifest found in snapshot but not in object-storage"
149116
.to_string()
150117
.into(),
151118
));
152119
};
153120
manifest.apply_change(change);
154-
storage.put_manifest(&manifest_path, manifest).await?;
121+
storage.put_manifest(&path, manifest).await?;
155122
} else {
156123
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
157124
let upper_bound = lower_bound
@@ -210,7 +177,6 @@ pub async fn get_first_event(
210177
// get current snapshot
211178
let mut meta = storage.get_object_store_format(stream_name).await?;
212179
let manifests = &mut meta.snapshot.manifest_list;
213-
let time_partition = meta.time_partition;
214180
if manifests.is_empty() {
215181
log::info!("No manifest found for stream {stream_name}");
216182
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
@@ -232,15 +198,9 @@ pub async fn get_first_event(
232198
};
233199

234200
if let Some(first_event) = manifest.files.first() {
235-
if let Some(time_partition) = time_partition {
236-
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
237-
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
238-
return Ok(Some(first_event_at));
239-
} else {
240-
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
241-
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
242-
return Ok(Some(first_event_at));
243-
}
201+
let (lower_bound, _) = get_file_bounds(first_event);
202+
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
203+
return Ok(Some(first_event_at));
244204
}
245205
Ok(None)
246206
}

server/src/event.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use std::sync::Arc;
2929
use self::error::EventError;
3030
pub use self::writer::STREAM_WRITERS;
3131
use crate::metadata;
32-
use chrono::NaiveDateTime;
3332

3433
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3534
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
@@ -42,7 +41,6 @@ pub struct Event {
4241
pub origin_format: &'static str,
4342
pub origin_size: u64,
4443
pub is_first_event: bool,
45-
pub parsed_timestamp: NaiveDateTime,
4644
}
4745

4846
// Events holds the schema related to a each event for a single log stream
@@ -55,12 +53,7 @@ impl Event {
5553
commit_schema(&self.stream_name, self.rb.schema())?;
5654
}
5755

58-
Self::process_event(
59-
&self.stream_name,
60-
&key,
61-
self.rb.clone(),
62-
self.parsed_timestamp,
63-
)?;
56+
Self::process_event(&self.stream_name, &key, self.rb.clone())?;
6457

6558
metadata::STREAM_INFO.update_stats(
6659
&self.stream_name,
@@ -87,9 +80,8 @@ impl Event {
8780
stream_name: &str,
8881
schema_key: &str,
8982
rb: RecordBatch,
90-
parsed_timestamp: NaiveDateTime,
9183
) -> Result<(), EventError> {
92-
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
84+
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?;
9385
Ok(())
9486
}
9587
}

server/src/event/writer.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::utils;
3030
use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
3131
use arrow_array::{RecordBatch, TimestampMillisecondArray};
3232
use arrow_schema::Schema;
33-
use chrono::{NaiveDateTime, Utc};
33+
use chrono::Utc;
3434
use derive_more::{Deref, DerefMut};
3535
use once_cell::sync::Lazy;
3636

@@ -48,7 +48,6 @@ impl Writer {
4848
stream_name: &str,
4949
schema_key: &str,
5050
rb: RecordBatch,
51-
parsed_timestamp: NaiveDateTime,
5251
) -> Result<(), StreamWriterError> {
5352
let rb = utils::arrow::replace_columns(
5453
rb.schema(),
@@ -57,8 +56,7 @@ impl Writer {
5756
&[Arc::new(get_timestamp_array(rb.num_rows()))],
5857
);
5958

60-
self.disk
61-
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
59+
self.disk.push(stream_name, schema_key, &rb)?;
6260
self.mem.push(schema_key, rb);
6361
Ok(())
6462
}
@@ -74,34 +72,29 @@ impl WriterTable {
7472
stream_name: &str,
7573
schema_key: &str,
7674
record: RecordBatch,
77-
parsed_timestamp: NaiveDateTime,
7875
) -> Result<(), StreamWriterError> {
7976
let hashmap_guard = self.read().unwrap();
8077

8178
match hashmap_guard.get(stream_name) {
8279
Some(stream_writer) => {
83-
stream_writer.lock().unwrap().push(
84-
stream_name,
85-
schema_key,
86-
record,
87-
parsed_timestamp,
88-
)?;
80+
stream_writer
81+
.lock()
82+
.unwrap()
83+
.push(stream_name, schema_key, record)?;
8984
}
9085
None => {
9186
drop(hashmap_guard);
9287
let mut map = self.write().unwrap();
9388
// check for race condition
9489
// if map contains entry then just
9590
if let Some(writer) = map.get(stream_name) {
96-
writer.lock().unwrap().push(
97-
stream_name,
98-
schema_key,
99-
record,
100-
parsed_timestamp,
101-
)?;
91+
writer
92+
.lock()
93+
.unwrap()
94+
.push(stream_name, schema_key, record)?;
10295
} else {
10396
let mut writer = Writer::default();
104-
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
97+
writer.push(stream_name, schema_key, record)?;
10598
map.insert(stream_name.to_owned(), Mutex::new(writer));
10699
}
107100
}

server/src/event/writer/file_writer.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
use arrow_array::RecordBatch;
2121
use arrow_ipc::writer::StreamWriter;
22-
use chrono::NaiveDateTime;
2322
use derive_more::{Deref, DerefMut};
2423
use std::collections::HashMap;
2524
use std::fs::{File, OpenOptions};
@@ -44,17 +43,27 @@ impl FileWriter {
4443
stream_name: &str,
4544
schema_key: &str,
4645
record: &RecordBatch,
47-
parsed_timestamp: NaiveDateTime,
4846
) -> Result<(), StreamWriterError> {
49-
let (path, writer) =
50-
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
51-
self.insert(
52-
schema_key.to_owned(),
53-
ArrowWriter {
54-
file_path: path,
55-
writer,
56-
},
57-
);
47+
match self.get_mut(schema_key) {
48+
Some(writer) => {
49+
writer
50+
.writer
51+
.write(record)
52+
.map_err(StreamWriterError::Writer)?;
53+
}
54+
// entry is not present thus we create it
55+
None => {
56+
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
57+
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
58+
self.insert(
59+
schema_key.to_owned(),
60+
ArrowWriter {
61+
file_path: path,
62+
writer,
63+
},
64+
);
65+
}
66+
};
5867

5968
Ok(())
6069
}
@@ -70,10 +79,9 @@ fn init_new_stream_writer_file(
7079
stream_name: &str,
7180
schema_key: &str,
7281
record: &RecordBatch,
73-
parsed_timestamp: NaiveDateTime,
7482
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
7583
let dir = StorageDir::new(stream_name);
76-
let path = dir.path_by_current_time(schema_key, parsed_timestamp);
84+
let path = dir.path_by_current_time(schema_key);
7785
std::fs::create_dir_all(dir.data_path)?;
7886

7987
let file = OpenOptions::new().create(true).append(true).open(&path)?;

server/src/handlers.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ const PREFIX_TAGS: &str = "x-p-tag-";
2323
const PREFIX_META: &str = "x-p-meta-";
2424
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2525
const LOG_SOURCE_KEY: &str = "x-p-log-source";
26-
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
2726
const AUTHORIZATION_KEY: &str = "authorization";
2827
const SEPARATOR: char = '^';
2928

0 commit comments

Comments
 (0)