diff --git a/databroker/Cargo.toml b/databroker/Cargo.toml index ddfdd0dc..c0dbf99e 100644 --- a/databroker/Cargo.toml +++ b/databroker/Cargo.toml @@ -60,7 +60,6 @@ glob-match = "0.2.1" jemallocator = { version = "0.5.0", optional = true } lazy_static = "1.4.0" thiserror = "1.0.47" - futures = { version = "0.3.28" } async-trait = "0.1.82" diff --git a/databroker/src/grpc/server.rs b/databroker/src/grpc/server.rs index 58222bb5..c4b95753 100644 --- a/databroker/src/grpc/server.rs +++ b/databroker/src/grpc/server.rs @@ -13,11 +13,15 @@ use std::{convert::TryFrom, future::Future, time::Duration}; -use tokio::net::TcpListener; -use tokio_stream::wrappers::TcpListenerStream; -use tonic::transport::Server; +use futures::Stream; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + net::{TcpListener, UnixListener}, +}; +use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream}; #[cfg(feature = "tls")] use tonic::transport::ServerTlsConfig; +use tonic::transport::{server::Connected, Server}; use tracing::{debug, info}; use databroker_proto::{kuksa, sdv}; @@ -34,7 +38,7 @@ pub enum ServerTLS { Enabled { tls_config: ServerTlsConfig }, } -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub enum Api { KuksaValV1, KuksaValV2, @@ -96,7 +100,7 @@ where databroker.shutdown().await; } -pub async fn serve( +pub async fn serve_tcp( addr: impl Into, broker: broker::DataBroker, #[cfg(feature = "tls")] server_tls: ServerTLS, @@ -110,25 +114,14 @@ where let socket_addr = addr.into(); let listener = TcpListener::bind(socket_addr).await?; - /* On Linux systems try to notify daemon readiness to systemd. - * This function determines whether the a system is using systemd - * or not, so it is safe to use on non-systemd systems as well. - */ - #[cfg(target_os = "linux")] - { - match sd_notify::booted() { - Ok(true) => { - info!("Notifying systemd that the service is ready"); - sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?; - } - _ => { - debug!("System is not using systemd, will not try to notify"); - } - } + if let Ok(addr) = listener.local_addr() { + info!("Listening on {}", addr); } + let incoming = TcpListenerStream::new(listener); + serve_with_incoming_shutdown( - listener, + incoming, broker, #[cfg(feature = "tls")] server_tls, @@ -139,10 +132,9 @@ where .await } -pub async fn serve_with_incoming_shutdown( - listener: TcpListener, +pub async fn serve_uds( + path: impl AsRef, broker: broker::DataBroker, - #[cfg(feature = "tls")] server_tls: ServerTLS, apis: &[Api], authorization: Authorization, signal: F, @@ -150,12 +142,45 @@ pub async fn serve_with_incoming_shutdown( where F: Future, { - broker.start_housekeeping_task(); + let listener = UnixListener::bind(path)?; + if let Ok(addr) = listener.local_addr() { - info!("Listening on {}", addr); + match addr.as_pathname() { + Some(pathname) => info!("Listening on unix socket at {}", pathname.display()), + None => info!("Listening on unix socket (unknown path)"), + } } - let incoming = TcpListenerStream::new(listener); + let incoming = UnixListenerStream::new(listener); + + serve_with_incoming_shutdown( + incoming, + broker, + ServerTLS::Disabled, + apis, + authorization, + signal, + ) + .await +} + +pub async fn serve_with_incoming_shutdown( + incoming: I, + broker: broker::DataBroker, + #[cfg(feature = "tls")] server_tls: ServerTLS, + apis: &[Api], + authorization: Authorization, + signal: F, +) -> Result<(), Box> +where + F: Future, + I: Stream>, + IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, + IO::ConnectInfo: Clone + Send + Sync + 'static, + IE: Into>, +{ + broker.start_housekeeping_task(); + let mut server = Server::builder() .http2_keepalive_interval(Some(Duration::from_secs(10))) .http2_keepalive_timeout(Some(Duration::from_secs(20))); diff --git a/databroker/src/main.rs b/databroker/src/main.rs index aa1ea985..bc6d5007 100644 --- a/databroker/src/main.rs +++ b/databroker/src/main.rs @@ -15,6 +15,12 @@ #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; +static DEFAULT_UNIX_SOCKET_PATH: &str = "/run/kuksa/databroker.sock"; + +use std::io; +use std::os::unix::fs::FileTypeExt; +use std::path::Path; + use databroker::authorization::Authorization; use databroker::broker::RegistrationError; @@ -179,6 +185,15 @@ async fn read_metadata_file<'a, 'b>( Ok(()) } +fn unlink_unix_domain_socket(path: impl AsRef) -> Result<(), io::Error> { + if let Ok(metadata) = std::fs::metadata(&path) { + if metadata.file_type().is_socket() { + std::fs::remove_file(&path)?; + } + }; + Ok(()) +} + fn main() -> Result<(), Box> { let version = option_env!("CARGO_PKG_VERSION").unwrap_or_default(); let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or_default(); @@ -228,8 +243,26 @@ fn main() -> Result<(), Box> { .default_value("55555"), ) .arg( - Arg::new("vss-file") + Arg::new("enable-unix-socket") + .display_order(3) + .long("enable-unix-socket") + .help("Listen on unix socket, default /run/kuksa/databroker.sock") + .action(ArgAction::SetTrue) + .env("KUKSA_DATABROKER_ENABLE_UNIX_SOCKET") + ) + .arg( + Arg::new("unix-socket") .display_order(4) + .long("unix-socket") + .help("Listen on unix socket, e.g. /tmp/kuksa/databroker.sock") + .action(ArgAction::Set) + .value_name("PATH") + .required(false) + .env("KUKSA_DATABROKER_UNIX_SOCKET"), + ) + .arg( + Arg::new("vss-file") + .display_order(5) .alias("metadata") .long("vss") .help("Populate data broker with VSS metadata from (comma-separated) list of files") @@ -242,7 +275,7 @@ fn main() -> Result<(), Box> { ) .arg( Arg::new("jwt-public-key") - .display_order(5) + .display_order(6) .long("jwt-public-key") .help("Public key used to verify JWT access tokens") .action(ArgAction::Set) @@ -251,7 +284,7 @@ fn main() -> Result<(), Box> { ) .arg( Arg::new("disable-authorization") - .display_order(6) + .display_order(7) .long("disable-authorization") .help("Disable authorization") .action(ArgAction::SetTrue), @@ -489,7 +522,56 @@ fn main() -> Result<(), Box> { if args.get_flag("enable-databroker-v1") { apis.push(grpc::server::Api::SdvDatabrokerV1); } - grpc::server::serve( + + let unix_socket_path = args.get_one::("unix-socket").cloned().or_else(|| { + // If the --unix-socket PATH is not explicitly set, check whether it + // should be enabled using the default path + if args.get_flag("enable-unix-socket") { + Some(DEFAULT_UNIX_SOCKET_PATH.into()) + } else { + None + } + }); + + if let Some(path) = unix_socket_path { + // We cannot assume that the socket was closed down properly + // so unlink before we recreate it. + unlink_unix_domain_socket(&path)?; + std::fs::create_dir_all(Path::new(&path).parent().unwrap())?; + let broker = broker.clone(); + let authorization = authorization.clone(); + let apis = apis.clone(); + tokio::spawn(async move { + if let Err(err) = + grpc::server::serve_uds(&path, broker, &apis, authorization, shutdown_handler()) + .await + { + error!("{err}"); + } + + info!("Unlinking unix domain socket at {}", path); + unlink_unix_domain_socket(path) + .unwrap_or_else(|_| error!("Failed to unlink unix domain socket")); + }); + } + + // On Linux systems try to notify daemon readiness to systemd. + // This function determines whether the a system is using systemd + // or not, so it is safe to use on non-systemd systems as well. + #[cfg(target_os = "linux")] + { + match sd_notify::booted() { + Ok(true) => { + info!("Notifying systemd that the service is ready"); + sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?; + } + _ => { + debug!("System is not using systemd, will not try to notify"); + } + } + } + + grpc::server::serve_tcp( addr, broker, #[cfg(feature = "tls")] diff --git a/databroker/tests/world/mod.rs b/databroker/tests/world/mod.rs index e33feb72..3630a992 100644 --- a/databroker/tests/world/mod.rs +++ b/databroker/tests/world/mod.rs @@ -32,6 +32,7 @@ use databroker::{ }; use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; use tracing::debug; use lazy_static::lazy_static; @@ -188,6 +189,7 @@ impl DataBrokerWorld { let addr = listener .local_addr() .expect("failed to determine listener's port"); + let incoming = TcpListenerStream::new(listener); tokio::spawn(async move { let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown"); @@ -230,7 +232,7 @@ impl DataBrokerWorld { } grpc::server::serve_with_incoming_shutdown( - listener, + incoming, data_broker, #[cfg(feature = "tls")] CERTS.server_tls_config(), diff --git a/doc/user_guide.md b/doc/user_guide.md index 783ec34e..1fdbedc2 100644 --- a/doc/user_guide.md +++ b/doc/user_guide.md @@ -37,6 +37,8 @@ Usage: databroker [OPTIONS] Options: --address Bind address [env: KUKSA_DATABROKER_ADDR=] [default: 127.0.0.1] --port Bind port [env: KUKSA_DATABROKER_PORT=] [default: 55555] + --enable-unix-socket Listen on unix socket, default /run/kuksa/databroker.sock [env: KUKSA_DATABROKER_ENABLE_UNIX_SOCKET=] + --unix-socket Listen on unix socket, e.g. /tmp/kuksa/databroker.sock [env: KUKSA_DATABROKER_UNIX_SOCKET=] --vss Populate data broker with VSS metadata from (comma-separated) list of files [env: KUKSA_DATABROKER_METADATA_FILE=] --jwt-public-key Public key used to verify JWT access tokens --disable-authorization Disable authorization @@ -145,46 +147,7 @@ docker run --rm -it --network kuksa -v ./certificates:/opt/kuksa ghcr.io/eclipse

