Skip to content

Commit 7d93bac

Browse files
authored
Merge pull request #95 from boschglobal/fix/use_tokio_broadcast_channel
Use tokio broadcast::channel
2 parents 5604aa9 + a89dc6e commit 7d93bac

File tree

5 files changed

+116
-33
lines changed

5 files changed

+116
-33
lines changed

databroker/src/broker.rs

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ use crate::query;
1818
pub use crate::types::{ChangeType, DataType, DataValue, EntryType};
1919

2020
use tokio::sync::{broadcast, mpsc, RwLock};
21+
use tokio_stream::wrappers::BroadcastStream;
2122
use tokio_stream::wrappers::ReceiverStream;
22-
use tokio_stream::Stream;
23+
use tokio_stream::{Stream, StreamExt};
2324

2425
use std::collections::{HashMap, HashSet};
2526
use std::convert::TryFrom;
@@ -33,6 +34,8 @@ use tracing::{debug, info, warn};
3334

3435
use crate::glob;
3536

37+
const MAX_SUBSCRIBE_BUFFER_SIZE: usize = 1000;
38+
3639
#[derive(Debug)]
3740
pub enum ActuationError {
3841
NotFound,
@@ -135,14 +138,14 @@ pub struct QueryField {
135138
pub value: DataValue,
136139
}
137140

138-
#[derive(Debug)]
141+
#[derive(Debug, Clone)]
139142
pub struct ChangeNotification {
140143
pub id: i32,
141144
pub update: EntryUpdate,
142145
pub fields: HashSet<Field>,
143146
}
144147

145-
#[derive(Debug, Default)]
148+
#[derive(Debug, Default, Clone)]
146149
pub struct EntryUpdates {
147150
pub updates: Vec<ChangeNotification>,
148151
}
@@ -157,6 +160,7 @@ pub enum QueryError {
157160
pub enum SubscriptionError {
158161
NotFound,
159162
InvalidInput,
163+
InvalidBufferSize,
160164
InternalError,
161165
}
162166

@@ -198,7 +202,7 @@ pub struct QuerySubscription {
198202

199203
pub struct ChangeSubscription {
200204
entries: HashMap<i32, HashSet<Field>>,
201-
sender: mpsc::Sender<EntryUpdates>,
205+
sender: broadcast::Sender<EntryUpdates>,
202206
permissions: Permissions,
203207
}
204208

@@ -805,7 +809,7 @@ impl Subscriptions {
805809
}
806810
});
807811
self.change_subscriptions.retain(|sub| {
808-
if sub.sender.is_closed() {
812+
if sub.sender.receiver_count() == 0 {
809813
info!("Subscriber gone: removing subscription");
810814
false
811815
} else if sub.permissions.is_expired() {
@@ -898,9 +902,12 @@ impl ChangeSubscription {
898902
if notifications.updates.is_empty() {
899903
Ok(())
900904
} else {
901-
match self.sender.send(notifications).await {
902-
Ok(()) => Ok(()),
903-
Err(_) => Err(NotificationError {}),
905+
match self.sender.send(notifications) {
906+
Ok(_number_of_receivers) => Ok(()),
907+
Err(err) => {
908+
debug!("Send error for entry{}: ", err);
909+
Err(NotificationError {})
910+
}
904911
}
905912
}
906913
} else {
@@ -939,9 +946,12 @@ impl ChangeSubscription {
939946
}
940947
notifications
941948
};
942-
match self.sender.send(notifications).await {
943-
Ok(()) => Ok(()),
944-
Err(_) => Err(NotificationError {}),
949+
match self.sender.send(notifications) {
950+
Ok(_number_of_receivers) => Ok(()),
951+
Err(err) => {
952+
debug!("Send error for entry{}: ", err);
953+
Err(NotificationError {})
954+
}
945955
}
946956
}
947957
}
@@ -1622,12 +1632,22 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
16221632
pub async fn subscribe(
16231633
&self,
16241634
valid_entries: HashMap<i32, HashSet<Field>>,
1635+
buffer_size: Option<usize>,
16251636
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
16261637
if valid_entries.is_empty() {
16271638
return Err(SubscriptionError::InvalidInput);
16281639
}
16291640

1630-
let (sender, receiver) = mpsc::channel(10);
1641+
let channel_capacity = if let Some(cap) = buffer_size {
1642+
if cap > MAX_SUBSCRIBE_BUFFER_SIZE {
1643+
return Err(SubscriptionError::InvalidBufferSize);
1644+
}
1645+
cap
1646+
} else {
1647+
1
1648+
};
1649+
1650+
let (sender, receiver) = broadcast::channel(channel_capacity);
16311651
let subscription = ChangeSubscription {
16321652
entries: valid_entries,
16331653
sender,
@@ -1648,7 +1668,13 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
16481668
.await
16491669
.add_change_subscription(subscription);
16501670

1651-
let stream = ReceiverStream::new(receiver);
1671+
let stream = BroadcastStream::new(receiver).filter_map(|result| match result {
1672+
Ok(message) => Some(message),
1673+
Err(err) => {
1674+
debug!("Lagged entries: {}", err);
1675+
None
1676+
}
1677+
});
16521678
Ok(stream)
16531679
}
16541680

@@ -4234,20 +4260,23 @@ pub mod tests {
42344260
.expect("Register datapoint should succeed");
42354261

42364262
let mut stream = broker
4237-
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
4263+
.subscribe(
4264+
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
4265+
None,
4266+
)
42384267
.await
42394268
.expect("subscription should succeed");
42404269

42414270
// Stream should yield initial notification with current values i.e. NotAvailable
42424271
match stream.next().await {
4243-
Some(next) => {
4244-
assert_eq!(next.updates.len(), 1);
4272+
Some(entry) => {
4273+
assert_eq!(entry.updates.len(), 1);
42454274
assert_eq!(
4246-
next.updates[0].update.path,
4275+
entry.updates[0].update.path,
42474276
Some("test.datapoint1".to_string())
42484277
);
42494278
assert_eq!(
4250-
next.updates[0].update.datapoint.as_ref().unwrap().value,
4279+
entry.updates[0].update.datapoint.as_ref().unwrap().value,
42514280
DataValue::NotAvailable
42524281
);
42534282
}
@@ -4281,14 +4310,14 @@ pub mod tests {
42814310

42824311
// Value has been set, expect the next item in stream to match.
42834312
match stream.next().await {
4284-
Some(next) => {
4285-
assert_eq!(next.updates.len(), 1);
4313+
Some(entry) => {
4314+
assert_eq!(entry.updates.len(), 1);
42864315
assert_eq!(
4287-
next.updates[0].update.path,
4316+
entry.updates[0].update.path,
42884317
Some("test.datapoint1".to_string())
42894318
);
42904319
assert_eq!(
4291-
next.updates[0].update.datapoint.as_ref().unwrap().value,
4320+
entry.updates[0].update.datapoint.as_ref().unwrap().value,
42924321
DataValue::Int32(101)
42934322
);
42944323
}

databroker/src/grpc/kuksa_val_v1/val.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ impl proto::val_server::Val for broker::DataBroker {
600600
}
601601
}
602602

603-
match broker.subscribe(entries).await {
603+
match broker.subscribe(entries, Some(1)).await {
604604
Ok(stream) => {
605605
let stream = convert_to_proto_stream(stream);
606606
Ok(tonic::Response::new(Box::pin(stream)))
@@ -615,6 +615,10 @@ impl proto::val_server::Val for broker::DataBroker {
615615
Err(SubscriptionError::InternalError) => {
616616
Err(tonic::Status::new(tonic::Code::Internal, "Internal Error"))
617617
}
618+
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
619+
tonic::Code::InvalidArgument,
620+
"Subscription buffer_size max allowed value is 1000",
621+
)),
618622
}
619623
}
620624

databroker/src/grpc/kuksa_val_v2/val.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,15 @@ impl proto::val_server::Val for broker::DataBroker {
221221
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
222222
};
223223

224-
let broker = self.authorized_access(&permissions);
225-
226224
let request = request.into_inner();
225+
if request.buffer_size == 0 {
226+
return Err(tonic::Status::invalid_argument(format!(
227+
"Provided buffer_size {} should be greater than zero.",
228+
request.buffer_size
229+
)));
230+
}
231+
232+
let broker = self.authorized_access(&permissions);
227233

228234
let signal_paths = request.signal_paths;
229235
let size = signal_paths.len();
@@ -247,7 +253,10 @@ impl proto::val_server::Val for broker::DataBroker {
247253
);
248254
}
249255

250-
match broker.subscribe(valid_requests).await {
256+
match broker
257+
.subscribe(valid_requests, Some(request.buffer_size as usize))
258+
.await
259+
{
251260
Ok(stream) => {
252261
let stream = convert_to_proto_stream(stream, size);
253262
Ok(tonic::Response::new(Box::pin(stream)))
@@ -257,6 +266,10 @@ impl proto::val_server::Val for broker::DataBroker {
257266
Err(tonic::Status::invalid_argument("Invalid Argument"))
258267
}
259268
Err(SubscriptionError::InternalError) => Err(tonic::Status::internal("Internal Error")),
269+
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
270+
tonic::Code::InvalidArgument,
271+
"Subscription buffer_size max allowed value is 1000",
272+
)),
260273
}
261274
}
262275

@@ -287,9 +300,15 @@ impl proto::val_server::Val for broker::DataBroker {
287300
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
288301
};
289302

290-
let broker = self.authorized_access(&permissions);
291-
292303
let request = request.into_inner();
304+
if request.buffer_size == 0 {
305+
return Err(tonic::Status::invalid_argument(format!(
306+
"Provided lag_buffer_capacity {} should be greater than zero.",
307+
request.buffer_size
308+
)));
309+
}
310+
311+
let broker = self.authorized_access(&permissions);
293312

294313
let signal_ids = request.signal_ids;
295314
let size = signal_ids.len();
@@ -313,7 +332,10 @@ impl proto::val_server::Val for broker::DataBroker {
313332
);
314333
}
315334

316-
match broker.subscribe(valid_requests).await {
335+
match broker
336+
.subscribe(valid_requests, Some(request.buffer_size as usize))
337+
.await
338+
{
317339
Ok(stream) => {
318340
let stream = convert_to_proto_stream_id(stream, size);
319341
Ok(tonic::Response::new(Box::pin(stream)))
@@ -328,6 +350,10 @@ impl proto::val_server::Val for broker::DataBroker {
328350
Err(SubscriptionError::InternalError) => {
329351
Err(tonic::Status::new(tonic::Code::Internal, "Internal Error"))
330352
}
353+
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
354+
tonic::Code::InvalidArgument,
355+
"Subscription buffer_size max allowed value is 1000",
356+
)),
331357
}
332358
}
333359

@@ -1836,6 +1862,7 @@ mod tests {
18361862

18371863
let mut request = tonic::Request::new(proto::SubscribeRequest {
18381864
signal_paths: vec!["test.datapoint1".to_string()],
1865+
buffer_size: 5,
18391866
});
18401867

18411868
request
@@ -1982,6 +2009,7 @@ mod tests {
19822009

19832010
let mut request = tonic::Request::new(proto::SubscribeByIdRequest {
19842011
signal_ids: vec![entry_id],
2012+
buffer_size: 5,
19852013
});
19862014

19872015
request

databroker/src/viss/v2/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ impl Viss for Server {
271271
});
272272
};
273273

274-
match broker.subscribe(entries).await {
274+
match broker.subscribe(entries, Some(1)).await {
275275
Ok(stream) => {
276276
let subscription_id = SubscriptionId::new();
277277

@@ -303,6 +303,7 @@ impl Viss for Server {
303303
broker::SubscriptionError::NotFound => Error::NotFoundInvalidPath,
304304
broker::SubscriptionError::InvalidInput => Error::NotFoundInvalidPath,
305305
broker::SubscriptionError::InternalError => Error::InternalServerError,
306+
broker::SubscriptionError::InvalidBufferSize => Error::InternalServerError,
306307
},
307308
ts: SystemTime::now().into(),
308309
}),

proto/kuksa/val/v2/val.proto

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,26 +52,39 @@ service VAL {
5252
// NOT_FOUND if any of the signals are non-existant.
5353
// UNAUTHENTICATED if no credentials provided or credentials has expired
5454
// PERMISSION_DENIED if access is denied for any of the signals.
55-
// INVALID_ARGUMENT if the request is empty or provided path is too long
56-
// - MAX_REQUEST_PATH_LENGTH: usize = 1000;
55+
// INVALID_ARGUMENT
56+
// - if the request is empty or provided path is too long
57+
// MAX_REQUEST_PATH_LENGTH: usize = 1000;
58+
// - if buffer_size exceeds the maximum permitted
59+
// MAX_BUFFER_SIZE: usize = 1000;
5760
//
5861
// When subscribing, Databroker shall immediately return the value for all
5962
// subscribed entries.
6063
// If a value isn't available when subscribing to a it, it should return None
6164
//
65+
// If a subscriber is slow to consume signals, messages will be buffered up
66+
// to the specified buffer_size before the oldest messages are dropped.
67+
//
6268
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);
6369

6470
// Subscribe to a set of signals using i32 id parameters
6571
// Returns (GRPC error code):
6672
// NOT_FOUND if any of the signals are non-existant.
6773
// UNAUTHENTICATED if no credentials provided or credentials has expired
6874
// PERMISSION_DENIED if access is denied for any of the signals.
69-
// INVALID_ARGUMENT if the request is empty or provided path is too long
75+
// INVALID_ARGUMENT
76+
// - if the request is empty or provided path is too long
77+
// MAX_REQUEST_PATH_LENGTH: usize = 1000;
78+
// - if buffer_size exceeds the maximum permitted
79+
// MAX_BUFFER_SIZE: usize = 1000;
7080
//
7181
// When subscribing, Databroker shall immediately return the value for all
7282
// subscribed entries.
7383
// If a value isn't available when subscribing to a it, it should return None
7484
//
85+
// If a subscriber is slow to consume signals, messages will be buffered up
86+
// to the specified buffer_size before the oldest messages are dropped.
87+
//
7588
rpc SubscribeById(SubscribeByIdRequest) returns (stream SubscribeByIdResponse);
7689

7790
// Actuate a single actuator
@@ -192,6 +205,10 @@ message GetValuesResponse {
192205

193206
message SubscribeRequest {
194207
repeated string signal_paths = 1;
208+
209+
// Specifies the number of messages that can be buffered for
210+
// slow subscribers before the oldest messages are dropped.
211+
uint32 buffer_size = 2;
195212
}
196213

197214
message SubscribeResponse {
@@ -200,6 +217,10 @@ message SubscribeResponse {
200217

201218
message SubscribeByIdRequest {
202219
repeated int32 signal_ids = 1;
220+
221+
// Specifies the number of messages that can be buffered for
222+
// slow subscribers before the oldest messages are dropped.
223+
uint32 buffer_size = 2;
203224
}
204225

205226
message SubscribeByIdResponse {

0 commit comments

Comments
 (0)