Skip to content

Commit 88c89be

Browse files
committed
feat(edda, forklift, pinga, rebaser, veritech): impl endpoint service
1 parent 81a50aa commit 88c89be

File tree

22 files changed

+271
-19
lines changed

22 files changed

+271
-19
lines changed

bin/edda/src/main.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ fn main() -> Result<()> {
2828
async fn async_main() -> Result<()> {
2929
let main_tracker = TaskTracker::new();
3030
let main_token = CancellationToken::new();
31+
let endpoints_tracker = TaskTracker::new();
32+
let endpoints_token = CancellationToken::new();
3133
let layer_db_tracker = TaskTracker::new();
3234
let layer_db_token = CancellationToken::new();
3335
let telemetry_tracker = TaskTracker::new();
@@ -70,6 +72,17 @@ async fn async_main() -> Result<()> {
7072
let config = load_config_with_provider(args, provider).await?;
7173
debug!(?config, "computed configuration");
7274

75+
let endpoints_server = if config.service_endpoints().enabled {
76+
let endpoints = edda_server::DefaultServiceEndpoints::from_config("edda", &config)?;
77+
Some(edda_server::EndpointsServer::new(
78+
std::sync::Arc::new(endpoints),
79+
config.service_endpoints().clone(),
80+
endpoints_token.clone(),
81+
))
82+
} else {
83+
None
84+
};
85+
7386
let server = Server::from_config(
7487
config,
7588
main_token.clone(),
@@ -83,8 +96,17 @@ async fn async_main() -> Result<()> {
8396
server.run().await
8497
});
8598

99+
if let Some(endpoints_server) = endpoints_server {
100+
endpoints_tracker.spawn(async move {
101+
if let Err(err) = endpoints_server.run().await {
102+
error!(error = ?err, "error running edda endpoints server");
103+
}
104+
});
105+
}
106+
86107
shutdown::graceful()
87108
.group(main_tracker, main_token)
109+
.group(endpoints_tracker, endpoints_token)
88110
.group(layer_db_tracker, layer_db_token)
89111
.group(telemetry_tracker, telemetry_token)
90112
.telemetry_guard(telemetry_shutdown.into_future())

bin/forklift/src/main.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ fn main() -> Result<()> {
3030
async fn async_main() -> Result<()> {
3131
let main_tracker = TaskTracker::new();
3232
let main_token = CancellationToken::new();
33+
let endpoints_tracker = TaskTracker::new();
34+
let endpoints_token = CancellationToken::new();
3335
let telemetry_tracker = TaskTracker::new();
3436
let telemetry_token = CancellationToken::new();
3537

@@ -70,15 +72,35 @@ async fn async_main() -> Result<()> {
7072
let config = load_config_with_provider(args, provider).await?;
7173
debug!(?config, "computed configuration");
7274

75+
let endpoints_server = if config.service_endpoints().enabled {
76+
let endpoints = forklift_server::DefaultServiceEndpoints::from_config("forklift", &config)?;
77+
Some(forklift_server::EndpointsServer::new(
78+
std::sync::Arc::new(endpoints),
79+
config.service_endpoints().clone(),
80+
endpoints_token.clone(),
81+
))
82+
} else {
83+
None
84+
};
85+
7386
let server = Server::from_config(config, main_token.clone()).await?;
7487

7588
main_tracker.spawn(async move {
7689
info!("ready to receive messages");
7790
server.run().await
7891
});
7992

93+
if let Some(endpoints_server) = endpoints_server {
94+
endpoints_tracker.spawn(async move {
95+
if let Err(err) = endpoints_server.run().await {
96+
error!(error = ?err, "error running forklift endpoints server");
97+
}
98+
});
99+
}
100+
80101
shutdown::graceful()
81102
.group(main_tracker, main_token)
103+
.group(endpoints_tracker, endpoints_token)
82104
.group(telemetry_tracker, telemetry_token)
83105
.telemetry_guard(telemetry_shutdown.into_future())
84106
.timeout(GRACEFUL_SHUTDOWN_TIMEOUT)

bin/pinga/src/main.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ fn main() -> Result<()> {
3030
async fn async_main() -> Result<()> {
3131
let main_tracker = TaskTracker::new();
3232
let main_token = CancellationToken::new();
33+
let endpoints_tracker = TaskTracker::new();
34+
let endpoints_token = CancellationToken::new();
3335
let layer_db_tracker = TaskTracker::new();
3436
let layer_db_token = CancellationToken::new();
3537
let telemetry_tracker = TaskTracker::new();
@@ -80,6 +82,18 @@ async fn async_main() -> Result<()> {
8082
let config = load_config_with_provider(args, provider).await?;
8183
debug!(?config, "computed configuration");
8284

85+
// Create optional HTTP endpoints server before Server takes ownership of config
86+
let endpoints_server = if config.service_endpoints().enabled {
87+
let endpoints = pinga_server::DefaultServiceEndpoints::from_config("pinga", &config)?;
88+
Some(pinga_server::EndpointsServer::new(
89+
std::sync::Arc::new(endpoints),
90+
config.service_endpoints().clone(),
91+
endpoints_token.clone(),
92+
))
93+
} else {
94+
None
95+
};
96+
8397
let server = Server::from_config(
8498
config,
8599
main_token.clone(),
@@ -93,8 +107,17 @@ async fn async_main() -> Result<()> {
93107
server.run().await
94108
});
95109

110+
if let Some(endpoints_server) = endpoints_server {
111+
endpoints_tracker.spawn(async move {
112+
if let Err(err) = endpoints_server.run().await {
113+
error!(error = ?err, "error running pinga endpoints server");
114+
}
115+
});
116+
}
117+
96118
shutdown::graceful()
97119
.group(main_tracker, main_token)
120+
.group(endpoints_tracker, endpoints_token)
98121
.group(layer_db_tracker, layer_db_token)
99122
.group(telemetry_tracker, telemetry_token)
100123
.telemetry_guard(telemetry_shutdown.into_future())

bin/rebaser/src/main.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ fn main() -> Result<()> {
3030
async fn async_main() -> Result<()> {
3131
let main_tracker = TaskTracker::new();
3232
let main_token = CancellationToken::new();
33+
let endpoints_tracker = TaskTracker::new();
34+
let endpoints_token = CancellationToken::new();
3335
let layer_db_tracker = TaskTracker::new();
3436
let layer_db_token = CancellationToken::new();
3537
let telemetry_tracker = TaskTracker::new();
@@ -80,6 +82,17 @@ async fn async_main() -> Result<()> {
8082
let config = load_config_with_provider(args, provider).await?;
8183
debug!(?config, "computed configuration");
8284

85+
let endpoints_server = if config.service_endpoints().enabled {
86+
let endpoints = rebaser_server::DefaultServiceEndpoints::from_config("rebaser", &config)?;
87+
Some(rebaser_server::EndpointsServer::new(
88+
std::sync::Arc::new(endpoints),
89+
config.service_endpoints().clone(),
90+
endpoints_token.clone(),
91+
))
92+
} else {
93+
None
94+
};
95+
8396
let server = Server::from_config(
8497
config,
8598
main_token.clone(),
@@ -93,8 +106,17 @@ async fn async_main() -> Result<()> {
93106
server.run().await
94107
});
95108

109+
if let Some(endpoints_server) = endpoints_server {
110+
endpoints_tracker.spawn(async move {
111+
if let Err(err) = endpoints_server.run().await {
112+
error!(error = ?err, "error running rebaser endpoints server");
113+
}
114+
});
115+
}
116+
96117
shutdown::graceful()
97118
.group(main_tracker, main_token)
119+
.group(endpoints_tracker, endpoints_token)
98120
.group(layer_db_tracker, layer_db_token)
99121
.group(telemetry_tracker, telemetry_token)
100122
.telemetry_guard(telemetry_shutdown.into_future())

bin/veritech/src/main.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ fn main() -> Result<()> {
2929
async fn async_main(args: args::Args) -> Result<()> {
3030
let main_tracker = TaskTracker::new();
3131
let main_token = CancellationToken::new();
32+
let endpoints_tracker = TaskTracker::new();
33+
let endpoints_token = CancellationToken::new();
3234
let telemetry_tracker = TaskTracker::new();
3335
let telemetry_token = CancellationToken::new();
3436

@@ -71,6 +73,17 @@ async fn async_main(args: args::Args) -> Result<()> {
7173
let config = load_config_with_provider(args, provider).await?;
7274
debug!(?config, "computed configuration");
7375

76+
let endpoints_server = if config.service_endpoints().enabled {
77+
let endpoints = veritech_server::DefaultServiceEndpoints::from_config("veritech", &config)?;
78+
Some(veritech_server::EndpointsServer::new(
79+
std::sync::Arc::new(endpoints),
80+
config.service_endpoints().clone(),
81+
endpoints_token.clone(),
82+
))
83+
} else {
84+
None
85+
};
86+
7487
let (server, maybe_heartbeat_app) = Server::from_config(config, main_token.clone()).await?;
7588

7689
if let Some(mut heartbeat_app) = maybe_heartbeat_app {
@@ -81,8 +94,17 @@ async fn async_main(args: args::Args) -> Result<()> {
8194
server.run().await
8295
});
8396

97+
if let Some(endpoints_server) = endpoints_server {
98+
endpoints_tracker.spawn(async move {
99+
if let Err(err) = endpoints_server.run().await {
100+
error!(error = ?err, "error running veritech endpoints server");
101+
}
102+
});
103+
}
104+
84105
shutdown::graceful()
85106
.group(main_tracker, main_token)
107+
.group(endpoints_tracker, endpoints_token)
86108
.group(telemetry_tracker, telemetry_token)
87109
.telemetry_guard(telemetry_shutdown.into_future())
88110
.timeout(graceful_shutdown_timeout)

lib/edda-server/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ rust_library(
1919
"//lib/si-frontend-mv-types-rs:si-frontend-mv-types",
2020
"//lib/si-id:si-id",
2121
"//lib/si-layer-cache:si-layer-cache",
22+
"//lib/si-service-endpoints:si-service-endpoints",
2223
"//lib/si-settings:si-settings",
2324
"//lib/si-std:si-std",
2425
"//lib/si-tls:si-tls",

lib/edda-server/src/config.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use si_crypto::{
1818
use si_data_nats::NatsConfig;
1919
use si_data_pg::PgPoolConfig;
2020
use si_layer_cache::db::LayerDbConfig;
21+
use si_service_endpoints::ServiceEndpointsConfig;
2122
pub(crate) use si_settings::StandardConfig;
2223
pub use si_settings::StandardConfigFile;
2324
use si_std::CanonicalFileError;
@@ -56,7 +57,7 @@ impl ConfigError {
5657
type Result<T> = std::result::Result<T, ConfigError>;
5758

5859
/// The config for the forklift server.
59-
#[derive(Debug, Builder)]
60+
#[derive(Debug, Builder, Serialize)]
6061
pub struct Config {
6162
#[builder(default = "random_instance_id()")]
6263
instance_id: String,
@@ -87,6 +88,9 @@ pub struct Config {
8788

8889
#[builder(default = "default_quiescent_period()")]
8990
quiescent_period: Duration,
91+
92+
#[builder(default = "default_service_endpoints_config()")]
93+
service_endpoints: ServiceEndpointsConfig,
9094
}
9195

9296
impl StandardConfig for Config {
@@ -152,6 +156,12 @@ impl Config {
152156
pub fn quiescent_period(&self) -> Duration {
153157
self.quiescent_period
154158
}
159+
160+
/// Gets a reference to the config's service endpoints configuration.
161+
#[must_use]
162+
pub fn service_endpoints(&self) -> &ServiceEndpointsConfig {
163+
&self.service_endpoints
164+
}
155165
}
156166

157167
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -179,6 +189,8 @@ pub struct ConfigFile {
179189
layer_db_config: LayerDbConfig,
180190
#[serde(default = "default_quiescent_period_secs")]
181191
quiescent_period_secs: u64,
192+
#[serde(default = "default_service_endpoints_config")]
193+
service_endpoints: ServiceEndpointsConfig,
182194
}
183195

184196
impl Default for ConfigFile {
@@ -194,6 +206,7 @@ impl Default for ConfigFile {
194206
symmetric_crypto_service: default_symmetric_crypto_config(),
195207
layer_db_config: default_layer_db_config(),
196208
quiescent_period_secs: default_quiescent_period_secs(),
209+
service_endpoints: default_service_endpoints_config(),
197210
}
198211
}
199212
}
@@ -219,6 +232,7 @@ impl TryFrom<ConfigFile> for Config {
219232
config.parallel_build_limit(value.edda_parallel_build_limit);
220233
config.instance_id(value.instance_id);
221234
config.quiescent_period(Duration::from_secs(value.quiescent_period_secs));
235+
config.service_endpoints(value.service_endpoints);
222236
config.build().map_err(Into::into)
223237
}
224238
}
@@ -259,6 +273,10 @@ fn default_quiescent_period_secs() -> u64 {
259273
DEFAULT_QUIESCENT_PERIOD_SECS
260274
}
261275

276+
fn default_service_endpoints_config() -> ServiceEndpointsConfig {
277+
ServiceEndpointsConfig::new(0)
278+
}
279+
262280
#[allow(clippy::disallowed_methods)] // Used to determine if running in development
263281
pub fn detect_and_configure_development(config: &mut ConfigFile) -> Result<()> {
264282
if env::var("BUCK_RUN_BUILD_ID").is_ok() || env::var("BUCK_BUILD_ID").is_ok() {

lib/edda-server/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ mod deployment_processor_task;
1111
mod handlers;
1212
mod local_message;
1313
mod server;
14+
pub use si_service_endpoints::{
15+
DefaultServiceEndpoints,
16+
ServiceEndpointsConfig,
17+
server::EndpointsServer,
18+
};
1419
pub use si_settings::{
1520
ConfigMap,
1621
ParameterProvider,

lib/forklift-server/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ rust_library(
1212
"//lib/naxum:naxum",
1313
"//lib/si-data-nats:si-data-nats",
1414
"//lib/si-events-rs:si-events",
15+
"//lib/si-service-endpoints:si-service-endpoints",
1516
"//lib/si-settings:si-settings",
1617
"//lib/si-std:si-std",
1718
"//lib/si-tls:si-tls",

0 commit comments

Comments
 (0)