diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 4a24aedc012..9440872da87 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -44,6 +44,16 @@ use tracing::{error, instrument}; /// The maximum number of splits that the GC should delete per attempt. const DELETE_SPLITS_BATCH_SIZE: usize = 10_000; +pub trait RecordGcMetrics: Sync { + fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize); +} + +pub(crate) struct DoNotRecordGcMetrics; + +impl RecordGcMetrics for DoNotRecordGcMetrics { + fn record(&self, _num_deleted_splits: usize, _num_deleted_bytes: u64, _num_failed_splits: usize) {} +} + /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from /// storage and metastore. #[derive(Error, Debug)] @@ -94,6 +104,7 @@ pub async fn run_garbage_collect( deletion_grace_period: Duration, dry_run: bool, progress_opt: Option<&Progress>, + metrics: &dyn RecordGcMetrics, ) -> anyhow::Result { let grace_period_timestamp = OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; @@ -170,6 +181,7 @@ pub async fn run_garbage_collect( metastore, indexes, progress_opt, + metrics, ) .await) } @@ -179,6 +191,7 @@ async fn delete_splits( storages: &HashMap>, metastore: MetastoreServiceClient, progress_opt: Option<&Progress>, + metrics: &dyn RecordGcMetrics, split_removal_info: &mut SplitRemovalInfo, ) -> Result<(), ()> { let mut delete_split_from_index_res_stream = @@ -219,9 +232,26 @@ async fn delete_splits( while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await { match delete_split_result { Ok(entries) => { + let deleted_bytes = entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = entries.len(); + + metrics.record(deleted_splits_count, deleted_bytes, 0); split_removal_info.removed_split_entries.extend(entries); } Err(delete_split_error) => { + let deleted_bytes = delete_split_error + .successes + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = delete_split_error.successes.len(); + let failed_splits_count = delete_split_error.storage_failures.len() + + delete_split_error.metastore_failures.len(); + + metrics.record(deleted_splits_count, deleted_bytes, failed_splits_count); split_removal_info .removed_split_entries .extend(delete_split_error.successes); @@ -265,13 +295,14 @@ async fn list_splits_metadata( /// /// The aim of this is to spread the load out across a longer period /// rather than short, heavy bursts on the metastore and storage system itself. -#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))] +#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))] async fn delete_splits_marked_for_deletion_several_indexes( index_uids: Vec, updated_before_timestamp: i64, metastore: MetastoreServiceClient, storages: HashMap>, progress_opt: Option<&Progress>, + metrics: &dyn RecordGcMetrics, ) -> SplitRemovalInfo { let mut split_removal_info = SplitRemovalInfo::default(); @@ -280,7 +311,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( return split_removal_info; }; - let list_splits_query = list_splits_query + let mut list_splits_query = list_splits_query .with_split_state(SplitState::MarkedForDeletion) .with_update_timestamp_lte(updated_before_timestamp) .with_limit(DELETE_SPLITS_BATCH_SIZE) @@ -300,11 +331,13 @@ async fn delete_splits_marked_for_deletion_several_indexes( } }; - let num_splits_to_delete = splits_metadata_to_delete.len(); - - if num_splits_to_delete == 0 { + // set split after which to search for the next loop + let Some(last_split_metadata) = splits_metadata_to_delete.last() else { break; - } + }; + list_splits_query = list_splits_query.after_split(last_split_metadata); + + let num_splits_to_delete = splits_metadata_to_delete.len(); let splits_metadata_to_delete_per_index: HashMap> = splits_metadata_to_delete @@ -312,18 +345,20 @@ async fn delete_splits_marked_for_deletion_several_indexes( .map(|meta| (meta.index_uid.clone(), meta)) .into_group_map(); - let delete_split_res = delete_splits( + // ignore return we continue either way + let _: Result<(), ()> = delete_splits( splits_metadata_to_delete_per_index, &storages, metastore.clone(), progress_opt, + metrics, &mut split_removal_info, ) .await; - if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() { - // stop the gc if this was the last batch or we encountered an error - // (otherwise we might try deleting the same splits in an endless loop) + if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE { + // stop the gc if this was the last batch + // we are guaranteed to make progress due to .after_split() break; } } @@ -345,7 +380,7 @@ pub async fn delete_splits_from_storage_and_metastore( metastore: MetastoreServiceClient, splits: Vec, progress_opt: Option<&Progress>, -) -> anyhow::Result, DeleteSplitsError> { +) -> Result, DeleteSplitsError> { let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); for split in splits { @@ -511,6 +546,7 @@ mod tests { Duration::from_secs(30), false, None, + &DoNotRecordGcMetrics, ) .await .unwrap(); @@ -538,6 +574,7 @@ mod tests { Duration::from_secs(30), false, None, + &DoNotRecordGcMetrics, ) .await .unwrap(); @@ -615,6 +652,7 @@ mod tests { Duration::from_secs(30), false, None, + &DoNotRecordGcMetrics, ) .await .unwrap(); @@ -642,6 +680,7 @@ mod tests { Duration::from_secs(0), false, None, + None, ) .await .unwrap(); @@ -680,6 +719,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 5d4dc5ec149..86f03844358 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -49,6 +49,7 @@ use crate::garbage_collection::{ delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError, SplitRemovalInfo, }; +use crate::DoNotRecordGcMetrics; #[derive(Error, Debug)] pub enum IndexServiceError { @@ -373,6 +374,7 @@ impl IndexService { Duration::ZERO, dry_run, None, + &DoNotRecordGcMetrics, ) .await?; diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index 93b6ee6d1c3..c5b6b1934b9 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -21,4 +21,7 @@ mod garbage_collection; mod index; pub use garbage_collection::run_garbage_collect; +pub use garbage_collection::RecordGcMetrics; pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError}; + +use garbage_collection::DoNotRecordGcMetrics; diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index fbfdeb2b1e1..fd23f5824cd 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::{stream, StreamExt}; @@ -36,6 +36,8 @@ use quickwit_storage::{Storage, StorageResolver}; use serde::Serialize; use tracing::{debug, error, info}; +use crate::metrics::JANITOR_METRICS; + const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Staged files needs to be deleted if there was a failure. @@ -51,10 +53,10 @@ pub struct GarbageCollectorCounters { pub num_deleted_files: usize, /// The number of bytes deleted. pub num_deleted_bytes: usize, - /// The number of failed garbage collection run on an index. - pub num_failed_gc_run_on_index: usize, - /// The number of successful garbage collection run on an index. - pub num_successful_gc_run_on_index: usize, + /// The number of failed garbage collection run. + pub num_failed_gc_run: usize, + /// The number of successful garbage collection run. + pub num_successful_gc_run: usize, /// The number or failed storage resolution. pub num_failed_storage_resolution: usize, /// The number of splits that were unable to be removed. @@ -86,6 +88,8 @@ impl GarbageCollector { debug!("loading indexes from the metastore"); self.counters.num_passes += 1; + let start = Instant::now(); + let response = match self .metastore .list_indexes_metadata(ListIndexesMetadataRequest::all()) @@ -137,23 +141,39 @@ impl GarbageCollector { split_deletion_grace_period(), false, Some(ctx.progress()), + &JANITOR_METRICS.gc_metrics, ) .await; + let run_duration = start.elapsed().as_secs(); + JANITOR_METRICS.gc_seconds_total.inc_by(run_duration); + let deleted_file_entries = match gc_res { Ok(removal_info) => { - self.counters.num_successful_gc_run_on_index += 1; + self.counters.num_successful_gc_run += 1; + JANITOR_METRICS + .gc_run_count + .with_label_values(["success"]) + .inc(); self.counters.num_failed_splits += removal_info.failed_splits.len(); removal_info.removed_split_entries } Err(error) => { - self.counters.num_failed_gc_run_on_index += 1; + self.counters.num_failed_gc_run += 1; + JANITOR_METRICS + .gc_run_count + .with_label_values(["error"]) + .inc(); error!(error=?error, "failed to run garbage collection"); return; } }; if !deleted_file_entries.is_empty() { let num_deleted_splits = deleted_file_entries.len(); + let num_deleted_bytes = deleted_file_entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64() as usize) + .sum::(); let deleted_files: HashSet<&Path> = deleted_file_entries .iter() .map(|deleted_entry| deleted_entry.file_name.as_path()) @@ -163,11 +183,8 @@ impl GarbageCollector { num_deleted_splits = num_deleted_splits, "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits, ); - self.counters.num_deleted_files += deleted_file_entries.len(); - self.counters.num_deleted_bytes += deleted_file_entries - .iter() - .map(|entry| entry.file_size_bytes.as_u64() as usize) - .sum::(); + self.counters.num_deleted_files += num_deleted_splits; + self.counters.num_deleted_bytes += num_deleted_bytes; } } } @@ -348,6 +365,7 @@ mod tests { split_deletion_grace_period(), false, None, + None, ) .await; assert!(result.is_ok()); @@ -497,9 +515,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 30 secs later @@ -508,9 +526,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 60 secs later @@ -519,9 +537,9 @@ mod tests { assert_eq!(counters.num_passes, 2); assert_eq!(counters.num_deleted_files, 4); assert_eq!(counters.num_deleted_bytes, 80); - assert_eq!(counters.num_successful_gc_run_on_index, 2); + assert_eq!(counters.num_successful_gc_run, 2); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -585,9 +603,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 0); assert_eq!(counters.num_deleted_bytes, 0); - assert_eq!(counters.num_successful_gc_run_on_index, 0); + assert_eq!(counters.num_successful_gc_run, 0); assert_eq!(counters.num_failed_storage_resolution, 1); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -608,7 +626,7 @@ mod tests { }); mock_metastore .expect_list_splits() - .times(2) + .times(3) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); assert_eq!(query.index_uids.len(), 2); @@ -616,24 +634,40 @@ mod tests { .contains(&query.index_uids[0].index_id.as_ref())); assert!(["test-index-1", "test-index-2"] .contains(&query.index_uids[1].index_id.as_ref())); - let splits = match query.split_states[0] { + let splits_ids_string: Vec = + (0..8000).map(|seq| format!("split-{seq:04}")).collect(); + let splits_ids: Vec<&str> = splits_ids_string + .iter() + .map(|string| string.as_str()) + .collect(); + let mut splits = match query.split_states[0] { SplitState::Staged => { let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged); splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged)); splits } SplitState::MarkedForDeletion => { + assert_eq!(query.limit, Some(10_000)); let mut splits = - make_splits("test-index-1", &["a", "b"], SplitState::MarkedForDeletion); + make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion); splits.append(&mut make_splits( "test-index-2", - &["a", "b"], + &splits_ids, SplitState::MarkedForDeletion, )); splits } _ => panic!("only Staged and MarkedForDeletion expected."), }; + if let Some((index_uid, split_id)) = query.after_split { + splits.retain(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) > (&index_uid, &split_id) + }); + } + splits.truncate(10_000); let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); Ok(ServiceStream::from(vec![Ok(splits)])) }); @@ -648,7 +682,7 @@ mod tests { }); mock_metastore .expect_delete_splits() - .times(2) + .times(3) .returning(|delete_splits_request| { let index_uid: IndexUid = delete_splits_request.index_uid().clone(); let split_ids = HashSet::<&str>::from_iter( @@ -657,14 +691,30 @@ mod tests { .iter() .map(|split_id| split_id.as_str()), ); - let expected_split_ids = HashSet::<&str>::from_iter(["a", "b"]); - - assert_eq!(split_ids, expected_split_ids); + if index_uid.index_id == "test-index-1" { + assert_eq!(split_ids.len(), 8000); + for seq in 0..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 2000 { + for seq in 0..2000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 6000 { + for seq in 2000..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else { + panic!(); + } // This should not cause the whole run to fail and return an error, // instead this should simply get logged and return the list of splits // which have successfully been deleted. - if index_uid.index_id == "test-index-2" { + if index_uid.index_id == "test-index-2" && split_ids.len() == 2000 { Err(MetastoreError::Db { message: "fail to delete".to_string(), }) @@ -682,12 +732,12 @@ mod tests { let counters = handle.process_pending_and_observe().await.state; assert_eq!(counters.num_passes, 1); - assert_eq!(counters.num_deleted_files, 2); - assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_deleted_files, 14000); + assert_eq!(counters.num_deleted_bytes, 20 * 14000); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); - assert_eq!(counters.num_failed_splits, 2); + assert_eq!(counters.num_failed_gc_run, 0); + assert_eq!(counters.num_failed_splits, 2000); universe.assert_quit().await; } } diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index d3392af7b3f..560ae08c26a 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -18,10 +18,57 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_gauge_vec, IntGaugeVec}; +use quickwit_common::metrics::{ + new_counter, new_counter_vec, new_gauge_vec, IntCounter, IntCounterVec, IntGaugeVec +}; +use quickwit_index_management::RecordGcMetrics; + + +pub struct GcMetrics { + pub deleted_splits: IntCounter, + pub deleted_bytes: IntCounter, + pub failed_splits: IntCounter, +} + +impl RecordGcMetrics for GcMetrics { + fn record(&self, num_deleted_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize) { + self.deleted_splits.inc_by(num_deleted_splits as u64); + self.deleted_bytes.inc_by(num_deleted_bytes); + self.failed_splits.inc_by(num_failed_splits as u64); + } +} + +impl Default for GcMetrics { + fn default() -> GcMetrics { + let deleted_splits = new_counter_vec( + "gc_deleted_splits_count", + "Total number of splits deleted by the garbage collector.", + "quickwit_janitor", + &[], + ["result"], + ); + let deleted_bytes = new_counter( + "gc_deleted_bytes_total", + "Total number of bytes deleted by the garbage collector.", + "quickwit_janitor", + &[], + ); + GcMetrics { + deleted_splits: deleted_splits.with_label_values(["success"]), + deleted_bytes, + failed_splits: deleted_splits.with_label_values(["failure"]) + } + } + +} pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, + pub gc_run_count: IntCounterVec<1>, + pub gc_metrics: GcMetrics, + pub gc_seconds_total: IntCounter, + // TODO having a current run duration which is 0|undefined out of run, and returns `now - + // start_time` during a run would be nice } impl Default for JanitorMetrics { @@ -34,6 +81,20 @@ impl Default for JanitorMetrics { &[], ["index"], ), + gc_run_count: new_counter_vec( + "gc_run_total", + "Total number of garbage collector execition.", + "quickwit_janitor", + &[], + ["result"], + ), + gc_seconds_total: new_counter( + "gc_seconds_total", + "Total time spent running the garbage collector", + "quickwit_janitor", + &[], + ), + gc_metrics: GcMetrics::default(), } } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 426a91530b5..4a8dc28ac61 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -452,7 +452,18 @@ impl FileBackedIndex { .splits .values() .filter(|split| split_query_predicate(split, query)) - .sorted_unstable_by_key(|split| &split.split_metadata.index_uid) + .sorted_unstable_by(|left_split, right_split| { + left_split + .split_metadata + .index_uid + .cmp(&right_split.split_metadata.index_uid) + .then_with(|| { + left_split + .split_metadata + .split_id + .cmp(&right_split.split_metadata.split_id) + }) + }) .skip(offset) .take(limit) .cloned() @@ -763,6 +774,17 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { } } + if let Some((index_uid, split_id)) = &query.after_split { + if *index_uid > split.split_metadata.index_uid { + return false; + } + if *index_uid == split.split_metadata.index_uid + && *split_id >= split.split_metadata.split_id + { + return false; + } + } + true } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 06211e1f63a..2b0bb86d9ca 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -632,6 +632,9 @@ pub struct ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. pub sort_by: SortBy, + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split + pub after_split: Option<(IndexUid, SplitId)>, } #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] @@ -658,6 +661,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, } } @@ -680,6 +684,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, }) } @@ -850,11 +855,18 @@ impl ListSplitsQuery { self } - /// Sorts the splits by index_uid. + /// Sorts the splits by index_uid and split_id. pub fn sort_by_index_uid(mut self) -> Self { self.sort_by = SortBy::IndexUid; self } + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split. + /// This is only useful if results are sorted by index_uid and split_id. + pub fn after_split(mut self, split_meta: &SplitMetadata) -> Self { + self.after_split = Some((split_meta.index_uid.clone(), split_meta.split_id.clone())); + self + } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 32331650b68..698e5a460d4 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2085,7 +2085,25 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"# + r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC, "split_id" ASC"# + ) + ); + + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + + let query = + ListSplitsQuery::for_index(index_uid.clone()).after_split(&crate::SplitMetadata { + index_uid: index_uid.clone(), + split_id: "my_split".to_string(), + ..Default::default() + }); + append_query_filters(sql, &query); + + assert_eq!( + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("index_uid", "split_id") > ('{index_uid}', 'my_split')"# ) ); } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 6e850ae0fed..fdc1bc2bb07 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -187,6 +187,16 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits Expr::expr(val) }); + if let Some((index_uid, split_id)) = &query.after_split { + sql.cond_where( + Expr::tuple([ + Expr::col(Splits::IndexUid).into(), + Expr::col(Splits::SplitId).into(), + ]) + .gt(Expr::tuple([Expr::value(index_uid), Expr::value(split_id)])), + ); + } + match query.sort_by { SortBy::Staleness => { sql.order_by( @@ -195,7 +205,8 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits ); } SortBy::IndexUid => { - sql.order_by(Splits::IndexUid, Order::Asc); + sql.order_by(Splits::IndexUid, Order::Asc) + .order_by(Splits::SplitId, Order::Asc); } SortBy::None => (), }