Skip to content

Commit 5f7a022

Browse files
committed
Fix issues blocking RAP integration
1 parent fbfb769 commit 5f7a022

13 files changed

+372
-22
lines changed

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ serde = {version = "1.0.215", features = ["derive"]}
2626
serde_json = "1.0.133"
2727
# mqtt client
2828
# Disable the default dependency on SSL to avoid a build dependency on OpenSSL
29-
paho-mqtt = {version = "0.12.5", features=["bundled"]}
29+
paho-mqtt = {version = "0.12.5", default-features=false, features=["bundled"]}
3030
async-trait = "0.1.51"
3131
async-once-cell = "0.5.4"
3232

@@ -39,3 +39,7 @@ criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] }
3939
[[bench]]
4040
name = "simple_add"
4141
harness = false
42+
43+
[[bench]]
44+
name = "maple_sequence"
45+
harness = false

benches/simple_add.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,14 @@ fn from_elem(c: &mut Criterion) {
139139
group.measurement_time(std::time::Duration::from_secs(30));
140140

141141
for size in sizes {
142-
if size <= 10000 {
143-
group.bench_with_input(
144-
BenchmarkId::new("simple_add_constraints", size),
145-
&size,
146-
|b, &size| {
147-
b.to_async(&tokio_rt)
148-
.iter(|| monitor_outputs_untyped_constraints(size))
149-
},
150-
);
151-
}
142+
group.bench_with_input(
143+
BenchmarkId::new("simple_add_constraints", size),
144+
&size,
145+
|b, &size| {
146+
b.to_async(&tokio_rt)
147+
.iter(|| monitor_outputs_untyped_constraints(size))
148+
},
149+
);
152150
group.bench_with_input(
153151
BenchmarkId::new("simple_add_untyped_async", size),
154152
&size,
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
0: stage = "m"
2+
1: stage = "a"
3+
2: stage = "p"
4+
3: stage = "l"
5+
4: stage = "e"

examples/maple_sequence_true.input

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
0: m = true
2+
a = false
3+
p = false
4+
l = false
5+
e = false
6+
1: m = false
7+
a = true
8+
p = false
9+
l = false
10+
e = false
11+
2: m = false
12+
a = false
13+
p = true
14+
l = false
15+
e = false
16+
3: m = false
17+
a = false
18+
p = false
19+
l = true
20+
e = false
21+
4: m = false
22+
a = false
23+
p = false
24+
l = false
25+
e = true

examples/maple_simple_seq.lola

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
in m : Bool
2+
in a : Bool
3+
in p : Bool
4+
in l : Bool
5+
in e : Bool
6+
out mout : Bool
7+
out aout : Bool
8+
out pout : Bool
9+
out lout : Bool
10+
out eout : Bool
11+
out maple : Bool
12+
out globallymaple : Bool
13+
mout = m && !a && !p && !l && !e && eout[-1, true]
14+
aout = !m && a && !p && !l && !e && mout[-1, false]
15+
pout = !m && !a && p && !l && !e && aout[-1, false]
16+
lout = !m && !a && !p && l && !e && pout[-1, false]
17+
eout = !m && !a && !p && !l && e && lout[-1, false]
18+
maple = mout || aout || pout || lout || eout
19+
globallymaple = maple && maple[-1, true]
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
in stage : Str
2+
out m: Bool
3+
out a: Bool
4+
out p: Bool
5+
out l: Bool
6+
out e: Bool
7+
out maple : Bool
8+
m = (stage == "m") && e[-1, true]
9+
a = (stage == "a") && m[-1, false]
10+
p = (stage == "p") && a[-1, false]
11+
l = (stage == "l") && p[-1, false]
12+
e = (stage == "e") && l[-1, false]
13+
maple = m || a || p || l || e

src/async_runtime.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ async fn manage_var<V: StreamData>(
114114
data = input_stream.next() => {
115115
if let Some(data) = data {
116116
// println!("sending data {:?}", data);
117-
assert!(!senders.is_empty());
117+
// Senders can be empty if an input is not actually used
118+
// assert!(!senders.is_empty());
118119
let send_futs = senders.iter().map(|sender| sender.send(data.clone()));
119120
for res in join_all(send_futs).await {
120121
if let Err(_) = res {
@@ -484,7 +485,8 @@ where
484485
.collect();
485486
let mut async_streams = vec![];
486487
for (var, _) in to_launch_out.iter() {
487-
let expr = model.var_expr(&var).unwrap();
488+
let expr = model.var_expr(&var)
489+
.expect(format!("Failed to find expression for var {}", var.0.as_str()).as_str());
488490
let stream = Box::pin(S::to_async_stream(expr, &var_exchange));
489491
// let stream = SS::transform_stream(
490492
// DropGuardStream {

src/file_handling.rs

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ impl Display for FileParseError {
2525

2626
impl Error for FileParseError {}
2727

28-
pub async fn parse_file<O: Clone + Debug, E: Debug>(
28+
pub async fn parse_file<
29+
O: Clone + Debug,
30+
E: Debug + Display + for<'a> winnow::error::ParserError<&'a str>,
31+
>(
2932
// The for<'a> syntax is a higher-ranked trait bound which is
3033
// necessary to specify that the lifetime of the string passed
3134
// into the parser does not need to outlive this function call
@@ -42,7 +45,7 @@ pub async fn parse_file<O: Clone + Debug, E: Debug>(
4245
// parser.parse_next(&mut contents.as_str().into()).unwrap()
4346
// );
4447
parser
45-
.parse_next(&mut contents.as_str().into())
48+
.parse(contents.as_str().into())
4649
.map_err(|e| Box::new(FileParseError::new(e.to_string())) as Box<dyn Error>)
4750
}
4851

@@ -67,4 +70,86 @@ mod tests {
6770
.await;
6871
assert_eq!(x_vals, vec![Value::Int(1), Value::Int(3)]);
6972
}
73+
74+
#[tokio::test]
75+
async fn test_parse_boolean_file() {
76+
let parser = crate::parser::lola_input_file;
77+
let file = "examples/maple_sequence_true.input";
78+
let mut data = parse_file(parser, file).await.unwrap();
79+
let m_vals = data
80+
.input_stream(&VarName("m".into()))
81+
.unwrap()
82+
.collect::<Vec<_>>()
83+
.await;
84+
assert_eq!(
85+
m_vals,
86+
vec![
87+
Value::Bool(true),
88+
Value::Bool(false),
89+
Value::Bool(false),
90+
Value::Bool(false),
91+
Value::Bool(false)
92+
],
93+
);
94+
let a_vals = data
95+
.input_stream(&VarName("a".into()))
96+
.unwrap()
97+
.collect::<Vec<_>>()
98+
.await;
99+
assert_eq!(
100+
a_vals,
101+
vec![
102+
Value::Bool(false),
103+
Value::Bool(true),
104+
Value::Bool(false),
105+
Value::Bool(false),
106+
Value::Bool(false)
107+
],
108+
);
109+
let p_vals = data
110+
.input_stream(&VarName("p".into()))
111+
.unwrap()
112+
.collect::<Vec<_>>()
113+
.await;
114+
assert_eq!(
115+
p_vals,
116+
vec![
117+
Value::Bool(false),
118+
Value::Bool(false),
119+
Value::Bool(true),
120+
Value::Bool(false),
121+
Value::Bool(false)
122+
],
123+
);
124+
let l_vals = data
125+
.input_stream(&VarName("l".into()))
126+
.unwrap()
127+
.collect::<Vec<_>>()
128+
.await;
129+
assert_eq!(
130+
l_vals,
131+
vec![
132+
Value::Bool(false),
133+
Value::Bool(false),
134+
Value::Bool(false),
135+
Value::Bool(true),
136+
Value::Bool(false)
137+
],
138+
);
139+
let e_vals = data
140+
.input_stream(&VarName("e".into()))
141+
.unwrap()
142+
.collect::<Vec<_>>()
143+
.await;
144+
assert_eq!(
145+
e_vals,
146+
vec![
147+
Value::Bool(false),
148+
Value::Bool(false),
149+
Value::Bool(false),
150+
Value::Bool(false),
151+
Value::Bool(true)
152+
],
153+
);
154+
}
70155
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ async fn main() {
7777
let model = parse_file(model_parser, cli.model.as_str())
7878
.await
7979
.expect("Model file could not be parsed");
80+
println!("Model: {:?}", model);
8081

8182
let output_handler: Box<dyn OutputHandler<Value>> = match cli.output_mode {
8283
trustworthiness_checker::commandline_args::OutputMode {

src/mqtt_input_provider.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio_stream::wrappers::ReceiverStream;
1313
use crate::{core::VarName, mqtt_client::provide_mqtt_client, InputProvider, OutputStream, Value};
1414
// use async_stream::stream;
1515

16-
const QOS: &[i32] = &[1, 1];
16+
const QOS: i32 = 1;
1717

1818
pub struct VarData {
1919
pub variable: VarName,
@@ -74,13 +74,14 @@ impl MQTTInputProvider {
7474
let client = provide_mqtt_client(host.clone()).await.unwrap();
7575
let mut stream = client.clone().get_stream(10);
7676
// println!("Connected to MQTT broker");
77+
let qos = topics.iter().map(|_| QOS).collect::<Vec<_>>();
7778
loop {
78-
match client.subscribe_many(&topics, QOS).await {
79+
match client.subscribe_many(&topics, &qos).await {
7980
Ok(_) => break,
8081
Err(e) => {
8182
println!(
82-
"Failed to subscribe to topics with error {:?}, retrying in 100ms",
83-
e
83+
"Failed to subscribe to topics {:?} with error {:?}, retrying in 100ms",
84+
topics, e
8485
);
8586
tokio::time::sleep(Duration::from_millis(100)).await;
8687
println!("Reconnecting");

0 commit comments

Comments
 (0)