Skip to content

Commit ef8eb34

Browse files
fix: data type mismatch in the schema (#887)
This PR includes changes to 1. update data type for time partition field from utf-8 to timestamp 2. fetch stats from ingestors and combine with querier stats data type for time partition fields in version < 1.2.0 is Utf8. But the data type for time partition fields in version >=1.2.0 was Timestamp(TimeUnit::Millisecond, None) So we update the data type from Utf-8 to Timestamp(TimeUnit::Millisecond, None) at server start when stream has time partition and data type in schema is Utf-8 This change can be eventually removed as users move to latest versions.
1 parent 0df41a7 commit ef8eb34

File tree

2 files changed

+56
-34
lines changed

2 files changed

+56
-34
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use crate::option::CONFIG;
2828
use crate::metrics::prom_utils::Metrics;
2929
use crate::stats::Stats;
3030
use crate::storage::object_storage::ingestor_metadata_path;
31-
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
3231
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
32+
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
3333
use actix_web::http::header::{self, HeaderMap};
3434
use actix_web::{HttpRequest, Responder};
3535
use bytes::Bytes;
@@ -237,38 +237,18 @@ pub async fn fetch_stats_from_ingestors(
237237
let mut deleted_storage_size = 0u64;
238238
let mut deleted_count = 0u64;
239239
for ob in obs {
240-
let stream_metadata: serde_json::Value =
240+
let stream_metadata: ObjectStoreFormat =
241241
serde_json::from_slice(&ob).expect("stream.json is valid json");
242-
let version = stream_metadata
243-
.as_object()
244-
.and_then(|meta| meta.get("version"))
245-
.and_then(|version| version.as_str());
246-
let stats = stream_metadata.get("stats").unwrap();
247-
if matches!(version, Some("v4")) {
248-
let current_stats = stats.get("current_stats").unwrap().clone();
249-
let lifetime_stats = stats.get("lifetime_stats").unwrap().clone();
250-
let deleted_stats = stats.get("deleted_stats").unwrap().clone();
251-
252-
count += current_stats.get("events").unwrap().as_u64().unwrap();
253-
ingestion_size += current_stats.get("ingestion").unwrap().as_u64().unwrap();
254-
storage_size += current_stats.get("storage").unwrap().as_u64().unwrap();
255-
lifetime_count += lifetime_stats.get("events").unwrap().as_u64().unwrap();
256-
lifetime_ingestion_size += lifetime_stats.get("ingestion").unwrap().as_u64().unwrap();
257-
lifetime_storage_size += lifetime_stats.get("storage").unwrap().as_u64().unwrap();
258-
deleted_count += deleted_stats.get("events").unwrap().as_u64().unwrap();
259-
deleted_ingestion_size += deleted_stats.get("ingestion").unwrap().as_u64().unwrap();
260-
deleted_storage_size += deleted_stats.get("storage").unwrap().as_u64().unwrap();
261-
} else {
262-
count += stats.get("events").unwrap().as_u64().unwrap();
263-
ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap();
264-
storage_size += stats.get("storage").unwrap().as_u64().unwrap();
265-
lifetime_count += stats.get("events").unwrap().as_u64().unwrap();
266-
lifetime_ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap();
267-
lifetime_storage_size += stats.get("storage").unwrap().as_u64().unwrap();
268-
deleted_count += 0;
269-
deleted_ingestion_size += 0;
270-
deleted_storage_size += 0;
271-
}
242+
243+
count += stream_metadata.stats.current_stats.events;
244+
ingestion_size += stream_metadata.stats.current_stats.ingestion;
245+
storage_size += stream_metadata.stats.current_stats.storage;
246+
lifetime_count += stream_metadata.stats.lifetime_stats.events;
247+
lifetime_ingestion_size += stream_metadata.stats.lifetime_stats.ingestion;
248+
lifetime_storage_size += stream_metadata.stats.lifetime_stats.storage;
249+
deleted_count += stream_metadata.stats.deleted_stats.events;
250+
deleted_ingestion_size += stream_metadata.stats.deleted_stats.ingestion;
251+
deleted_storage_size += stream_metadata.stats.deleted_stats.storage;
272252
}
273253

274254
let qs = QueriedStats::new(

server/src/metadata.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use arrow_array::RecordBatch;
20-
use arrow_schema::{Field, Fields, Schema};
20+
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
2121
use chrono::{Local, NaiveDateTime};
2222
use itertools::Itertools;
2323
use once_cell::sync::Lazy;
@@ -417,6 +417,45 @@ fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Sche
417417
Schema::try_merge(vec![schema, current_schema]).unwrap()
418418
}
419419

420+
///this function updates the data type of time partition field
421+
/// from utf-8 to timestamp if it is not already timestamp
422+
/// and updates the schema in the storage
423+
/// required only when migrating from version 1.2.0 and below
424+
/// this function will be removed in the future
425+
pub async fn update_data_type_time_partition(
426+
storage: &(impl ObjectStorage + ?Sized),
427+
stream_name: &str,
428+
schema: Schema,
429+
meta: ObjectStoreFormat,
430+
) -> anyhow::Result<Schema> {
431+
let mut schema = schema.clone();
432+
if meta.time_partition.is_some() {
433+
let time_partition = meta.time_partition.unwrap();
434+
let time_partition_data_type = schema
435+
.field_with_name(&time_partition)
436+
.unwrap()
437+
.data_type()
438+
.clone();
439+
if time_partition_data_type != DataType::Timestamp(TimeUnit::Millisecond, None) {
440+
let mut fields = schema
441+
.fields()
442+
.iter()
443+
.filter(|field| *field.name() != time_partition)
444+
.cloned()
445+
.collect::<Vec<Arc<Field>>>();
446+
let time_partition_field = Arc::new(Field::new(
447+
time_partition,
448+
DataType::Timestamp(TimeUnit::Millisecond, None),
449+
true,
450+
));
451+
fields.push(time_partition_field);
452+
schema = Schema::new(fields);
453+
storage.put_schema(stream_name, &schema).await?;
454+
}
455+
}
456+
Ok(schema)
457+
}
458+
420459
pub async fn load_stream_metadata_on_server_start(
421460
storage: &(impl ObjectStorage + ?Sized),
422461
stream_name: &str,
@@ -428,7 +467,8 @@ pub async fn load_stream_metadata_on_server_start(
428467
meta =
429468
serde_json::from_slice(&serde_json::to_vec(&stream_metadata_value).unwrap()).unwrap();
430469
}
431-
470+
let schema =
471+
update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?;
432472
let mut retention = meta.retention.clone();
433473
let mut time_partition = meta.time_partition.clone();
434474
let mut time_partition_limit = meta.time_partition_limit.clone();
@@ -546,6 +586,8 @@ pub mod error {
546586
pub enum LoadError {
547587
#[error("Error while loading from object storage: {0}")]
548588
ObjectStorage(#[from] ObjectStorageError),
589+
#[error(" Error: {0}")]
590+
Anyhow(#[from] anyhow::Error),
549591
}
550592
}
551593
}

0 commit comments

Comments
 (0)