Skip to content

Commit

Permalink
Add unit tests for metastore and control plane
Browse files Browse the repository at this point in the history
Also include some comment and naming improvments.
  • Loading branch information
rdettai committed Jan 21, 2025
1 parent d008f8e commit 39a8506
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 22 deletions.
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ pub trait TestableForRegression: Serialize + DeserializeOwned {
fn assert_equality(&self, other: &Self);
}

pub fn indexing_params_fingerprint(
/// Return a fingerprint of all parameters that should trigger an indexing pipeline restart.
pub fn indexing_pipeline_params_fingerprint(
index_config: &IndexConfig,
source_config: &SourceConfig,
) -> u64 {
Expand Down
73 changes: 71 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,9 @@ mod tests {
use mockall::Sequence;
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
use quickwit_cluster::ClusterChangeStreamFactoryForTest;
use quickwit_config::{IndexConfig, SourceParams, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID};
use quickwit_config::{
IndexConfig, SourceParams, TransformConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use quickwit_indexing::IndexingService;
use quickwit_metastore::{
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
Expand Down Expand Up @@ -1273,7 +1275,8 @@ mod tests {
assert_eq!(source_config.source_type(), SourceType::Void);
true
})
.returning(|_| Ok(EmptyResponse {}));
.return_once(|_| Ok(EmptyResponse {}));
// the list_indexes_metadata and list_shards calls are made when the control plane starts
mock_metastore
.expect_list_indexes_metadata()
.return_once(move |_| {
Expand Down Expand Up @@ -1312,6 +1315,72 @@ mod tests {
universe.assert_quit().await;
}

#[tokio::test]
async fn test_control_plane_update_source() {
let universe = Universe::with_accelerated_time();
let self_node_id: NodeId = "test-node".into();
let indexer_pool = IndexerPool::default();
let ingester_pool = IngesterPool::default();

let mut index_metadata = IndexMetadata::for_test("test-index", "ram://tata");
index_metadata
.add_source(SourceConfig::ingest_v2())
.unwrap();

let test_source_config = SourceConfig::for_test("test-source", SourceParams::void());
index_metadata.add_source(test_source_config).unwrap();

let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_update_source()
.withf(|update_source_request| {
let source_config: SourceConfig =
serde_json::from_str(&update_source_request.source_config_json).unwrap();
assert_eq!(source_config.source_id, "test-source");
assert_eq!(source_config.source_type(), SourceType::Void);
assert!(!source_config.transform_config.is_none());
true
})
.return_once(|_| Ok(EmptyResponse {}));
// the list_indexes_metadata and list_shards calls are made when the control plane starts
mock_metastore
.expect_list_indexes_metadata()
.return_once(move |_| {
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata.clone()
]))
});
mock_metastore
.expect_list_shards()
.return_once(move |_| Ok(ListShardsResponse::default()));

let cluster_config = ClusterConfig::for_test();
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
let (control_plane_mailbox, _control_plane_handle, _readiness_rx) = ControlPlane::spawn(
&universe,
cluster_config,
self_node_id,
cluster_change_stream_factory,
indexer_pool,
ingester_pool,
MetastoreServiceClient::from_mock(mock_metastore),
);
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let mut updated_source_config = SourceConfig::for_test("test-source", SourceParams::void());
updated_source_config.transform_config =
Some(TransformConfig::new("del(.username)".to_string(), None));
let update_source_request = UpdateSourceRequest {
index_uid: Some(index_uid),
source_config_json: serde_json::to_string(&updated_source_config).unwrap(),
};
control_plane_mailbox
.ask_for_res(update_source_request)
.await
.unwrap();

universe.assert_quit().await;
}

#[tokio::test]
async fn test_control_plane_toggle_source() {
let universe = Universe::with_accelerated_time();
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_common::pretty::PrettySample;
use quickwit_config::{indexing_params_fingerprint, FileSourceParams, SourceParams};
use quickwit_config::{indexing_pipeline_params_fingerprint, FileSourceParams, SourceParams};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
PIPELINE_THROUGHPUT,
Expand Down Expand Up @@ -170,7 +170,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
}
let params_fingerprint = model
.index_metadata(&source_uid.index_uid)
.map(|index_meta| indexing_params_fingerprint(&index_meta.index_config, source_config))
.map(|index_meta| indexing_pipeline_params_fingerprint(&index_meta.index_config, source_config))
.unwrap_or_default();
match source_config.source_params {
SourceParams::File(FileSourceParams::Filepath(_))
Expand Down
31 changes: 30 additions & 1 deletion quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ impl ControlPlaneModel {
#[cfg(test)]
mod tests {
use metastore::EmptyResponse;
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
use quickwit_config::{SourceConfig, SourceParams, TransformConfig, INGEST_V2_SOURCE_ID};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::{ListIndexesMetadataResponse, MockMetastoreService};
Expand Down Expand Up @@ -772,6 +772,35 @@ mod tests {
);
}

#[test]
fn test_control_plane_model_update_sources() {
let mut model = ControlPlaneModel::default();
let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes");
let mut my_source = SourceConfig::for_test("my-source", SourceParams::void());
index_metadata.add_source(my_source.clone()).unwrap();
index_metadata
.add_source(SourceConfig::ingest_v2())
.unwrap();
let index_uid = index_metadata.index_uid.clone();
model.add_index(index_metadata.clone());

// Update a source
my_source.transform_config = Some(TransformConfig::new("del(.username)".to_string(), None));
model.update_source(&index_uid, my_source.clone()).unwrap();

assert_eq!(model.index_table.len(), 1);
assert_eq!(
model
.index_table
.get(&index_uid)
.unwrap()
.sources
.get("my-source")
.unwrap(),
&my_source
);
}

#[test]
fn test_control_plane_model_delete_index() {
let mut model = ControlPlaneModel::default();
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use quickwit_common::io::Limiter;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::{io, temp_dir};
use quickwit_config::{
build_doc_mapper, indexing_params_fingerprint, IndexConfig, IndexerConfig, SourceConfig,
INGEST_API_SOURCE_ID,
build_doc_mapper, indexing_pipeline_params_fingerprint, IndexConfig, IndexerConfig,
SourceConfig, INGEST_API_SOURCE_ID,
};
use quickwit_ingest::{
DropQueueRequest, GetPartitionId, IngestApiService, IngesterPool, ListQueuesRequest,
Expand Down Expand Up @@ -324,7 +324,8 @@ impl IndexingService {
let max_concurrent_split_uploads_merge =
(self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1);

let params_fingerprint = indexing_params_fingerprint(&index_config, &source_config);
let params_fingerprint =
indexing_pipeline_params_fingerprint(&index_config, &source_config);
if let Some(expected_params_fingerprint) = expected_params_fingerprint {
if params_fingerprint != expected_params_fingerprint {
warn!(
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-metastore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub use metastore::{
IndexMetadataResponseExt, IndexesMetadataResponseExt, ListIndexesMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt,
UpdateIndexRequestExt,
UpdateIndexRequestExt, UpdateSourceRequestExt,
};
pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore};
pub use metastore_resolver::MetastoreResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl IndexMetadata {
if entry.get() == &source_config {
return Ok(false);
}
entry.insert(source_config.clone());
entry.insert(source_config);
Ok(true)
}
Entry::Vacant(_) => Err(MetastoreError::NotFound(EntityKind::Source {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ impl MetastoreService for PostgresqlMetastore {
async fn update_source(&self, request: UpdateSourceRequest) -> MetastoreResult<EmptyResponse> {
let source_config = request.deserialize_source_config()?;
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, "add source", {
run_with_tx!(self.connection_pool, tx, "update source", {
mutate_index_metadata::<MetastoreError, _>(tx, index_uid, |index_metadata| {
let mutation_occurred = index_metadata.update_source(source_config)?;
Ok(MutationOccurred::from(mutation_occurred))
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-metastore/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,13 @@ macro_rules! metastore_test_suite {
$crate::tests::source::test_metastore_add_source::<$metastore_type>().await;
}

#[tokio::test]
#[serial_test::file_serial]
async fn test_metastore_update_source() {
let _ = tracing_subscriber::fmt::try_init();
$crate::tests::source::test_metastore_update_source::<$metastore_type>().await;
}

#[tokio::test]
#[serial_test::file_serial]
async fn test_metastore_toggle_source() {
Expand Down
113 changes: 111 additions & 2 deletions quickwit/quickwit-metastore/src/tests/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
use std::num::NonZeroUsize;

use quickwit_common::rand::append_random_suffix;
use quickwit_config::{IndexConfig, SourceConfig, SourceInputFormat, SourceParams};
use quickwit_config::{
IndexConfig, SourceConfig, SourceInputFormat, SourceParams, TransformConfig,
};
use quickwit_proto::metastore::{
AddSourceRequest, CreateIndexRequest, DeleteSourceRequest, EntityKind, IndexMetadataRequest,
MetastoreError, PublishSplitsRequest, ResetSourceCheckpointRequest, SourceType,
StageSplitsRequest, ToggleSourceRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateSourceRequest,
};
use quickwit_proto::types::IndexUid;

use super::DefaultForTest;
use crate::checkpoint::SourceCheckpoint;
use crate::metastore::UpdateSourceRequestExt;
use crate::tests::cleanup_index;
use crate::{
AddSourceRequestExt, CreateIndexRequestExt, IndexMetadataResponseExt, MetastoreServiceExt,
Expand Down Expand Up @@ -136,6 +139,112 @@ pub async fn test_metastore_add_source<MetastoreToTest: MetastoreServiceExt + De
cleanup_index(&mut metastore, index_uid).await;
}

pub async fn test_metastore_update_source<MetastoreToTest: MetastoreServiceExt + DefaultForTest>() {
let mut metastore = MetastoreToTest::default_for_test().await;

let index_id = append_random_suffix("test-add-source");
let index_uri = format!("ram:///indexes/{index_id}");
let index_config = IndexConfig::for_test(&index_id, &index_uri);

let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap();
let index_uid: IndexUid = metastore
.create_index(create_index_request)
.await
.unwrap()
.index_uid()
.clone();

let source_id = format!("{index_id}--source");

let mut source = SourceConfig {
source_id: source_id.to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::void(),
transform_config: None,
input_format: SourceInputFormat::Json,
};

assert_eq!(
metastore
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string()))
.await
.unwrap()
.deserialize_index_metadata()
.unwrap()
.checkpoint
.source_checkpoint(&source_id),
None
);

let add_source_request =
AddSourceRequest::try_from_source_config(index_uid.clone(), &source).unwrap();
metastore.add_source(add_source_request).await.unwrap();

source.transform_config = Some(TransformConfig::new("del(.username)".to_string(), None));

// Update the source twice with the same value to validate indempotency
for _ in 0..2 {
let update_source_request =
UpdateSourceRequest::try_from_source_config(index_uid.clone(), &source).unwrap();
metastore
.update_source(update_source_request)
.await
.unwrap();

let index_metadata = metastore
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string()))
.await
.unwrap()
.deserialize_index_metadata()
.unwrap();

let sources = &index_metadata.sources;
assert_eq!(sources.len(), 1);
assert!(sources.contains_key(&source_id));
assert_eq!(sources.get(&source_id).unwrap().source_id, source_id);
assert_eq!(
sources.get(&source_id).unwrap().source_type(),
SourceType::Void
);
assert_eq!(
sources.get(&source_id).unwrap().transform_config,
Some(TransformConfig::new("del(.username)".to_string(), None))
);
assert_eq!(
index_metadata.checkpoint.source_checkpoint(&source_id),
Some(&SourceCheckpoint::default())
);
}

source.source_id = "unknown-src-id".to_string();
assert!(matches!(
metastore
.update_source(
UpdateSourceRequest::try_from_source_config(index_uid.clone(), &source).unwrap()
)
.await
.unwrap_err(),
MetastoreError::NotFound(EntityKind::Source { .. })
));
source.source_id = source_id;
assert!(matches!(
metastore
.add_source(
AddSourceRequest::try_from_source_config(
IndexUid::new_with_random_ulid("index-not-found"),
&source
)
.unwrap()
)
.await
.unwrap_err(),
MetastoreError::NotFound(EntityKind::Index { .. })
));

cleanup_index(&mut metastore, index_uid).await;
}

pub async fn test_metastore_toggle_source<MetastoreToTest: MetastoreServiceExt + DefaultForTest>() {
let mut metastore = MetastoreToTest::default_for_test().await;

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-proto/protos/quickwit/metastore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ service MetastoreService {
// Adds a source.
rpc AddSource(AddSourceRequest) returns (EmptyResponse);

// Update a source.
// Updates a source.
rpc UpdateSource(UpdateSourceRequest) returns (EmptyResponse);

// Toggles source.
// Toggles (turns on or off) source.
rpc ToggleSource(ToggleSourceRequest) returns (EmptyResponse);

// Removes source.
Expand Down
Loading

0 comments on commit 39a8506

Please sign in to comment.