Skip to content

Commit

Permalink
Use tokio broadcast::channel
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Nov 13, 2024
1 parent f9f7f6f commit cb68f84
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 33 deletions.
73 changes: 51 additions & 22 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::query;
pub use crate::types::{ChangeType, DataType, DataValue, EntryType};

use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tokio_stream::{Stream, StreamExt};

use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
Expand All @@ -33,6 +34,8 @@ use tracing::{debug, info, warn};

use crate::glob;

const MAX_SUBSCRIBE_BUFFER_SIZE: usize = 1000;

#[derive(Debug)]
pub enum ActuationError {
NotFound,
Expand Down Expand Up @@ -135,14 +138,14 @@ pub struct QueryField {
pub value: DataValue,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ChangeNotification {
pub id: i32,
pub update: EntryUpdate,
pub fields: HashSet<Field>,
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct EntryUpdates {
pub updates: Vec<ChangeNotification>,
}
Expand All @@ -157,6 +160,7 @@ pub enum QueryError {
pub enum SubscriptionError {
NotFound,
InvalidInput,
InvalidBufferSize,
InternalError,
}

Expand Down Expand Up @@ -198,7 +202,7 @@ pub struct QuerySubscription {

pub struct ChangeSubscription {
entries: HashMap<i32, HashSet<Field>>,
sender: mpsc::Sender<EntryUpdates>,
sender: broadcast::Sender<EntryUpdates>,
permissions: Permissions,
}

Expand Down Expand Up @@ -805,7 +809,7 @@ impl Subscriptions {
}
});
self.change_subscriptions.retain(|sub| {
if sub.sender.is_closed() {
if sub.sender.receiver_count() == 0 {
info!("Subscriber gone: removing subscription");
false
} else if sub.permissions.is_expired() {
Expand Down Expand Up @@ -898,9 +902,12 @@ impl ChangeSubscription {
if notifications.updates.is_empty() {
Ok(())
} else {
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
match self.sender.send(notifications) {
Ok(_number_of_receivers) => Ok(()),
Err(err) => {
debug!("Send error for entry{}: ", err);
Err(NotificationError {})
}
}
}
} else {
Expand Down Expand Up @@ -939,9 +946,12 @@ impl ChangeSubscription {
}
notifications
};
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
match self.sender.send(notifications) {
Ok(_number_of_receivers) => Ok(()),
Err(err) => {
debug!("Send error for entry{}: ", err);
Err(NotificationError {})
}
}
}
}
Expand Down Expand Up @@ -1622,12 +1632,22 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
buffer_size: Option<usize>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
}

let (sender, receiver) = mpsc::channel(10);
let channel_capacity = if let Some(cap) = buffer_size {
if cap > MAX_SUBSCRIBE_BUFFER_SIZE {
return Err(SubscriptionError::InvalidBufferSize);
}
cap
} else {
1
};

let (sender, receiver) = broadcast::channel(channel_capacity);
let subscription = ChangeSubscription {
entries: valid_entries,
sender,
Expand All @@ -1648,7 +1668,13 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.await
.add_change_subscription(subscription);

let stream = ReceiverStream::new(receiver);
let stream = BroadcastStream::new(receiver).filter_map(|result| match result {
Ok(message) => Some(message),
Err(err) => {
debug!("Lagged entries: {}", err);
None
}
});
Ok(stream)
}

Expand Down Expand Up @@ -4234,20 +4260,23 @@ pub mod tests {
.expect("Register datapoint should succeed");

let mut stream = broker
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
None,
)
.await
.expect("subscription should succeed");

// Stream should yield initial notification with current values i.e. NotAvailable
match stream.next().await {
Some(next) => {
assert_eq!(next.updates.len(), 1);
Some(entry) => {
assert_eq!(entry.updates.len(), 1);
assert_eq!(
next.updates[0].update.path,
entry.updates[0].update.path,
Some("test.datapoint1".to_string())
);
assert_eq!(
next.updates[0].update.datapoint.as_ref().unwrap().value,
entry.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::NotAvailable
);
}
Expand Down Expand Up @@ -4281,14 +4310,14 @@ pub mod tests {

// Value has been set, expect the next item in stream to match.
match stream.next().await {
Some(next) => {
assert_eq!(next.updates.len(), 1);
Some(entry) => {
assert_eq!(entry.updates.len(), 1);
assert_eq!(
next.updates[0].update.path,
entry.updates[0].update.path,
Some("test.datapoint1".to_string())
);
assert_eq!(
next.updates[0].update.datapoint.as_ref().unwrap().value,
entry.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::Int32(101)
);
}
Expand Down
6 changes: 5 additions & 1 deletion databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

match broker.subscribe(entries).await {
match broker.subscribe(entries, Some(1)).await {
Ok(stream) => {
let stream = convert_to_proto_stream(stream);
Ok(tonic::Response::new(Box::pin(stream)))
Expand All @@ -615,6 +615,10 @@ impl proto::val_server::Val for broker::DataBroker {
Err(SubscriptionError::InternalError) => {
Err(tonic::Status::new(tonic::Code::Internal, "Internal Error"))
}
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription lag_buffer_capacity max allowed value is 1000",
)),
}
}

Expand Down
40 changes: 34 additions & 6 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,15 @@ impl proto::val_server::Val for broker::DataBroker {
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
};

let broker = self.authorized_access(&permissions);

let request = request.into_inner();
if request.lag_buffer_capacity == 0 {
return Err(tonic::Status::invalid_argument(format!(
"Provided lag_buffer_capacity {} should be greater than zero.",
request.lag_buffer_capacity
)));
}

let broker = self.authorized_access(&permissions);

let signal_paths = request.signal_paths;
let size = signal_paths.len();
Expand All @@ -247,7 +253,10 @@ impl proto::val_server::Val for broker::DataBroker {
);
}

match broker.subscribe(valid_requests).await {
match broker
.subscribe(valid_requests, Some(request.lag_buffer_capacity as usize))
.await
{
Ok(stream) => {
let stream = convert_to_proto_stream(stream, size);
Ok(tonic::Response::new(Box::pin(stream)))
Expand All @@ -257,6 +266,10 @@ impl proto::val_server::Val for broker::DataBroker {
Err(tonic::Status::invalid_argument("Invalid Argument"))
}
Err(SubscriptionError::InternalError) => Err(tonic::Status::internal("Internal Error")),
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription lag_buffer_capacity max allowed value is 1000",
)),
}
}

Expand Down Expand Up @@ -287,9 +300,15 @@ impl proto::val_server::Val for broker::DataBroker {
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
};

let broker = self.authorized_access(&permissions);

let request = request.into_inner();
if request.lag_buffer_capacity == 0 {
return Err(tonic::Status::invalid_argument(format!(
"Provided lag_buffer_capacity {} should be greater than zero.",
request.lag_buffer_capacity
)));
}

let broker = self.authorized_access(&permissions);

let signal_ids = request.signal_ids;
let size = signal_ids.len();
Expand All @@ -313,7 +332,10 @@ impl proto::val_server::Val for broker::DataBroker {
);
}

match broker.subscribe(valid_requests).await {
match broker
.subscribe(valid_requests, Some(request.lag_buffer_capacity as usize))
.await
{
Ok(stream) => {
let stream = convert_to_proto_stream_id(stream, size);
Ok(tonic::Response::new(Box::pin(stream)))
Expand All @@ -328,6 +350,10 @@ impl proto::val_server::Val for broker::DataBroker {
Err(SubscriptionError::InternalError) => {
Err(tonic::Status::new(tonic::Code::Internal, "Internal Error"))
}
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription lag_buffer_capacity max allowed value is 1000",
)),
}
}

Expand Down Expand Up @@ -1844,6 +1870,7 @@ mod tests {

let mut request = tonic::Request::new(proto::SubscribeRequest {
signal_paths: vec!["test.datapoint1".to_string()],
lag_buffer_capacity: 5,
});

request
Expand Down Expand Up @@ -1990,6 +2017,7 @@ mod tests {

let mut request = tonic::Request::new(proto::SubscribeByIdRequest {
signal_ids: vec![entry_id],
lag_buffer_capacity: 5,
});

request
Expand Down
3 changes: 2 additions & 1 deletion databroker/src/viss/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl Viss for Server {
});
};

match broker.subscribe(entries).await {
match broker.subscribe(entries, Some(1)).await {
Ok(stream) => {
let subscription_id = SubscriptionId::new();

Expand Down Expand Up @@ -303,6 +303,7 @@ impl Viss for Server {
broker::SubscriptionError::NotFound => Error::NotFoundInvalidPath,
broker::SubscriptionError::InvalidInput => Error::NotFoundInvalidPath,
broker::SubscriptionError::InternalError => Error::InternalServerError,
broker::SubscriptionError::InvalidBufferSize => Error::InternalServerError,
},
ts: SystemTime::now().into(),
}),
Expand Down
27 changes: 24 additions & 3 deletions proto/kuksa/val/v2/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,39 @@ service VAL {
// NOT_FOUND if any of the signals are non-existant.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the signals.
// INVALID_ARGUMENT if the request is empty or provided path is too long
// - MAX_REQUEST_PATH_LENGTH: usize = 1000;
// INVALID_ARGUMENT
// - if the request is empty or provided path is too long
// MAX_REQUEST_PATH_LENGTH: usize = 1000;
// - if lag_buffer_capacity exceeds the maximum permitted
// MAX_BUFFER_SIZE: usize = 1000;
//
// When subscribing, Databroker shall immediately return the value for all
// subscribed entries.
// If a value isn't available when subscribing to a it, it should return None
//
// If a subscriber is slow to consume signals, messages will be buffered up
// to the specified buffer_size before the oldest messages are dropped.
//
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);

// Subscribe to a set of signals using i32 id parameters
// Returns (GRPC error code):
// NOT_FOUND if any of the signals are non-existant.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the signals.
// INVALID_ARGUMENT if the request is empty or provided path is too long
// INVALID_ARGUMENT
// - if the request is empty or provided path is too long
// MAX_REQUEST_PATH_LENGTH: usize = 1000;
// - if lag_buffer_capacity exceeds the maximum permitted
// MAX_BUFFER_SIZE: usize = 1000;
//
// When subscribing, Databroker shall immediately return the value for all
// subscribed entries.
// If a value isn't available when subscribing to a it, it should return None
//
// If a subscriber is slow to consume signals, messages will be buffered up
// to the specified buffer_size before the oldest messages are dropped.
//
rpc SubscribeById(SubscribeByIdRequest) returns (stream SubscribeByIdResponse);

// Actuate a single actuator
Expand Down Expand Up @@ -191,6 +204,10 @@ message GetValuesResponse {

message SubscribeRequest {
repeated string signal_paths = 1;

// Specifies the number of messages that can be buffered for
// slow subscribers before the oldest messages are dropped.
uint32 buffer_size = 2;
}

message SubscribeResponse {
Expand All @@ -199,6 +216,10 @@ message SubscribeResponse {

message SubscribeByIdRequest {
repeated int32 signal_ids = 1;

// Specifies the number of messages that can be buffered for
// slow subscribers before the oldest messages are dropped.
uint32 buffer_size = 2;
}

message SubscribeByIdResponse {
Expand Down

0 comments on commit cb68f84

Please sign in to comment.