Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally serve aggregator API alongside DAP API #1547

Merged
merged 2 commits into from
Jun 29, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 185 additions & 50 deletions aggregator/src/bin/aggregator.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs/samples/advanced_config/aggregator.yaml and any other files with the old settings should be updated as well.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{iter::Iterator, net::SocketAddr, sync::Arc, time::Duration};
use tokio::join;
use tracing::info;
use trillium::Headers;
use trillium_router::router;
use trillium_tokio::Stopper;

#[tokio::main]
Expand All @@ -31,62 +32,84 @@ async fn main() -> Result<()> {
.response_headers()
.context("failed to parse response headers")?;

let aggregator_handler = aggregator_handler(
Arc::clone(&datastore),
ctx.clock,
ctx.config.aggregator_config(),
)?;
let mut handlers = (
aggregator_handler(
Arc::clone(&datastore),
ctx.clock,
ctx.config.aggregator_config(),
)?,
None,
);

let (aggregator_bound_address, aggregator_server) = setup_server(
ctx.config.listen_address,
response_headers.clone(),
stopper.clone(),
aggregator_handler,
)
.await
.context("failed to create aggregator server")?;
let aggregator_api_auth_tokens = ctx
.options
.aggregator_api_auth_tokens
.iter()
.filter(|token| !token.is_empty())
.map(|token| {
let token_bytes = STANDARD
.decode(token)
.context("couldn't base64-decode aggregator API auth token")?;

info!(?aggregator_bound_address, "Running aggregator");
Ok(SecretBytes::new(token_bytes))
})
.collect::<Result<Vec<_>>>()?;

let aggregator_api_server =
if let Some(aggregator_api_listen_address) = ctx.config.aggregator_api_listen_address {
let auth_tokens = ctx
.options
.aggregator_api_auth_tokens
.iter()
.filter(|token| !token.is_empty())
.map(|token| {
let token_bytes = STANDARD
.decode(token)
.context("couldn't base64-decode aggregator API auth token")?;

Ok(SecretBytes::new(token_bytes))
})
.collect::<Result<Vec<_>>>()?;

let aggregator_api_handler = aggregator_api_handler(
Arc::clone(&datastore),
janus_aggregator_api::Config { auth_tokens },
);
let inner_aggregator_api_handler = aggregator_api_handler(
Arc::clone(&datastore),
janus_aggregator_api::Config {
auth_tokens: aggregator_api_auth_tokens,
},
);

// No-op closure to unconditionally pass to tokio::join!
let mut aggregator_api_future = Box::pin(async {}) as Pin<Box<dyn Future<Output = ()>>>;

match ctx.config.aggregator_api {
Some(AggregatorApi::ListenAddress { listen_address }) => {
// Bind the requested address and spawn a future that serves the aggregator API on
// it, which we'll `tokio::join!` on below
let (aggregator_api_bound_address, aggregator_api_server) = setup_server(
aggregator_api_listen_address,
response_headers,
listen_address,
response_headers.clone(),
stopper.clone(),
aggregator_api_handler,
inner_aggregator_api_handler,
)
.await
.context("failed to create aggregator API server")?;

info!(?aggregator_api_bound_address, "Running aggregator API");

Box::pin(aggregator_api_server) as Pin<Box<dyn Future<Output = ()>>>
} else {
// No-op closure to unconditionally pass to tokio::join!
Box::pin(async {}) as Pin<Box<dyn Future<Output = ()>>>
};
aggregator_api_future =
Box::pin(aggregator_api_server) as Pin<Box<dyn Future<Output = ()>>>
}
Some(AggregatorApi::PathPrefix { path_prefix }) => {
// Create a Trillium handler under the requested path prefix, which we'll add to the
// DAP API handler in the setup_server call below
info!(
aggregator_bound_address = ?ctx.config.listen_address,
?path_prefix,
"Serving aggregator API relative to DAP API"
);
// Append wildcard so that this handler will match anything under the prefix
let path_prefix = format!("{path_prefix}/*");
handlers.1 = Some(router().all(path_prefix, inner_aggregator_api_handler));
}
None => { /* Do nothing */ }
}

let (aggregator_bound_address, aggregator_server) = setup_server(
ctx.config.listen_address,
response_headers,
stopper.clone(),
handlers,
)
.await
.context("failed to create aggregator server")?;

info!(?aggregator_bound_address, "Running aggregator");

join!(aggregator_server, aggregator_api_server);
join!(aggregator_server, aggregator_api_future);
Ok(())
})
.await
Expand Down Expand Up @@ -128,15 +151,55 @@ pub struct HeaderEntry {
value: String,
}

