Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for unix domain sockets
Browse files Browse the repository at this point in the history
argerus authored and erikbosch committed Nov 8, 2024
1 parent 0af2e2e commit f8d4388
Showing 5 changed files with 129 additions and 32 deletions.
4 changes: 2 additions & 2 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -59,10 +59,10 @@ glob-match = "0.2.1"
jemallocator = { version = "0.5.0", optional = true }
lazy_static = "1.4.0"
thiserror = "1.0.47"
futures = { version = "0.3.28" }

# VISS
axum = { version = "0.6.20", optional = true, features = ["ws"] }
futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }

@@ -74,7 +74,7 @@ sd-notify = "0.4.1"
default = ["tls"]
tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
viss = ["dep:axum", "dep:chrono", "dep:uuid"]
libtest = []

[build-dependencies]
79 changes: 52 additions & 27 deletions databroker/src/grpc/server.rs
Original file line number Diff line number Diff line change
@@ -13,11 +13,15 @@

use std::{convert::TryFrom, future::Future, time::Duration};

use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::Server;
use futures::Stream;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpListener, UnixListener},
};
use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream};
#[cfg(feature = "tls")]
use tonic::transport::ServerTlsConfig;
use tonic::transport::{server::Connected, Server};
use tracing::{debug, info};

use databroker_proto::{kuksa, sdv};
@@ -34,7 +38,7 @@ pub enum ServerTLS {
Enabled { tls_config: ServerTlsConfig },
}

#[derive(PartialEq)]
#[derive(PartialEq, Clone)]
pub enum Api {
KuksaValV1,
SdvDatabrokerV1,
@@ -95,7 +99,7 @@ where
databroker.shutdown().await;
}

