Skip to content

Commit

Permalink
Revert lag error propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Nov 13, 2024
1 parent c8240d6 commit 8e02443
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 145 deletions.
76 changes: 32 additions & 44 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::query;
pub use crate::types::{ChangeType, DataType, DataValue, EntryType};

use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
Expand All @@ -35,7 +34,7 @@ use tracing::{debug, info, warn};

use crate::glob;

const MAX_BROADCAST_CHANNEL_CAPACITY: usize = 1000;
const MAX_SUBSCRIBE_BUFFER_SIZE: usize = 1000;

#[derive(Debug)]
pub enum ActuationError {
Expand Down Expand Up @@ -161,7 +160,7 @@ pub enum QueryError {
pub enum SubscriptionError {
NotFound,
InvalidInput,
InvalidInputBroadChannel,
InvalidSubscibeBufferSize,
InternalError,
}

Expand Down Expand Up @@ -1633,16 +1632,15 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
lag_buffer_capacity: Option<usize>,
) -> Result<impl Stream<Item = Result<EntryUpdates, BroadcastStreamRecvError>>, SubscriptionError>
{
buffer_size: Option<usize>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
}

let channel_capacity = if let Some(cap) = lag_buffer_capacity {
if cap > MAX_BROADCAST_CHANNEL_CAPACITY {
return Err(SubscriptionError::InvalidInputBroadChannel);
let channel_capacity = if let Some(cap) = buffer_size {
if cap > MAX_SUBSCRIBE_BUFFER_SIZE {
return Err(SubscriptionError::InvalidSubscibeBufferSize);
}
cap
} else {
Expand Down Expand Up @@ -1670,11 +1668,11 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.await
.add_change_subscription(subscription);

let stream = BroadcastStream::new(receiver).map(|result| match result {
Ok(message) => Ok(message),
let stream = BroadcastStream::new(receiver).filter_map(|result| match result {
Ok(message) => Some(message),
Err(err) => {
debug!("Lagged entries: {}", err);
Err(err)
None
}
});
Ok(stream)
Expand Down Expand Up @@ -4271,22 +4269,17 @@ pub mod tests {

// Stream should yield initial notification with current values i.e. NotAvailable
match stream.next().await {
Some(next) => match next {
Ok(entry) => {
assert_eq!(entry.updates.len(), 1);
assert_eq!(
entry.updates[0].update.path,
Some("test.datapoint1".to_string())
);
assert_eq!(
entry.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::NotAvailable
);
}
Err(_) => {
panic!("did not expect stream end")
}
},
Some(entry) => {
assert_eq!(entry.updates.len(), 1);
assert_eq!(
entry.updates[0].update.path,
Some("test.datapoint1".to_string())
);
assert_eq!(
entry.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::NotAvailable
);
}
None => {
panic!("did not expect stream end")
}
Expand Down Expand Up @@ -4317,22 +4310,17 @@ pub mod tests {

// Value has been set, expect the next item in stream to match.
match stream.next().await {
Some(next) => match next {
Ok(entry) => {
assert_eq!(entry.updates.len(), 1);
assert_eq!(
entry.updates[0].update.path,
Some("test.datapoint1".to_string())
);
assert_eq!(
entry.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::Int32(101)
);
}
Err(_) => {
panic!("did not expect stream end")
}
},
Some(entry) => {
assert_eq!(entry.updates.len(), 1);
assert_eq!(
entry.updates[0].update.path,
Some("test.datapoint1".to_string())
);
assert_eq!(
entry.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::Int32(101)
);
}
None => {
panic!("did not expect stream end")
}
Expand Down
37 changes: 15 additions & 22 deletions databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use tokio::sync::mpsc;

use databroker_proto::kuksa::val::v1 as proto;
use databroker_proto::kuksa::val::v1::{DataEntryError, EntryUpdate};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -616,7 +615,7 @@ impl proto::val_server::Val for broker::DataBroker {
Err(SubscriptionError::InternalError) => {
Err(tonic::Status::new(tonic::Code::Internal, "Internal Error"))
}
Err(SubscriptionError::InvalidInputBroadChannel) => Err(tonic::Status::new(
Err(SubscriptionError::InvalidSubscibeBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription lag_buffer_capacity max allowed value is 1000",
)),
Expand Down Expand Up @@ -737,28 +736,22 @@ fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> Da
}

fn convert_to_proto_stream(
input: impl Stream<Item = Result<broker::EntryUpdates, BroadcastStreamRecvError>>,
input: impl Stream<Item = broker::EntryUpdates>,
) -> impl Stream<Item = Result<proto::SubscribeResponse, tonic::Status>> {
input.map(move |item| match item {
Ok(item) => {
let mut updates = Vec::new();
for update in item.updates {
updates.push(proto::EntryUpdate {
entry: Some(proto::DataEntry::from(update.update)),
fields: update
.fields
.iter()
.map(|field| proto::Field::from(field) as i32)
.collect(),
});
}
let response = proto::SubscribeResponse { updates };
Ok(response)
input.map(move |item| {
let mut updates = Vec::new();
for update in item.updates {
updates.push(proto::EntryUpdate {
entry: Some(proto::DataEntry::from(update.update)),
fields: update
.fields
.iter()
.map(|field| proto::Field::from(field) as i32)
.collect(),
});
}
Err(err) => Err(tonic::Status::resource_exhausted(format!(
"Lagged entries: {}",
err
))),
let response = proto::SubscribeResponse { updates };
Ok(response)
})
}

Expand Down
81 changes: 33 additions & 48 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ use kuksa::proto::v2::{
};
use std::collections::HashSet;
use tokio::{select, sync::mpsc};
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, ReceiverStream},
Stream, StreamExt,
};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::debug;

const MAX_REQUEST_PATH_LENGTH: usize = 1000;
Expand Down Expand Up @@ -269,7 +266,7 @@ impl proto::val_server::Val for broker::DataBroker {
Err(tonic::Status::invalid_argument("Invalid Argument"))
}
Err(SubscriptionError::InternalError) => Err(tonic::Status::internal("Internal Error")),
Err(SubscriptionError::InvalidInputBroadChannel) => Err(tonic::Status::new(
Err(SubscriptionError::InvalidSubscibeBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription lag_buffer_capacity max allowed value is 1000",
)),
Expand Down Expand Up @@ -353,7 +350,7 @@ impl proto::val_server::Val for broker::DataBroker {
Err(SubscriptionError::InternalError) => {
Err(tonic::Status::new(tonic::Code::Internal, "Internal Error"))
}
Err(SubscriptionError::InvalidInputBroadChannel) => Err(tonic::Status::new(
Err(SubscriptionError::InvalidSubscibeBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription lag_buffer_capacity max allowed value is 1000",
)),
Expand Down Expand Up @@ -876,60 +873,48 @@ async fn get_signal(
}

fn convert_to_proto_stream(
input: impl Stream<Item = Result<broker::EntryUpdates, BroadcastStreamRecvError>>,
input: impl Stream<Item = broker::EntryUpdates>,
size: usize,
) -> impl Stream<Item = Result<proto::SubscribeResponse, tonic::Status>> {
input.map(move |item| match item {
Ok(item) => {
let mut entries: HashMap<String, proto::Datapoint> = HashMap::with_capacity(size);
for update in item.updates {
let update_datapoint: Option<proto::Datapoint> = match update.update.datapoint {
Some(datapoint) => datapoint.into(),
None => None,
};
if let Some(dp) = update_datapoint {
entries.insert(
update
.update
.path
.expect("Something wrong with update path of subscriptions!"),
dp,
);
}
input.map(move |item| {
let mut entries: HashMap<String, proto::Datapoint> = HashMap::with_capacity(size);
for update in item.updates {
let update_datapoint: Option<proto::Datapoint> = match update.update.datapoint {
Some(datapoint) => datapoint.into(),
None => None,
};
if let Some(dp) = update_datapoint {
entries.insert(
update
.update
.path
.expect("Something wrong with update path of subscriptions!"),
dp,
);
}
let response = proto::SubscribeResponse { entries };
Ok(response)
}
Err(err) => Err(tonic::Status::resource_exhausted(format!(
"Lagged entries: {}",
err
))),
let response = proto::SubscribeResponse { entries };
Ok(response)
})
}

fn convert_to_proto_stream_id(
input: impl Stream<Item = Result<broker::EntryUpdates, BroadcastStreamRecvError>>,
input: impl Stream<Item = broker::EntryUpdates>,
size: usize,
) -> impl Stream<Item = Result<proto::SubscribeByIdResponse, tonic::Status>> {
input.map(move |item| match item {
Ok(item) => {
let mut entries: HashMap<i32, proto::Datapoint> = HashMap::with_capacity(size);
for update in item.updates {
let update_datapoint: Option<proto::Datapoint> = match update.update.datapoint {
Some(datapoint) => datapoint.into(),
None => None,
};
if let Some(dp) = update_datapoint {
entries.insert(update.id, dp);
}
input.map(move |item| {
let mut entries: HashMap<i32, proto::Datapoint> = HashMap::with_capacity(size);
for update in item.updates {
let update_datapoint: Option<proto::Datapoint> = match update.update.datapoint {
Some(datapoint) => datapoint.into(),
None => None,
};
if let Some(dp) = update_datapoint {
entries.insert(update.id, dp);
}
let response = proto::SubscribeByIdResponse { entries };
Ok(response)
}
Err(err) => Err(tonic::Status::resource_exhausted(format!(
"Lagged entries: {}",
err
))),
let response = proto::SubscribeByIdResponse { entries };
Ok(response)
})
}

Expand Down
50 changes: 21 additions & 29 deletions databroker/src/viss/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use futures::{
Stream, StreamExt,
};
use tokio::sync::RwLock;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::warn;

use crate::{
Expand Down Expand Up @@ -304,7 +303,7 @@ impl Viss for Server {
broker::SubscriptionError::NotFound => Error::NotFoundInvalidPath,
broker::SubscriptionError::InvalidInput => Error::NotFoundInvalidPath,
broker::SubscriptionError::InternalError => Error::InternalServerError,
broker::SubscriptionError::InvalidInputBroadChannel => {
broker::SubscriptionError::InvalidSubscibeBufferSize => {
Error::InternalServerError
}
},
Expand Down Expand Up @@ -340,40 +339,33 @@ impl Viss for Server {

fn convert_to_viss_stream(
subscription_id: SubscriptionId,
stream: impl Stream<Item = Result<broker::EntryUpdates, BroadcastStreamRecvError>>,
stream: impl Stream<Item = broker::EntryUpdates>,
) -> impl Stream<Item = Result<SubscriptionEvent, SubscriptionErrorEvent>> {
stream.map(move |item| match item {
Ok(mut item) => {
let ts = SystemTime::now().into();
let subscription_id = subscription_id.clone();
match item.updates.pop() {
Some(item) => match (item.update.path, item.update.datapoint) {
(Some(path), Some(datapoint)) => Ok(SubscriptionEvent {
subscription_id,
data: Data::Object(DataObject {
path: path.into(),
dp: datapoint.into(),
}),
ts,
}),
(_, _) => Err(SubscriptionErrorEvent {
subscription_id,
error: Error::InternalServerError,
ts,
stream.map(move |mut item| {
let ts = SystemTime::now().into();
let subscription_id = subscription_id.clone();
match item.updates.pop() {
Some(item) => match (item.update.path, item.update.datapoint) {
(Some(path), Some(datapoint)) => Ok(SubscriptionEvent {
subscription_id,
data: Data::Object(DataObject {
path: path.into(),
dp: datapoint.into(),
}),
},
None => Err(SubscriptionErrorEvent {
ts,
}),
(_, _) => Err(SubscriptionErrorEvent {
subscription_id,
error: Error::InternalServerError,
ts,
}),
}
},
None => Err(SubscriptionErrorEvent {
subscription_id,
error: Error::InternalServerError,
ts,
}),
}
Err(_err) => Err(SubscriptionErrorEvent {
subscription_id: subscription_id.clone(),
error: Error::InternalServerError,
ts: SystemTime::now().into(),
}),
})
}

Expand Down
Loading

0 comments on commit 8e02443

Please sign in to comment.