Skip to content

Commit

Permalink
aggregator API: default to bearer tokens
Browse files Browse the repository at this point in the history
Previously, the aggregator API assumed that any aggregator or collector
auth tokens it was handling were `AuthenticationToken::DapAuth`. Janus
must support those for the interop testing API and to work with Daphne,
but generally, we prefer to use more boring `Authorization: Bearer`
tokens. Certainly any tasks provisioned via the aggregator API and
`divviup-api` should use bearer tokens.

With this PR, we augment the `PostTaskReq` and `PostTaskResp` messages
so that they use `AuthenticationToken` to represent aggregator and
collector tokens, meaning that on the wire, a token now looks like:

```
{
  "type": "Bearer",
  "token": "AAAAAAAAA<etc.>"
}
```

Also, when the aggregator API generates either kind of token, it now
generates a bearer token. I'm not aware of any scenarios where we need
to generate `DapAuth` tokens in the aggregator API so there is no
affordance for requesting that kind of token.

See #472
  • Loading branch information
tgeoghegan committed Jul 26, 2023
1 parent 617f566 commit 7323175
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 69 deletions.
97 changes: 48 additions & 49 deletions aggregator_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use janus_aggregator_core::{
SecretBytes,
};
use janus_core::{
hpke::generate_hpke_config_and_private_key,
http::extract_bearer_token,
task::{AuthenticationToken, DapAuthToken},
time::Clock,
hpke::generate_hpke_config_and_private_key, http::extract_bearer_token, time::Clock,
};
use janus_messages::{Duration, HpkeAeadId, HpkeKdfId, HpkeKemId, Role, TaskId};
use models::{GetTaskMetricsResp, TaskResp};
Expand Down Expand Up @@ -195,6 +192,7 @@ async fn post_task<C: Clock>(
// struct `aggregator_core::task::Task` expects to get two aggregator endpoint URLs, but only
// the one for the peer aggregator is in the incoming request (or for that matter, is ever used
// by Janus), so we insert a fake URL for "self".
// TODO(#1524): clean this up with `aggregator_core::task::Task` changes
// unwrap safety: this fake URL is valid
let fake_aggregator_url = Url::parse("http://never-used.example.com").unwrap();
let aggregator_endpoints = match req.role {
Expand Down Expand Up @@ -230,33 +228,16 @@ async fn post_task<C: Clock>(

let vdaf_verify_keys = Vec::from([SecretBytes::new(vdaf_verify_key_bytes)]);

let aggregator_auth_tokens = match req.role {
let (aggregator_auth_tokens, collector_auth_tokens) = match req.role {
Role::Leader => {
let encoded = req.aggregator_auth_token.as_ref().ok_or_else(|| {
let aggregator_auth_token = req.aggregator_auth_token.ok_or_else(|| {
Error::new(
"aggregator acting in leader role must be provided an aggregator auth token"
.to_string(),
Status::BadRequest,
)
})?;
let token_bytes = URL_SAFE_NO_PAD.decode(encoded).map_err(|err| {
Error::new(
format!("Invalid base64 value for aggregator_auth_token: {err}"),
Status::BadRequest,
)
})?;
Vec::from([
// TODO(#472): Each token in the PostTaskReq should indicate whether it is a bearer
// token or a DAP-Auth-Token. For now, assume the latter.
DapAuthToken::try_from(token_bytes)
.map(AuthenticationToken::DapAuth)
.map_err(|_| {
Error::new(
"Invalid HTTP header value in aggregator_auth_token".to_string(),
Status::BadRequest,
)
})?,
])
(Vec::from([aggregator_auth_token]), Vec::from([random()]))
}

Role::Helper => {
Expand All @@ -267,19 +248,13 @@ async fn post_task<C: Clock>(
Status::BadRequest,
));
}
// TODO(#472): switch to generating bearer tokens by default
Vec::from([AuthenticationToken::DapAuth(random())])

(Vec::from([random()]), Vec::new())
}

_ => unreachable!(),
};

let collector_auth_tokens = match req.role {
// TODO(#472): switch to generating bearer tokens by default
Role::Leader => Vec::from([AuthenticationToken::DapAuth(random())]),
Role::Helper => Vec::new(),
_ => unreachable!(),
};
let hpke_keys = Vec::from([generate_hpke_config_and_private_key(
random(),
HpkeKemId::X25519HkdfSha256,
Expand Down Expand Up @@ -404,7 +379,7 @@ async fn get_task_metrics<C: Clock>(
mod models {
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use janus_aggregator_core::task::{QueryType, Task};
use janus_core::task::VdafInstance;
use janus_core::task::{AuthenticationToken, VdafInstance};
use janus_messages::{Duration, HpkeConfig, Role, TaskId, Time};
use serde::{Deserialize, Serialize};
use url::Url;
Expand Down Expand Up @@ -441,10 +416,10 @@ mod models {
pub(crate) time_precision: Duration,
/// HPKE configuration for the collector.
pub(crate) collector_hpke_config: HpkeConfig,
/// If this aggregator is the leader, this is the bearer token to use to authenticate
/// requests to the helper, as Base64 encoded bytes. If this aggregator is the helper, the
/// value is `None`.
pub(crate) aggregator_auth_token: Option<String>,
/// If this aggregator is the leader, this is the token to use to authenticate requests to
/// the helper, as Base64 encoded bytes. If this aggregator is the helper, the value is
/// `None`.
pub(crate) aggregator_auth_token: Option<AuthenticationToken>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -484,13 +459,13 @@ mod models {
/// requests from the Leader.
// TODO(#1509): This field will have to change as Janus helpers will only store a salted
// hash of aggregator auth tokens.
pub(crate) aggregator_auth_token: String,
pub(crate) aggregator_auth_token: AuthenticationToken,
/// The authentication token used by the task's Collector to authenticate to the Leader, as
/// Base64 encoded bytes.
/// `Some` if `role` is Leader, `None` otherwise.
// TODO(#1509) This field will have to change as Janus leaders will only store a salted hash
// of collector auth tokens.
pub(crate) collector_auth_token: Option<String>,
pub(crate) collector_auth_token: Option<AuthenticationToken>,
/// HPKE configuration used by the collector to decrypt aggregate shares.
pub(crate) collector_hpke_config: HpkeConfig,
/// HPKE configuration(s) used by this aggregator to decrypt report shares.
Expand Down Expand Up @@ -526,9 +501,8 @@ mod models {
Role::Leader => {
if task.collector_auth_tokens().len() != 1 {
return Err("illegal number of collector auth tokens in task");
} else {
Some(URL_SAFE_NO_PAD.encode(task.collector_auth_tokens()[0].as_ref()))
}
Some(task.collector_auth_tokens()[0].clone())
}
Role::Helper => None,
_ => return Err("illegal aggregator role in task"),
Expand All @@ -554,8 +528,7 @@ mod models {
min_batch_size: task.min_batch_size(),
time_precision: *task.time_precision(),
tolerable_clock_skew: *task.tolerable_clock_skew(),
aggregator_auth_token: URL_SAFE_NO_PAD
.encode(task.aggregator_auth_tokens()[0].as_ref()),
aggregator_auth_token: task.aggregator_auth_tokens()[0].clone(),
collector_auth_token,
collector_hpke_config: task.collector_hpke_config().clone(),
aggregator_hpke_configs,
Expand Down Expand Up @@ -781,7 +754,7 @@ mod tests {
)
.config()
.clone(),
aggregator_auth_token: Some(URL_SAFE_NO_PAD.encode(&aggregator_auth_token)),
aggregator_auth_token: Some(aggregator_auth_token),
};
assert_response!(
post("/tasks")
Expand Down Expand Up @@ -825,7 +798,7 @@ mod tests {
)
.config()
.clone(),
aggregator_auth_token: Some(URL_SAFE_NO_PAD.encode(&aggregator_auth_token)),
aggregator_auth_token: Some(aggregator_auth_token),
};
assert_response!(
post("/tasks")
Expand Down Expand Up @@ -947,7 +920,7 @@ mod tests {
)
.config()
.clone(),
aggregator_auth_token: Some(URL_SAFE_NO_PAD.encode(&aggregator_auth_token)),
aggregator_auth_token: Some(aggregator_auth_token),
};
assert_response!(
post("/tasks")
Expand Down Expand Up @@ -993,7 +966,7 @@ mod tests {
)
.config()
.clone(),
aggregator_auth_token: Some(URL_SAFE_NO_PAD.encode(&aggregator_auth_token)),
aggregator_auth_token: Some(aggregator_auth_token.clone()),
};
let mut conn = post("/tasks")
.with_request_body(serde_json::to_vec(&req).unwrap())
Expand Down Expand Up @@ -1527,7 +1500,9 @@ mod tests {
HpkeAeadId::Aes128Gcm,
HpkePublicKey::from([0u8; 32].to_vec()),
),
aggregator_auth_token: Some("encoded".to_owned()),
aggregator_auth_token: Some(AuthenticationToken::DapAuth(
DapAuthToken::try_from(b"encoded".to_vec()).unwrap(),
)),
},
&[
Token::Struct {
Expand Down Expand Up @@ -1602,7 +1577,15 @@ mod tests {
Token::StructEnd,
Token::Str("aggregator_auth_token"),
Token::Some,
Token::Str("encoded"),
Token::Struct {
name: "AuthenticationToken",
len: 2,
},
Token::Str("type"),
Token::Str("DapAuth"),
Token::Str("token"),
Token::Str("ZW5jb2RlZA"),
Token::StructEnd,
Token::StructEnd,
],
);
Expand Down Expand Up @@ -1704,10 +1687,26 @@ mod tests {
Token::NewtypeStruct { name: "Duration" },
Token::U64(60),
Token::Str("aggregator_auth_token"),
Token::Struct {
name: "AuthenticationToken",
len: 2,
},
Token::Str("type"),
Token::Str("DapAuth"),
Token::Str("token"),
Token::Str("YWdncmVnYXRvci0xMjM0NTY3OA"),
Token::StructEnd,
Token::Str("collector_auth_token"),
Token::Some,
Token::Struct {
name: "AuthenticationToken",
len: 2,
},
Token::Str("type"),
Token::Str("DapAuth"),
Token::Str("token"),
Token::Str("Y29sbGVjdG9yLWFiY2RlZjAw"),
Token::StructEnd,
Token::Str("collector_hpke_config"),
Token::Struct {
name: "HpkeConfig",
Expand Down
2 changes: 1 addition & 1 deletion aggregator_core/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ mod tests {
Token::Str("type"),
Token::Str("Bearer"),
Token::Str("token"),
Token::Str("YWdncmVnYXRvciB0b2tlbg=="),
Token::Str("YWdncmVnYXRvciB0b2tlbg"),
Token::StructEnd,
Token::SeqEnd,
Token::Str("collector_auth_tokens"),
Expand Down
26 changes: 18 additions & 8 deletions core/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,13 +553,18 @@ macro_rules! vdaf_dispatch {
#[non_exhaustive]
pub enum AuthenticationToken {
/// A bearer token. The value is an opaque byte string. Its Base64 encoding is inserted into
/// HTTP requests as specified in [RFC 6750 section 2.1][1]. The token is not necessarily an
/// OAuth token.
/// HTTP requests as specified in [RFC 6750 section 2.1][1], but in configuration files or
/// aggregator API requests, it is encoded in unpadded, URL-safe Base64.
///
///The token is not necessarily an OAuth token.
///
/// [1]: https://datatracker.ietf.org/doc/html/rfc6750#section-2.1
Bearer(
#[derivative(Debug = "ignore")]
#[serde(serialize_with = "as_base64", deserialize_with = "from_base64")]
#[serde(
serialize_with = "as_base64_url_unpadded",
deserialize_with = "from_base64_url_unpadded"
)]
Vec<u8>,
),

Expand Down Expand Up @@ -620,17 +625,22 @@ impl AsRef<[u8]> for AuthenticationToken {
}

/// Serialize bytes into format suitable for bearer tokens in Janus configuration files.
fn as_base64<S: Serializer, T: AsRef<[u8]>>(key: &T, serializer: S) -> Result<S::Ok, S::Error> {
fn as_base64_url_unpadded<S: Serializer, T: AsRef<[u8]>>(
key: &T,
serializer: S,
) -> Result<S::Ok, S::Error> {
let bytes: &[u8] = key.as_ref();
serializer.serialize_str(&STANDARD.encode(bytes))
serializer.serialize_str(&URL_SAFE_NO_PAD.encode(bytes))
}

/// Deserialize bytes from Janus configuration files into a bearer token.
fn from_base64<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
fn from_base64_url_unpadded<'de, D: Deserializer<'de>>(
deserializer: D,
) -> Result<Vec<u8>, D::Error> {
String::deserialize(deserializer).and_then(|s| {
STANDARD
URL_SAFE_NO_PAD
.decode(s)
.map_err(|e| D::Error::custom(format!("cannot decode value from Base64: {e:?}")))
.map_err(|e| D::Error::custom(format!("cannot decode value from Base64URL: {e:?}")))
})
}

Expand Down
6 changes: 3 additions & 3 deletions docs/samples/tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
# to "aggregator-235242f99406c4fd28b820c32eab0f68".
- type: "DapAuth"
token: "YWdncmVnYXRvci0yMzUyNDJmOTk0MDZjNGZkMjhiODIwYzMyZWFiMGY2OA"
# Bearer token values are encoded in base64 with padding.
# Bearer token values are encoded in unpadded base64url.
- type: "Bearer"
token: "YWdncmVnYXRvci04NDc1NjkwZjJmYzQzMDBmYjE0NmJiMjk1NDIzNDk1NA=="
token: "YWdncmVnYXRvci04NDc1NjkwZjJmYzQzMDBmYjE0NmJiMjk1NDIzNDk1NA"

# Authentication tokens shared between the leader and the collector, and used
# to authenticate collector-to-leader requests. For leader tasks, this has the
Expand Down Expand Up @@ -127,7 +127,7 @@
public_key: KHRLcWgfWxli8cdOLPsgsZPttHXh0ho3vLVLrW-63lE
aggregator_auth_tokens:
- type: "Bearer"
token: "YWdncmVnYXRvci1jZmE4NDMyZjdkMzllMjZiYjU3OGUzMzY5Mzk1MWQzNQ=="
token: "YWdncmVnYXRvci1jZmE4NDMyZjdkMzllMjZiYjU3OGUzMzY5Mzk1MWQzNQ"
# Note that this task does not have any collector authentication tokens, since
# it is a helper role task.
collector_auth_tokens: []
Expand Down
12 changes: 4 additions & 8 deletions integration_tests/tests/in_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use base64::engine::{
};
use common::{submit_measurements_and_verify_aggregate, test_task_builders};
use janus_aggregator_core::task::QueryType;
use janus_collector::AuthenticationToken;
use janus_core::{
task::{DapAuthToken, VdafInstance},
task::AuthenticationToken,
task::VdafInstance,
test_util::{
install_test_trace_subscriber,
kubernetes::{Cluster, PortForward},
Expand Down Expand Up @@ -170,14 +170,10 @@ impl InClusterJanusPair {

// Update the task parameters with the ID and collector auth token from divviup-api.
task_parameters.task_id = TaskId::from_str(provisioned_task.id.as_ref()).unwrap();
task_parameters.collector_auth_token = AuthenticationToken::DapAuth(
DapAuthToken::try_from(
task_parameters.collector_auth_token = AuthenticationToken::Bearer(
URL_SAFE_NO_PAD
.decode(collector_auth_tokens[0].clone())
.unwrap(),
)
.unwrap(),
);
.unwrap());

Self {
task_parameters,
Expand Down

0 comments on commit 7323175

Please sign in to comment.