From 1ccd4f2a52252a3cc66226aca4b680c5c417401f Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 6 Oct 2025 18:11:43 +0200 Subject: [PATCH 1/5] Update cargo lock --- Cargo.lock | 102 ++++++++++++++++++++++++++--------------------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4b3b8de8d..1a00bf476e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "bifrost-benchpress" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "bytes", @@ -1585,7 +1585,7 @@ dependencies = [ [[package]] name = "codederror" -version = "1.5.0" +version = "1.5.1" dependencies = [ "codederror-derive", "restate-workspace-hack", @@ -4708,7 +4708,7 @@ dependencies = [ [[package]] name = "mock-service-endpoint" -version = "1.5.0" +version = "1.5.1" dependencies = [ "assert2", "async-stream", @@ -6434,7 +6434,7 @@ dependencies = [ [[package]] name = "restate-admin" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -6491,7 +6491,7 @@ dependencies = [ [[package]] name = "restate-admin-rest-model" -version = "1.5.0" +version = "1.5.1" dependencies = [ "bytes", "derive_more", @@ -6511,7 +6511,7 @@ dependencies = [ [[package]] name = "restate-base64-util" -version = "1.5.0" +version = "1.5.1" dependencies = [ "base64 0.22.1", "restate-workspace-hack", @@ -6519,7 +6519,7 @@ dependencies = [ [[package]] name = "restate-benchmarks" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "criterion", @@ -6545,7 +6545,7 @@ dependencies = [ [[package]] name = "restate-bifrost" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -6599,7 +6599,7 @@ dependencies = [ [[package]] name = "restate-cli" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "arc-swap", @@ -6660,7 +6660,7 @@ dependencies = [ [[package]] name = "restate-cli-util" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "arc-swap", @@ -6689,7 +6689,7 @@ dependencies = [ [[package]] name = "restate-cloud-tunnel-client" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "bs58", @@ -6712,7 +6712,7 @@ dependencies = [ [[package]] name = "restate-core" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -6782,7 +6782,7 @@ dependencies = [ [[package]] name = "restate-encoding" -version = "1.5.0" +version = "1.5.1" dependencies = [ "bilrost", "bytes", @@ -6804,7 +6804,7 @@ dependencies = [ [[package]] name = "restate-errors" -version = "1.5.0" +version = "1.5.1" dependencies = [ "codederror", "paste", @@ -6817,7 +6817,7 @@ dependencies = [ [[package]] name = "restate-fs-util" -version = "1.5.0" +version = "1.5.1" dependencies = [ "restate-workspace-hack", "tokio", @@ -6827,7 +6827,7 @@ dependencies = [ [[package]] name = "restate-futures-util" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "futures", @@ -6845,7 +6845,7 @@ dependencies = [ [[package]] name = "restate-ingress-http" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "bytes", @@ -6891,7 +6891,7 @@ dependencies = [ [[package]] name = "restate-ingress-kafka" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "base64 0.22.1", @@ -6920,7 +6920,7 @@ dependencies = [ [[package]] name = "restate-invoker-api" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "bytes", @@ -6938,7 +6938,7 @@ dependencies = [ [[package]] name = "restate-invoker-impl" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "bytes", @@ -6982,7 +6982,7 @@ dependencies = [ [[package]] name = "restate-local-cluster-runner" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "arc-swap", @@ -7019,7 +7019,7 @@ dependencies = [ [[package]] name = "restate-log-server" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -7057,7 +7057,7 @@ dependencies = [ [[package]] name = "restate-log-server-grpc" -version = "1.5.0" +version = "1.5.1" dependencies = [ "prost", "restate-types", @@ -7068,7 +7068,7 @@ dependencies = [ [[package]] name = "restate-metadata-providers" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "async-trait", @@ -7101,7 +7101,7 @@ dependencies = [ [[package]] name = "restate-metadata-server" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "arc-swap", @@ -7149,7 +7149,7 @@ dependencies = [ [[package]] name = "restate-metadata-server-grpc" -version = "1.5.0" +version = "1.5.1" dependencies = [ "bytes", "bytestring", @@ -7169,7 +7169,7 @@ dependencies = [ [[package]] name = "restate-metadata-store" -version = "1.5.0" +version = "1.5.1" dependencies = [ "async-trait", "bytes", @@ -7189,7 +7189,7 @@ dependencies = [ [[package]] name = "restate-node" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -7244,7 +7244,7 @@ dependencies = [ [[package]] name = "restate-object-store-util" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "aws-config", @@ -7262,7 +7262,7 @@ dependencies = [ [[package]] name = "restate-partition-store" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -7310,7 +7310,7 @@ dependencies = [ [[package]] name = "restate-queue" -version = "1.5.0" +version = "1.5.1" dependencies = [ "bincode", "criterion", @@ -7326,7 +7326,7 @@ dependencies = [ [[package]] name = "restate-rocksdb" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "bytes", @@ -7355,7 +7355,7 @@ dependencies = [ [[package]] name = "restate-serde-util" -version = "1.5.0" +version = "1.5.1" dependencies = [ "bytes", "bytesize", @@ -7372,7 +7372,7 @@ dependencies = [ [[package]] name = "restate-server" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "async-trait", @@ -7427,7 +7427,7 @@ dependencies = [ [[package]] name = "restate-service-client" -version = "1.5.0" +version = "1.5.1" dependencies = [ "arc-swap", "aws-config", @@ -7467,7 +7467,7 @@ dependencies = [ [[package]] name = "restate-service-protocol" -version = "1.5.0" +version = "1.5.1" dependencies = [ "bytes", "bytes-utils", @@ -7497,7 +7497,7 @@ dependencies = [ [[package]] name = "restate-service-protocol-v4" -version = "1.5.0" +version = "1.5.1" dependencies = [ "assert2", "bytes", @@ -7523,7 +7523,7 @@ dependencies = [ [[package]] name = "restate-storage-api" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "bytes", @@ -7545,7 +7545,7 @@ dependencies = [ [[package]] name = "restate-storage-query-datafusion" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -7582,7 +7582,7 @@ dependencies = [ [[package]] name = "restate-test-util" -version = "1.5.0" +version = "1.5.1" dependencies = [ "assert2", "bytes", @@ -7597,7 +7597,7 @@ dependencies = [ [[package]] name = "restate-time-util" -version = "1.5.0" +version = "1.5.1" dependencies = [ "jiff", "restate-workspace-hack", @@ -7610,7 +7610,7 @@ dependencies = [ [[package]] name = "restate-timer" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "futures-util", @@ -7629,7 +7629,7 @@ dependencies = [ [[package]] name = "restate-timer-queue" -version = "1.5.0" +version = "1.5.1" dependencies = [ "futures", "restate-workspace-hack", @@ -7638,7 +7638,7 @@ dependencies = [ [[package]] name = "restate-tracing-instrumentation" -version = "1.5.0" +version = "1.5.1" dependencies = [ "arc-swap", "console-subscriber", @@ -7671,7 +7671,7 @@ dependencies = [ [[package]] name = "restate-types" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -7764,7 +7764,7 @@ dependencies = [ [[package]] name = "restate-utoipa" -version = "1.5.0" +version = "1.5.1" dependencies = [ "assert-json-diff", "indexmap 2.10.0", @@ -7776,7 +7776,7 @@ dependencies = [ [[package]] name = "restate-wal-protocol" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "bilrost", @@ -7806,7 +7806,7 @@ dependencies = [ [[package]] name = "restate-worker" -version = "1.5.0" +version = "1.5.1" dependencies = [ "ahash", "anyhow", @@ -8012,7 +8012,7 @@ dependencies = [ [[package]] name = "restatectl" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "arrow", @@ -8515,7 +8515,7 @@ dependencies = [ [[package]] name = "service-protocol-wireshark-dissector" -version = "1.5.0" +version = "1.5.1" dependencies = [ "bytes", "mlua", @@ -10488,7 +10488,7 @@ checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" [[package]] name = "xtask" -version = "1.5.0" +version = "1.5.1" dependencies = [ "anyhow", "reqwest", From 57ec55122d09ddf3f8cadab2c042159162328bc9 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 6 Oct 2025 18:50:50 +0200 Subject: [PATCH 2/5] Add routing header feature to deployment registration. This lets you use a specific header as "discriminator" when registering a service. --- cli/src/commands/deployments/register.rs | 20 +- crates/admin-rest-model/src/deployments.rs | 22 ++- crates/admin/src/rest_api/deployments.rs | 19 +- crates/admin/src/schema_registry/mod.rs | 5 +- crates/serde-util/src/header_name.rs | 37 ++++ crates/serde-util/src/lib.rs | 2 + .../types/src/schema/metadata/updater/mod.rs | 20 +- .../src/schema/metadata/updater/tests.rs | 173 ++++++++++++++++-- 8 files changed, 272 insertions(+), 26 deletions(-) create mode 100644 crates/serde-util/src/header_name.rs diff --git a/cli/src/commands/deployments/register.rs b/cli/src/commands/deployments/register.rs index 5ccdc420ec..561083516a 100644 --- a/cli/src/commands/deployments/register.rs +++ b/cli/src/commands/deployments/register.rs @@ -18,7 +18,7 @@ use comfy_table::Table; use http::{HeaderName, HeaderValue, StatusCode, Uri}; use indicatif::ProgressBar; -use restate_admin_rest_model::deployments::RegisterDeploymentRequest; +use restate_admin_rest_model::deployments::{Header, RegisterDeploymentRequest}; use restate_cli_util::ui::console::{Styled, StyledTable, confirm_or_exit}; use restate_cli_util::ui::stylesheet::Style; use restate_cli_util::{c_eprintln, c_error, c_indent_table, c_indentln, c_success, c_warn}; @@ -48,11 +48,20 @@ pub struct Register { assume_role_arn: Option, /// Additional header that will be sent to the endpoint during the discovery request. + /// You typically want to include here API keys and other tokens required to send requests to deployments. /// /// Use `--extra-header name=value` format and repeat --extra-header for each additional header. #[clap(long="extra-header", value_parser = parse_header, action = clap::ArgAction::Append)] extra_headers: Option>, + /// Header used for routing to a specific deployment. + /// If the load balancer between restate-server and your deployments uses a specific header to route, + /// you should set this as the routing header, as it will be used to distinguish this deployment with other deployments with the same URL. + /// + /// Use `--routing-header name=value` format. + #[clap(long="routing-header", value_parser = parse_header)] + routing_header: Option, + /// Attempt discovery using a client that defaults to HTTP1.1 instead of a prior-knowledge HTTP2 client. /// This may be necessary if you see `META0014` discovering local dev servers like `wrangler dev`. #[clap(long = "use-http1.1")] @@ -131,6 +140,14 @@ pub async fn run_register(State(env): State, discover_opts: &Register) - let headers = discover_opts.extra_headers.as_ref().map(|headers| { HashMap::from_iter(headers.iter().map(|kv| (kv.key.clone(), kv.value.clone()))) }); + let routing_header = + discover_opts + .routing_header + .as_ref() + .map(|HeaderKeyValue { key, value }| Header { + key: key.clone(), + value: value.clone(), + }); // Preparing the discovery request let client = AdminClient::new(&env).await?; @@ -189,6 +206,7 @@ pub async fn run_register(State(env): State, discover_opts: &Register) - let mk_request_body = |force, dry_run| match &deployment { DeploymentEndpoint::Uri(uri) => RegisterDeploymentRequest::Http { uri: uri.clone(), + routing_header: routing_header.clone(), additional_headers: headers.clone().map(Into::into), use_http_11: discover_opts.use_http_11, force, diff --git a/crates/admin-rest-model/src/deployments.rs b/crates/admin-rest-model/src/deployments.rs index 58353f4538..3911130fc7 100644 --- a/crates/admin-rest-model/src/deployments.rs +++ b/crates/admin-rest-model/src/deployments.rs @@ -8,8 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use http::Uri; use http::Version; +use http::{HeaderName, HeaderValue, Uri}; use restate_serde_util::SerdeableHeaderHashMap; use restate_types::identifiers::ServiceRevision; use restate_types::identifiers::{DeploymentId, LambdaARN}; @@ -18,6 +18,18 @@ use restate_types::schema::service::ServiceMetadata; use serde::{Deserialize, Serialize}; use serde_with::serde_as; +#[serde_as] +#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Header { + #[serde_as(as = "restate_serde_util::HeaderNameSerde")] + #[cfg_attr(feature = "schema", schemars(with = "String"))] + pub key: HeaderName, + #[serde_as(as = "restate_serde_util::HeaderValueSerde")] + #[cfg_attr(feature = "schema", schemars(with = "String"))] + pub value: HeaderValue, +} + // This enum could be a struct with a nested enum to avoid repeating some fields, but serde(flatten) unfortunately breaks the openapi code generation #[serde_as] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] @@ -39,10 +51,18 @@ pub enum RegisterDeploymentRequest { #[cfg_attr(feature = "schema", schemars(with = "String"))] uri: Uri, + /// # Routing header + /// + /// Header used for routing to a specific deployment. + /// If the load balancer between restate-server and your deployments uses a specific header to route, + /// you should set this as the routing header, as it will be used to distinguish this deployment with other deployments with the same URL. + routing_header: Option
, + /// # Additional headers /// /// Additional headers added to the discover/invoke requests to the deployment. /// + /// You typically want to include here API keys and other tokens required to send requests to deployments. additional_headers: Option, /// # Use http1.1 diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index 5af610a17d..e0cbb4110c 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -10,6 +10,7 @@ use super::error::*; use crate::state::AdminServiceState; +use std::collections::HashMap; use std::time::SystemTime; use axum::Json; @@ -30,7 +31,7 @@ use serde::Deserialize; /// Create deployment and return discovered services. #[openapi( - summary = "Create deployment", + summary = " pubCreate deployment", description = "Create deployment. Restate will invoke the endpoint to gather additional information required for registration, such as the services exposed by the deployment. If the deployment is already registered, this method will fail unless `force` is set to `true`.", operation_id = "create_deployment", tags = "deployment", @@ -49,10 +50,11 @@ pub async fn create_deployment( State(state): State>, #[request_body(required = true)] Json(payload): Json, ) -> Result { - let (discover_endpoint, force, dry_run) = match payload { + let (discover_endpoint, routing_header, force, dry_run) = match payload { RegisterDeploymentRequest::Http { uri, additional_headers, + routing_header, use_http_11, force, dry_run, @@ -69,6 +71,13 @@ pub async fn create_deployment( let is_using_https = uri.scheme().unwrap() == &Scheme::HTTPS; + // Add it as part of the additional headers + let mut additional_headers: HashMap<_, _> = + additional_headers.unwrap_or_default().into(); + if let Some(routing_header) = &routing_header { + additional_headers.insert(routing_header.key.clone(), routing_header.value.clone()); + } + ( DiscoverEndpoint::new( Endpoint::Http( @@ -83,8 +92,9 @@ pub async fn create_deployment( Some(http::Version::HTTP_2) }, ), - additional_headers.unwrap_or_default().into(), + additional_headers, ), + routing_header.map(|h| (h.key, h.value)), force, dry_run, ) @@ -106,6 +116,7 @@ pub async fn create_deployment( ), additional_headers.unwrap_or_default().into(), ), + None, force, dry_run, ), @@ -125,7 +136,7 @@ pub async fn create_deployment( let (deployment, services) = state .schema_registry - .register_deployment(discover_endpoint, force, apply_mode) + .register_deployment(discover_endpoint, routing_header, force, apply_mode) .await .inspect_err(|e| warn_it!(e))?; diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index 93753f4f4d..37c9358f83 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -13,7 +13,7 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::Context; -use http::{HeaderMap, HeaderValue, Uri, uri::PathAndQuery}; +use http::{HeaderMap, HeaderName, HeaderValue, Uri, uri::PathAndQuery}; use tracing::subscriber::NoSubscriber; use tracing::trace; @@ -90,6 +90,7 @@ impl SchemaRegistry { pub async fn register_deployment( &self, discover_endpoint: DiscoverEndpoint, + routing_header: Option<(HeaderName, HeaderValue)>, force: updater::Force, apply_mode: updater::ApplyMode, ) -> Result<(Deployment, Vec), SchemaRegistryError> { @@ -128,6 +129,7 @@ impl SchemaRegistry { let id = tracing::subscriber::with_default(NoSubscriber::new(), || { updater.add_deployment( deployment_metadata, + routing_header, discovered_metadata.services, force.force_enabled(), ) @@ -153,6 +155,7 @@ impl SchemaRegistry { new_deployment_id = Some(updater.add_deployment( deployment_metadata.clone(), + routing_header.clone(), discovered_metadata.services.clone(), force.force_enabled(), )?); diff --git a/crates/serde-util/src/header_name.rs b/crates/serde-util/src/header_name.rs new file mode 100644 index 0000000000..9ad96a1aa5 --- /dev/null +++ b/crates/serde-util/src/header_name.rs @@ -0,0 +1,37 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use http::HeaderName; +use serde::Deserialize; +use serde_with::{DeserializeAs, SerializeAs}; +use std::str::FromStr; + +/// SerializeAs/DeserializeAs to implement ser/de trait for [HeaderName] +/// Use it with `#[serde(with = "serde_with::As::")]`. +pub struct HeaderNameSerde; + +impl SerializeAs for HeaderNameSerde { + fn serialize_as(source: &HeaderName, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(source.as_str()) + } +} + +impl<'de> DeserializeAs<'de, HeaderName> for HeaderNameSerde { + fn deserialize_as(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let buf = String::deserialize(deserializer)?; + HeaderName::from_str(&buf).map_err(serde::de::Error::custom) + } +} diff --git a/crates/serde-util/src/lib.rs b/crates/serde-util/src/lib.rs index 41b3c9b492..102b556a1c 100644 --- a/crates/serde-util/src/lib.rs +++ b/crates/serde-util/src/lib.rs @@ -15,12 +15,14 @@ mod proto; pub mod authority; pub mod default; +mod header_name; pub mod header_value; mod map_as_vec; mod version; pub use byte_count::*; pub use header_map::SerdeableHeaderHashMap; +pub use header_name::HeaderNameSerde; pub use header_value::HeaderValueSerde; pub use map_as_vec::{MapAsVec, MapAsVecItem}; #[cfg(feature = "proto")] diff --git a/crates/types/src/schema/metadata/updater/mod.rs b/crates/types/src/schema/metadata/updater/mod.rs index 2ad40ee43d..3c887965e6 100644 --- a/crates/types/src/schema/metadata/updater/mod.rs +++ b/crates/types/src/schema/metadata/updater/mod.rs @@ -27,7 +27,7 @@ use crate::schema::invocation_target::{ use crate::schema::subscriptions::{ EventInvocationTargetTemplate, Sink, Source, Subscription, SubscriptionValidator, }; -use http::{HeaderValue, Uri}; +use http::{HeaderName, HeaderValue, Uri}; use serde_json::Value; use std::borrow::Borrow; use std::collections::HashMap; @@ -344,7 +344,8 @@ impl SchemaUpdater { pub fn add_deployment( &mut self, - deployment_metadata: DeploymentMetadata, + mut deployment_metadata: DeploymentMetadata, + routing_header: Option<(HeaderName, HeaderValue)>, services: Vec, force: bool, ) -> Result { @@ -357,6 +358,15 @@ impl SchemaUpdater { let mut existing_deployments = self.schema.deployments.iter().filter(|(_, schemas)| { schemas.ty.protocol_type() == deployment_metadata.ty.protocol_type() && schemas.ty.normalized_address() == deployment_metadata.ty.normalized_address() + && routing_header.as_ref().is_none_or( + |(routing_header_key, routing_header_value)| { + schemas + .delivery_options + .additional_headers + .get(routing_header_key) + .is_some_and(|v| v == routing_header_value) + }, + ) }); let mut services_to_remove = Vec::default(); @@ -433,6 +443,12 @@ impl SchemaUpdater { drop(existing_deployments); + if let Some((key, value)) = routing_header { + deployment_metadata + .delivery_options + .additional_headers + .insert(key, value); + } self.schema.deployments.insert( deployment_id, Deployment { diff --git a/crates/types/src/schema/metadata/updater/tests.rs b/crates/types/src/schema/metadata/updater/tests.rs index 4d52159f62..f24a9deac0 100644 --- a/crates/types/src/schema/metadata/updater/tests.rs +++ b/crates/types/src/schema/metadata/updater/tests.rs @@ -201,7 +201,12 @@ fn register_new_deployment() { let mut deployment = Deployment::mock(); deployment.id = updater - .add_deployment(deployment.metadata.clone(), vec![greeter_service()], false) + .add_deployment( + deployment.metadata.clone(), + None, + vec![greeter_service()], + false, + ) .unwrap(); let schema = updater.into_inner(); @@ -223,6 +228,7 @@ fn register_new_deployment_add_unregistered_service() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -241,6 +247,7 @@ fn register_new_deployment_add_unregistered_service() { deployment_2.id = updater .add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service(), another_greeter_service()], false, ) @@ -253,14 +260,79 @@ fn register_new_deployment_add_unregistered_service() { schemas.assert_service_revision(ANOTHER_GREETER_SERVICE_NAME, 1); } +#[test] +fn register_new_deployment_discriminate_on_routing_header() { + let deployment = Deployment::mock_with_uri("http://localhost:9080"); + + // Register first deployment + let (deployment_id_1, schema) = + SchemaUpdater::update_and_return(Schema::default(), |updater| { + updater.add_deployment( + deployment.metadata.clone(), + Some(( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("1"), + )), + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_1); + + // Update providing another routing header + let (deployment_id_2, schema) = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + deployment.metadata.clone(), + Some(( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("2"), + )), + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_2); + + // Update providing no routing header -> conflict + let update_result = SchemaUpdater::update_and_return(schema.clone(), |updater| { + updater.add_deployment( + deployment.metadata.clone(), + None, + vec![greeter_service()], + false, + ) + }); + assert!(let SchemaError::Override(_) = update_result.unwrap_err()); + + // Update providing the same routing header-> conflict + let update_result = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + deployment.metadata.clone(), + Some(( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("2"), + )), + vec![greeter_service()], + false, + ) + }); + assert!(let SchemaError::Override(_) = update_result.unwrap_err()); +} + /// This test case ensures that https://github.com/restatedev/restate/issues/1205 works #[test] fn force_deploy_private_service() -> Result<(), SchemaError> { let mut updater = SchemaUpdater::default(); let mut deployment = Deployment::mock(); - deployment.id = - updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], false)?; + deployment.id = updater.add_deployment( + deployment.metadata.clone(), + None, + vec![greeter_service()], + false, + )?; let schemas = updater.into_inner(); @@ -288,8 +360,12 @@ fn force_deploy_private_service() -> Result<(), SchemaError> { ); updater = SchemaUpdater::new(schemas); - deployment.id = - updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], true)?; + deployment.id = updater.add_deployment( + deployment.metadata.clone(), + None, + vec![greeter_service()], + true, + )?; let schemas = updater.into_inner(); assert!(schemas.assert_service(GREETER_SERVICE_NAME).public); @@ -318,6 +394,7 @@ mod change_service_type { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -328,6 +405,7 @@ mod change_service_type { let compute_result = SchemaUpdater::new(schemas).add_deployment( deployment_2.metadata, + None, vec![greeter_virtual_object()], false, ); @@ -347,6 +425,7 @@ mod change_service_type { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -356,7 +435,12 @@ mod change_service_type { updater = SchemaUpdater::new(schemas); deployment_2.id = updater - .add_deployment(deployment_2.metadata, vec![greeter_virtual_object()], true) + .add_deployment( + deployment_2.metadata, + None, + vec![greeter_virtual_object()], + true, + ) .unwrap(); let schemas = updater.into_inner(); schemas.assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); @@ -377,6 +461,7 @@ mod change_service_type { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -386,7 +471,7 @@ mod change_service_type { updater = SchemaUpdater::new(schemas); deployment_2.id = updater - .add_deployment(deployment_2.metadata, vec![greeter_workflow()], true) + .add_deployment(deployment_2.metadata, None, vec![greeter_workflow()], true) .unwrap(); let schemas = updater.into_inner(); schemas.assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); @@ -406,6 +491,7 @@ fn override_existing_deployment_removing_a_service() { let (deployment_id, schemas) = SchemaUpdater::update_and_return(Schema::default(), |updater| { updater.add_deployment( deployment.metadata.clone(), + None, vec![greeter_service(), another_greeter_service()], false, ) @@ -417,7 +503,12 @@ fn override_existing_deployment_removing_a_service() { schemas.assert_service_deployment(ANOTHER_GREETER_SERVICE_NAME, deployment.id); let (deployment_id, schemas) = SchemaUpdater::update_and_return(schemas, |updater| { - updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], true) + updater.add_deployment( + deployment.metadata.clone(), + None, + vec![greeter_service()], + true, + ) }) .unwrap(); assert_eq!(deployment_id, deployment.id); @@ -436,11 +527,17 @@ fn cannot_override_existing_deployment_endpoint_conflict() { let mut deployment = Deployment::mock(); deployment.id = updater - .add_deployment(deployment.metadata.clone(), vec![greeter_service()], false) + .add_deployment( + deployment.metadata.clone(), + None, + vec![greeter_service()], + false, + ) .unwrap(); assert!(let SchemaError::Override(_) = updater.add_deployment( deployment.metadata, + None, vec![greeter_service()], false).unwrap_err() ); @@ -455,6 +552,7 @@ fn register_two_deployments_then_remove_first() { SchemaUpdater::update_and_return(Schema::default(), |updater| { updater.add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service(), another_greeter_service()], false, ) @@ -465,6 +563,7 @@ fn register_two_deployments_then_remove_first() { let (deployment_id_2, schemas) = SchemaUpdater::update_and_return(schemas, |updater| { updater.add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -504,6 +603,7 @@ fn register_two_deployments_then_remove_second() { SchemaUpdater::update_and_return(Schema::default(), |updater| { updater.add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service(), another_greeter_service()], false, ) @@ -514,6 +614,7 @@ fn register_two_deployments_then_remove_second() { let (deployment_id_2, schemas) = SchemaUpdater::update_and_return(schemas, |updater| { updater.add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -657,14 +758,24 @@ mod remove_handler { let deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); deployment_1.id = updater - .add_deployment(deployment_1.metadata, vec![greeter_v1_service()], false) + .add_deployment( + deployment_1.metadata, + None, + vec![greeter_v1_service()], + false, + ) .unwrap(); let schemas = updater.into_inner(); schemas.assert_service_revision(GREETER_SERVICE_NAME, 1); updater = SchemaUpdater::new(schemas); let rejection = updater - .add_deployment(deployment_2.metadata, vec![greeter_v2_service()], false) + .add_deployment( + deployment_2.metadata, + None, + vec![greeter_v2_service()], + false, + ) .unwrap_err(); let schemas = updater.into_inner(); @@ -688,6 +799,7 @@ fn update_latest_deployment() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_virtual_object()], false, ) @@ -774,6 +886,7 @@ fn update_draining_deployment() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -782,6 +895,7 @@ fn update_draining_deployment() { deployment_2.id = updater .add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service(), another_greeter_service()], false, ) @@ -829,6 +943,7 @@ fn update_deployment_same_uri() { SchemaUpdater::update_and_return(Schema::default(), |updater| { updater.add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -840,6 +955,7 @@ fn update_deployment_same_uri() { let (deployment_id_2, schemas) = SchemaUpdater::update_and_return(schemas, |updater| { updater.add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -863,7 +979,7 @@ fn update_deployment_same_uri() { assert!(let &SchemaError::Deployment( DeploymentError::MultipleExistingDeployments(_) ) = SchemaUpdater::update(schemas.clone(), |updater| updater.add_deployment( - deployment_1.metadata.clone(), + deployment_1.metadata.clone(),None, vec![greeter_service(), greeter_virtual_object()], true, ).map(|_| ())).unwrap_err()); @@ -871,7 +987,7 @@ fn update_deployment_same_uri() { assert!(let &SchemaError::Deployment( DeploymentError::MultipleExistingDeployments(_) ) = SchemaUpdater::update(schemas.clone(), |updater| updater.add_deployment( - deployment_2.metadata.clone(), + deployment_2.metadata.clone(),None, vec![greeter_service(), greeter_virtual_object()], true, ).map(|_| ())).unwrap_err()); @@ -899,6 +1015,7 @@ fn update_deployment_same_uri() { deployment_2.id, SchemaUpdater::update_and_return(schemas, |updater| updater.add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service(), greeter_virtual_object()], true, )) @@ -916,6 +1033,7 @@ fn update_latest_deployment_add_handler() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -986,6 +1104,7 @@ fn update_draining_deployment_add_handler() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -994,6 +1113,7 @@ fn update_draining_deployment_add_handler() { deployment_2.id = updater .add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -1078,6 +1198,7 @@ fn update_latest_deployment_add_service() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -1115,6 +1236,7 @@ fn update_draining_deployment_add_service() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -1123,6 +1245,7 @@ fn update_draining_deployment_add_service() { deployment_2.id = updater .add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service()], false, ) @@ -1155,8 +1278,12 @@ fn update_deployment_with_private_service() -> Result<(), SchemaError> { let mut updater = SchemaUpdater::default(); let mut deployment = Deployment::mock(); - deployment.id = - updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], false)?; + deployment.id = updater.add_deployment( + deployment.metadata.clone(), + None, + vec![greeter_service()], + false, + )?; let schemas = updater.into_inner(); @@ -1224,7 +1351,7 @@ mod endpoint_manifest_options_propagation { let mut deployment = Deployment::mock(); let (deployment_id, schema) = SchemaUpdater::update_and_return(Schema::default(), move |updater| { - updater.add_deployment(deployment.metadata.clone(), vec![svc], false) + updater.add_deployment(deployment.metadata.clone(), None, vec![svc], false) }) .unwrap(); deployment.id = deployment_id; @@ -1254,6 +1381,7 @@ mod endpoint_manifest_options_propagation { updater .add_deployment( Deployment::mock().metadata, + None, vec![endpoint_manifest::Service { // Mock two handlers, one explicitly private, the other just default settings handlers: vec![ @@ -1292,6 +1420,7 @@ mod endpoint_manifest_options_propagation { assert_that!( updater.add_deployment( deployment.metadata.clone(), + None, vec![endpoint_manifest::Service { ingress_private: Some(true), handlers: vec![endpoint_manifest::Handler { @@ -1315,6 +1444,7 @@ mod endpoint_manifest_options_propagation { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( Deployment::mock().metadata.clone(), + None, vec![endpoint_manifest::Service { handlers: vec![endpoint_manifest::Handler { workflow_completion_retention: Some(30 * 1000), @@ -1555,6 +1685,7 @@ mod endpoint_manifest_options_propagation { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( deployment.metadata.clone(), + None, vec![endpoint_manifest::Service { journal_retention: Some(60 * 1000), // A = 60 seconds ..greeter_service() @@ -1616,6 +1747,7 @@ mod endpoint_manifest_options_propagation { let (deployment_id, schema) = SchemaUpdater::update_and_return(schema, move |updater| { updater.add_deployment( deployment.metadata.clone(), + None, vec![endpoint_manifest::Service { journal_retention: Some(60 * 1000), // A = 60 seconds again ..greeter_service() @@ -1693,6 +1825,7 @@ mod endpoint_manifest_options_propagation { let (deployment_id, schema) = SchemaUpdater::update_and_return(schema, move |updater| { updater.add_deployment( deployment.metadata.clone(), + None, vec![endpoint_manifest::Service { journal_retention: None, ..greeter_service() @@ -1789,6 +1922,7 @@ mod endpoint_manifest_options_propagation { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( deployment.metadata.clone(), + None, vec![endpoint_manifest::Service { journal_retention: Some(120 * 1000), // Service sets 120 seconds handlers: vec![endpoint_manifest::Handler { @@ -1943,7 +2077,7 @@ mod endpoint_manifest_options_propagation { let mut deployment = Deployment::mock(); deployment.id = updater - .add_deployment(deployment.metadata.clone(), vec![svc], false) + .add_deployment(deployment.metadata.clone(), None, vec![svc], false) .unwrap(); let schema = updater.into_inner(); @@ -2026,6 +2160,7 @@ mod modify_service { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( Deployment::mock().metadata.clone(), + None, vec![greeter_workflow()], false, ) @@ -2093,6 +2228,7 @@ mod modify_service { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( Deployment::mock().metadata.clone(), + None, vec![greeter_service()], false, ) @@ -2151,6 +2287,7 @@ mod modify_service { SchemaUpdater::update_and_return(schema, move |updater| { updater.add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_service()], // identical service definition false, ) @@ -2188,6 +2325,7 @@ mod modify_service { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( Deployment::mock().metadata.clone(), + None, vec![greeter_workflow()], false, ) @@ -2270,6 +2408,7 @@ mod modify_service { SchemaUpdater::update_and_return(schema, move |updater| { updater.add_deployment( deployment_2.metadata.clone(), + None, vec![greeter_workflow()], // identical service definition false, ) From 47a923a2ffd01ba403f0d34781c79988ec7591a1 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 7 Oct 2025 12:40:43 +0200 Subject: [PATCH 3/5] Remove multiple deployments check for existing_deployment. This indeed was problematic, more in the code comment. --- .../types/src/schema/metadata/updater/mod.rs | 55 +++++++++---------- .../src/schema/metadata/updater/tests.rs | 39 +++++++------ 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/crates/types/src/schema/metadata/updater/mod.rs b/crates/types/src/schema/metadata/updater/mod.rs index 3c887965e6..7190a2f688 100644 --- a/crates/types/src/schema/metadata/updater/mod.rs +++ b/crates/types/src/schema/metadata/updater/mod.rs @@ -355,38 +355,39 @@ impl SchemaUpdater { .collect::, _>>()?; // Did we find an existing deployment with a conflicting endpoint url? - let mut existing_deployments = self.schema.deployments.iter().filter(|(_, schemas)| { - schemas.ty.protocol_type() == deployment_metadata.ty.protocol_type() - && schemas.ty.normalized_address() == deployment_metadata.ty.normalized_address() - && routing_header.as_ref().is_none_or( - |(routing_header_key, routing_header_value)| { - schemas - .delivery_options - .additional_headers - .get(routing_header_key) - .is_some_and(|v| v == routing_header_value) - }, - ) - }); + let existing_deployment = self + .schema + .deployments + .iter() + .filter(|(_, schemas)| { + schemas.ty.protocol_type() == deployment_metadata.ty.protocol_type() + && schemas.ty.normalized_address() + == deployment_metadata.ty.normalized_address() + && routing_header.as_ref().is_none_or( + |(routing_header_key, routing_header_value)| { + schemas + .delivery_options + .additional_headers + .get(routing_header_key) + .is_some_and(|v| v == routing_header_value) + }, + ) + }) + // There are few situations where we might have multiple deployments for the same endpoint: + // * If the user specified in the at least two previous registrations the routing-header, + // but then it didn't specify it in this one. In this case, multiple deployments will match the above filter + // * If update_deployment was used on at least one deployment, pointing to the same address of another deployment, + // resulting in having two deployments pointing at the same address. + // + // We pick max_by created_at, because with force the user wants to override the last deployment version, and not old ones. + .max_by(|(_, x), (_, y)| x.created_at.cmp(&y.created_at)); let mut services_to_remove = Vec::default(); let deployment_id = if let Some((existing_deployment_id, existing_deployment)) = - existing_deployments.next() + existing_deployment { if force { - // Even under force we will only accept exactly one existing deployment with this endpoint - if let Some((another_existing_deployment_id, _)) = existing_deployments.next() { - let mut existing_deployment_ids = - vec![*existing_deployment_id, *another_existing_deployment_id]; - existing_deployment_ids - .extend(existing_deployments.map(|(deployment_id, _)| *deployment_id)); - - return Err(SchemaError::Deployment( - DeploymentError::MultipleExistingDeployments(existing_deployment_ids), - )); - } - for service in existing_deployment.services.values() { // If a service is not available anymore in the new deployment, we need to remove it if !proposed_services.contains_key(&service.name) { @@ -441,8 +442,6 @@ impl SchemaUpdater { computed_services.insert(service_name.to_string(), Arc::new(new_service_revision)); } - drop(existing_deployments); - if let Some((key, value)) = routing_header { deployment_metadata .delivery_options diff --git a/crates/types/src/schema/metadata/updater/tests.rs b/crates/types/src/schema/metadata/updater/tests.rs index f24a9deac0..fbfe9a62c9 100644 --- a/crates/types/src/schema/metadata/updater/tests.rs +++ b/crates/types/src/schema/metadata/updater/tests.rs @@ -307,7 +307,7 @@ fn register_new_deployment_discriminate_on_routing_header() { assert!(let SchemaError::Override(_) = update_result.unwrap_err()); // Update providing the same routing header-> conflict - let update_result = SchemaUpdater::update_and_return(schema, |updater| { + let update_result = SchemaUpdater::update_and_return(schema.clone(), |updater| { updater.add_deployment( deployment.metadata.clone(), Some(( @@ -319,6 +319,26 @@ fn register_new_deployment_discriminate_on_routing_header() { ) }); assert!(let SchemaError::Override(_) = update_result.unwrap_err()); + + // Force with same routing header -> all good + let (expected_dp_id_2, schema) = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + deployment.metadata.clone(), + Some(( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("2"), + )), + vec![greeter_virtual_object()], + true, + ) + }) + .unwrap(); + assert_eq!(expected_dp_id_2, deployment_id_2); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_2); + assert_eq!( + schema.assert_service(GREETER_SERVICE_NAME).ty, + ServiceType::VirtualObject + ); } /// This test case ensures that https://github.com/restatedev/restate/issues/1205 works @@ -975,23 +995,6 @@ fn update_deployment_same_uri() { }) .unwrap(); - // there are now two deployment IDs pointing to :9081, so we shouldn't be able to force either of them - assert!(let &SchemaError::Deployment( - DeploymentError::MultipleExistingDeployments(_) - ) = SchemaUpdater::update(schemas.clone(), |updater| updater.add_deployment( - deployment_1.metadata.clone(),None, - vec![greeter_service(), greeter_virtual_object()], - true, - ).map(|_| ())).unwrap_err()); - - assert!(let &SchemaError::Deployment( - DeploymentError::MultipleExistingDeployments(_) - ) = SchemaUpdater::update(schemas.clone(), |updater| updater.add_deployment( - deployment_2.metadata.clone(),None, - vec![greeter_service(), greeter_virtual_object()], - true, - ).map(|_| ())).unwrap_err()); - // Latest should remain deployment_2 schemas.assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); schemas.assert_service_revision(GREETER_SERVICE_NAME, 2); From 4680ec4fc28b2ec7220db50d6947cd8995287471 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 7 Oct 2025 15:53:50 +0200 Subject: [PATCH 4/5] Change routing header feature. Accept the key of the routing header via configuration. This makes the behavior more consistent. --- cli/src/commands/deployments/register.rs | 20 +- crates/admin-rest-model/src/deployments.rs | 21 +- crates/admin/src/rest_api/deployments.rs | 19 +- crates/admin/src/schema_registry/mod.rs | 5 +- crates/types/src/config/admin.rs | 16 +- .../types/src/schema/metadata/updater/mod.rs | 26 +- .../src/schema/metadata/updater/tests.rs | 458 +++++++++++------- 7 files changed, 325 insertions(+), 240 deletions(-) diff --git a/cli/src/commands/deployments/register.rs b/cli/src/commands/deployments/register.rs index 561083516a..5ccdc420ec 100644 --- a/cli/src/commands/deployments/register.rs +++ b/cli/src/commands/deployments/register.rs @@ -18,7 +18,7 @@ use comfy_table::Table; use http::{HeaderName, HeaderValue, StatusCode, Uri}; use indicatif::ProgressBar; -use restate_admin_rest_model::deployments::{Header, RegisterDeploymentRequest}; +use restate_admin_rest_model::deployments::RegisterDeploymentRequest; use restate_cli_util::ui::console::{Styled, StyledTable, confirm_or_exit}; use restate_cli_util::ui::stylesheet::Style; use restate_cli_util::{c_eprintln, c_error, c_indent_table, c_indentln, c_success, c_warn}; @@ -48,20 +48,11 @@ pub struct Register { assume_role_arn: Option, /// Additional header that will be sent to the endpoint during the discovery request. - /// You typically want to include here API keys and other tokens required to send requests to deployments. /// /// Use `--extra-header name=value` format and repeat --extra-header for each additional header. #[clap(long="extra-header", value_parser = parse_header, action = clap::ArgAction::Append)] extra_headers: Option>, - /// Header used for routing to a specific deployment. - /// If the load balancer between restate-server and your deployments uses a specific header to route, - /// you should set this as the routing header, as it will be used to distinguish this deployment with other deployments with the same URL. - /// - /// Use `--routing-header name=value` format. - #[clap(long="routing-header", value_parser = parse_header)] - routing_header: Option, - /// Attempt discovery using a client that defaults to HTTP1.1 instead of a prior-knowledge HTTP2 client. /// This may be necessary if you see `META0014` discovering local dev servers like `wrangler dev`. #[clap(long = "use-http1.1")] @@ -140,14 +131,6 @@ pub async fn run_register(State(env): State, discover_opts: &Register) - let headers = discover_opts.extra_headers.as_ref().map(|headers| { HashMap::from_iter(headers.iter().map(|kv| (kv.key.clone(), kv.value.clone()))) }); - let routing_header = - discover_opts - .routing_header - .as_ref() - .map(|HeaderKeyValue { key, value }| Header { - key: key.clone(), - value: value.clone(), - }); // Preparing the discovery request let client = AdminClient::new(&env).await?; @@ -206,7 +189,6 @@ pub async fn run_register(State(env): State, discover_opts: &Register) - let mk_request_body = |force, dry_run| match &deployment { DeploymentEndpoint::Uri(uri) => RegisterDeploymentRequest::Http { uri: uri.clone(), - routing_header: routing_header.clone(), additional_headers: headers.clone().map(Into::into), use_http_11: discover_opts.use_http_11, force, diff --git a/crates/admin-rest-model/src/deployments.rs b/crates/admin-rest-model/src/deployments.rs index 3911130fc7..204c3d238f 100644 --- a/crates/admin-rest-model/src/deployments.rs +++ b/crates/admin-rest-model/src/deployments.rs @@ -8,8 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use http::Uri; use http::Version; -use http::{HeaderName, HeaderValue, Uri}; use restate_serde_util::SerdeableHeaderHashMap; use restate_types::identifiers::ServiceRevision; use restate_types::identifiers::{DeploymentId, LambdaARN}; @@ -18,18 +18,6 @@ use restate_types::schema::service::ServiceMetadata; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -#[serde_as] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Header { - #[serde_as(as = "restate_serde_util::HeaderNameSerde")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] - pub key: HeaderName, - #[serde_as(as = "restate_serde_util::HeaderValueSerde")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] - pub value: HeaderValue, -} - // This enum could be a struct with a nested enum to avoid repeating some fields, but serde(flatten) unfortunately breaks the openapi code generation #[serde_as] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] @@ -51,13 +39,6 @@ pub enum RegisterDeploymentRequest { #[cfg_attr(feature = "schema", schemars(with = "String"))] uri: Uri, - /// # Routing header - /// - /// Header used for routing to a specific deployment. - /// If the load balancer between restate-server and your deployments uses a specific header to route, - /// you should set this as the routing header, as it will be used to distinguish this deployment with other deployments with the same URL. - routing_header: Option
, - /// # Additional headers /// /// Additional headers added to the discover/invoke requests to the deployment. diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index e0cbb4110c..5af610a17d 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -10,7 +10,6 @@ use super::error::*; use crate::state::AdminServiceState; -use std::collections::HashMap; use std::time::SystemTime; use axum::Json; @@ -31,7 +30,7 @@ use serde::Deserialize; /// Create deployment and return discovered services. #[openapi( - summary = " pubCreate deployment", + summary = "Create deployment", description = "Create deployment. Restate will invoke the endpoint to gather additional information required for registration, such as the services exposed by the deployment. If the deployment is already registered, this method will fail unless `force` is set to `true`.", operation_id = "create_deployment", tags = "deployment", @@ -50,11 +49,10 @@ pub async fn create_deployment( State(state): State>, #[request_body(required = true)] Json(payload): Json, ) -> Result { - let (discover_endpoint, routing_header, force, dry_run) = match payload { + let (discover_endpoint, force, dry_run) = match payload { RegisterDeploymentRequest::Http { uri, additional_headers, - routing_header, use_http_11, force, dry_run, @@ -71,13 +69,6 @@ pub async fn create_deployment( let is_using_https = uri.scheme().unwrap() == &Scheme::HTTPS; - // Add it as part of the additional headers - let mut additional_headers: HashMap<_, _> = - additional_headers.unwrap_or_default().into(); - if let Some(routing_header) = &routing_header { - additional_headers.insert(routing_header.key.clone(), routing_header.value.clone()); - } - ( DiscoverEndpoint::new( Endpoint::Http( @@ -92,9 +83,8 @@ pub async fn create_deployment( Some(http::Version::HTTP_2) }, ), - additional_headers, + additional_headers.unwrap_or_default().into(), ), - routing_header.map(|h| (h.key, h.value)), force, dry_run, ) @@ -116,7 +106,6 @@ pub async fn create_deployment( ), additional_headers.unwrap_or_default().into(), ), - None, force, dry_run, ), @@ -136,7 +125,7 @@ pub async fn create_deployment( let (deployment, services) = state .schema_registry - .register_deployment(discover_endpoint, routing_header, force, apply_mode) + .register_deployment(discover_endpoint, force, apply_mode) .await .inspect_err(|e| warn_it!(e))?; diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index 37c9358f83..93753f4f4d 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -13,7 +13,7 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::Context; -use http::{HeaderMap, HeaderName, HeaderValue, Uri, uri::PathAndQuery}; +use http::{HeaderMap, HeaderValue, Uri, uri::PathAndQuery}; use tracing::subscriber::NoSubscriber; use tracing::trace; @@ -90,7 +90,6 @@ impl SchemaRegistry { pub async fn register_deployment( &self, discover_endpoint: DiscoverEndpoint, - routing_header: Option<(HeaderName, HeaderValue)>, force: updater::Force, apply_mode: updater::ApplyMode, ) -> Result<(Deployment, Vec), SchemaRegistryError> { @@ -129,7 +128,6 @@ impl SchemaRegistry { let id = tracing::subscriber::with_default(NoSubscriber::new(), || { updater.add_deployment( deployment_metadata, - routing_header, discovered_metadata.services, force.force_enabled(), ) @@ -155,7 +153,6 @@ impl SchemaRegistry { new_deployment_id = Some(updater.add_deployment( deployment_metadata.clone(), - routing_header.clone(), discovered_metadata.services.clone(), force.force_enabled(), )?); diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index 3cf8f6bbe6..af9084145b 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -13,7 +13,7 @@ use std::num::NonZeroUsize; use std::path::PathBuf; use std::time::Duration; -use http::Uri; +use http::{HeaderName, Uri}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use tokio::sync::Semaphore; @@ -42,6 +42,19 @@ pub struct AdminOptions { #[cfg_attr(feature = "schemars", schemars(with = "String", url))] pub advertised_admin_endpoint: Option, + /// # Deployment routing headers + /// + /// List of header names considered routing headers. + /// + /// These will be used during deployment creation to distinguish between an already existing deployment and a new deployment. + #[serde( + default, + skip_serializing_if = "Vec::is_empty", + with = "serde_with::As::>" + )] + #[cfg_attr(feature = "schemars", schemars(with = "Vec"))] + pub deployment_routing_headers: Vec, + /// # Concurrency limit /// /// Concurrency limit for the Admin APIs. Default is unlimited. @@ -130,6 +143,7 @@ impl Default for AdminOptions { bind_address: "0.0.0.0:9070".parse().unwrap(), advertised_admin_endpoint: None, // max is limited by Tower's LoadShedLayer. + deployment_routing_headers: vec![], concurrent_api_requests_limit: None, query_engine: Default::default(), heartbeat_interval: NonZeroFriendlyDuration::from_millis_unchecked(1500), diff --git a/crates/types/src/schema/metadata/updater/mod.rs b/crates/types/src/schema/metadata/updater/mod.rs index 7190a2f688..de631e3cd9 100644 --- a/crates/types/src/schema/metadata/updater/mod.rs +++ b/crates/types/src/schema/metadata/updater/mod.rs @@ -27,7 +27,7 @@ use crate::schema::invocation_target::{ use crate::schema::subscriptions::{ EventInvocationTargetTemplate, Sink, Source, Subscription, SubscriptionValidator, }; -use http::{HeaderName, HeaderValue, Uri}; +use http::{HeaderValue, Uri}; use serde_json::Value; use std::borrow::Borrow; use std::collections::HashMap; @@ -344,8 +344,7 @@ impl SchemaUpdater { pub fn add_deployment( &mut self, - mut deployment_metadata: DeploymentMetadata, - routing_header: Option<(HeaderName, HeaderValue)>, + deployment_metadata: DeploymentMetadata, services: Vec, force: bool, ) -> Result { @@ -354,6 +353,8 @@ impl SchemaUpdater { .map(|c| ServiceName::try_from(c.name.to_string()).map(|name| (name, c))) .collect::, _>>()?; + let deployment_routing_headers = &Configuration::pinned().admin.deployment_routing_headers; + // Did we find an existing deployment with a conflicting endpoint url? let existing_deployment = self .schema @@ -363,15 +364,16 @@ impl SchemaUpdater { schemas.ty.protocol_type() == deployment_metadata.ty.protocol_type() && schemas.ty.normalized_address() == deployment_metadata.ty.normalized_address() - && routing_header.as_ref().is_none_or( - |(routing_header_key, routing_header_value)| { - schemas + && deployment_routing_headers.iter().all(|routing_header_key| { + deployment_metadata + .delivery_options + .additional_headers + .get(routing_header_key) + == schemas .delivery_options .additional_headers .get(routing_header_key) - .is_some_and(|v| v == routing_header_value) - }, - ) + }) }) // There are few situations where we might have multiple deployments for the same endpoint: // * If the user specified in the at least two previous registrations the routing-header, @@ -442,12 +444,6 @@ impl SchemaUpdater { computed_services.insert(service_name.to_string(), Arc::new(new_service_revision)); } - if let Some((key, value)) = routing_header { - deployment_metadata - .delivery_options - .additional_headers - .insert(key, value); - } self.schema.deployments.insert( deployment_id, Deployment { diff --git a/crates/types/src/schema/metadata/updater/tests.rs b/crates/types/src/schema/metadata/updater/tests.rs index fbfe9a62c9..3da518dffe 100644 --- a/crates/types/src/schema/metadata/updater/tests.rs +++ b/crates/types/src/schema/metadata/updater/tests.rs @@ -11,15 +11,14 @@ use super::*; use std::convert::Infallible; -use http::HeaderName; -use restate_test_util::{assert, assert_eq}; -use test_log::test; - use crate::Versioned; -use crate::schema::deployment::Deployment; use crate::schema::deployment::DeploymentResolver; +use crate::schema::deployment::{DeliveryOptions, Deployment}; use crate::schema::invocation_target::InvocationTargetResolver; use crate::schema::service::ServiceMetadataResolver; +use http::HeaderName; +use restate_test_util::{assert, assert_eq}; +use test_log::test; const GREETER_SERVICE_NAME: &str = "greeter.Greeter"; const GREET_HANDLER_NAME: &str = "greet"; @@ -201,12 +200,7 @@ fn register_new_deployment() { let mut deployment = Deployment::mock(); deployment.id = updater - .add_deployment( - deployment.metadata.clone(), - None, - vec![greeter_service()], - false, - ) + .add_deployment(deployment.metadata.clone(), vec![greeter_service()], false) .unwrap(); let schema = updater.into_inner(); @@ -228,7 +222,6 @@ fn register_new_deployment_add_unregistered_service() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -247,7 +240,6 @@ fn register_new_deployment_add_unregistered_service() { deployment_2.id = updater .add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service(), another_greeter_service()], false, ) @@ -260,85 +252,290 @@ fn register_new_deployment_add_unregistered_service() { schemas.assert_service_revision(ANOTHER_GREETER_SERVICE_NAME, 1); } -#[test] -fn register_new_deployment_discriminate_on_routing_header() { - let deployment = Deployment::mock_with_uri("http://localhost:9080"); +mod routing_header { + use super::*; - // Register first deployment - let (deployment_id_1, schema) = - SchemaUpdater::update_and_return(Schema::default(), |updater| { + use restate_test_util::{assert, assert_eq}; + use test_log::test; + + #[test] + fn register_new_deployment_with_routing_header_first() { + let mut config = Configuration::default(); + config.admin.deployment_routing_headers = vec![HeaderName::from_static("x-routing")]; + crate::config::set_current_config(config); + + let deployment = Deployment::mock_with_uri("http://localhost:9080"); + + // Register first deployment + let (deployment_id_1, schema) = + SchemaUpdater::update_and_return(Schema::default(), |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("1"), + )] + .into(), + ), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_1); + + // Update providing another routing header + let (deployment_id_2, schema) = SchemaUpdater::update_and_return(schema, |updater| { updater.add_deployment( - deployment.metadata.clone(), - Some(( - HeaderName::from_static("x-routing"), - HeaderValue::from_static("1"), - )), + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("2"), + )] + .into(), + ), + created_at: (deployment.metadata.created_at.as_u64() + 1).into(), + ..deployment.metadata.clone() + }, vec![greeter_service()], false, ) }) .unwrap(); - schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_1); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_2); - // Update providing another routing header - let (deployment_id_2, schema) = SchemaUpdater::update_and_return(schema, |updater| { - updater.add_deployment( - deployment.metadata.clone(), - Some(( - HeaderName::from_static("x-routing"), - HeaderValue::from_static("2"), - )), - vec![greeter_service()], - false, - ) - }) - .unwrap(); - schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_2); + // Update not providing routing_header -> new deployment here + let (deployment_id_3, schema) = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new(Default::default()), + created_at: (deployment.metadata.created_at.as_u64() + 2).into(), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_3); - // Update providing no routing header -> conflict - let update_result = SchemaUpdater::update_and_return(schema.clone(), |updater| { - updater.add_deployment( - deployment.metadata.clone(), - None, - vec![greeter_service()], - false, - ) - }); - assert!(let SchemaError::Override(_) = update_result.unwrap_err()); + // Update providing the same routing header-> conflict + let update_result = SchemaUpdater::update_and_return(schema.clone(), |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("2"), + )] + .into(), + ), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }); + assert!(let SchemaError::Override(_) = update_result.unwrap_err()); - // Update providing the same routing header-> conflict - let update_result = SchemaUpdater::update_and_return(schema.clone(), |updater| { - updater.add_deployment( - deployment.metadata.clone(), - Some(( - HeaderName::from_static("x-routing"), - HeaderValue::from_static("2"), - )), - vec![greeter_service()], - false, - ) - }); - assert!(let SchemaError::Override(_) = update_result.unwrap_err()); + // Force with same routing header -> all good + let (expected_dp_id_2, schema) = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("2"), + )] + .into(), + ), + ..deployment.metadata.clone() + }, + vec![greeter_virtual_object()], + true, + ) + }) + .unwrap(); + assert_eq!(expected_dp_id_2, deployment_id_2); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_2); + assert_eq!( + schema.assert_service(GREETER_SERVICE_NAME).ty, + ServiceType::VirtualObject + ); + } - // Force with same routing header -> all good - let (expected_dp_id_2, schema) = SchemaUpdater::update_and_return(schema, |updater| { - updater.add_deployment( - deployment.metadata.clone(), - Some(( - HeaderName::from_static("x-routing"), - HeaderValue::from_static("2"), - )), - vec![greeter_virtual_object()], - true, - ) - }) - .unwrap(); - assert_eq!(expected_dp_id_2, deployment_id_2); - schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_2); - assert_eq!( - schema.assert_service(GREETER_SERVICE_NAME).ty, - ServiceType::VirtualObject - ); + #[test] + fn register_new_deployment_without_routing_header_first() { + let mut config = Configuration::default(); + config.admin.deployment_routing_headers = vec![HeaderName::from_static("x-routing")]; + crate::config::set_current_config(config); + + let deployment = Deployment::mock_with_uri("http://localhost:9080"); + + // Register first deployment with routing header + let (deployment_id_1, schema) = + SchemaUpdater::update_and_return(Schema::default(), |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("1"), + )] + .into(), + ), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_1); + + // Update without routing header -> new deployment + let (deployment_id_2, schema) = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new(Default::default()), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_2); + } + + #[test] + fn with_multiple_configured_routing_headers() { + let mut config = Configuration::default(); + config.admin.deployment_routing_headers = vec![ + HeaderName::from_static("x-restate-routing"), + HeaderName::from_static("x-my-routing"), + ]; + crate::config::set_current_config(config); + + let deployment = Deployment::mock_with_uri("http://localhost:9080"); + + // Register first deployment with routing header + let (deployment_id_1, schema) = + SchemaUpdater::update_and_return(Schema::default(), |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [( + HeaderName::from_static("x-restate-routing"), + HeaderValue::from_static("1"), + )] + .into(), + ), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_1); + + // Update with different routing header -> new deployment + let (deployment_id_2, schema) = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [( + HeaderName::from_static("x-my-routing"), + HeaderValue::from_static("1"), + )] + .into(), + ), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_2); + + // No routing header -> new deployment + let (deployment_id_3, schema) = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new(Default::default()), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_3); + + // Update with both routing header -> new deployment + let (deployment_id_4, schema) = SchemaUpdater::update_and_return(schema, |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [ + ( + HeaderName::from_static("x-restate-routing"), + HeaderValue::from_static("1"), + ), + ( + HeaderName::from_static("x-my-routing"), + HeaderValue::from_static("1"), + ), + ] + .into(), + ), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }) + .unwrap(); + schema.assert_service_deployment(GREETER_SERVICE_NAME, deployment_id_4); + + // Update with same header -> fails + let update_result = SchemaUpdater::update_and_return(schema.clone(), |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new( + [( + HeaderName::from_static("x-routing"), + HeaderValue::from_static("1"), + )] + .into(), + ), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }); + assert!(let SchemaError::Override(_) = update_result.unwrap_err()); + + // Update without header -> fails + let update_result = SchemaUpdater::update_and_return(schema.clone(), |updater| { + updater.add_deployment( + DeploymentMetadata { + delivery_options: DeliveryOptions::new(Default::default()), + ..deployment.metadata.clone() + }, + vec![greeter_service()], + false, + ) + }); + assert!(let SchemaError::Override(_) = update_result.unwrap_err()); + } } /// This test case ensures that https://github.com/restatedev/restate/issues/1205 works @@ -347,12 +544,8 @@ fn force_deploy_private_service() -> Result<(), SchemaError> { let mut updater = SchemaUpdater::default(); let mut deployment = Deployment::mock(); - deployment.id = updater.add_deployment( - deployment.metadata.clone(), - None, - vec![greeter_service()], - false, - )?; + deployment.id = + updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], false)?; let schemas = updater.into_inner(); @@ -380,12 +573,8 @@ fn force_deploy_private_service() -> Result<(), SchemaError> { ); updater = SchemaUpdater::new(schemas); - deployment.id = updater.add_deployment( - deployment.metadata.clone(), - None, - vec![greeter_service()], - true, - )?; + deployment.id = + updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], true)?; let schemas = updater.into_inner(); assert!(schemas.assert_service(GREETER_SERVICE_NAME).public); @@ -414,7 +603,6 @@ mod change_service_type { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -425,7 +613,6 @@ mod change_service_type { let compute_result = SchemaUpdater::new(schemas).add_deployment( deployment_2.metadata, - None, vec![greeter_virtual_object()], false, ); @@ -445,7 +632,6 @@ mod change_service_type { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -455,12 +641,7 @@ mod change_service_type { updater = SchemaUpdater::new(schemas); deployment_2.id = updater - .add_deployment( - deployment_2.metadata, - None, - vec![greeter_virtual_object()], - true, - ) + .add_deployment(deployment_2.metadata, vec![greeter_virtual_object()], true) .unwrap(); let schemas = updater.into_inner(); schemas.assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); @@ -481,7 +662,6 @@ mod change_service_type { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -491,7 +671,7 @@ mod change_service_type { updater = SchemaUpdater::new(schemas); deployment_2.id = updater - .add_deployment(deployment_2.metadata, None, vec![greeter_workflow()], true) + .add_deployment(deployment_2.metadata, vec![greeter_workflow()], true) .unwrap(); let schemas = updater.into_inner(); schemas.assert_service_deployment(GREETER_SERVICE_NAME, deployment_2.id); @@ -511,7 +691,6 @@ fn override_existing_deployment_removing_a_service() { let (deployment_id, schemas) = SchemaUpdater::update_and_return(Schema::default(), |updater| { updater.add_deployment( deployment.metadata.clone(), - None, vec![greeter_service(), another_greeter_service()], false, ) @@ -523,12 +702,7 @@ fn override_existing_deployment_removing_a_service() { schemas.assert_service_deployment(ANOTHER_GREETER_SERVICE_NAME, deployment.id); let (deployment_id, schemas) = SchemaUpdater::update_and_return(schemas, |updater| { - updater.add_deployment( - deployment.metadata.clone(), - None, - vec![greeter_service()], - true, - ) + updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], true) }) .unwrap(); assert_eq!(deployment_id, deployment.id); @@ -547,17 +721,11 @@ fn cannot_override_existing_deployment_endpoint_conflict() { let mut deployment = Deployment::mock(); deployment.id = updater - .add_deployment( - deployment.metadata.clone(), - None, - vec![greeter_service()], - false, - ) + .add_deployment(deployment.metadata.clone(), vec![greeter_service()], false) .unwrap(); assert!(let SchemaError::Override(_) = updater.add_deployment( deployment.metadata, - None, vec![greeter_service()], false).unwrap_err() ); @@ -572,7 +740,6 @@ fn register_two_deployments_then_remove_first() { SchemaUpdater::update_and_return(Schema::default(), |updater| { updater.add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service(), another_greeter_service()], false, ) @@ -583,7 +750,6 @@ fn register_two_deployments_then_remove_first() { let (deployment_id_2, schemas) = SchemaUpdater::update_and_return(schemas, |updater| { updater.add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -623,7 +789,6 @@ fn register_two_deployments_then_remove_second() { SchemaUpdater::update_and_return(Schema::default(), |updater| { updater.add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service(), another_greeter_service()], false, ) @@ -634,7 +799,6 @@ fn register_two_deployments_then_remove_second() { let (deployment_id_2, schemas) = SchemaUpdater::update_and_return(schemas, |updater| { updater.add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -778,24 +942,14 @@ mod remove_handler { let deployment_2 = Deployment::mock_with_uri("http://localhost:9081"); deployment_1.id = updater - .add_deployment( - deployment_1.metadata, - None, - vec![greeter_v1_service()], - false, - ) + .add_deployment(deployment_1.metadata, vec![greeter_v1_service()], false) .unwrap(); let schemas = updater.into_inner(); schemas.assert_service_revision(GREETER_SERVICE_NAME, 1); updater = SchemaUpdater::new(schemas); let rejection = updater - .add_deployment( - deployment_2.metadata, - None, - vec![greeter_v2_service()], - false, - ) + .add_deployment(deployment_2.metadata, vec![greeter_v2_service()], false) .unwrap_err(); let schemas = updater.into_inner(); @@ -819,7 +973,6 @@ fn update_latest_deployment() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_virtual_object()], false, ) @@ -906,7 +1059,6 @@ fn update_draining_deployment() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -915,7 +1067,6 @@ fn update_draining_deployment() { deployment_2.id = updater .add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service(), another_greeter_service()], false, ) @@ -963,7 +1114,6 @@ fn update_deployment_same_uri() { SchemaUpdater::update_and_return(Schema::default(), |updater| { updater.add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -975,7 +1125,6 @@ fn update_deployment_same_uri() { let (deployment_id_2, schemas) = SchemaUpdater::update_and_return(schemas, |updater| { updater.add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -1018,7 +1167,6 @@ fn update_deployment_same_uri() { deployment_2.id, SchemaUpdater::update_and_return(schemas, |updater| updater.add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service(), greeter_virtual_object()], true, )) @@ -1036,7 +1184,6 @@ fn update_latest_deployment_add_handler() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -1107,7 +1254,6 @@ fn update_draining_deployment_add_handler() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -1116,7 +1262,6 @@ fn update_draining_deployment_add_handler() { deployment_2.id = updater .add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -1201,7 +1346,6 @@ fn update_latest_deployment_add_service() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -1239,7 +1383,6 @@ fn update_draining_deployment_add_service() { deployment_1.id = updater .add_deployment( deployment_1.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -1248,7 +1391,6 @@ fn update_draining_deployment_add_service() { deployment_2.id = updater .add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service()], false, ) @@ -1281,12 +1423,8 @@ fn update_deployment_with_private_service() -> Result<(), SchemaError> { let mut updater = SchemaUpdater::default(); let mut deployment = Deployment::mock(); - deployment.id = updater.add_deployment( - deployment.metadata.clone(), - None, - vec![greeter_service()], - false, - )?; + deployment.id = + updater.add_deployment(deployment.metadata.clone(), vec![greeter_service()], false)?; let schemas = updater.into_inner(); @@ -1354,7 +1492,7 @@ mod endpoint_manifest_options_propagation { let mut deployment = Deployment::mock(); let (deployment_id, schema) = SchemaUpdater::update_and_return(Schema::default(), move |updater| { - updater.add_deployment(deployment.metadata.clone(), None, vec![svc], false) + updater.add_deployment(deployment.metadata.clone(), vec![svc], false) }) .unwrap(); deployment.id = deployment_id; @@ -1384,7 +1522,6 @@ mod endpoint_manifest_options_propagation { updater .add_deployment( Deployment::mock().metadata, - None, vec![endpoint_manifest::Service { // Mock two handlers, one explicitly private, the other just default settings handlers: vec![ @@ -1423,7 +1560,6 @@ mod endpoint_manifest_options_propagation { assert_that!( updater.add_deployment( deployment.metadata.clone(), - None, vec![endpoint_manifest::Service { ingress_private: Some(true), handlers: vec![endpoint_manifest::Handler { @@ -1447,7 +1583,6 @@ mod endpoint_manifest_options_propagation { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( Deployment::mock().metadata.clone(), - None, vec![endpoint_manifest::Service { handlers: vec![endpoint_manifest::Handler { workflow_completion_retention: Some(30 * 1000), @@ -1688,7 +1823,6 @@ mod endpoint_manifest_options_propagation { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( deployment.metadata.clone(), - None, vec![endpoint_manifest::Service { journal_retention: Some(60 * 1000), // A = 60 seconds ..greeter_service() @@ -1750,7 +1884,6 @@ mod endpoint_manifest_options_propagation { let (deployment_id, schema) = SchemaUpdater::update_and_return(schema, move |updater| { updater.add_deployment( deployment.metadata.clone(), - None, vec![endpoint_manifest::Service { journal_retention: Some(60 * 1000), // A = 60 seconds again ..greeter_service() @@ -1828,7 +1961,6 @@ mod endpoint_manifest_options_propagation { let (deployment_id, schema) = SchemaUpdater::update_and_return(schema, move |updater| { updater.add_deployment( deployment.metadata.clone(), - None, vec![endpoint_manifest::Service { journal_retention: None, ..greeter_service() @@ -1925,7 +2057,6 @@ mod endpoint_manifest_options_propagation { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( deployment.metadata.clone(), - None, vec![endpoint_manifest::Service { journal_retention: Some(120 * 1000), // Service sets 120 seconds handlers: vec![endpoint_manifest::Handler { @@ -2080,7 +2211,7 @@ mod endpoint_manifest_options_propagation { let mut deployment = Deployment::mock(); deployment.id = updater - .add_deployment(deployment.metadata.clone(), None, vec![svc], false) + .add_deployment(deployment.metadata.clone(), vec![svc], false) .unwrap(); let schema = updater.into_inner(); @@ -2163,7 +2294,6 @@ mod modify_service { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( Deployment::mock().metadata.clone(), - None, vec![greeter_workflow()], false, ) @@ -2231,7 +2361,6 @@ mod modify_service { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( Deployment::mock().metadata.clone(), - None, vec![greeter_service()], false, ) @@ -2290,7 +2419,6 @@ mod modify_service { SchemaUpdater::update_and_return(schema, move |updater| { updater.add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_service()], // identical service definition false, ) @@ -2328,7 +2456,6 @@ mod modify_service { SchemaUpdater::update_and_return(Schema::default(), move |updater| { updater.add_deployment( Deployment::mock().metadata.clone(), - None, vec![greeter_workflow()], false, ) @@ -2411,7 +2538,6 @@ mod modify_service { SchemaUpdater::update_and_return(schema, move |updater| { updater.add_deployment( deployment_2.metadata.clone(), - None, vec![greeter_workflow()], // identical service definition false, ) From a5b0ae2aa18f6b777ad12c6fdf9279263fa96ee6 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 7 Oct 2025 17:08:19 +0200 Subject: [PATCH 5/5] Update comment --- crates/types/src/schema/metadata/updater/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/types/src/schema/metadata/updater/mod.rs b/crates/types/src/schema/metadata/updater/mod.rs index de631e3cd9..aca2d3ebc6 100644 --- a/crates/types/src/schema/metadata/updater/mod.rs +++ b/crates/types/src/schema/metadata/updater/mod.rs @@ -376,8 +376,8 @@ impl SchemaUpdater { }) }) // There are few situations where we might have multiple deployments for the same endpoint: - // * If the user specified in the at least two previous registrations the routing-header, - // but then it didn't specify it in this one. In this case, multiple deployments will match the above filter + // * If there is some different configuration of the Configuration.admin.deployment_routing_headers between nodes, + // and some registration was previously accepted. // * If update_deployment was used on at least one deployment, pointing to the same address of another deployment, // resulting in having two deployments pointing at the same address. //