From 732317531540a49292de7ce3e2d8719e55285269 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Thu, 29 Jun 2023 10:10:03 -0700 Subject: [PATCH] aggregator API: default to bearer tokens Previously, the aggregator API assumed that any aggregator or collector auth tokens it was handling were `AuthenticationToken::DapAuth`. Janus must support those for the interop testing API and to work with Daphne, but generally, we prefer to use more boring `Authorization: Bearer` tokens. Certainly any tasks provisioned via the aggregator API and `divviup-api` should use bearer tokens. With this PR, we augment the `PostTaskReq` and `PostTaskResp` messages so that they use `AuthenticationToken` to represent aggregator and collector tokens, meaning that on the wire, a token now looks like: ``` { "type": "Bearer", "token": "AAAAAAAAA" } ``` Also, when the aggregator API generates either kind of token, it now generates a bearer token. I'm not aware of any scenarios where we need to generate `DapAuth` tokens in the aggregator API so there is no affordance for requesting that kind of token. See #472 --- aggregator_api/src/lib.rs | 97 +++++++++++++-------------- aggregator_core/src/task.rs | 2 +- core/src/task.rs | 26 ++++--- docs/samples/tasks.yaml | 6 +- integration_tests/tests/in_cluster.rs | 12 ++-- 5 files changed, 74 insertions(+), 69 deletions(-) diff --git a/aggregator_api/src/lib.rs b/aggregator_api/src/lib.rs index 8db66239b..e8e408df6 100644 --- a/aggregator_api/src/lib.rs +++ b/aggregator_api/src/lib.rs @@ -10,10 +10,7 @@ use janus_aggregator_core::{ SecretBytes, }; use janus_core::{ - hpke::generate_hpke_config_and_private_key, - http::extract_bearer_token, - task::{AuthenticationToken, DapAuthToken}, - time::Clock, + hpke::generate_hpke_config_and_private_key, http::extract_bearer_token, time::Clock, }; use janus_messages::{Duration, HpkeAeadId, HpkeKdfId, HpkeKemId, Role, TaskId}; use models::{GetTaskMetricsResp, TaskResp}; @@ -195,6 +192,7 @@ async fn post_task( // struct `aggregator_core::task::Task` expects to get two aggregator endpoint URLs, but only // the one for the peer aggregator is in the incoming request (or for that matter, is ever used // by Janus), so we insert a fake URL for "self". + // TODO(#1524): clean this up with `aggregator_core::task::Task` changes // unwrap safety: this fake URL is valid let fake_aggregator_url = Url::parse("http://never-used.example.com").unwrap(); let aggregator_endpoints = match req.role { @@ -230,33 +228,16 @@ async fn post_task( let vdaf_verify_keys = Vec::from([SecretBytes::new(vdaf_verify_key_bytes)]); - let aggregator_auth_tokens = match req.role { + let (aggregator_auth_tokens, collector_auth_tokens) = match req.role { Role::Leader => { - let encoded = req.aggregator_auth_token.as_ref().ok_or_else(|| { + let aggregator_auth_token = req.aggregator_auth_token.ok_or_else(|| { Error::new( "aggregator acting in leader role must be provided an aggregator auth token" .to_string(), Status::BadRequest, ) })?; - let token_bytes = URL_SAFE_NO_PAD.decode(encoded).map_err(|err| { - Error::new( - format!("Invalid base64 value for aggregator_auth_token: {err}"), - Status::BadRequest, - ) - })?; - Vec::from([ - // TODO(#472): Each token in the PostTaskReq should indicate whether it is a bearer - // token or a DAP-Auth-Token. For now, assume the latter. - DapAuthToken::try_from(token_bytes) - .map(AuthenticationToken::DapAuth) - .map_err(|_| { - Error::new( - "Invalid HTTP header value in aggregator_auth_token".to_string(), - Status::BadRequest, - ) - })?, - ]) + (Vec::from([aggregator_auth_token]), Vec::from([random()])) } Role::Helper => { @@ -267,19 +248,13 @@ async fn post_task( Status::BadRequest, )); } - // TODO(#472): switch to generating bearer tokens by default - Vec::from([AuthenticationToken::DapAuth(random())]) + + (Vec::from([random()]), Vec::new()) } _ => unreachable!(), }; - let collector_auth_tokens = match req.role { - // TODO(#472): switch to generating bearer tokens by default - Role::Leader => Vec::from([AuthenticationToken::DapAuth(random())]), - Role::Helper => Vec::new(), - _ => unreachable!(), - }; let hpke_keys = Vec::from([generate_hpke_config_and_private_key( random(), HpkeKemId::X25519HkdfSha256, @@ -404,7 +379,7 @@ async fn get_task_metrics( mod models { use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::task::{QueryType, Task}; - use janus_core::task::VdafInstance; + use janus_core::task::{AuthenticationToken, VdafInstance}; use janus_messages::{Duration, HpkeConfig, Role, TaskId, Time}; use serde::{Deserialize, Serialize}; use url::Url; @@ -441,10 +416,10 @@ mod models { pub(crate) time_precision: Duration, /// HPKE configuration for the collector. pub(crate) collector_hpke_config: HpkeConfig, - /// If this aggregator is the leader, this is the bearer token to use to authenticate - /// requests to the helper, as Base64 encoded bytes. If this aggregator is the helper, the - /// value is `None`. - pub(crate) aggregator_auth_token: Option, + /// If this aggregator is the leader, this is the token to use to authenticate requests to + /// the helper, as Base64 encoded bytes. If this aggregator is the helper, the value is + /// `None`. + pub(crate) aggregator_auth_token: Option, } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -484,13 +459,13 @@ mod models { /// requests from the Leader. // TODO(#1509): This field will have to change as Janus helpers will only store a salted // hash of aggregator auth tokens. - pub(crate) aggregator_auth_token: String, + pub(crate) aggregator_auth_token: AuthenticationToken, /// The authentication token used by the task's Collector to authenticate to the Leader, as /// Base64 encoded bytes. /// `Some` if `role` is Leader, `None` otherwise. // TODO(#1509) This field will have to change as Janus leaders will only store a salted hash // of collector auth tokens. - pub(crate) collector_auth_token: Option, + pub(crate) collector_auth_token: Option, /// HPKE configuration used by the collector to decrypt aggregate shares. pub(crate) collector_hpke_config: HpkeConfig, /// HPKE configuration(s) used by this aggregator to decrypt report shares. @@ -526,9 +501,8 @@ mod models { Role::Leader => { if task.collector_auth_tokens().len() != 1 { return Err("illegal number of collector auth tokens in task"); - } else { - Some(URL_SAFE_NO_PAD.encode(task.collector_auth_tokens()[0].as_ref())) } + Some(task.collector_auth_tokens()[0].clone()) } Role::Helper => None, _ => return Err("illegal aggregator role in task"), @@ -554,8 +528,7 @@ mod models { min_batch_size: task.min_batch_size(), time_precision: *task.time_precision(), tolerable_clock_skew: *task.tolerable_clock_skew(), - aggregator_auth_token: URL_SAFE_NO_PAD - .encode(task.aggregator_auth_tokens()[0].as_ref()), + aggregator_auth_token: task.aggregator_auth_tokens()[0].clone(), collector_auth_token, collector_hpke_config: task.collector_hpke_config().clone(), aggregator_hpke_configs, @@ -781,7 +754,7 @@ mod tests { ) .config() .clone(), - aggregator_auth_token: Some(URL_SAFE_NO_PAD.encode(&aggregator_auth_token)), + aggregator_auth_token: Some(aggregator_auth_token), }; assert_response!( post("/tasks") @@ -825,7 +798,7 @@ mod tests { ) .config() .clone(), - aggregator_auth_token: Some(URL_SAFE_NO_PAD.encode(&aggregator_auth_token)), + aggregator_auth_token: Some(aggregator_auth_token), }; assert_response!( post("/tasks") @@ -947,7 +920,7 @@ mod tests { ) .config() .clone(), - aggregator_auth_token: Some(URL_SAFE_NO_PAD.encode(&aggregator_auth_token)), + aggregator_auth_token: Some(aggregator_auth_token), }; assert_response!( post("/tasks") @@ -993,7 +966,7 @@ mod tests { ) .config() .clone(), - aggregator_auth_token: Some(URL_SAFE_NO_PAD.encode(&aggregator_auth_token)), + aggregator_auth_token: Some(aggregator_auth_token.clone()), }; let mut conn = post("/tasks") .with_request_body(serde_json::to_vec(&req).unwrap()) @@ -1527,7 +1500,9 @@ mod tests { HpkeAeadId::Aes128Gcm, HpkePublicKey::from([0u8; 32].to_vec()), ), - aggregator_auth_token: Some("encoded".to_owned()), + aggregator_auth_token: Some(AuthenticationToken::DapAuth( + DapAuthToken::try_from(b"encoded".to_vec()).unwrap(), + )), }, &[ Token::Struct { @@ -1602,7 +1577,15 @@ mod tests { Token::StructEnd, Token::Str("aggregator_auth_token"), Token::Some, - Token::Str("encoded"), + Token::Struct { + name: "AuthenticationToken", + len: 2, + }, + Token::Str("type"), + Token::Str("DapAuth"), + Token::Str("token"), + Token::Str("ZW5jb2RlZA"), + Token::StructEnd, Token::StructEnd, ], ); @@ -1704,10 +1687,26 @@ mod tests { Token::NewtypeStruct { name: "Duration" }, Token::U64(60), Token::Str("aggregator_auth_token"), + Token::Struct { + name: "AuthenticationToken", + len: 2, + }, + Token::Str("type"), + Token::Str("DapAuth"), + Token::Str("token"), Token::Str("YWdncmVnYXRvci0xMjM0NTY3OA"), + Token::StructEnd, Token::Str("collector_auth_token"), Token::Some, + Token::Struct { + name: "AuthenticationToken", + len: 2, + }, + Token::Str("type"), + Token::Str("DapAuth"), + Token::Str("token"), Token::Str("Y29sbGVjdG9yLWFiY2RlZjAw"), + Token::StructEnd, Token::Str("collector_hpke_config"), Token::Struct { name: "HpkeConfig", diff --git a/aggregator_core/src/task.rs b/aggregator_core/src/task.rs index bfbdb255b..72e1f789f 100644 --- a/aggregator_core/src/task.rs +++ b/aggregator_core/src/task.rs @@ -1304,7 +1304,7 @@ mod tests { Token::Str("type"), Token::Str("Bearer"), Token::Str("token"), - Token::Str("YWdncmVnYXRvciB0b2tlbg=="), + Token::Str("YWdncmVnYXRvciB0b2tlbg"), Token::StructEnd, Token::SeqEnd, Token::Str("collector_auth_tokens"), diff --git a/core/src/task.rs b/core/src/task.rs index 3bb7fa626..410b3dcd3 100644 --- a/core/src/task.rs +++ b/core/src/task.rs @@ -553,13 +553,18 @@ macro_rules! vdaf_dispatch { #[non_exhaustive] pub enum AuthenticationToken { /// A bearer token. The value is an opaque byte string. Its Base64 encoding is inserted into - /// HTTP requests as specified in [RFC 6750 section 2.1][1]. The token is not necessarily an - /// OAuth token. + /// HTTP requests as specified in [RFC 6750 section 2.1][1], but in configuration files or + /// aggregator API requests, it is encoded in unpadded, URL-safe Base64. + /// + ///The token is not necessarily an OAuth token. /// /// [1]: https://datatracker.ietf.org/doc/html/rfc6750#section-2.1 Bearer( #[derivative(Debug = "ignore")] - #[serde(serialize_with = "as_base64", deserialize_with = "from_base64")] + #[serde( + serialize_with = "as_base64_url_unpadded", + deserialize_with = "from_base64_url_unpadded" + )] Vec, ), @@ -620,17 +625,22 @@ impl AsRef<[u8]> for AuthenticationToken { } /// Serialize bytes into format suitable for bearer tokens in Janus configuration files. -fn as_base64>(key: &T, serializer: S) -> Result { +fn as_base64_url_unpadded>( + key: &T, + serializer: S, +) -> Result { let bytes: &[u8] = key.as_ref(); - serializer.serialize_str(&STANDARD.encode(bytes)) + serializer.serialize_str(&URL_SAFE_NO_PAD.encode(bytes)) } /// Deserialize bytes from Janus configuration files into a bearer token. -fn from_base64<'de, D: Deserializer<'de>>(deserializer: D) -> Result, D::Error> { +fn from_base64_url_unpadded<'de, D: Deserializer<'de>>( + deserializer: D, +) -> Result, D::Error> { String::deserialize(deserializer).and_then(|s| { - STANDARD + URL_SAFE_NO_PAD .decode(s) - .map_err(|e| D::Error::custom(format!("cannot decode value from Base64: {e:?}"))) + .map_err(|e| D::Error::custom(format!("cannot decode value from Base64URL: {e:?}"))) }) } diff --git a/docs/samples/tasks.yaml b/docs/samples/tasks.yaml index 1d381cd03..c83958604 100644 --- a/docs/samples/tasks.yaml +++ b/docs/samples/tasks.yaml @@ -75,9 +75,9 @@ # to "aggregator-235242f99406c4fd28b820c32eab0f68". - type: "DapAuth" token: "YWdncmVnYXRvci0yMzUyNDJmOTk0MDZjNGZkMjhiODIwYzMyZWFiMGY2OA" - # Bearer token values are encoded in base64 with padding. + # Bearer token values are encoded in unpadded base64url. - type: "Bearer" - token: "YWdncmVnYXRvci04NDc1NjkwZjJmYzQzMDBmYjE0NmJiMjk1NDIzNDk1NA==" + token: "YWdncmVnYXRvci04NDc1NjkwZjJmYzQzMDBmYjE0NmJiMjk1NDIzNDk1NA" # Authentication tokens shared between the leader and the collector, and used # to authenticate collector-to-leader requests. For leader tasks, this has the @@ -127,7 +127,7 @@ public_key: KHRLcWgfWxli8cdOLPsgsZPttHXh0ho3vLVLrW-63lE aggregator_auth_tokens: - type: "Bearer" - token: "YWdncmVnYXRvci1jZmE4NDMyZjdkMzllMjZiYjU3OGUzMzY5Mzk1MWQzNQ==" + token: "YWdncmVnYXRvci1jZmE4NDMyZjdkMzllMjZiYjU3OGUzMzY5Mzk1MWQzNQ" # Note that this task does not have any collector authentication tokens, since # it is a helper role task. collector_auth_tokens: [] diff --git a/integration_tests/tests/in_cluster.rs b/integration_tests/tests/in_cluster.rs index f84ad96a5..7339645ab 100644 --- a/integration_tests/tests/in_cluster.rs +++ b/integration_tests/tests/in_cluster.rs @@ -6,9 +6,9 @@ use base64::engine::{ }; use common::{submit_measurements_and_verify_aggregate, test_task_builders}; use janus_aggregator_core::task::QueryType; -use janus_collector::AuthenticationToken; use janus_core::{ - task::{DapAuthToken, VdafInstance}, + task::AuthenticationToken, + task::VdafInstance, test_util::{ install_test_trace_subscriber, kubernetes::{Cluster, PortForward}, @@ -170,14 +170,10 @@ impl InClusterJanusPair { // Update the task parameters with the ID and collector auth token from divviup-api. task_parameters.task_id = TaskId::from_str(provisioned_task.id.as_ref()).unwrap(); - task_parameters.collector_auth_token = AuthenticationToken::DapAuth( - DapAuthToken::try_from( + task_parameters.collector_auth_token = AuthenticationToken::Bearer( URL_SAFE_NO_PAD .decode(collector_auth_tokens[0].clone()) - .unwrap(), - ) - .unwrap(), - ); + .unwrap()); Self { task_parameters,