Skip to content

Commit 3b55a37

Browse files
Add routing header feature to deployment registration.
This lets you use a specific header as "discriminator" when registering a service.
1 parent 05e8afc commit 3b55a37

File tree

13 files changed

+480
-67
lines changed

13 files changed

+480
-67
lines changed

cli/src/commands/deployments/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use cling::prelude::*;
1717

1818
#[derive(Run, Subcommand, Clone)]
1919
#[clap(visible_alias = "dp", alias = "deployment")]
20+
#[allow(clippy::large_enum_variant)]
2021
pub enum Deployments {
2122
/// List the registered deployments
2223
List(list::List),

cli/src/commands/deployments/register.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use comfy_table::Table;
1818
use http::{HeaderName, HeaderValue, StatusCode, Uri};
1919
use indicatif::ProgressBar;
2020
use indoc::indoc;
21+
2122
use restate_admin_rest_model::deployments::{
2223
DetailedDeploymentResponse, RegisterDeploymentRequest, RegisterDeploymentResponse,
2324
};
@@ -55,6 +56,7 @@ pub struct Register {
5556
assume_role_arn: Option<String>,
5657

5758
/// Additional header that will be sent to the endpoint during the discovery request.
59+
/// You typically want to include here API keys and other tokens required to send requests to deployments.
5860
///
5961
/// Use `--extra-header name=value` format and repeat --extra-header for each additional header.
6062
#[clap(long="extra-header", value_parser = parse_header, action = clap::ArgAction::Append)]

crates/admin-rest-model/src/deployments.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use http::Uri;
12-
use http::Version;
11+
use http::{Uri, Version};
1312
use restate_serde_util::SerdeableHeaderHashMap;
1413
use restate_types::identifiers::ServiceRevision;
1514
use restate_types::identifiers::{DeploymentId, LambdaARN};
@@ -44,7 +43,7 @@ pub enum RegisterDeploymentRequest {
4443
///
4544
/// Additional headers added to every discover/invoke request to the deployment.
4645
///
47-
/// Fill this field with API Tokens and other authorization headers needed to send requests to the deployment.
46+
/// You typically want to include here API keys and other tokens required to send requests to deployments.
4847
additional_headers: Option<SerdeableHeaderHashMap>,
4948

5049
/// # Metadata

crates/admin/src/schema_registry/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ impl SchemaRegistry {
154154
// Verify first if we have the service. If we do, no need to do anything here.
155155
if overwrite == updater::Overwrite::No {
156156
// Verify if we have a service for this endpoint already or not
157-
if let Some((deployment, services)) =
158-
Metadata::with_current(|m| m.schema_ref()).find_deployment(&deployment_address)
157+
if let Some((deployment, services)) = Metadata::with_current(|m| m.schema_ref())
158+
.find_deployment(&deployment_address, &additional_headers)
159159
{
160160
return Ok((RegisterDeploymentResult::Unchanged, deployment, services));
161161
}

crates/invoker-impl/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1525,7 +1525,7 @@ mod tests {
15251525
use restate_test_util::{check, let_assert};
15261526
use restate_time_util::FriendlyDuration;
15271527
use restate_types::config::InvokerOptionsBuilder;
1528-
use restate_types::deployment::DeploymentAddress;
1528+
use restate_types::deployment::{DeploymentAddress, Headers};
15291529
use restate_types::errors::{InvocationError, codes};
15301530
use restate_types::identifiers::{LeaderEpoch, PartitionId, ServiceRevision};
15311531
use restate_types::invocation::ServiceType;
@@ -1707,6 +1707,7 @@ mod tests {
17071707
fn find_deployment(
17081708
&self,
17091709
_: &DeploymentAddress,
1710+
_: &Headers,
17101711
) -> Option<(Deployment, Vec<ServiceMetadata>)> {
17111712
None
17121713
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use http::HeaderName;
12+
use serde::Deserialize;
13+
use serde_with::{DeserializeAs, SerializeAs};
14+
use std::str::FromStr;
15+
16+
/// SerializeAs/DeserializeAs to implement ser/de trait for [HeaderName]
17+
/// Use it with `#[serde(with = "serde_with::As::<HeaderNameSerde>")]`.
18+
pub struct HeaderNameSerde;
19+
20+
impl SerializeAs<HeaderName> for HeaderNameSerde {
21+
fn serialize_as<S>(source: &HeaderName, serializer: S) -> Result<S::Ok, S::Error>
22+
where
23+
S: serde::Serializer,
24+
{
25+
serializer.serialize_str(source.as_str())
26+
}
27+
}
28+
29+
impl<'de> DeserializeAs<'de, HeaderName> for HeaderNameSerde {
30+
fn deserialize_as<D>(deserializer: D) -> Result<HeaderName, D::Error>
31+
where
32+
D: serde::Deserializer<'de>,
33+
{
34+
let buf = String::deserialize(deserializer)?;
35+
HeaderName::from_str(&buf).map_err(serde::de::Error::custom)
36+
}
37+
}

crates/serde-util/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ mod proto;
1515

1616
pub mod authority;
1717
pub mod default;
18+
mod header_name;
1819
pub mod header_value;
1920
mod map_as_vec;
2021
mod version;
2122

2223
pub use byte_count::*;
2324
pub use header_map::SerdeableHeaderHashMap;
25+
pub use header_name::HeaderNameSerde;
2426
pub use header_value::HeaderValueSerde;
2527
pub use map_as_vec::{MapAsVec, MapAsVecItem};
2628
#[cfg(feature = "proto")]

crates/storage-query-datafusion/src/mocks.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use restate_partition_store::{PartitionStore, PartitionStoreManager};
3030
use restate_rocksdb::RocksDbManager;
3131
use restate_types::NodeId;
3232
use restate_types::config::QueryEngineOptions;
33-
use restate_types::deployment::DeploymentAddress;
33+
use restate_types::deployment::{DeploymentAddress, Headers};
3434
use restate_types::errors::GenericError;
3535
use restate_types::identifiers::{DeploymentId, PartitionId, PartitionKey, ServiceRevision};
3636
use restate_types::live::Live;
@@ -74,8 +74,13 @@ impl DeploymentResolver for MockSchemas {
7474
self.1.resolve_latest_deployment_for_service(service_name)
7575
}
7676

77-
fn find_deployment(&self, _: &DeploymentAddress) -> Option<(Deployment, Vec<ServiceMetadata>)> {
78-
None
77+
fn find_deployment(
78+
&self,
79+
deployment_address: &DeploymentAddress,
80+
additional_headers: &Headers,
81+
) -> Option<(Deployment, Vec<ServiceMetadata>)> {
82+
self.1
83+
.find_deployment(deployment_address, additional_headers)
7984
}
8085

8186
fn get_deployment(&self, deployment_id: &DeploymentId) -> Option<Deployment> {

crates/types/src/config/admin.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::net::SocketAddr;
1212
use std::num::NonZeroUsize;
1313
use std::path::PathBuf;
1414

15-
use http::Uri;
15+
use http::{HeaderName, Uri};
1616
use serde::{Deserialize, Serialize};
1717
use serde_with::serde_as;
1818
use tokio::sync::Semaphore;
@@ -41,6 +41,19 @@ pub struct AdminOptions {
4141
#[cfg_attr(feature = "schemars", schemars(with = "String", url))]
4242
pub advertised_admin_endpoint: Option<Uri>,
4343

44+
/// # Deployment routing headers
45+
///
46+
/// List of header names considered routing headers.
47+
///
48+
/// These will be used during deployment creation to distinguish between an already existing deployment and a new deployment.
49+
#[serde(
50+
default,
51+
skip_serializing_if = "Vec::is_empty",
52+
with = "serde_with::As::<Vec<restate_serde_util::HeaderNameSerde>>"
53+
)]
54+
#[cfg_attr(feature = "schemars", schemars(with = "Vec<String>"))]
55+
pub deployment_routing_headers: Vec<HeaderName>,
56+
4457
/// # Concurrency limit
4558
///
4659
/// Concurrency limit for the Admin APIs. Default is unlimited.
@@ -113,6 +126,7 @@ impl Default for AdminOptions {
113126
bind_address: "0.0.0.0:9070".parse().unwrap(),
114127
advertised_admin_endpoint: None,
115128
// max is limited by Tower's LoadShedLayer.
129+
deployment_routing_headers: vec![],
116130
concurrent_api_requests_limit: None,
117131
query_engine: Default::default(),
118132
heartbeat_interval: NonZeroFriendlyDuration::from_millis_unchecked(1500),

crates/types/src/schema/deployment.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use std::fmt;
1313
use std::fmt::{Display, Formatter};
1414
use std::ops::RangeInclusive;
1515

16-
use crate::deployment::{DeploymentAddress, HttpDeploymentAddress, LambdaDeploymentAddress};
16+
use crate::config::Configuration;
17+
use crate::deployment::{
18+
DeploymentAddress, Headers, HttpDeploymentAddress, LambdaDeploymentAddress,
19+
};
1720
use crate::identifiers::{DeploymentId, LambdaARN, ServiceRevision};
1821
use crate::schema::service::ServiceMetadata;
1922
use crate::time::MillisSinceEpoch;
@@ -141,15 +144,24 @@ impl DeploymentMetadata {
141144
self.created_at
142145
}
143146

144-
pub fn semantic_eq_with_address_and_headers(&self, other_addess: &DeploymentAddress) -> bool {
147+
pub fn semantic_eq_with_address_and_headers(
148+
&self,
149+
other_addess: &DeploymentAddress,
150+
other_additional_headers: &Headers,
151+
) -> bool {
145152
match (&self.ty, other_addess) {
146153
(
147154
DeploymentType::Http {
148155
address: this_address,
149156
..
150157
},
151158
DeploymentAddress::Http(HttpDeploymentAddress { uri: other_address }),
152-
) => Self::semantic_eq_http(this_address, other_address),
159+
) => Self::semantic_eq_http(
160+
this_address,
161+
other_address,
162+
&self.delivery_options.additional_headers,
163+
other_additional_headers,
164+
),
153165
(
154166
DeploymentType::Lambda { arn: this_arn, .. },
155167
DeploymentAddress::Lambda(LambdaDeploymentAddress { arn: other_arn, .. }),
@@ -162,10 +174,21 @@ impl DeploymentMetadata {
162174
this_arn == other_arn
163175
}
164176

165-
pub(crate) fn semantic_eq_http(this_address: &Uri, other_address: &Uri) -> bool {
177+
pub(crate) fn semantic_eq_http(
178+
this_address: &Uri,
179+
other_address: &Uri,
180+
this_additional_headers: &Headers,
181+
other_additional_headers: &Headers,
182+
) -> bool {
183+
let deployment_routing_headers = &Configuration::pinned().admin.deployment_routing_headers;
184+
166185
this_address.authority().expect("Must have authority")
167186
== other_address.authority().expect("Must have authority")
168187
&& this_address.path() == other_address.path()
188+
&& deployment_routing_headers.iter().all(|routing_header_key| {
189+
this_additional_headers.get(routing_header_key)
190+
== other_additional_headers.get(routing_header_key)
191+
})
169192
}
170193
}
171194

@@ -245,6 +268,7 @@ pub trait DeploymentResolver {
245268
fn find_deployment(
246269
&self,
247270
deployment_address: &DeploymentAddress,
271+
additional_headers: &Headers,
248272
) -> Option<(Deployment, Vec<ServiceMetadata>)>;
249273

250274
fn get_deployment(&self, deployment_id: &DeploymentId) -> Option<Deployment>;
@@ -445,10 +469,13 @@ pub mod test_util {
445469
fn find_deployment(
446470
&self,
447471
deployment_address: &DeploymentAddress,
472+
additional_headers: &Headers,
448473
) -> Option<(Deployment, Vec<ServiceMetadata>)> {
449474
self.deployments
450475
.iter()
451-
.find(|(_, d)| d.semantic_eq_with_address_and_headers(deployment_address))
476+
.find(|(_, d)| {
477+
d.semantic_eq_with_address_and_headers(deployment_address, additional_headers)
478+
})
452479
.and_then(|(dp_id, _)| self.get_deployment_and_services(dp_id))
453480
}
454481

@@ -504,6 +531,7 @@ pub mod test_util {
504531
fn find_deployment(
505532
&self,
506533
_: &DeploymentAddress,
534+
_: &Headers,
507535
) -> Option<(Deployment, Vec<ServiceMetadata>)> {
508536
None
509537
}

0 commit comments

Comments
 (0)