(back to top)

-## APIs supported by Databroker - -Kuksa Databroker provides [gRPC](https://grpc.io/) based API endpoints which can be used by -clients to interact with the server. - -gRPC services are specified by means of `.proto` files which define the services and the data -exchanged between server and client. - -[Tooling](https://grpc.io/docs/languages/) is available for most popular programming languages to create -client stubs for invoking the services. - -The Databroker uses gRPC's default HTTP/2 transport and [protocol buffers](https://developers.google.com/protocol-buffers) for message serialization. -The same `.proto` file can be used to generate server skeleton and client stubs for other transports and serialization formats as well. - -HTTP/2 is a binary replacement for HTTP/1.1 used for handling connections, multiplexing (channels) and providing a standardized way to add headers for authorization and TLS for encryption/authentication. -It also supports bi-directional streaming between client and server. - -Kuksa Databroker implements the following service interfaces: - -- Enabled on Databroker by default [kuksa.val.v2.VAL](../proto/kuksa/val/v2/val.proto) (recommended to use but still not supported by databroker-cli) -- Enabled on Databroker by default [kuksa.val.v1.VAL](../proto/kuksa/val/v1/val.proto) -- Disabled on Databroker by default, use `--enable-databroker-v1` to enable [sdv.databroker.v1.Broker](../proto/sdv/databroker/v1/broker.proto) -- Disabled on Databroker by default, use `--enable-databroker-v1` to enable [sdv.databroker.v1.Collector](../proto/sdv/databroker/v1/collector.proto) - -