pub async fn serve<F>(
pub async fn serve_tcp<F>(
addr: impl Into<std::net::SocketAddr>,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
@@ -109,25 +113,14 @@ where
let socket_addr = addr.into();
let listener = TcpListener::bind(socket_addr).await?;

/* On Linux systems try to notify daemon readiness to systemd.
* This function determines whether the a system is using systemd
* or not, so it is safe to use on non-systemd systems as well.
*/
#[cfg(target_os = "linux")]
{
match sd_notify::booted() {
Ok(true) => {
info!("Notifying systemd that the service is ready");
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
}
_ => {
debug!("System is not using systemd, will not try to notify");
}
}
if let Ok(addr) = listener.local_addr() {
info!("Listening on {}", addr);
}

let incoming = TcpListenerStream::new(listener);

serve_with_incoming_shutdown(
listener,
incoming,
broker,
#[cfg(feature = "tls")]
server_tls,
@@ -138,23 +131,55 @@ where
.await
}

pub async fn serve_with_incoming_shutdown<F>(
listener: TcpListener,
pub async fn serve_unix_domain_socket<F>(
path: impl AsRef<std::path::Path>,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
apis: &[Api],
authorization: Authorization,
signal: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Future<Output = ()>,
{
broker.start_housekeeping_task();
let listener = UnixListener::bind(path)?;

if let Ok(addr) = listener.local_addr() {
info!("Listening on {}", addr);
match addr.as_pathname() {
Some(pathname) => info!("Listening on unix socket at {}", pathname.display()),
None => info!("Listening on unix socket (unknown path)"),
}
}

let incoming = TcpListenerStream::new(listener);
let incoming = UnixListenerStream::new(listener);

serve_with_incoming_shutdown(
incoming,
broker,
ServerTLS::Disabled,
apis,
authorization,
signal,
)
.await
}

pub async fn serve_with_incoming_shutdown<F, I, IO, IE>(
incoming: I,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
apis: &[Api],
authorization: Authorization,
signal: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Future<Output = ()>,
I: Stream<Item = Result<IO, IE>>,
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IO::ConnectInfo: Clone + Send + Sync + 'static,
IE: Into<Box<dyn std::error::Error + Send + Sync>>,
{
broker.start_housekeeping_task();

let mut server = Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(10)))
.http2_keepalive_timeout(Some(Duration::from_secs(20)));
72 changes: 70 additions & 2 deletions databroker/src/main.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,10 @@
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

use std::io;
use std::os::unix::fs::FileTypeExt;
use std::path::Path;

use databroker::authorization::Authorization;
use databroker::broker::RegistrationError;

@@ -171,6 +175,15 @@ async fn read_metadata_file<'a, 'b>(
Ok(())
}

fn unlink_unix_domain_socket(path: impl AsRef<Path>) -> Result<(), io::Error> {
if let Ok(metadata) = std::fs::metadata(&path) {
if metadata.file_type().is_socket() {
std::fs::remove_file(&path)?;
}
};
Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let version = option_env!("CARGO_PKG_VERSION").unwrap_or_default();

@@ -218,6 +231,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.value_parser(clap::value_parser!(u16))
.default_value("55555"),
)
.arg(
Arg::new("unix-socket")
.display_order(3)
.long("unix-socket")
.help("Listen on unix socket, e.g. /tmp/kuksa/databroker.sock")
.action(ArgAction::Set)
.value_name("PATH")
.required(false)
.env("KUKSA_DATABROKER_UNIX_SOCKET"),
)
.arg(
Arg::new("vss-file")
.display_order(4)
@@ -480,7 +503,53 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
if args.get_flag("enable-databroker-v1") {
apis.push(grpc::server::Api::SdvDatabrokerV1);
}
grpc::server::serve(

let unix_socket = args.get_one::<String>("unix-socket").cloned();
if let Some(path) = unix_socket {
// We cannot assume that the socket was closed down properly
// so unlink before we recreate it.
unlink_unix_domain_socket(&path)?;
std::fs::create_dir_all(Path::new(&path).parent().unwrap())?;
let broker = broker.clone();
let authorization = authorization.clone();
let apis = apis.clone();
tokio::spawn(async move {
if let Err(err) = grpc::server::serve_unix_domain_socket(
&path,
broker,
&apis,
authorization,
shutdown_handler(),
)
.await
{
error!("{err}");
}

info!("Unlinking unix domain socket at {}", path);
unlink_unix_domain_socket(path)
.unwrap_or_else(|_| error!("Failed to unlink unix domain socket"));
});
}

/* On Linux systems try to notify daemon readiness to systemd.
* This function determines whether the a system is using systemd
* or not, so it is safe to use on non-systemd systems as well.
*/
#[cfg(target_os = "linux")]
{
match sd_notify::booted() {
Ok(true) => {
info!("Notifying systemd that the service is ready");
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
}
_ => {
debug!("System is not using systemd, will not try to notify");
}
}
}

grpc::server::serve_tcp(
addr,
broker,
#[cfg(feature = "tls")]
@@ -491,6 +560,5 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
)
.await
})?;

Ok(())
}
4 changes: 3 additions & 1 deletion databroker/tests/world/mod.rs
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ use databroker::{
};

use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tracing::debug;

use lazy_static::lazy_static;
@@ -188,6 +189,7 @@ impl DataBrokerWorld {
let addr = listener
.local_addr()
.expect("failed to determine listener's port");
let incoming = TcpListenerStream::new(listener);

tokio::spawn(async move {
let version = option_env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT")
@@ -228,7 +230,7 @@ impl DataBrokerWorld {
}

grpc::server::serve_with_incoming_shutdown(
listener,
incoming,
data_broker,
#[cfg(feature = "tls")]
CERTS.server_tls_config(),
2 changes: 2 additions & 0 deletions doc/user_guide.md
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ Usage: databroker [OPTIONS]
Options:
--address <IP> Bind address [env: KUKSA_DATABROKER_ADDR=] [default: 127.0.0.1]
--port <PORT> Bind port [env: KUKSA_DATABROKER_PORT=] [default: 55555]
--unix-socket <PATH> Listen on unix socket, e.g. /tmp/kuksa/databroker.sock [env: KUKSA_DATABROKER_UNIX_SOCKET=]
--vss <FILE> Populate data broker with VSS metadata from (comma-separated) list of files [env: KUKSA_DATABROKER_METADATA_FILE=]
--jwt-public-key <FILE> Public key used to verify JWT access tokens
--disable-authorization Disable authorization
@@ -240,6 +241,7 @@ The default configuration can be overridden by means of setting the correspondin
| `--vss`,<br>`--metadata` | `KUKSA_DATABROKER_METADATA_FILE` | | Populate data broker with metadata from file |
| `--address` | `KUKSA_DATABROKER_ADDR` | `127.0.0.1` | Listen for rpc calls |
| `--port` | `KUKSA_DATABROKER_PORT` | `55555` | Listen for rpc calls |
| `--unix-socket` | `KUKSA_DATABROKER_UNIX_SOCKET` | | Listen on unix socket, e.g. `/tmp/kuksa/databroker.sockcalls` |
| `--jwt-public-key` | | | Public key used to verify JWT access tokens |
| `--tls-cert` | | | TLS certificate file (.pem) |
| `--tls-private-key` | | | TLS private key file (.key) |

0 comments on commit f8d4388

Please sign in to comment.