diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 2d38e9e63..5f990711c 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -9,7 +9,7 @@ rust-version.workspace = true version.workspace = true [features] -in-cluster = ["dep:k8s-openapi", "dep:kube", "dep:serde"] +in-cluster = ["dep:k8s-openapi", "dep:kube"] [dependencies] anyhow.workspace = true @@ -17,6 +17,7 @@ backoff = { version = "0.4", features = ["tokio"] } base64 = "0.21.2" futures = "0.3.28" hex = "0.4" +http = "0.2" janus_aggregator = { workspace = true, features = ["test-util"] } janus_aggregator_core = { workspace = true, features = ["test-util"] } janus_client.workspace = true @@ -28,14 +29,13 @@ kube = { workspace = true, optional = true } prio.workspace = true rand = "0.8" reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] } -serde = { workspace = true, optional = true } +serde.workspace = true serde_json = "1.0.99" testcontainers = "0.14.0" tokio = { version = "1", features = ["full", "tracing"] } url = { version = "2.4.0", features = ["serde"] } [dev-dependencies] -http = "0.2" itertools.workspace = true janus_collector = { workspace = true, features = ["test-util"] } tempfile = "3" diff --git a/integration_tests/src/divviup_api_client.rs b/integration_tests/src/divviup_api_client.rs new file mode 100644 index 000000000..75623b326 --- /dev/null +++ b/integration_tests/src/divviup_api_client.rs @@ -0,0 +1,192 @@ +use anyhow::anyhow; +use http::{ + header::{ACCEPT, CONTENT_TYPE}, + Method, +}; +use janus_core::{task::VdafInstance, test_util::kubernetes::PortForward}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_json::json; +use url::Url; + +/// Representation of a `divviup-api` account. +#[derive(Deserialize)] +pub struct Account { + id: String, +} + +/// Representation of a VDAF in `divviup-api`. +#[derive(Serialize)] +#[serde(rename_all = "snake_case", tag = "type")] +pub enum ApiVdaf { + /// Corresponds to Prio3Count + Count, + Histogram { + buckets: Vec, + }, + Sum { + bits: usize, + }, +} + +impl TryFrom<&VdafInstance> for ApiVdaf { + type Error = anyhow::Error; + + fn try_from(vdaf: &VdafInstance) -> Result { + match vdaf { + VdafInstance::Prio3Count => Ok(ApiVdaf::Count), + VdafInstance::Prio3Sum { bits } => Ok(ApiVdaf::Sum { bits: *bits }), + VdafInstance::Prio3Histogram { buckets } => Ok(ApiVdaf::Histogram { + buckets: buckets.clone(), + }), + _ => Err(anyhow!("unsupported VDAF: {vdaf:?}")), + } + } +} + +#[derive(Serialize)] +pub struct NewTaskRequest { + pub name: String, + pub leader_aggregator_id: String, + pub helper_aggregator_id: String, + pub vdaf: ApiVdaf, + pub min_batch_size: u64, + pub max_batch_size: Option, + pub expiration: String, + pub time_precision_seconds: u64, + pub hpke_config: String, +} + +/// Representation of a DAP task in responses from divviup-api. This application ignores several +/// fields that we never use. +#[derive(Deserialize)] +pub struct DivviUpApiTask { + /// DAP task ID + pub id: String, +} + +/// Request to pair an aggregator with divviup-api +#[derive(Serialize)] +pub struct NewAggregatorRequest { + pub role: String, + pub name: String, + pub api_url: String, + pub dap_url: String, + /// Bearer token for authenticating requests to this aggregator's aggregator API + pub bearer_token: String, +} + +/// Representation of an aggregator in responses from divviup-api. This application ignores several +/// fields that we never use. +#[derive(Deserialize)] +pub struct DivviUpAggregator { + pub id: String, + pub dap_url: Url, +} + +const DIVVIUP_CONTENT_TYPE: &str = "application/vnd.divviup+json;version=0.1"; + +pub struct DivviupApiClient { + port_forward: PortForward, + client: reqwest::Client, +} + +impl DivviupApiClient { + pub fn new(port_forward: PortForward) -> Self { + Self { + port_forward, + client: reqwest::Client::new(), + } + } + + pub async fn make_request( + &self, + method: Method, + path: &str, + body: Option, + request_description: &str, + ) -> R { + let mut builder = self + .client + .request( + method, + format!( + "http://127.0.0.1:{}/api/{path}", + self.port_forward.local_port() + ), + ) + .header(CONTENT_TYPE, DIVVIUP_CONTENT_TYPE) + .header(ACCEPT, DIVVIUP_CONTENT_TYPE); + if let Some(body) = body { + let body_string = serde_json::to_string(&body).unwrap(); + builder = builder.body(body_string); + } + + let resp = builder.send().await.unwrap(); + let status = resp.status(); + if !status.is_success() { + let resp_text = resp.text().await; + panic!("{request_description} request returned status code {status}, {resp_text:?}"); + } + + resp.json().await.unwrap() + } + + pub async fn create_account(&self) -> Account { + self.make_request( + Method::POST, + "accounts", + Some(json!({"name": "Integration test account"})), + "Account creation", + ) + .await + } + + pub async fn pair_global_aggregator( + &self, + request: &NewAggregatorRequest, + ) -> DivviUpAggregator { + self.make_request( + Method::POST, + "aggregators", + Some(request), + "Global aggregator pairing", + ) + .await + } + + pub async fn pair_aggregator( + &self, + account: &Account, + request: &NewAggregatorRequest, + ) -> DivviUpAggregator { + self.make_request( + Method::POST, + &format!("accounts/{}/aggregators", account.id), + Some(request), + "Aggregator pairing", + ) + .await + } + + pub async fn create_task(&self, account: &Account, request: &NewTaskRequest) -> DivviUpApiTask { + self.make_request( + Method::POST, + &format!("accounts/{}/tasks", account.id), + Some(request), + "Task creation", + ) + .await + } + + pub async fn list_collector_auth_tokens(&self, task: &DivviUpApiTask) -> Vec { + // Hack: we must choose some specialization for the B type despite the request having no + // Body + self.make_request::( + Method::GET, + &format!("tasks/{}/collector_auth_tokens", task.id), + None, + "List collector auth tokens", + ) + .await + } +} diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index d8259a4f2..2745e8553 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -2,5 +2,6 @@ pub mod client; pub mod daphne; +pub mod divviup_api_client; pub mod interop_api; pub mod janus; diff --git a/integration_tests/tests/in_cluster.rs b/integration_tests/tests/in_cluster.rs index 767ef6c74..4bd2c110c 100644 --- a/integration_tests/tests/in_cluster.rs +++ b/integration_tests/tests/in_cluster.rs @@ -1,26 +1,27 @@ #![cfg(feature = "in-cluster")] -use anyhow::anyhow; use base64::engine::{ general_purpose::{STANDARD, URL_SAFE_NO_PAD}, Engine, }; use common::{submit_measurements_and_verify_aggregate, test_task_builders}; -use http::header::{ACCEPT, CONTENT_TYPE}; -use janus_aggregator_core::task::{QueryType, Task}; +use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType, Task}; +use janus_collector::AuthenticationToken; use janus_core::{ hpke::HpkePrivateKey, - task::VdafInstance, + task::{DapAuthToken, VdafInstance}, test_util::{ install_test_trace_subscriber, kubernetes::{Cluster, PortForward}, }, }; -use janus_integration_tests::client::ClientBackend; -use janus_messages::{Role, TaskId}; +use janus_integration_tests::{ + client::ClientBackend, + divviup_api_client::{DivviupApiClient, NewAggregatorRequest, NewTaskRequest}, +}; +use janus_messages::TaskId; use prio::codec::Encode; -use serde::{Deserialize, Serialize}; -use std::env; +use std::{env, str::FromStr}; use url::Url; mod common; @@ -47,103 +48,161 @@ impl InClusterJanusPair { /// needed to connect to the cluster. /// - `JANUS_E2E_KUBECTL_CONTEXT_NAME`: The name of a context in the kubeconfig file. /// - `JANUS_E2E_LEADER_NAMESPACE`: The Kubernetes namespace where the DAP leader is deployed. - /// - `JANUS_E2E_LEADER_API_NAMESPACE`: The Kubernetes namespace where the leader's instance of - /// divviup-api is deployed. + /// - `JANUS_E2E_LEADER_AGGREGATOR_API_AUTH_TOKEN`: Credential with which requests to the + /// leader's aggregator API are authenticated. /// - `JANUS_E2E_HELPER_NAMESPACE`: The Kubernetes namespace where the DAP helper is deployed. - /// - `JANUS_E2E_HELPER_API_NAMESPACE`: The Kubernetes namespace where the helper's instance of - /// divviup-api is deployed. + /// - `JANUS_E2E_HELPER_AGGREGATOR_API_AUTH_TOKEN`: Credential with which requests to the + /// helper's aggregator API are authenticated. + /// - `JANUS_E2E_DIVVIUP_API_NAMESPACE`: The Kubernetes namespace where `divviup-api` is + /// deployed. async fn new(vdaf: VdafInstance, query_type: QueryType) -> Self { - let (collector_private_key, mut leader_task, mut helper_task) = - test_task_builders(vdaf, query_type); - leader_task = leader_task.with_min_batch_size(100); - helper_task = helper_task.with_min_batch_size(100); - let ( kubeconfig_path, kubectl_context_name, leader_namespace, - leader_api_namespace, + leader_aggregator_api_auth_token, helper_namespace, - helper_api_namespace, + helper_aggregator_api_auth_token, + divviup_api_namespace, ) = match ( env::var("JANUS_E2E_KUBE_CONFIG_PATH"), env::var("JANUS_E2E_KUBECTL_CONTEXT_NAME"), env::var("JANUS_E2E_LEADER_NAMESPACE"), - env::var("JANUS_E2E_LEADER_API_NAMESPACE"), + env::var("JANUS_E2E_LEADER_AGGREGATOR_API_AUTH_TOKEN"), env::var("JANUS_E2E_HELPER_NAMESPACE"), - env::var("JANUS_E2E_HELPER_API_NAMESPACE"), + env::var("JANUS_E2E_HELPER_AGGREGATOR_API_AUTH_TOKEN"), + env::var("JANUS_E2E_DIVVIUP_API_NAMESPACE"), ) { ( Ok(kubeconfig_path), Ok(kubectl_context_name), Ok(leader_namespace), - Ok(leader_api_namespace), + Ok(leader_aggregator_api_auth_token), Ok(helper_namespace), - Ok(helper_api_namespace), + Ok(helper_aggregator_api_auth_token), + Ok(divviup_api_namespace), ) => ( kubeconfig_path, kubectl_context_name, leader_namespace, - leader_api_namespace, + leader_aggregator_api_auth_token, helper_namespace, - helper_api_namespace, + helper_aggregator_api_auth_token, + divviup_api_namespace, ), _ => panic!("missing or invalid environment variables"), }; let cluster = Cluster::new(&kubeconfig_path, &kubectl_context_name); + let (collector_private_key, task_builder, _) = test_task_builders(vdaf, query_type); + let task = task_builder.with_min_batch_size(100).build(); + // From outside the cluster, the aggregators are reached at a dynamically allocated port on // localhost. When the aggregators talk to each other, they do so in the cluster's network, // so they need the in-cluster DNS name of the other aggregator, and they can use well-known - // service port numbers. The leader uses its view of its own endpoint URL to construct - // collection job URIs, so we will only patch each aggregator's view of its peer's endpoint. - let leader_endpoints = { - let mut endpoints = leader_task.aggregator_endpoints().to_vec(); - endpoints[1] = Self::in_cluster_aggregator_url(&helper_namespace); - endpoints - }; - let leader_task = leader_task - .with_aggregator_endpoints(leader_endpoints) - .build(); - let leader = InClusterJanus::new( - &cluster, - &leader_namespace, - &leader_api_namespace, - &leader_task, - ) - .await; - - let helper_endpoints = { - let mut endpoints = helper_task.aggregator_endpoints().to_vec(); - endpoints[0] = Self::in_cluster_aggregator_url(&leader_namespace); - endpoints + // service port numbers. + + let divviup_api = DivviupApiClient::new( + cluster + .forward_port(&divviup_api_namespace, "divviup-api", 80) + .await, + ); + + // Create an account first. (We should be implicitly logged in as a testing user already, + // assuming divviup-api was built with the integration-testing feature) + let account = divviup_api.create_account().await; + + // Pair the aggregators. The same Janus instances will get paired multiple times across + // multiple tests, but it's to a different divviup-api account each time, so that's + // harmless. The leader aggregator is paired as a *global* aggregator using the admin + // endpoint. We do this for two reasons: + // + // - setting up tasks with one global aggregator and one per-account aggregator is most + // representative of the subscriber use cases Divvi Up supports, + // - pairing a global aggregator implictly marks it as "first-party" in divviup-api, which + // is necessary for the task we later provision to pass a validity check. + let paired_leader_aggregator = divviup_api + .pair_global_aggregator(&NewAggregatorRequest { + role: "Leader".to_string(), + name: "leader".to_string(), + api_url: Self::in_cluster_aggregator_api_url(&leader_namespace).to_string(), + dap_url: Self::in_cluster_aggregator_dap_url(&leader_namespace).to_string(), + bearer_token: leader_aggregator_api_auth_token, + }) + .await; + + let paired_helper_aggregator = divviup_api + .pair_aggregator( + &account, + &NewAggregatorRequest { + role: "Helper".to_string(), + name: "helper".to_string(), + api_url: Self::in_cluster_aggregator_api_url(&helper_namespace).to_string(), + dap_url: Self::in_cluster_aggregator_dap_url(&helper_namespace).to_string(), + bearer_token: helper_aggregator_api_auth_token, + }, + ) + .await; + + let provision_task_request = NewTaskRequest { + name: "Integration test task".to_string(), + leader_aggregator_id: paired_leader_aggregator.id, + helper_aggregator_id: paired_helper_aggregator.id, + vdaf: task.vdaf().try_into().unwrap(), + min_batch_size: task.min_batch_size(), + max_batch_size: match task.query_type() { + QueryType::TimeInterval => None, + QueryType::FixedSize { max_batch_size } => Some(*max_batch_size), + }, + expiration: "3000-01-01T00:00:00Z".to_owned(), + time_precision_seconds: task.time_precision().as_seconds(), + hpke_config: STANDARD.encode(task.collector_hpke_config().get_encoded()), }; - let helper_task = helper_task - .with_aggregator_endpoints(helper_endpoints) + + // Provision the task into both aggregators via divviup-api + let provisioned_task = divviup_api + .create_task(&account, &provision_task_request) + .await; + + let collector_auth_tokens = divviup_api + .list_collector_auth_tokens(&provisioned_task) + .await; + + // Update the task with the ID and collector auth token from divviup-api. + let task = TaskBuilder::from(task) + .with_id(TaskId::from_str(provisioned_task.id.as_ref()).unwrap()) + .with_collector_auth_tokens(Vec::from([AuthenticationToken::DapAuth( + DapAuthToken::try_from( + URL_SAFE_NO_PAD + .decode(collector_auth_tokens[0].clone()) + .unwrap(), + ) + .unwrap(), + )])) .build(); - let helper = InClusterJanus::new( - &cluster, - &helper_namespace, - &helper_api_namespace, - &helper_task, - ) - .await; Self { - leader_task, + leader_task: task, collector_private_key, - leader, - helper, + leader: InClusterJanus::new(&cluster, &leader_namespace).await, + helper: InClusterJanus::new(&cluster, &helper_namespace).await, } } - fn in_cluster_aggregator_url(namespace: &str) -> Url { + fn in_cluster_aggregator_dap_url(namespace: &str) -> Url { Url::parse(&format!( "http://aggregator.{namespace}.svc.cluster.local:80" )) .unwrap() } + + fn in_cluster_aggregator_api_url(namespace: &str) -> Url { + Url::parse(&format!( + "http://aggregator-api.{namespace}.svc.cluster.local:8081" + )) + .unwrap() + } } struct InClusterJanus { @@ -153,16 +212,7 @@ struct InClusterJanus { impl InClusterJanus { /// Set up a port forward to an existing Janus instance in a Kubernetes cluster, and provision a /// DAP task in it via divviup-api. - async fn new( - cluster: &Cluster, - aggregator_namespace: &str, - control_plane_namespace: &str, - task: &Task, - ) -> Self { - let divviup_api_port_forward = cluster - .forward_port(control_plane_namespace, "divviup-api", 80) - .await; - divviup_api_create_task(&divviup_api_port_forward, task).await; + async fn new(cluster: &Cluster, aggregator_namespace: &str) -> Self { let aggregator_port_forward = cluster .forward_port(aggregator_namespace, "aggregator", 80) .await; @@ -176,154 +226,6 @@ impl InClusterJanus { } } -// Serialization/deserialization helper structures for interaction with divviup-api. - -#[derive(Deserialize)] -struct Account { - id: String, -} - -#[derive(Serialize)] -struct Histogram { - buckets: Vec, -} - -#[derive(Serialize)] -struct Sum { - bits: usize, -} - -#[derive(Serialize)] -#[serde(rename_all = "snake_case", tag = "type")] -enum ApiVdaf { - Count, - Histogram(Histogram), - Sum(Sum), -} - -impl TryFrom<&VdafInstance> for ApiVdaf { - type Error = anyhow::Error; - - fn try_from(vdaf: &VdafInstance) -> Result { - match vdaf { - VdafInstance::Prio3Count => Ok(ApiVdaf::Count), - VdafInstance::Prio3Sum { bits } => Ok(ApiVdaf::Sum(Sum { bits: *bits })), - VdafInstance::Prio3Histogram { buckets } => Ok(ApiVdaf::Histogram(Histogram { - buckets: buckets.clone(), - })), - _ => Err(anyhow!("unsupported VDAF: {vdaf:?}")), - } - } -} - -#[derive(Serialize)] -struct NewTaskRequest { - name: String, - id: TaskId, - partner_url: String, - vdaf: ApiVdaf, - min_batch_size: u64, - max_batch_size: Option, - is_leader: bool, - vdaf_verify_key: String, - expiration: String, - time_precision_seconds: u64, - hpke_config: String, - aggregator_auth_token: String, - collector_auth_token: Option, -} - -impl TryFrom<&Task> for NewTaskRequest { - type Error = anyhow::Error; - - fn try_from(task: &Task) -> Result { - let partner_url = match task.role() { - Role::Leader => task.aggregator_endpoints()[Role::Helper.index().unwrap()].to_string(), - Role::Helper => task.aggregator_endpoints()[Role::Leader.index().unwrap()].to_string(), - _ => unreachable!(), - }; - let max_batch_size = match task.query_type() { - QueryType::TimeInterval => None, - QueryType::FixedSize { max_batch_size } => Some(*max_batch_size), - }; - let collector_auth_token = if *task.role() == Role::Leader { - Some(URL_SAFE_NO_PAD.encode(task.primary_collector_auth_token().as_ref())) - } else { - None - }; - Ok(Self { - name: format!("Integration test task: {task:?}"), - id: *task.id(), - partner_url, - max_batch_size, - vdaf: task.vdaf().try_into()?, - min_batch_size: task.min_batch_size(), - is_leader: *task.role() == Role::Leader, - vdaf_verify_key: URL_SAFE_NO_PAD.encode(&task.vdaf_verify_keys()[0]), - expiration: "3000-01-01T00:00:00Z".to_owned(), - time_precision_seconds: task.time_precision().as_seconds(), - hpke_config: STANDARD.encode(task.collector_hpke_config().get_encoded()), - aggregator_auth_token: URL_SAFE_NO_PAD - .encode(task.primary_aggregator_auth_token().as_ref()), - collector_auth_token, - }) - } -} - -const DIVVIUP_CONTENT_TYPE: &str = "application/vnd.divviup+json;version=0.1"; - -async fn divviup_api_create_task(port_forward: &PortForward, task: &Task) { - // TODO(#1528): divviup-api is responsible for provisioning the task into both aggregators. This - // will need to adopt its new task creation message. - - let client = reqwest::Client::new(); - // Create an account first. (We should be implicitly logged in as a testing user already, - // assuming divviup-api was built with the integration-testing feature) - let create_account_resp = client - .post(&format!( - "http://127.0.0.1:{}/api/accounts", - port_forward.local_port() - )) - .header(CONTENT_TYPE, DIVVIUP_CONTENT_TYPE) - .header(ACCEPT, DIVVIUP_CONTENT_TYPE) - .body("{\"name\":\"Integration test account\"}") - .send() - .await - .unwrap(); - let create_account_status = create_account_resp.status(); - if !create_account_status.is_success() { - let response_text_res = create_account_resp.text().await; - panic!( - "Account creation request returned status code {}, {:?}", - create_account_status, response_text_res - ); - } - let account = create_account_resp.json::().await.unwrap(); - - // Create the task within the new account. - let request = NewTaskRequest::try_from(task).unwrap(); - let create_task_resp = client - .post(format!( - "http://127.0.0.1:{}/api/accounts/{}/tasks", - port_forward.local_port(), - account.id - )) - .json(&request) - .header(CONTENT_TYPE, DIVVIUP_CONTENT_TYPE) - .header(ACCEPT, DIVVIUP_CONTENT_TYPE) - .send() - .await - .unwrap(); - let create_task_status = create_task_resp.status(); - if !create_task_status.is_success() { - let response_text_res = create_task_resp.text().await; - panic!( - "Task creation request returned status code {}, {:?}", - create_task_status, response_text_res - ); - } -} - #[tokio::test(flavor = "multi_thread")] async fn in_cluster_count() { install_test_trace_subscriber();