Skip to content

Commit c2a5520

Browse files
Add routing header feature to deployment registration.
This lets you use a specific header as "discriminator" when registering a service.
1 parent 1ccd4f2 commit c2a5520

File tree

8 files changed

+267
-25
lines changed

8 files changed

+267
-25
lines changed

cli/src/commands/deployments/register.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use comfy_table::Table;
1818
use http::{HeaderName, HeaderValue, StatusCode, Uri};
1919
use indicatif::ProgressBar;
2020

21-
use restate_admin_rest_model::deployments::RegisterDeploymentRequest;
21+
use restate_admin_rest_model::deployments::{Header, RegisterDeploymentRequest};
2222
use restate_cli_util::ui::console::{Styled, StyledTable, confirm_or_exit};
2323
use restate_cli_util::ui::stylesheet::Style;
2424
use restate_cli_util::{c_eprintln, c_error, c_indent_table, c_indentln, c_success, c_warn};
@@ -48,11 +48,20 @@ pub struct Register {
4848
assume_role_arn: Option<String>,
4949

5050
/// Additional header that will be sent to the endpoint during the discovery request.
51+
/// You typically want to include here API keys and other tokens required to send requests to deployments.
5152
///
5253
/// Use `--extra-header name=value` format and repeat --extra-header for each additional header.
5354
#[clap(long="extra-header", value_parser = parse_header, action = clap::ArgAction::Append)]
5455
extra_headers: Option<Vec<HeaderKeyValue>>,
5556

57+
/// Header used for routing to a specific deployment.
58+
/// If the load balancer between restate-server and your deployments uses a specific header to route,
59+
/// you should set this as the routing header, as it will be used to distinguish this deployment with other deployments with the same URL.
60+
///
61+
/// Use `--routing-header name=value` format.
62+
#[clap(long="routing-header", value_parser = parse_header)]
63+
routing_header: Option<HeaderKeyValue>,
64+
5665
/// Attempt discovery using a client that defaults to HTTP1.1 instead of a prior-knowledge HTTP2 client.
5766
/// This may be necessary if you see `META0014` discovering local dev servers like `wrangler dev`.
5867
#[clap(long = "use-http1.1")]
@@ -131,6 +140,14 @@ pub async fn run_register(State(env): State<CliEnv>, discover_opts: &Register) -
131140
let headers = discover_opts.extra_headers.as_ref().map(|headers| {
132141
HashMap::from_iter(headers.iter().map(|kv| (kv.key.clone(), kv.value.clone())))
133142
});
143+
let routing_header =
144+
discover_opts
145+
.routing_header
146+
.as_ref()
147+
.map(|HeaderKeyValue { key, value }| Header {
148+
key: key.clone(),
149+
value: value.clone(),
150+
});
134151

135152
// Preparing the discovery request
136153
let client = AdminClient::new(&env).await?;
@@ -189,6 +206,7 @@ pub async fn run_register(State(env): State<CliEnv>, discover_opts: &Register) -
189206
let mk_request_body = |force, dry_run| match &deployment {
190207
DeploymentEndpoint::Uri(uri) => RegisterDeploymentRequest::Http {
191208
uri: uri.clone(),
209+
routing_header: routing_header.clone(),
192210
additional_headers: headers.clone().map(Into::into),
193211
use_http_11: discover_opts.use_http_11,
194212
force,

crates/admin-rest-model/src/deployments.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use http::Uri;
1211
use http::Version;
12+
use http::{HeaderName, HeaderValue, Uri};
1313
use restate_serde_util::SerdeableHeaderHashMap;
1414
use restate_types::identifiers::ServiceRevision;
1515
use restate_types::identifiers::{DeploymentId, LambdaARN};
@@ -18,6 +18,18 @@ use restate_types::schema::service::ServiceMetadata;
1818
use serde::{Deserialize, Serialize};
1919
use serde_with::serde_as;
2020

21+
#[serde_as]
22+
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
23+
#[derive(Debug, Clone, Serialize, Deserialize)]
24+
pub struct Header {
25+
#[serde_as(as = "restate_serde_util::HeaderNameSerde")]
26+
#[cfg_attr(feature = "schema", schemars(with = "String"))]
27+
pub key: HeaderName,
28+
#[serde_as(as = "restate_serde_util::HeaderValueSerde")]
29+
#[cfg_attr(feature = "schema", schemars(with = "String"))]
30+
pub value: HeaderValue,
31+
}
32+
2133
// This enum could be a struct with a nested enum to avoid repeating some fields, but serde(flatten) unfortunately breaks the openapi code generation
2234
#[serde_as]
2335
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
@@ -39,10 +51,18 @@ pub enum RegisterDeploymentRequest {
3951
#[cfg_attr(feature = "schema", schemars(with = "String"))]
4052
uri: Uri,
4153

54+
/// # Routing header
55+
///
56+
/// Header used for routing to a specific deployment.
57+
/// If the load balancer between restate-server and your deployments uses a specific header to route,
58+
/// you should set this as the routing header, as it will be used to distinguish this deployment with other deployments with the same URL.
59+
routing_header: Option<Header>,
60+
4261
/// # Additional headers
4362
///
4463
/// Additional headers added to the discover/invoke requests to the deployment.
4564
///
65+
/// You typically want to include here API keys and other tokens required to send requests to deployments.
4666
additional_headers: Option<SerdeableHeaderHashMap>,
4767

4868
/// # Use http1.1

crates/admin/src/rest_api/deployments.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
use super::error::*;
1212
use crate::state::AdminServiceState;
13+
use std::collections::HashMap;
1314
use std::time::SystemTime;
1415

1516
use axum::Json;
@@ -30,7 +31,7 @@ use serde::Deserialize;
3031

3132
/// Create deployment and return discovered services.
3233
#[openapi(
33-
summary = "Create deployment",
34+
summary = " pubCreate deployment",
3435
description = "Create deployment. Restate will invoke the endpoint to gather additional information required for registration, such as the services exposed by the deployment. If the deployment is already registered, this method will fail unless `force` is set to `true`.",
3536
operation_id = "create_deployment",
3637
tags = "deployment",
@@ -49,10 +50,11 @@ pub async fn create_deployment<V, IC>(
4950
State(state): State<AdminServiceState<V, IC>>,
5051
#[request_body(required = true)] Json(payload): Json<RegisterDeploymentRequest>,
5152
) -> Result<impl IntoResponse, MetaApiError> {
52-
let (discover_endpoint, force, dry_run) = match payload {
53+
let (discover_endpoint, routing_header, force, dry_run) = match payload {
5354
RegisterDeploymentRequest::Http {
5455
uri,
5556
additional_headers,
57+
routing_header,
5658
use_http_11,
5759
force,
5860
dry_run,
@@ -69,6 +71,13 @@ pub async fn create_deployment<V, IC>(
6971

7072
let is_using_https = uri.scheme().unwrap() == &Scheme::HTTPS;
7173

74+
// Add it as part of the additional headers
75+
let mut additional_headers: HashMap<_, _> =
76+
additional_headers.unwrap_or_default().into();
77+
if let Some(routing_header) = &routing_header {
78+
additional_headers.insert(routing_header.key.clone(), routing_header.value.clone());
79+
}
80+
7281
(
7382
DiscoverEndpoint::new(
7483
Endpoint::Http(
@@ -83,8 +92,9 @@ pub async fn create_deployment<V, IC>(
8392
Some(http::Version::HTTP_2)
8493
},
8594
),
86-
additional_headers.unwrap_or_default().into(),
95+
additional_headers,
8796
),
97+
routing_header.map(|h| (h.key, h.value)),
8898
force,
8999
dry_run,
90100
)
@@ -106,6 +116,7 @@ pub async fn create_deployment<V, IC>(
106116
),
107117
additional_headers.unwrap_or_default().into(),
108118
),
119+
None,
109120
force,
110121
dry_run,
111122
),
@@ -125,7 +136,7 @@ pub async fn create_deployment<V, IC>(
125136

126137
let (deployment, services) = state
127138
.schema_registry
128-
.register_deployment(discover_endpoint, force, apply_mode)
139+
.register_deployment(discover_endpoint, routing_header, force, apply_mode)
129140
.await
130141
.inspect_err(|e| warn_it!(e))?;
131142

crates/admin/src/schema_registry/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::ops::Deref;
1313
use std::sync::Arc;
1414

1515
use anyhow::Context;
16-
use http::{HeaderMap, HeaderValue, Uri, uri::PathAndQuery};
16+
use http::{HeaderMap, HeaderName, HeaderValue, Uri, uri::PathAndQuery};
1717
use tracing::subscriber::NoSubscriber;
1818
use tracing::trace;
1919

@@ -90,6 +90,7 @@ impl<V> SchemaRegistry<V> {
9090
pub async fn register_deployment(
9191
&self,
9292
discover_endpoint: DiscoverEndpoint,
93+
routing_header: Option<(HeaderName, HeaderValue)>,
9394
force: updater::Force,
9495
apply_mode: updater::ApplyMode,
9596
) -> Result<(Deployment, Vec<ServiceMetadata>), SchemaRegistryError> {
@@ -128,6 +129,7 @@ impl<V> SchemaRegistry<V> {
128129
let id = tracing::subscriber::with_default(NoSubscriber::new(), || {
129130
updater.add_deployment(
130131
deployment_metadata,
132+
routing_header,
131133
discovered_metadata.services,
132134
force.force_enabled(),
133135
)
@@ -153,6 +155,7 @@ impl<V> SchemaRegistry<V> {
153155

154156
new_deployment_id = Some(updater.add_deployment(
155157
deployment_metadata.clone(),
158+
routing_header.clone(),
156159
discovered_metadata.services.clone(),
157160
force.force_enabled(),
158161
)?);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use http::HeaderName;
12+
use serde::Deserialize;
13+
use serde_with::{DeserializeAs, SerializeAs};
14+
use std::str::FromStr;
15+
16+
/// SerializeAs/DeserializeAs to implement ser/de trait for [HeaderName]
17+
/// Use it with `#[serde(with = "serde_with::As::<HeaderNameSerde>")]`.
18+
pub struct HeaderNameSerde;
19+
20+
impl SerializeAs<HeaderName> for HeaderNameSerde {
21+
fn serialize_as<S>(source: &HeaderName, serializer: S) -> Result<S::Ok, S::Error>
22+
where
23+
S: serde::Serializer,
24+
{
25+
serializer.serialize_str(source.as_str())
26+
}
27+
}
28+
29+
impl<'de> DeserializeAs<'de, HeaderName> for HeaderNameSerde {
30+
fn deserialize_as<D>(deserializer: D) -> Result<HeaderName, D::Error>
31+
where
32+
D: serde::Deserializer<'de>,
33+
{
34+
let buf = String::deserialize(deserializer)?;
35+
HeaderName::from_str(&buf).map_err(serde::de::Error::custom)
36+
}
37+
}

crates/serde-util/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ mod proto;
1515

1616
pub mod authority;
1717
pub mod default;
18+
mod header_name;
1819
pub mod header_value;
1920
mod map_as_vec;
2021
mod version;
2122

2223
pub use byte_count::*;
2324
pub use header_map::SerdeableHeaderHashMap;
25+
pub use header_name::HeaderNameSerde;
2426
pub use header_value::HeaderValueSerde;
2527
pub use map_as_vec::{MapAsVec, MapAsVecItem};
2628
#[cfg(feature = "proto")]

crates/types/src/schema/metadata/updater/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::schema::invocation_target::{
2727
use crate::schema::subscriptions::{
2828
EventInvocationTargetTemplate, Sink, Source, Subscription, SubscriptionValidator,
2929
};
30-
use http::{HeaderValue, Uri};
30+
use http::{HeaderName, HeaderValue, Uri};
3131
use serde_json::Value;
3232
use std::borrow::Borrow;
3333
use std::collections::HashMap;
@@ -345,6 +345,7 @@ impl SchemaUpdater {
345345
pub fn add_deployment(
346346
&mut self,
347347
deployment_metadata: DeploymentMetadata,
348+
routing_header: Option<(HeaderName, HeaderValue)>,
348349
services: Vec<endpoint_manifest::Service>,
349350
force: bool,
350351
) -> Result<DeploymentId, SchemaError> {
@@ -357,6 +358,16 @@ impl SchemaUpdater {
357358
let mut existing_deployments = self.schema.deployments.iter().filter(|(_, schemas)| {
358359
schemas.ty.protocol_type() == deployment_metadata.ty.protocol_type()
359360
&& schemas.ty.normalized_address() == deployment_metadata.ty.normalized_address()
361+
&& (routing_header.as_ref().is_none()
362+
|| routing_header.as_ref().is_some_and(
363+
|(routing_header_key, routing_header_value)| {
364+
schemas
365+
.delivery_options
366+
.additional_headers
367+
.get(routing_header_key)
368+
.is_some_and(|v| v == routing_header_value)
369+
},
370+
))
360371
});
361372

362373
let mut services_to_remove = Vec::default();

0 commit comments

Comments
 (0)