(back to top)

- - -## Current and target value concept vs data value concept. -For some of the APIs (`sdv.databroker.v1` and `kuksa.val.v1`), the concepts of `current_value` and `target_value` were introduced to differentiate between the expected or desired value for an actuator and the current value published by the provider (both stored in the Databroker’s database). - -This concept has been removed in `kuksa.val.v2`. Now, there is only a single `data_value` for sensors and actuators, meaning that desired actuator values are simply forwarded from the Signal Consumer to the Databroker and then to the Provider. The Provider is responsible for updating on Databroker the `data_value` received from the vehicle network. - -**Kuksa does not guarantee that the desired actuator value will be fully updated on the vehicle network; it only forwards actuator values from the Signal Consumer to the vehicle network.** - -**Do not mix different versions of APIs for providers and clients, as this will cause issues; kuksa.val.v2 is not backward compatible with sdv.databroker.v1 and kuksa.val.v1** - -

(back to top)

- - -## sdv.databroker.v1 Query Syntax, disabled by default, use `--enable-databroker-v1` to enable it +## Query Syntax Clients can subscribe to updates of data entries of interest using an SQL-based [query syntax](./QUERY.md). @@ -268,17 +231,8 @@ Vehicle.Cabin.Door.Row1.Left.IsOpen: description: Is door open or closed ``` -#### For kuksa.val.v1: - The change types currently apply on _current_ values, when subscribing to a _target value_, as an actuation provider would do, any set on the target value is propagated just like in `continuous` mode, even if a datapoint (and thus its current value behavior) is set to `onchange` or `static`. The idea here is, that a "set" by an application is the intent to actuate something (maybe a retry even), and should thus always be forwarded to the provider. -#### For kuksa.val.v2: -The concept of _current value_ and _target value_ does not exist in `kuksa.val.v2`, there are just simply _data value_ for `sensor` and `actuator` which are registered by default as `continuous`. -The change types apply to the _data value_, meaning that if `x-kuksa-changetype` is not specified (`continuous` by default), subscribers will be notified whenever the provider publishes a new value, whether there has been a change or not. Notifications for changes will only occur if the type is set to `onchange`. - -