/// Options for serving the aggregator API.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum AggregatorApi {
/// Address on which this server should listen for connections to the Janus aggregator API and
/// serve its API endpoints, independently from the address on which the DAP API is served.
ListenAddress { listen_address: SocketAddr },
/// The Janus aggregator API will be served on the same address as the DAP API, but relative to
/// the provided prefix. e.g., if `path_prefix` is `aggregator-api`, then the DAP API's uploads
/// endpoint would be `{listen-address}/tasks/{task-id}/reports`, while task IDs could be
/// obtained from the aggregator API at `{listen-address}/aggregator-api/task_ids`.
PathPrefix { path_prefix: String },
}

/// Non-secret configuration options for a Janus aggregator, deserialized from YAML.
///
/// # Examples
///
/// Configuration serving the aggregator API on its own port, distinct from the DAP API:
///
/// ```
/// let yaml_config = r#"
/// ---
/// listen_address: "0.0.0.0:8080"
/// aggregator_api:
/// listen_address: "0.0.0.0:8081"
/// response_headers:
/// - name: "Example"
/// value: "header value"
/// database:
/// url: "postgres://postgres:postgres@localhost:5432/postgres"
/// logging_config: # logging_config is optional
/// force_json_output: true
/// max_upload_batch_size: 100
/// max_upload_batch_write_delay_ms: 250
/// batch_aggregation_shard_count: 32
/// "#;
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
///
/// Configuration serving the aggregator API relative to the DAP API:
///
/// ```
/// let yaml_config = r#"
/// ---
/// listen_address: "0.0.0.0:8080"
/// aggregator_api_listen_address: "0.0.0.0:8081"
/// aggregator_api:
/// path_prefix: "aggregator-api"
/// response_headers:
/// - name: "Example"
/// value: "header value"
Expand All @@ -161,9 +224,8 @@ struct Config {
// TODO(#232): options for terminating TLS, unless that gets handled in a load balancer?
listen_address: SocketAddr,

/// Address on which this server should listen for connections to the Janus aggregator API and
/// serve its API endpoints. If not set, the aggregator API is not served.
aggregator_api_listen_address: Option<SocketAddr>,
/// How to serve the Janus aggregator API. If not set, the aggregator API is not served.
aggregator_api: Option<AggregatorApi>,

/// Additional headers that will be added to all responses.
#[serde(default)]
Expand Down Expand Up @@ -219,7 +281,7 @@ impl BinaryConfig for Config {

#[cfg(test)]
mod tests {
use super::{Config, HeaderEntry, Options};
use super::{AggregatorApi, Config, HeaderEntry, Options};
use clap::CommandFactory;
use janus_aggregator::{
aggregator,
Expand All @@ -244,11 +306,16 @@ mod tests {
Options::command().debug_assert()
}

#[rstest::rstest]
#[case::listen_address(AggregatorApi::ListenAddress {
listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081)),
})]
#[case::path_prefix(AggregatorApi::PathPrefix { path_prefix: "prefix".to_string() })]
#[test]
fn roundtrip_config() {
fn roundtrip_config(#[case] aggregator_api: AggregatorApi) {
roundtrip_encoding(Config {
listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080)),
aggregator_api_listen_address: Some(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081))),
aggregator_api: Some(aggregator_api),
common_config: CommonConfig {
database: generate_db_config(),
logging_config: generate_trace_config(),
Expand All @@ -265,6 +332,74 @@ mod tests {
})
}

#[test]
fn config_no_aggregator_api() {
assert_eq!(
serde_yaml::from_str::<Config>(
r#"---
listen_address: "0.0.0.0:8080"
database:
url: "postgres://postgres:postgres@localhost:5432/postgres"
connection_pool_timeouts_secs: 60
max_upload_batch_size: 100
max_upload_batch_write_delay_ms: 250
batch_aggregation_shard_count: 32
"#
)
.unwrap()
.aggregator_api,
None
);
}

#[test]
fn config_aggregator_api_listen_address() {
assert_eq!(
serde_yaml::from_str::<Config>(
r#"---
listen_address: "0.0.0.0:8080"
database:
url: "postgres://postgres:postgres@localhost:5432/postgres"
connection_pool_timeouts_secs: 60
max_upload_batch_size: 100
max_upload_batch_write_delay_ms: 250
batch_aggregation_shard_count: 32
aggregator_api:
listen_address: "0.0.0.0:8081"
"#
)
.unwrap()
.aggregator_api,
Some(AggregatorApi::ListenAddress {
listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081))
})
);
}

#[test]
fn config_aggregator_api_path_prefix() {
assert_eq!(
serde_yaml::from_str::<Config>(
r#"---
listen_address: "0.0.0.0:8080"
database:
url: "postgres://postgres:postgres@localhost:5432/postgres"
connection_pool_timeouts_secs: 60
max_upload_batch_size: 100
max_upload_batch_write_delay_ms: 250
batch_aggregation_shard_count: 32
aggregator_api:
path_prefix: "aggregator-api"
"#
)
.unwrap()
.aggregator_api,
Some(AggregatorApi::PathPrefix {
path_prefix: "aggregator-api".to_string()
})
);
}

/// Check that configuration fragments in the README and other documentation can be parsed
/// correctly.
#[test]
Expand Down