Skip to content

Commit

Permalink
Fix and test clusters without control plane (#5599)
Browse files Browse the repository at this point in the history
* Fix clusters without control plane

* Revert not using the proxied metastore for the search service
  • Loading branch information
rdettai authored Jan 13, 2025
1 parent f9a7e68 commit d8e98b7
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 143 deletions.
107 changes: 62 additions & 45 deletions quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use reqwest::Url;
use serde_json::Value;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tonic::transport::channel;
use tracing::debug;

use super::shutdown::NodeShutdownHandle;
Expand Down Expand Up @@ -150,7 +149,6 @@ impl ClusterSandboxBuilder {
temp_dir: self.temp_dir,
node_configs: resolved_node_configs,
tcp_listener_resolver,
use_legacy_ingest: self.use_legacy_ingest,
}
}

Expand All @@ -175,7 +173,6 @@ struct ResolvedClusterConfig {
temp_dir: TempDir,
node_configs: Vec<(NodeConfig, HashSet<QuickwitService>)>,
tcp_listener_resolver: TestTcpListenerResolver,
use_legacy_ingest: bool,
}

impl ResolvedClusterConfig {
Expand Down Expand Up @@ -216,41 +213,9 @@ impl ResolvedClusterConfig {
shutdown_handler.set_node_join_handle(join_handle);
node_shutdown_handles.push(shutdown_handler);
}
let searcher_config = self
.node_configs
.iter()
.find(|node_config| node_config.1.contains(&QuickwitService::Searcher))
.cloned()
.unwrap();
let indexer_config = self
.node_configs
.iter()
.find(|node_config| node_config.1.contains(&QuickwitService::Indexer))
.cloned()
.unwrap();
let indexer_channel =
channel::Endpoint::from_str(&format!("http://{}", indexer_config.0.grpc_listen_addr))
.unwrap()
.connect_lazy();
let searcher_channel =
channel::Endpoint::from_str(&format!("http://{}", searcher_config.0.grpc_listen_addr))
.unwrap()
.connect_lazy();

let sandbox = ClusterSandbox {
node_configs: self.node_configs,
searcher_rest_client: QuickwitClientBuilder::new(transport_url(
searcher_config.0.rest_config.listen_addr,
))
.build(),
indexer_rest_client: QuickwitClientBuilder::new(transport_url(
indexer_config.0.rest_config.listen_addr,
))
.use_legacy_ingest(self.use_legacy_ingest)
.build(),
trace_client: TraceServiceClient::new(indexer_channel.clone()),
logs_client: LogsServiceClient::new(indexer_channel),
jaeger_client: SpanReaderPluginClient::new(searcher_channel),
_temp_dir: self.temp_dir,
node_shutdown_handles,
};
Expand Down Expand Up @@ -292,23 +257,68 @@ pub(crate) async fn ingest(
/// or REST clients to test it.
pub struct ClusterSandbox {
pub node_configs: Vec<(NodeConfig, HashSet<QuickwitService>)>,
pub searcher_rest_client: QuickwitClient,
pub indexer_rest_client: QuickwitClient,
pub trace_client: TraceServiceClient<tonic::transport::Channel>,
pub logs_client: LogsServiceClient<tonic::transport::Channel>,
pub jaeger_client: SpanReaderPluginClient<tonic::transport::Channel>,
_temp_dir: TempDir,
node_shutdown_handles: Vec<NodeShutdownHandle>,
}

impl ClusterSandbox {
fn find_node_for_service(&self, service: QuickwitService) -> NodeConfig {
self.node_configs
.iter()
.find(|config| config.1.contains(&service))
.unwrap_or_else(|| panic!("No {:?} node", service))
.0
.clone()
}

fn channel(&self, service: QuickwitService) -> tonic::transport::Channel {
let node_config = self.find_node_for_service(service);
let endpoint = format!("http://{}", node_config.grpc_listen_addr);
tonic::transport::Channel::from_shared(endpoint)
.unwrap()
.connect_lazy()
}

/// Returns a client to one of the nodes that runs the specified service
pub fn rest_client(&self, service: QuickwitService) -> QuickwitClient {
let node_config = self.find_node_for_service(service);

QuickwitClientBuilder::new(transport_url(node_config.rest_config.listen_addr)).build()
}

// TODO(#5604)
pub fn rest_client_legacy_indexer(&self) -> QuickwitClient {
let node_config = self.find_node_for_service(QuickwitService::Indexer);

QuickwitClientBuilder::new(transport_url(node_config.rest_config.listen_addr))
.use_legacy_ingest(true)
.build()
}

pub fn jaeger_client(&self) -> SpanReaderPluginClient<tonic::transport::Channel> {
SpanReaderPluginClient::new(self.channel(QuickwitService::Searcher))
}

pub fn logs_client(&self) -> LogsServiceClient<tonic::transport::Channel> {
LogsServiceClient::new(self.channel(QuickwitService::Indexer))
}

pub fn trace_client(&self) -> TraceServiceClient<tonic::transport::Channel> {
TraceServiceClient::new(self.channel(QuickwitService::Indexer))
}

async fn wait_for_cluster_num_ready_nodes(
&self,
expected_num_ready_nodes: usize,
) -> anyhow::Result<()> {
wait_until_predicate(
|| async move {
match self.indexer_rest_client.cluster().snapshot().await {
match self
.rest_client(QuickwitService::Metastore)
.cluster()
.snapshot()
.await
{
Ok(result) => {
if result.ready_nodes.len() != expected_num_ready_nodes {
debug!(
Expand All @@ -334,14 +344,21 @@ impl ClusterSandbox {
Ok(())
}

// Waits for the needed number of indexing pipeline to start.
/// Waits for the needed number of indexing pipeline to start.
///
/// WARNING! does not work if multiple indexers are running
pub async fn wait_for_indexing_pipelines(
&self,
required_pipeline_num: usize,
) -> anyhow::Result<()> {
wait_until_predicate(
|| async move {
match self.indexer_rest_client.node_stats().indexing().await {
match self
.rest_client(QuickwitService::Indexer)
.node_stats()
.indexing()
.await
{
Ok(result) => {
if result.num_running_pipelines != required_pipeline_num {
debug!(
Expand Down Expand Up @@ -381,7 +398,7 @@ impl ClusterSandbox {
};
async move {
match self
.indexer_rest_client
.rest_client(QuickwitService::Metastore)
.splits(index_id)
.list(splits_query_params)
.await
Expand Down Expand Up @@ -458,7 +475,7 @@ impl ClusterSandbox {

pub async fn assert_hit_count(&self, index_id: &str, query: &str, expected_num_hits: u64) {
let search_response = self
.searcher_rest_client
.rest_client(QuickwitService::Searcher)
.search(
index_id,
SearchRequestQueryString {
Expand Down
12 changes: 6 additions & 6 deletions quickwit/quickwit-integration-tests/src/tests/basic_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn test_standalone_server() {
{
// The indexing service should be running.
let counters = sandbox
.indexer_rest_client
.rest_client(QuickwitService::Indexer)
.node_stats()
.indexing()
.await
Expand All @@ -67,7 +67,7 @@ async fn test_standalone_server() {
{
// Create an dynamic index.
sandbox
.indexer_rest_client
.rest_client(QuickwitService::Indexer)
.indexes()
.create(
r#"
Expand All @@ -87,7 +87,7 @@ async fn test_standalone_server() {
// Index should be searchable
assert_eq!(
sandbox
.indexer_rest_client
.rest_client(QuickwitService::Indexer)
.search(
"my-new-index",
SearchRequestQueryString {
Expand Down Expand Up @@ -119,7 +119,7 @@ async fn test_multi_nodes_cluster() {
.await;

sandbox
.indexer_rest_client
.rest_client(QuickwitService::Indexer)
.indexes()
.create(
r#"
Expand All @@ -139,7 +139,7 @@ async fn test_multi_nodes_cluster() {
.unwrap();

assert!(sandbox
.indexer_rest_client
.rest_client(QuickwitService::Indexer)
.node_health()
.is_live()
.await
Expand All @@ -150,7 +150,7 @@ async fn test_multi_nodes_cluster() {

// Check that search is working
let search_response_empty = sandbox
.searcher_rest_client
.rest_client(QuickwitService::Searcher)
.search(
"my-new-multi-node-index",
SearchRequestQueryString {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ async fn test_ingest_v1_happy_path() {
commit_timeout_secs: 1
"#
);
sandbox
.indexer_rest_client
let indexer_client = sandbox.rest_client_legacy_indexer();
indexer_client
.indexes()
.create(index_config, ConfigFormat::Yaml, false)
.await
.unwrap();

ingest(
&sandbox.indexer_rest_client,
&indexer_client,
index_id,
ingest_json!({"body": "my-doc"}),
CommitType::Auto,
Expand All @@ -80,8 +80,7 @@ async fn test_ingest_v1_happy_path() {
sandbox.assert_hit_count(index_id, "*", 1).await;

// Delete the index to avoid potential hanging on shutdown #5068
sandbox
.indexer_rest_client
indexer_client
.indexes()
.delete(index_id, false)
.await
Expand Down
Loading

0 comments on commit d8e98b7

Please sign in to comment.