Skip to content

Commit 0af2e2e

Browse files
authored
Build custom runtime for configurable worker threads (#91)
* Build custom runtime for configurable worker threads
1 parent 706a633 commit 0af2e2e

File tree

3 files changed

+176
-146
lines changed

3 files changed

+176
-146
lines changed

databroker/src/main.rs

Lines changed: 161 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databroker::broker::RegistrationError;
2121
#[cfg(feature = "tls")]
2222
use databroker::grpc::server::ServerTLS;
2323

24+
use std::thread::available_parallelism;
2425
use tokio::select;
2526
use tokio::signal::unix::{signal, SignalKind};
2627
#[cfg(feature = "tls")]
@@ -170,8 +171,7 @@ async fn read_metadata_file<'a, 'b>(
170171
Ok(())
171172
}
172173

173-
#[tokio::main]
174-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
174+
fn main() -> Result<(), Box<dyn std::error::Error>> {
175175
let version = option_env!("CARGO_PKG_VERSION").unwrap_or_default();
176176

177177
let about = format!(
@@ -249,10 +249,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
249249
)
250250
.arg(
251251
Arg::new("enable-databroker-v1")
252-
.display_order(30)
252+
.display_order(33)
253253
.long("enable-databroker-v1")
254254
.help("Enable sdv.databroker.v1 (GRPC) service")
255255
.action(ArgAction::SetTrue),
256+
)
257+
.arg(
258+
Arg::new("worker-threads")
259+
.display_order(34)
260+
.long("worker-threads")
261+
.help("How many worker threads will be spawned by the tokio runtime. Default is as many cores are detected on the system")
262+
.value_name("WORKER_THREADS")
263+
.required(false)
264+
.env("KUKSA_WORKER_THREADS")
265+
.value_parser(clap::value_parser!(usize))
256266
);
257267

258268
#[cfg(feature = "tls")]
@@ -321,152 +331,166 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
321331

322332
let args = parser.get_matches();
323333

324-
// install global collector configured based on RUST_LOG env var.
325-
databroker::init_logging();
326-
327-
info!("Starting Kuksa Databroker {}", version);
328-
329-
let ip_addr = args.get_one::<String>("address").unwrap().parse()?;
330-
let port = args
331-
.get_one::<u16>("port")
332-
.expect("port should be a number");
333-
let addr = std::net::SocketAddr::new(ip_addr, *port);
334-
335-
let broker = broker::DataBroker::new(version);
336-
let database = broker.authorized_access(&permissions::ALLOW_ALL);
337-
338-
add_kuksa_attribute(
339-
&database,
340-
"Kuksa.Databroker.GitVersion".to_owned(),
341-
option_env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT")
342-
.unwrap_or("N/A")
343-
.to_owned(),
344-
"Databroker version as reported by GIT".to_owned(),
345-
)
346-
.await;
347-
348-
add_kuksa_attribute(
349-
&database,
350-
"Kuksa.Databroker.CargoVersion".to_owned(),
351-
option_env!("CARGO_PKG_VERSION").unwrap_or("N/A").to_owned(),
352-
"Databroker version as reported by GIT".to_owned(),
353-
)
354-
.await;
355-
356-
add_kuksa_attribute(
357-
&database,
358-
"Kuksa.Databroker.CommitSha".to_owned(),
359-
option_env!("VERGEN_GIT_SHA").unwrap_or("N/A").to_owned(),
360-
"Commit SHA of current version".to_owned(),
361-
)
362-
.await;
363-
364-
if let Some(metadata_filenames) = args.get_many::<String>("vss-file") {
365-
for filename in metadata_filenames {
366-
read_metadata_file(&database, filename).await?;
367-
}
368-
}
334+
let cores = available_parallelism().unwrap().get();
335+
let worker_threads: &usize = args.get_one::<usize>("worker-threads").unwrap_or(&cores);
369336

370-
#[cfg(feature = "tls")]
371-
let tls_config = if args.get_flag("insecure") {
372-
ServerTLS::Disabled
373-
} else {
374-
let cert_file = args.get_one::<String>("tls-cert");
375-
let key_file = args.get_one::<String>("tls-private-key");
376-
match (cert_file, key_file) {
377-
(Some(cert_file), Some(key_file)) => {
378-
let cert = std::fs::read(cert_file)?;
379-
let key = std::fs::read(key_file)?;
380-
let identity = tonic::transport::Identity::from_pem(cert, key);
381-
ServerTLS::Enabled {
382-
tls_config: tonic::transport::ServerTlsConfig::new().identity(identity),
383-
}
384-
}
385-
(Some(_), None) => {
386-
return Err(
387-
"TLS private key (--tls-private-key) must be set if --tls-cert is.".into(),
388-
);
389-
}
390-
(None, Some(_)) => {
391-
return Err(
392-
"TLS certificate (--tls-cert) must be set if --tls-private-key is.".into(),
393-
);
394-
}
395-
(None, None) => {
396-
warn!(
397-
"TLS is not enabled. Default behavior of accepting insecure connections \
398-
when TLS is not configured may change in the future! \
399-
Please use --insecure to explicitly enable this behavior."
400-
);
401-
ServerTLS::Disabled
402-
}
403-
}
404-
};
337+
let runtime = tokio::runtime::Builder::new_multi_thread()
338+
.worker_threads(*worker_threads)
339+
.enable_all()
340+
.build()
341+
.unwrap();
405342

406-
let enable_authorization = !args.get_flag("disable-authorization");
407-
let jwt_public_key = match args.get_one::<String>("jwt-public-key") {
408-
Some(pub_key_filename) => match std::fs::read_to_string(pub_key_filename) {
409-
Ok(pub_key) => {
410-
info!("Using '{pub_key_filename}' to authenticate access tokens");
411-
Ok(Some(pub_key))
412-
}
413-
Err(err) => {
414-
error!("Failed to open file {:?}: {}", pub_key_filename, err);
415-
Err(err)
343+
runtime.block_on(async {
344+
// install global collector configured based on RUST_LOG env var.
345+
databroker::init_logging();
346+
347+
info!("Starting Kuksa Databroker {}", version);
348+
info!(
349+
"Using {} threads with {} cores available on the system",
350+
worker_threads, cores
351+
);
352+
353+
let ip_addr = args.get_one::<String>("address").unwrap().parse()?;
354+
let port = args
355+
.get_one::<u16>("port")
356+
.expect("port should be a number");
357+
let addr = std::net::SocketAddr::new(ip_addr, *port);
358+
359+
let broker = broker::DataBroker::new(version);
360+
let database = broker.authorized_access(&permissions::ALLOW_ALL);
361+
362+
add_kuksa_attribute(
363+
&database,
364+
"Kuksa.Databroker.GitVersion".to_owned(),
365+
option_env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT")
366+
.unwrap_or("N/A")
367+
.to_owned(),
368+
"Databroker version as reported by GIT".to_owned(),
369+
)
370+
.await;
371+
372+
add_kuksa_attribute(
373+
&database,
374+
"Kuksa.Databroker.CargoVersion".to_owned(),
375+
option_env!("CARGO_PKG_VERSION").unwrap_or("N/A").to_owned(),
376+
"Databroker version as reported by GIT".to_owned(),
377+
)
378+
.await;
379+
380+
add_kuksa_attribute(
381+
&database,
382+
"Kuksa.Databroker.CommitSha".to_owned(),
383+
option_env!("VERGEN_GIT_SHA").unwrap_or("N/A").to_owned(),
384+
"Commit SHA of current version".to_owned(),
385+
)
386+
.await;
387+
388+
if let Some(metadata_filenames) = args.get_many::<String>("vss-file") {
389+
for filename in metadata_filenames {
390+
read_metadata_file(&database, filename).await?;
416391
}
417-
},
418-
None => Ok(None),
419-
}?;
420-
421-
let authorization = match (enable_authorization, jwt_public_key) {
422-
(true, Some(pub_key)) => Authorization::new(pub_key)?,
423-
(true, None) => {
424-
warn!("Authorization is not enabled.");
425-
Authorization::Disabled
426392
}
427-
(false, _) => Authorization::Disabled,
428-
};
429393

430-
#[cfg(feature = "viss")]
431-
{
432-
let viss_bind_addr = if args.contains_id("viss-address") {
433-
args.get_one::<String>("viss-address").unwrap().parse()?
394+
#[cfg(feature = "tls")]
395+
let tls_config = if args.get_flag("insecure") {
396+
ServerTLS::Disabled
434397
} else {
435-
args.get_one::<String>("address").unwrap().parse()?
398+
let cert_file = args.get_one::<String>("tls-cert");
399+
let key_file = args.get_one::<String>("tls-private-key");
400+
match (cert_file, key_file) {
401+
(Some(cert_file), Some(key_file)) => {
402+
let cert = std::fs::read(cert_file)?;
403+
let key = std::fs::read(key_file)?;
404+
let identity = tonic::transport::Identity::from_pem(cert, key);
405+
ServerTLS::Enabled {
406+
tls_config: tonic::transport::ServerTlsConfig::new().identity(identity),
407+
}
408+
}
409+
(Some(_), None) => {
410+
return Err(
411+
"TLS private key (--tls-private-key) must be set if --tls-cert is.".into(),
412+
);
413+
}
414+
(None, Some(_)) => {
415+
return Err(
416+
"TLS certificate (--tls-cert) must be set if --tls-private-key is.".into(),
417+
);
418+
}
419+
(None, None) => {
420+
warn!(
421+
"TLS is not enabled. Default behavior of accepting insecure connections \
422+
when TLS is not configured may change in the future! \
423+
Please use --insecure to explicitly enable this behavior."
424+
);
425+
ServerTLS::Disabled
426+
}
427+
}
436428
};
437429

438-
let viss_port = args
439-
.get_one::<u16>("viss-port")
440-
.expect("port should be a number");
441-
let viss_addr = std::net::SocketAddr::new(viss_bind_addr, *viss_port);
442-
443-
if args.get_flag("enable-viss") {
444-
let broker = broker.clone();
445-
let authorization = authorization.clone();
446-
tokio::spawn(async move {
447-
if let Err(err) = viss::server::serve(viss_addr, broker, authorization).await {
448-
error!("{err}");
430+
let enable_authorization = !args.get_flag("disable-authorization");
431+
let jwt_public_key = match args.get_one::<String>("jwt-public-key") {
432+
Some(pub_key_filename) => match std::fs::read_to_string(pub_key_filename) {
433+
Ok(pub_key) => {
434+
info!("Using '{pub_key_filename}' to authenticate access tokens");
435+
Ok(Some(pub_key))
449436
}
450-
});
451-
}
452-
}
437+
Err(err) => {
438+
error!("Failed to open file {:?}: {}", pub_key_filename, err);
439+
Err(err)
440+
}
441+
},
442+
None => Ok(None),
443+
}?;
444+
445+
let authorization = match (enable_authorization, jwt_public_key) {
446+
(true, Some(pub_key)) => Authorization::new(pub_key)?,
447+
(true, None) => {
448+
warn!("Authorization is not enabled.");
449+
Authorization::Disabled
450+
}
451+
(false, _) => Authorization::Disabled,
452+
};
453453

454-
let mut apis = vec![grpc::server::Api::KuksaValV1];
454+
#[cfg(feature = "viss")]
455+
{
456+
let viss_bind_addr = if args.contains_id("viss-address") {
457+
args.get_one::<String>("viss-address").unwrap().parse()?
458+
} else {
459+
args.get_one::<String>("address").unwrap().parse()?
460+
};
461+
462+
let viss_port = args
463+
.get_one::<u16>("viss-port")
464+
.expect("port should be a number");
465+
let viss_addr = std::net::SocketAddr::new(viss_bind_addr, *viss_port);
466+
467+
if args.get_flag("enable-viss") {
468+
let broker = broker.clone();
469+
let authorization = authorization.clone();
470+
tokio::spawn(async move {
471+
if let Err(err) = viss::server::serve(viss_addr, broker, authorization).await {
472+
error!("{err}");
473+
}
474+
});
475+
}
476+
}
455477

456-
if args.get_flag("enable-databroker-v1") {
457-
apis.push(grpc::server::Api::SdvDatabrokerV1);
458-
}
478+
let mut apis = vec![grpc::server::Api::KuksaValV1];
459479

460-
grpc::server::serve(
461-
addr,
462-
broker,
463-
#[cfg(feature = "tls")]
464-
tls_config,
465-
&apis,
466-
authorization,
467-
shutdown_handler(),
468-
)
469-
.await?;
480+
if args.get_flag("enable-databroker-v1") {
481+
apis.push(grpc::server::Api::SdvDatabrokerV1);
482+
}
483+
grpc::server::serve(
484+
addr,
485+
broker,
486+
#[cfg(feature = "tls")]
487+
tls_config,
488+
&apis,
489+
authorization,
490+
shutdown_handler(),
491+
)
492+
.await
493+
})?;
470494

471495
Ok(())
472496
}

doc/behavior.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
The implementation of KUKSA databroker shall represent the latest value of a ```Datapoint```. Therefore the databroker always sets a ```timestamp``` for a ```Datapoint```. This means if a new value comes in it overwrites the older value. We opted for this behavior because a actuator/provider/application can have no access to a system time. For some use cases it could be interesting to provide a timestamp set by the actuator/provider/application. For this we added a so called source timestamp (short ```source_ts```) to the ```Datapoint``` class. This source timestamp is optional and per default set to None.
33

44
If an attacker gets an authorized connection to the databroker he can set the source_timestamp and overwrite the value with a new one. But for this he/she needs read and write access through JWT tokens. If a provider decides to work with ```source_ts``` of a ```Datapoint``` than it should be clear that they can be false/outdated.
5+
6+
# Tokio runtime behavior
7+
If you do not specify anything tokio will spawn as many threads as cores (virtual and physical) are detected on the system. If you want to optimize cpu load you can specify the threads spawned as workers by the tokio runtime. Therfore use the runtime option `--worker-threads` and specify how many threads you want to be spawned.

doc/user_guide.md

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -235,15 +235,18 @@ The change types currently apply on _current_ values, when subscribing to a _tar
235235

236236
The default configuration can be overridden by means of setting the corresponding environment variables and/or providing options on the command line as illustrated in the previous sections.
237237

238-
| CLI option | Environment Variable | Default Value | Description |
239-
| ------------------------ | -------------------------------- | ------------- | ----------------------------------------------------------------------------------------------------- |
240-
| `--vss`,<br>`--metadata` | `KUKSA_DATABROKER_METADATA_FILE` | | Populate data broker with metadata from file |
241-
| `--address` | `KUKSA_DATABROKER_ADDR` | `127.0.0.1` | Listen for rpc calls |
242-
| `--port` | `KUKSA_DATABROKER_PORT` | `55555` | Listen for rpc calls |
243-
| `--jwt-public-key` | | | Public key used to verify JWT access tokens |
244-
| `--tls-cert` | | | TLS certificate file (.pem) |
245-
| `--tls-private-key` | | | TLS private key file (.key) |
246-
| `--insecure` | | | Allow insecure connections (default unless `--tls-cert` and `--tls-private-key` options are provided) |
238+
| CLI option | Environment Variable | Default Value | Description |
239+
| ------------------------- | -------------------------------- | --------------------------------------------------- | ----------------------------------------------------------------------------------------------------- |
240+
| `--vss`,<br>`--metadata` | `KUKSA_DATABROKER_METADATA_FILE` | | Populate data broker with metadata from file |
241+
| `--address` | `KUKSA_DATABROKER_ADDR` | `127.0.0.1` | Listen for rpc calls |
242+
| `--port` | `KUKSA_DATABROKER_PORT` | `55555` | Listen for rpc calls |
243+
| `--jwt-public-key` | | | Public key used to verify JWT access tokens |
244+
| `--tls-cert` | | | TLS certificate file (.pem) |
245+
| `--tls-private-key` | | | TLS private key file (.key) |
246+
| `--disable-authorization` | | `true` | Disable authorization |
247+
| `--insecure` | | | Allow insecure connections (default unless `--tls-cert` and `--tls-private-key` options are provided) |
248+
| `--worker-threads` | `KUKSA_WORKER_THREADS` | as many threads as cores are detected on the system | How many worker threads will be spawned by the tokio runtime. |
249+
| `--enable-databroker-v1` | | `false` | Enable sdv.databroker.v1 (GRPC) service |
247250

248251
<p align="right">(<a href="#top">back to top</a>)</p>
249252

0 commit comments

Comments
 (0)