diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 851b14c60e9..cddd6b121b4 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -491,6 +491,27 @@ quickwit source create |-----------------|-------------| | `--index` | ID of the target index | | `--source-config` | Path to source config file. Please, refer to the documentation for more details. | +### source update + +Update an existing source. +`quickwit source update [args]` + +*Synopsis* + +```bash +quickwit source update + --index + --source + --source-config +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--index` | ID of the target index | +| `--source` | ID of the source | +| `--source-config` | Path to source config file. Please, refer to the documentation for more details. | ### source enable Enables a source for an index. diff --git a/quickwit/quickwit-cli/src/source.rs b/quickwit/quickwit-cli/src/source.rs index ee90689ca94..bd7173552fb 100644 --- a/quickwit/quickwit-cli/src/source.rs +++ b/quickwit/quickwit-cli/src/source.rs @@ -50,6 +50,20 @@ pub fn build_source_command() -> Command { .required(true), ]) ) + .subcommand( + Command::new("update") + .about("Update an existing source.") + .args(&[ + arg!(--index "ID of the target index") + .display_order(1) + .required(true), + arg!(--source "ID of the source") + .display_order(2) + .required(true), + arg!(--"source-config" "Path to source config file. Please, refer to the documentation for more details.") + .required(true), + ]) + ) .subcommand( Command::new("enable") .about("Enables a source for an index.") @@ -147,6 +161,14 @@ pub struct CreateSourceArgs { pub source_config_uri: Uri, } +#[derive(Debug, Eq, PartialEq)] +pub struct UpdateSourceArgs { + pub client_args: ClientArgs, + pub index_id: IndexId, + pub source_id: SourceId, + pub source_config_uri: Uri, +} + #[derive(Debug, Eq, PartialEq)] pub struct ToggleSourceArgs { pub client_args: ClientArgs, @@ -187,6 +209,7 @@ pub struct ResetCheckpointArgs { #[derive(Debug, Eq, PartialEq)] pub enum SourceCliCommand { CreateSource(CreateSourceArgs), + UpdateSource(UpdateSourceArgs), ToggleSource(ToggleSourceArgs), DeleteSource(DeleteSourceArgs), DescribeSource(DescribeSourceArgs), @@ -198,6 +221,7 @@ impl SourceCliCommand { pub async fn execute(self) -> anyhow::Result<()> { match self { Self::CreateSource(args) => create_source_cli(args).await, + Self::UpdateSource(args) => update_source_cli(args).await, Self::ToggleSource(args) => toggle_source_cli(args).await, Self::DeleteSource(args) => delete_source_cli(args).await, Self::DescribeSource(args) => describe_source_cli(args).await, @@ -212,6 +236,7 @@ impl SourceCliCommand { .context("failed to parse source subcommand")?; match subcommand.as_str() { "create" => Self::parse_create_args(submatches).map(Self::CreateSource), + "update" => Self::parse_update_args(submatches).map(Self::UpdateSource), "enable" => { Self::parse_toggle_source_args(&subcommand, submatches).map(Self::ToggleSource) } @@ -244,6 +269,26 @@ impl SourceCliCommand { }) } + fn parse_update_args(mut matches: ArgMatches) -> anyhow::Result { + let client_args = ClientArgs::parse(&mut matches)?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); + let source_id = matches + .remove_one::("source") + .expect("`source` should be a required arg."); + let source_config_uri = matches + .remove_one::("source-config") + .map(|uri_str| Uri::from_str(&uri_str)) + .expect("`source-config` should be a required arg.")?; + Ok(UpdateSourceArgs { + client_args, + index_id, + source_id, + source_config_uri, + }) + } + fn parse_toggle_source_args( subcommand: &str, mut matches: ArgMatches, @@ -342,6 +387,23 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> { Ok(()) } +async fn update_source_cli(args: UpdateSourceArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-source"); + println!("❯ Updating source..."); + let storage_resolver = StorageResolver::unconfigured(); + let source_config_content = load_file(&storage_resolver, &args.source_config_uri).await?; + let source_config_str: &str = std::str::from_utf8(&source_config_content) + .with_context(|| format!("source config is not utf-8: {}", args.source_config_uri))?; + let config_format = ConfigFormat::sniff_from_uri(&args.source_config_uri)?; + let qw_client = args.client_args.client(); + qw_client + .sources(&args.index_id) + .update(&args.source_id, source_config_str, config_format) + .await?; + println!("{} Source successfully updated.", "✔".color(GREEN_COLOR)); + Ok(()) +} + async fn toggle_source_cli(args: ToggleSourceArgs) -> anyhow::Result<()> { debug!(args=?args, "toggle-source"); println!("❯ Toggling source..."); @@ -604,6 +666,32 @@ mod tests { assert_eq!(command, expected_command); } + #[test] + fn test_parse_update_source_args() { + let app = build_cli().no_binary_name(true); + let matches = app + .try_get_matches_from(vec![ + "source", + "update", + "--index", + "hdfs-logs", + "--source", + "kafka-foo", + "--source-config", + "/source-conf.yaml", + ]) + .unwrap(); + let command = CliCommand::parse_cli_args(matches).unwrap(); + let expected_command = + CliCommand::Source(SourceCliCommand::UpdateSource(UpdateSourceArgs { + client_args: ClientArgs::default(), + index_id: "hdfs-logs".to_string(), + source_id: "kafka-foo".to_string(), + source_config_uri: Uri::from_str("file:///source-conf.yaml").unwrap(), + })); + assert_eq!(command, expected_command); + } + #[test] fn test_parse_toggle_source_args() { { diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index fed15b83226..f89c6ddb6a2 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -513,6 +513,30 @@ impl<'a> SourceClient<'a> { Ok(source_config) } + pub async fn update( + &self, + source_id: &str, + source_config_input: impl AsRef<[u8]>, + config_format: ConfigFormat, + ) -> Result { + let header_map = header_from_config_format(config_format); + let source_config_bytes = Bytes::copy_from_slice(source_config_input.as_ref()); + let path = format!("{}/{source_id}", self.sources_root_url()); + let response = self + .transport + .send::<()>( + Method::PUT, + &path, + Some(header_map), + None, + Some(source_config_bytes), + self.timeout, + ) + .await?; + let source_config = response.deserialize().await?; + Ok(source_config) + } + pub async fn get(&self, source_id: &str) -> Result { let path = format!("{}/{source_id}", self.sources_root_url()); let response = self @@ -1083,6 +1107,25 @@ mod test { source_config ); + // PUT update source with yaml + Mock::given(method("PUT")) + .and(path("/api/v1/indexes/my-index/sources/my-source-1")) + .and(header(CONTENT_TYPE.as_str(), "application/yaml")) + .respond_with( + ResponseTemplate::new(StatusCode::OK).set_body_json(source_config.clone()), + ) + .up_to_n_times(1) + .mount(&mock_server) + .await; + assert_eq!( + qw_client + .sources("my-index") + .update("my-source-1", "", ConfigFormat::Yaml) + .await + .unwrap(), + source_config + ); + // GET sources Mock::given(method("GET")) .and(path("/api/v1/indexes/my-index/sources"))