Skip to content

Commit 4680ec4

Browse files
Change routing header feature.
Accept the key of the routing header via configuration. This makes the behavior more consistent.
1 parent 47a923a commit 4680ec4

File tree

7 files changed

+325
-240
lines changed

7 files changed

+325
-240
lines changed

cli/src/commands/deployments/register.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use comfy_table::Table;
1818
use http::{HeaderName, HeaderValue, StatusCode, Uri};
1919
use indicatif::ProgressBar;
2020

21-
use restate_admin_rest_model::deployments::{Header, RegisterDeploymentRequest};
21+
use restate_admin_rest_model::deployments::RegisterDeploymentRequest;
2222
use restate_cli_util::ui::console::{Styled, StyledTable, confirm_or_exit};
2323
use restate_cli_util::ui::stylesheet::Style;
2424
use restate_cli_util::{c_eprintln, c_error, c_indent_table, c_indentln, c_success, c_warn};
@@ -48,20 +48,11 @@ pub struct Register {
4848
assume_role_arn: Option<String>,
4949

5050
/// Additional header that will be sent to the endpoint during the discovery request.
51-
/// You typically want to include here API keys and other tokens required to send requests to deployments.
5251
///
5352
/// Use `--extra-header name=value` format and repeat --extra-header for each additional header.
5453
#[clap(long="extra-header", value_parser = parse_header, action = clap::ArgAction::Append)]
5554
extra_headers: Option<Vec<HeaderKeyValue>>,
5655

57-
/// Header used for routing to a specific deployment.
58-
/// If the load balancer between restate-server and your deployments uses a specific header to route,
59-
/// you should set this as the routing header, as it will be used to distinguish this deployment with other deployments with the same URL.
60-
///
61-
/// Use `--routing-header name=value` format.
62-
#[clap(long="routing-header", value_parser = parse_header)]
63-
routing_header: Option<HeaderKeyValue>,
64-
6556
/// Attempt discovery using a client that defaults to HTTP1.1 instead of a prior-knowledge HTTP2 client.
6657
/// This may be necessary if you see `META0014` discovering local dev servers like `wrangler dev`.
6758
#[clap(long = "use-http1.1")]
@@ -140,14 +131,6 @@ pub async fn run_register(State(env): State<CliEnv>, discover_opts: &Register) -
140131
let headers = discover_opts.extra_headers.as_ref().map(|headers| {
141132
HashMap::from_iter(headers.iter().map(|kv| (kv.key.clone(), kv.value.clone())))
142133
});
143-
let routing_header =
144-
discover_opts
145-
.routing_header
146-
.as_ref()
147-
.map(|HeaderKeyValue { key, value }| Header {
148-
key: key.clone(),
149-
value: value.clone(),
150-
});
151134

152135
// Preparing the discovery request
153136
let client = AdminClient::new(&env).await?;
@@ -206,7 +189,6 @@ pub async fn run_register(State(env): State<CliEnv>, discover_opts: &Register) -
206189
let mk_request_body = |force, dry_run| match &deployment {
207190
DeploymentEndpoint::Uri(uri) => RegisterDeploymentRequest::Http {
208191
uri: uri.clone(),
209-
routing_header: routing_header.clone(),
210192
additional_headers: headers.clone().map(Into::into),
211193
use_http_11: discover_opts.use_http_11,
212194
force,

crates/admin-rest-model/src/deployments.rs

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use http::Uri;
1112
use http::Version;
12-
use http::{HeaderName, HeaderValue, Uri};
1313
use restate_serde_util::SerdeableHeaderHashMap;
1414
use restate_types::identifiers::ServiceRevision;
1515
use restate_types::identifiers::{DeploymentId, LambdaARN};
@@ -18,18 +18,6 @@ use restate_types::schema::service::ServiceMetadata;
1818
use serde::{Deserialize, Serialize};
1919
use serde_with::serde_as;
2020

21-
#[serde_as]
22-
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
23-
#[derive(Debug, Clone, Serialize, Deserialize)]
24-
pub struct Header {
25-
#[serde_as(as = "restate_serde_util::HeaderNameSerde")]
26-
#[cfg_attr(feature = "schema", schemars(with = "String"))]
27-
pub key: HeaderName,
28-
#[serde_as(as = "restate_serde_util::HeaderValueSerde")]
29-
#[cfg_attr(feature = "schema", schemars(with = "String"))]
30-
pub value: HeaderValue,
31-
}
32-
3321
// This enum could be a struct with a nested enum to avoid repeating some fields, but serde(flatten) unfortunately breaks the openapi code generation
3422
#[serde_as]
3523
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
@@ -51,13 +39,6 @@ pub enum RegisterDeploymentRequest {
5139
#[cfg_attr(feature = "schema", schemars(with = "String"))]
5240
uri: Uri,
5341

54-
/// # Routing header
55-
///
56-
/// Header used for routing to a specific deployment.
57-
/// If the load balancer between restate-server and your deployments uses a specific header to route,
58-
/// you should set this as the routing header, as it will be used to distinguish this deployment with other deployments with the same URL.
59-
routing_header: Option<Header>,
60-
6142
/// # Additional headers
6243
///
6344
/// Additional headers added to the discover/invoke requests to the deployment.

crates/admin/src/rest_api/deployments.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
use super::error::*;
1212
use crate::state::AdminServiceState;
13-
use std::collections::HashMap;
1413
use std::time::SystemTime;
1514

1615
use axum::Json;
@@ -31,7 +30,7 @@ use serde::Deserialize;
3130

3231
/// Create deployment and return discovered services.
3332
#[openapi(
34-
summary = " pubCreate deployment",
33+
summary = "Create deployment",
3534
description = "Create deployment. Restate will invoke the endpoint to gather additional information required for registration, such as the services exposed by the deployment. If the deployment is already registered, this method will fail unless `force` is set to `true`.",
3635
operation_id = "create_deployment",
3736
tags = "deployment",
@@ -50,11 +49,10 @@ pub async fn create_deployment<V, IC>(
5049
State(state): State<AdminServiceState<V, IC>>,
5150
#[request_body(required = true)] Json(payload): Json<RegisterDeploymentRequest>,
5251
) -> Result<impl IntoResponse, MetaApiError> {
53-
let (discover_endpoint, routing_header, force, dry_run) = match payload {
52+
let (discover_endpoint, force, dry_run) = match payload {
5453
RegisterDeploymentRequest::Http {
5554
uri,
5655
additional_headers,
57-
routing_header,
5856
use_http_11,
5957
force,
6058
dry_run,
@@ -71,13 +69,6 @@ pub async fn create_deployment<V, IC>(
7169

7270
let is_using_https = uri.scheme().unwrap() == &Scheme::HTTPS;
7371

74-
// Add it as part of the additional headers
75-
let mut additional_headers: HashMap<_, _> =
76-
additional_headers.unwrap_or_default().into();
77-
if let Some(routing_header) = &routing_header {
78-
additional_headers.insert(routing_header.key.clone(), routing_header.value.clone());
79-
}
80-
8172
(
8273
DiscoverEndpoint::new(
8374
Endpoint::Http(
@@ -92,9 +83,8 @@ pub async fn create_deployment<V, IC>(
9283
Some(http::Version::HTTP_2)
9384
},
9485
),
95-
additional_headers,
86+
additional_headers.unwrap_or_default().into(),
9687
),
97-
routing_header.map(|h| (h.key, h.value)),
9888
force,
9989
dry_run,
10090
)
@@ -116,7 +106,6 @@ pub async fn create_deployment<V, IC>(
116106
),
117107
additional_headers.unwrap_or_default().into(),
118108
),
119-
None,
120109
force,
121110
dry_run,
122111
),
@@ -136,7 +125,7 @@ pub async fn create_deployment<V, IC>(
136125

137126
let (deployment, services) = state
138127
.schema_registry
139-
.register_deployment(discover_endpoint, routing_header, force, apply_mode)
128+
.register_deployment(discover_endpoint, force, apply_mode)
140129
.await
141130
.inspect_err(|e| warn_it!(e))?;
142131

crates/admin/src/schema_registry/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::ops::Deref;
1313
use std::sync::Arc;
1414

1515
use anyhow::Context;
16-
use http::{HeaderMap, HeaderName, HeaderValue, Uri, uri::PathAndQuery};
16+
use http::{HeaderMap, HeaderValue, Uri, uri::PathAndQuery};
1717
use tracing::subscriber::NoSubscriber;
1818
use tracing::trace;
1919

@@ -90,7 +90,6 @@ impl<V> SchemaRegistry<V> {
9090
pub async fn register_deployment(
9191
&self,
9292
discover_endpoint: DiscoverEndpoint,
93-
routing_header: Option<(HeaderName, HeaderValue)>,
9493
force: updater::Force,
9594
apply_mode: updater::ApplyMode,
9695
) -> Result<(Deployment, Vec<ServiceMetadata>), SchemaRegistryError> {
@@ -129,7 +128,6 @@ impl<V> SchemaRegistry<V> {
129128
let id = tracing::subscriber::with_default(NoSubscriber::new(), || {
130129
updater.add_deployment(
131130
deployment_metadata,
132-
routing_header,
133131
discovered_metadata.services,
134132
force.force_enabled(),
135133
)
@@ -155,7 +153,6 @@ impl<V> SchemaRegistry<V> {
155153

156154
new_deployment_id = Some(updater.add_deployment(
157155
deployment_metadata.clone(),
158-
routing_header.clone(),
159156
discovered_metadata.services.clone(),
160157
force.force_enabled(),
161158
)?);

crates/types/src/config/admin.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::num::NonZeroUsize;
1313
use std::path::PathBuf;
1414
use std::time::Duration;
1515

16-
use http::Uri;
16+
use http::{HeaderName, Uri};
1717
use serde::{Deserialize, Serialize};
1818
use serde_with::serde_as;
1919
use tokio::sync::Semaphore;
@@ -42,6 +42,19 @@ pub struct AdminOptions {
4242
#[cfg_attr(feature = "schemars", schemars(with = "String", url))]
4343
pub advertised_admin_endpoint: Option<Uri>,
4444

45+
/// # Deployment routing headers
46+
///
47+
/// List of header names considered routing headers.
48+
///
49+
/// These will be used during deployment creation to distinguish between an already existing deployment and a new deployment.
50+
#[serde(
51+
default,
52+
skip_serializing_if = "Vec::is_empty",
53+
with = "serde_with::As::<Vec<restate_serde_util::HeaderNameSerde>>"
54+
)]
55+
#[cfg_attr(feature = "schemars", schemars(with = "Vec<String>"))]
56+
pub deployment_routing_headers: Vec<HeaderName>,
57+
4558
/// # Concurrency limit
4659
///
4760
/// Concurrency limit for the Admin APIs. Default is unlimited.
@@ -130,6 +143,7 @@ impl Default for AdminOptions {
130143
bind_address: "0.0.0.0:9070".parse().unwrap(),
131144
advertised_admin_endpoint: None,
132145
// max is limited by Tower's LoadShedLayer.
146+
deployment_routing_headers: vec![],
133147
concurrent_api_requests_limit: None,
134148
query_engine: Default::default(),
135149
heartbeat_interval: NonZeroFriendlyDuration::from_millis_unchecked(1500),

crates/types/src/schema/metadata/updater/mod.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::schema::invocation_target::{
2727
use crate::schema::subscriptions::{
2828
EventInvocationTargetTemplate, Sink, Source, Subscription, SubscriptionValidator,
2929
};
30-
use http::{HeaderName, HeaderValue, Uri};
30+
use http::{HeaderValue, Uri};
3131
use serde_json::Value;
3232
use std::borrow::Borrow;
3333
use std::collections::HashMap;
@@ -344,8 +344,7 @@ impl SchemaUpdater {
344344

345345
pub fn add_deployment(
346346
&mut self,
347-
mut deployment_metadata: DeploymentMetadata,
348-
routing_header: Option<(HeaderName, HeaderValue)>,
347+
deployment_metadata: DeploymentMetadata,
349348
services: Vec<endpoint_manifest::Service>,
350349
force: bool,
351350
) -> Result<DeploymentId, SchemaError> {
@@ -354,6 +353,8 @@ impl SchemaUpdater {
354353
.map(|c| ServiceName::try_from(c.name.to_string()).map(|name| (name, c)))
355354
.collect::<Result<HashMap<_, _>, _>>()?;
356355

356+
let deployment_routing_headers = &Configuration::pinned().admin.deployment_routing_headers;
357+
357358
// Did we find an existing deployment with a conflicting endpoint url?
358359
let existing_deployment = self
359360
.schema
@@ -363,15 +364,16 @@ impl SchemaUpdater {
363364
schemas.ty.protocol_type() == deployment_metadata.ty.protocol_type()
364365
&& schemas.ty.normalized_address()
365366
== deployment_metadata.ty.normalized_address()
366-
&& routing_header.as_ref().is_none_or(
367-
|(routing_header_key, routing_header_value)| {
368-
schemas
367+
&& deployment_routing_headers.iter().all(|routing_header_key| {
368+
deployment_metadata
369+
.delivery_options
370+
.additional_headers
371+
.get(routing_header_key)
372+
== schemas
369373
.delivery_options
370374
.additional_headers
371375
.get(routing_header_key)
372-
.is_some_and(|v| v == routing_header_value)
373-
},
374-
)
376+
})
375377
})
376378
// There are few situations where we might have multiple deployments for the same endpoint:
377379
// * If the user specified in the at least two previous registrations the routing-header,
@@ -442,12 +444,6 @@ impl SchemaUpdater {
442444
computed_services.insert(service_name.to_string(), Arc::new(new_service_revision));
443445
}
444446

445-
if let Some((key, value)) = routing_header {
446-
deployment_metadata
447-
.delivery_options
448-
.additional_headers
449-
.insert(key, value);
450-
}
451447
self.schema.deployments.insert(
452448
deployment_id,
453449
Deployment {

0 commit comments

Comments
 (0)