(back to top)

- - ## Configuration Reference The default configuration can be overridden by means of setting the corresponding environment variables and/or providing options on the command line as illustrated in the previous sections. @@ -288,6 +242,8 @@ The default configuration can be overridden by means of setting the correspondin | `--vss`,
`--metadata` | `KUKSA_DATABROKER_METADATA_FILE` | | Populate data broker with metadata from file | | `--address` | `KUKSA_DATABROKER_ADDR` | `127.0.0.1` | Listen for rpc calls | | `--port` | `KUKSA_DATABROKER_PORT` | `55555` | Listen for rpc calls | +| `--enable-unix-socket` | `KUKSA_DATABROKER_ENABLE_UNIX_SOCKET` | | Listen on unix socket, default `/run/kuksa/databroker.sock` | +| `--unix-socket` | `KUKSA_DATABROKER_UNIX_SOCKET` | | Listen on unix socket, e.g. `/tmp/kuksa/databroker.sockcalls` | | `--jwt-public-key` | | | Public key used to verify JWT access tokens | | `--tls-cert` | | | TLS certificate file (.pem) | | `--tls-private-key` | | | TLS private key file (.key) | @@ -298,6 +254,30 @@ The default configuration can be overridden by means of setting the correspondin

(back to top)

+## API + +Kuksa Databroker provides [gRPC](https://grpc.io/) based API endpoints which can be used by +clients to interact with the server. + +gRPC services are specified by means of `.proto` files which define the services and the data +exchanged between server and client. + +[Tooling](https://grpc.io/docs/languages/) is available for most popular programming languages to create +client stubs for invoking the services. + +The Databroker uses gRPC's default HTTP/2 transport and [protocol buffers](https://developers.google.com/protocol-buffers) for message serialization. +The same `.proto` file can be used to generate server skeleton and client stubs for other transports and serialization formats as well. + +HTTP/2 is a binary replacement for HTTP/1.1 used for handling connections, multiplexing (channels) and providing a standardized way to add headers for authorization and TLS for encryption/authentication. +It also supports bi-directional streaming between client and server. + +Kuksa Databroker implements the following service interfaces: + +- [kuksa.val.v1.VAL](../proto/kuksa/val/v1/val.proto) +- [sdv.databroker.v1.Broker](../proto/sdv/databroker/v1/broker.proto) +- [sdv.databroker.v1.Collector](../proto/sdv/databroker/v1/collector.proto) + +

(back to top)

## Troubleshooting