Skip to content

Commit

Permalink
update in_cluster for BYOHelper (#1532)
Browse files Browse the repository at this point in the history
Updates the `in_cluster` test harness code to use the divviup-api API
resource for managing aggregators and the automated task provisioning
flow (#1486).

Resolves #1528
  • Loading branch information
tgeoghegan authored Jun 28, 2023
1 parent 38700aa commit f9443a5
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 221 deletions.
6 changes: 3 additions & 3 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ rust-version.workspace = true
version.workspace = true

[features]
in-cluster = ["dep:k8s-openapi", "dep:kube", "dep:serde"]
in-cluster = ["dep:k8s-openapi", "dep:kube"]

[dependencies]
anyhow.workspace = true
backoff = { version = "0.4", features = ["tokio"] }
base64 = "0.21.2"
futures = "0.3.28"
hex = "0.4"
http = "0.2"
janus_aggregator = { workspace = true, features = ["test-util"] }
janus_aggregator_core = { workspace = true, features = ["test-util"] }
janus_client.workspace = true
Expand All @@ -28,14 +29,13 @@ kube = { workspace = true, optional = true }
prio.workspace = true
rand = "0.8"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
serde = { workspace = true, optional = true }
serde.workspace = true
serde_json = "1.0.99"
testcontainers = "0.14.0"
tokio = { version = "1", features = ["full", "tracing"] }
url = { version = "2.4.0", features = ["serde"] }

[dev-dependencies]
http = "0.2"
itertools.workspace = true
janus_collector = { workspace = true, features = ["test-util"] }
tempfile = "3"
192 changes: 192 additions & 0 deletions integration_tests/src/divviup_api_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
use anyhow::anyhow;
use http::{
header::{ACCEPT, CONTENT_TYPE},
Method,
};
use janus_core::{task::VdafInstance, test_util::kubernetes::PortForward};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::json;
use url::Url;

/// Representation of a `divviup-api` account.
#[derive(Deserialize)]
pub struct Account {
id: String,
}

/// Representation of a VDAF in `divviup-api`.
#[derive(Serialize)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum ApiVdaf {
/// Corresponds to Prio3Count
Count,
Histogram {
buckets: Vec<u64>,
},
Sum {
bits: usize,
},
}

impl TryFrom<&VdafInstance> for ApiVdaf {
type Error = anyhow::Error;

fn try_from(vdaf: &VdafInstance) -> Result<Self, Self::Error> {
match vdaf {
VdafInstance::Prio3Count => Ok(ApiVdaf::Count),
VdafInstance::Prio3Sum { bits } => Ok(ApiVdaf::Sum { bits: *bits }),
VdafInstance::Prio3Histogram { buckets } => Ok(ApiVdaf::Histogram {
buckets: buckets.clone(),
}),
_ => Err(anyhow!("unsupported VDAF: {vdaf:?}")),
}
}
}

#[derive(Serialize)]
pub struct NewTaskRequest {
pub name: String,
pub leader_aggregator_id: String,
pub helper_aggregator_id: String,
pub vdaf: ApiVdaf,
pub min_batch_size: u64,
pub max_batch_size: Option<u64>,
pub expiration: String,
pub time_precision_seconds: u64,
pub hpke_config: String,
}

/// Representation of a DAP task in responses from divviup-api. This application ignores several
/// fields that we never use.
#[derive(Deserialize)]
pub struct DivviUpApiTask {
/// DAP task ID
pub id: String,
}

/// Request to pair an aggregator with divviup-api
#[derive(Serialize)]
pub struct NewAggregatorRequest {
pub role: String,
pub name: String,
pub api_url: String,
pub dap_url: String,
/// Bearer token for authenticating requests to this aggregator's aggregator API
pub bearer_token: String,
}

/// Representation of an aggregator in responses from divviup-api. This application ignores several
/// fields that we never use.
#[derive(Deserialize)]
pub struct DivviUpAggregator {
pub id: String,
pub dap_url: Url,
}

const DIVVIUP_CONTENT_TYPE: &str = "application/vnd.divviup+json;version=0.1";

pub struct DivviupApiClient {
port_forward: PortForward,
client: reqwest::Client,
}

impl DivviupApiClient {
pub fn new(port_forward: PortForward) -> Self {
Self {
port_forward,
client: reqwest::Client::new(),
}
}

pub async fn make_request<B: Serialize, R: DeserializeOwned>(
&self,
method: Method,
path: &str,
body: Option<B>,
request_description: &str,
) -> R {
let mut builder = self
.client
.request(
method,
format!(
"http://127.0.0.1:{}/api/{path}",
self.port_forward.local_port()
),
)
.header(CONTENT_TYPE, DIVVIUP_CONTENT_TYPE)
.header(ACCEPT, DIVVIUP_CONTENT_TYPE);
if let Some(body) = body {
let body_string = serde_json::to_string(&body).unwrap();
builder = builder.body(body_string);
}

let resp = builder.send().await.unwrap();
let status = resp.status();
if !status.is_success() {
let resp_text = resp.text().await;
panic!("{request_description} request returned status code {status}, {resp_text:?}");
}

resp.json().await.unwrap()
}

pub async fn create_account(&self) -> Account {
self.make_request(
Method::POST,
"accounts",
Some(json!({"name": "Integration test account"})),
"Account creation",
)
.await
}

pub async fn pair_global_aggregator(
&self,
request: &NewAggregatorRequest,
) -> DivviUpAggregator {
self.make_request(
Method::POST,
"aggregators",
Some(request),
"Global aggregator pairing",
)
.await
}

pub async fn pair_aggregator(
&self,
account: &Account,
request: &NewAggregatorRequest,
) -> DivviUpAggregator {
self.make_request(
Method::POST,
&format!("accounts/{}/aggregators", account.id),
Some(request),
"Aggregator pairing",
)
.await
}

pub async fn create_task(&self, account: &Account, request: &NewTaskRequest) -> DivviUpApiTask {
self.make_request(
Method::POST,
&format!("accounts/{}/tasks", account.id),
Some(request),
"Task creation",
)
.await
}

pub async fn list_collector_auth_tokens(&self, task: &DivviUpApiTask) -> Vec<String> {
// Hack: we must choose some specialization for the B type despite the request having no
// Body
self.make_request::<String, _>(
Method::GET,
&format!("tasks/{}/collector_auth_tokens", task.id),
None,
"List collector auth tokens",
)
.await
}
}
1 change: 1 addition & 0 deletions integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

pub mod client;
pub mod daphne;
pub mod divviup_api_client;
pub mod interop_api;
pub mod janus;
Loading

0 comments on commit f9443a5

Please sign in to comment.