Skip to content

Commit

Permalink
Rewrite in-cluster integration test with divviup-api (#1383)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored May 19, 2023
1 parent 60e0de1 commit a060bcd
Show file tree
Hide file tree
Showing 14 changed files with 455 additions and 262 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ janus_messages = { version = "0.4", path = "messages" }
k8s-openapi = { version = "0.18.0", features = ["v1_24"] } # keep this version in sync with what is referenced by the indirect dependency via `kube`
kube = { version = "0.82.2", default-features = false, features = ["client", "rustls-tls"] }
prio = { version = "0.12.1", features = ["multithreaded"] }
serde = { version = "1.0.163", features = ["derive"] }
rstest = "0.17.0"
trillium = "0.2.9"
trillium-api = { version = "0.2.0-rc.3", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ regex = "1"
reqwest = { version = "0.11.17", default-features = false, features = ["rustls-tls", "json"] }
ring = "0.16.20"
routefinder = "0.5.3"
serde = { version = "1.0.163", features = ["derive"] }
serde.workspace = true
serde_json = "1.0.96"
serde_urlencoded = "0.7.1"
serde_yaml = "0.9.21"
Expand Down
2 changes: 1 addition & 1 deletion aggregator_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ opentelemetry = "0.18"
querystring = "1.1.0"
rand = { version = "0.8", features = ["min_const_gen"] }
ring = "0.16.20"
serde = { version = "1.0.163", features = ["derive"] }
serde.workspace = true
serde_json = "1.0.96"
serde_test = "1.0.163"
tracing = "0.1.37"
Expand Down
2 changes: 1 addition & 1 deletion aggregator_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ rand = { version = "0.8", features = ["min_const_gen"] }
regex = "1"
reqwest = { version = "0.11.17", default-features = false, features = ["rustls-tls", "json"] }
ring = "0.16.20"
serde = { version = "1.0.163", features = ["derive"] }
serde.workspace = true
serde_json = "1.0.96"
serde_yaml = "0.9.21"
sqlx = { version = "0.6.3", optional = true, features = ["runtime-tokio-rustls", "migrate", "postgres"] }
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ prio.workspace = true
rand = "0.8"
reqwest = { version = "0.11.17", default-features = false, features = ["rustls-tls", "json"] }
ring = "0.16.20"
serde = { version = "1.0.163", features = ["derive"] }
serde.workspace = true
serde_json = { version = "1.0.96", optional = true }
serde_yaml = "0.9.21"
stopper = { version = "0.2.0", optional = true }
Expand Down
11 changes: 4 additions & 7 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ rust-version.workspace = true
version.workspace = true

[features]
# This feature is deprecated, and will be removed in the future, once janus-ops
# no longer depends on it. Rustls is now used unconditionally for Kubernetes
# connections.
kube-openssl = []
in-cluster = ["dep:k8s-openapi", "dep:kube", "dep:serde"]

[dependencies]
anyhow.workspace = true
Expand All @@ -26,15 +23,15 @@ janus_client.workspace = true
janus_core = { workspace = true, features = ["test-util"] }
janus_interop_binaries = { workspace = true, features = ["testcontainer"] }
janus_messages.workspace = true
k8s-openapi.workspace = true
kube.workspace = true
k8s-openapi = { workspace = true, optional = true }
kube = { workspace = true, optional = true }
prio.workspace = true
rand = "0.8"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
serde = { workspace = true, optional = true }
serde_json = "1.0.96"
testcontainers = "0.14.0"
tokio = { version = "1", features = ["full", "tracing"] }
tracing = "0.1.37"
url = { version = "2.3.1", features = ["serde"] }

[dev-dependencies]
Expand Down
179 changes: 29 additions & 150 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,27 @@
//! Functionality for tests interacting with Janus (<https://github.com/divviup/janus>).

use crate::interop_api;
use janus_aggregator::{
binary_utils::{database_pool, datastore},
config::DbConfig,
};
use janus_aggregator_core::task::Task;
use janus_core::{
test_util::kubernetes::{Cluster, PortForward},
time::RealClock,
};
use janus_interop_binaries::{
log_export_path, test_util::await_http_server, testcontainer::Aggregator,
};
use janus_messages::Role;
use k8s_openapi::api::core::v1::Secret;
use std::{
path::Path,
process::{Command, Stdio},
thread::panicking,
};
use testcontainers::{clients::Cli, Container, RunnableImage};
use tracing::debug;
use url::Url;

/// Represents a running Janus test instance
#[allow(clippy::large_enum_variant)]
pub enum Janus<'a> {
/// Janus components are spawned in a container, and completely destroyed once the test ends.
Container {
role: Role,
container: Container<'a, Aggregator>,
},

/// Janus components are assumed to already be running in the Kubernetes cluster. Running tests
/// against the cluster will persistently mutate the Janus deployment, for instance by writing
/// new tasks and reports into its datastore.
KubernetesCluster {
aggregator_port_forward: PortForward,
},
/// Represents a running Janus test instance in a container.
pub struct Janus<'a> {
role: Role,
container: Container<'a, Aggregator>,
}

impl<'a> Janus<'a> {
/// Create & start a new hermetic Janus test instance in the given Docker network, configured
/// Create and start a new hermetic Janus test instance in the given Docker network, configured
/// to service the given task. The aggregator port is also exposed to the host.
pub async fn new_in_container(
container_client: &'a Cli,
network: &str,
task: &Task,
) -> Janus<'a> {
pub async fn new(container_client: &'a Cli, network: &str, task: &Task) -> Janus<'a> {
// Start the Janus interop aggregator container running.
let endpoint = task.aggregator_url(task.role()).unwrap();
let container = container_client.run(
Expand All @@ -64,139 +37,45 @@ impl<'a> Janus<'a> {
// Write the given task to the Janus instance we started.
interop_api::aggregator_add_task(port, task.clone()).await;

Self::Container {
Self {
role: *task.role(),
container,
}
}

/// Returns the port of the aggregator on the host.
pub fn port(&self) -> u16 {
match self {
Janus::Container { container, .. } => {
container.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT)
}
Janus::KubernetesCluster {
aggregator_port_forward,
..
} => aggregator_port_forward.local_port(),
}
}
}

impl Janus<'static> {
/// Set up a test case running in a Kubernetes cluster where Janus components and a datastore
/// are assumed to already be deployed.
pub async fn new_with_kubernetes_cluster<P>(
kubeconfig_path: P,
kubernetes_context_name: &str,
namespace: &str,
task: &Task,
) -> Janus<'static>
where
P: AsRef<Path>,
{
let cluster = Cluster::new(kubeconfig_path, kubernetes_context_name);

// Read the Postgres password and the datastore encryption key from Kubernetes secrets
let secrets_api: kube::Api<Secret> =
kube::Api::namespaced(cluster.client().await, namespace);

let database_password_secret = secrets_api.get("postgresql").await.unwrap();
let database_password = String::from_utf8(
database_password_secret
.data
.unwrap()
.get("postgres-password")
.unwrap()
.0
.clone(),
)
.unwrap();

let datastore_key_secret = secrets_api.get("datastore-key").await.unwrap();
let datastore_key = String::from_utf8(
datastore_key_secret
.data
.unwrap()
.get("datastore_key")
.unwrap()
.0
.clone(),
)
.unwrap();

// Forward database port so we can provision the task. We assume here that there is a
// service named "postgresql" listening on port 5432. We could instead look up the service
// by some label and dynamically discover its port, but being coupled to a label value isn't
// much different than being coupled to a service name.
let datastore_port_forward = cluster.forward_port(namespace, "postgresql", 5432).await;
let local_db_port = datastore_port_forward.local_port();
debug!("forwarded DB port");

let pool = database_pool(
&DbConfig {
url: Url::parse(&format!(
"postgres://postgres:{database_password}@127.0.0.1:{local_db_port}/postgres"
))
.unwrap(),
connection_pool_timeouts_secs: 60,
check_schema_version: true,
},
None,
)
.await
.unwrap();

// Since the Janus components are already running when the task is provisioned, they all
// must be configured to frequently poll the datastore for new tasks, or the test that
// depends on this task being defined will likely time out or otherwise fail.
// This should become more robust in the future when we implement dynamic task provisioning
// (#44).
let datastore = datastore(pool, RealClock::default(), &[datastore_key], true)
.await
.unwrap();
datastore.put_task(task).await.unwrap();

let aggregator_port_forward = cluster.forward_port(namespace, "aggregator", 80).await;

Self::KubernetesCluster {
aggregator_port_forward,
}
self.container
.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT)
}
}

impl<'a> Drop for Janus<'a> {
fn drop(&mut self) {
// We assume that if a Janus value is dropped during a panic, we are in the middle of
// test failure. In this case, export logs if log_export_path() suggests doing so.
//
// (log export is a no-op for non-containers: when running tests against a cluster, we
// gather up logfiles with `kind export logs`)

if let Janus::Container { role, container } = self {
if !panicking() {
return;
}
if let Some(mut destination_path) = log_export_path() {
destination_path.push(format!("{}-{}", role, container.id()));
if let Ok(docker_cp_status) = Command::new("docker")
.args([
"cp",
&format!("{}:logs/", container.id()),
destination_path.as_os_str().to_str().unwrap(),
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
{
if !docker_cp_status.success() {
println!("`docker cp` failed with status {docker_cp_status:?}");
}
} else {
println!("Failed to execute `docker cp`");
if !panicking() {
return;
}
if let Some(mut destination_path) = log_export_path() {
destination_path.push(format!("{}-{}", self.role, self.container.id()));
if let Ok(docker_cp_status) = Command::new("docker")
.args([
"cp",
&format!("{}:logs/", self.container.id()),
destination_path.as_os_str().to_str().unwrap(),
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
{
if !docker_cp_status.success() {
println!("`docker cp` failed with status {docker_cp_status:?}");
}
} else {
println!("Failed to execute `docker cp`");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/tests/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn daphne_janus() {

let container_client = container_client();
let leader = Daphne::new(&container_client, &network, &leader_task).await;
let helper = Janus::new_in_container(&container_client, &network, &helper_task).await;
let helper = Janus::new(&container_client, &network, &helper_task).await;

// Run the behavioral test.
submit_measurements_and_verify_aggregate(
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn janus_daphne() {
.unwrap();

let container_client = container_client();
let leader = Janus::new_in_container(&container_client, &network, &leader_task).await;
let leader = Janus::new(&container_client, &network, &leader_task).await;
let helper = Daphne::new(&container_client, &network, &helper_task).await;

// Run the behavioral test.
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/tests/divviup_ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ async fn run_divviup_ts_integration_test(container_client: &Cli, vdaf: VdafInsta
test_task_builders(vdaf, QueryType::TimeInterval);
let leader_task = leader_task.build();
let network = generate_network_name();
let leader = Janus::new_in_container(container_client, &network, &leader_task).await;
let helper = Janus::new_in_container(container_client, &network, &helper_task.build()).await;
let leader = Janus::new(container_client, &network, &leader_task).await;
let helper = Janus::new(container_client, &network, &helper_task.build()).await;

let client_backend = ClientBackend::Container {
container_client,
Expand Down
Loading

0 comments on commit a060bcd

Please sign in to comment.