Skip to content

Commit

Permalink
Add CI test for AWS IoT Device Advisor
Browse files Browse the repository at this point in the history
  • Loading branch information
MathiasKoch committed May 2, 2024
1 parent 3b18596 commit 0cb6ffd
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 28 deletions.
13 changes: 6 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ jobs:
AWS_DEFAULT_REGION: ${{ secrets.MGMT_AWS_DEFAULT_REGION }}
AWS_ACCESS_KEY_ID: ${{ secrets.MGMT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.MGMT_AWS_SECRET_ACCESS_KEY }}
SUITE_ID: greb3uy2wtq3
THING_ARN: arn:aws:iot:eu-west-1:274906834921:thing/mqttrust
CERTIFICATE_ARN: arn:aws:iot:eu-west-1:274906834921:cert/e7280d8d316b58da3058037a2c1730d9eb15de50e96f4d47e54ea655266b76db
SUITE_ID: 1gaev57dq6i5
THING_ARN: arn:aws:iot:eu-west-1:411974994697:thing/mqttrust
steps:
- name: Checkout
uses: actions/checkout@v1
Expand All @@ -89,7 +88,7 @@ jobs:
- name: Get AWS_HOSTNAME
id: hostname
run: |
hostname=$(aws iotdeviceadvisor get-endpoint --output text --query endpoint)
hostname=$(aws iotdeviceadvisor get-endpoint --thing-arn ${{ env.THING_ARN }} --output text --query endpoint)
ret=$?
echo "::set-output name=AWS_HOSTNAME::$hostname"
exit $ret
Expand All @@ -105,7 +104,7 @@ jobs:
- name: Start test suite
id: test_suite
run: |
suite_id=$(aws iotdeviceadvisor start-suite-run --suite-definition-id ${{ env.SUITE_ID }} --suite-run-configuration "primaryDevice={thingArn=${{ env.THING_ARN }},certificateArn=${{ env.CERTIFICATE_ARN }}}" --output text --query suiteRunId)
suite_id=$(aws iotdeviceadvisor start-suite-run --suite-definition-id ${{ env.SUITE_ID }} --suite-run-configuration "primaryDevice={thingArn=${{ env.THING_ARN }}},parallelRun=true" --output text --query suiteRunId)
ret=$?
echo "::set-output name=SUITE_RUN_ID::$suite_id"
exit $ret
Expand All @@ -121,9 +120,9 @@ jobs:
- name: Monitor test run
run: |
chmod +x ./.github/scripts/da_monitor.sh
chmod +x ./scripts/da_monitor.sh
echo ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }}
./.github/scripts/da_monitor.sh ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }}
./scripts/da_monitor.sh ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }}
- name: Kill test binary process
if: ${{ always() }}
Expand Down
12 changes: 8 additions & 4 deletions mqttrust_core/examples/aws_device_advisor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod common;

use mqttrust::encoding::v4::{Connack, ConnectReturnCode};
use mqttrust::{Mqtt, QoS, SubscribeTopic};
use mqttrust_core::bbqueue::BBBuffer;
use mqttrust_core::{EventLoop, MqttOptions, Notification};
Expand Down Expand Up @@ -44,10 +45,13 @@ fn main() {
.spawn(move || loop {
match nb::block!(mqtt_eventloop.connect(&mut network)) {
Err(_) => continue,
Ok(true) => {
Ok(Some(Notification::ConnAck(Connack {
session_present,
code: ConnectReturnCode::Accepted,
}))) => {
log::info!("Successfully connected to broker");
}
Ok(false) => {}
Ok(_) => {}
}

match mqtt_eventloop.yield_event(&mut network) {
Expand All @@ -64,14 +68,14 @@ fn main() {
thread::sleep(std::time::Duration::from_millis(5000));
mqtt_client
.subscribe(&[SubscribeTopic {
topic_path: format!("{}/device/advisor", thing_name).as_str(),
topic_path: format!("plc/output/{}", thing_name).as_str(),
qos: QoS::AtLeastOnce,
}])
.unwrap();

mqtt_client
.publish(
format!("{}/device/advisor/hello", thing_name).as_str(),
format!("plc/input/{}", thing_name).as_str(),
format!("Hello from {}", thing_name).as_bytes(),
QoS::AtLeastOnce,
)
Expand Down
Binary file modified mqttrust_core/examples/secrets/identity.pfx
Binary file not shown.
46 changes: 36 additions & 10 deletions mqttrust_core/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use core::convert::Infallible;
use core::ops::DerefMut;
use core::ops::RangeTo;
use embedded_nal::{AddrType, Dns, SocketAddr, TcpClientStack};
use fugit::ExtU32;
use fugit::{ExtU32, TimerDurationU32};
use heapless::{String, Vec};
use mqttrust::encoding::v4::{decode_slice, encode_slice, Connect, Packet, Protocol};

Expand All @@ -24,6 +24,7 @@ where
/// Request stream
pub(crate) requests: Option<FrameConsumer<'a, L>>,
network_handle: NetworkHandle<S>,
connect_counter: u8,
}

impl<'a, 'b, S, O, const TIMER_HZ: u32, const L: usize> EventLoop<'a, 'b, S, O, TIMER_HZ, L>
Expand All @@ -41,6 +42,7 @@ where
options,
requests: Some(requests),
network_handle: NetworkHandle::new(),
connect_counter: 0,
}
}

Expand All @@ -54,7 +56,7 @@ where
pub fn connect<N: Dns + TcpClientStack<TcpSocket = S> + ?Sized>(
&mut self,
network: &mut N,
) -> nb::Result<bool, EventError> {
) -> nb::Result<Option<Notification>, EventError> {

Check warning on line 59 in mqttrust_core/src/eventloop.rs

View workflow job for this annotation

GitHub Actions / clippy

docs for function returning `Result` missing `# Errors` section

warning: docs for function returning `Result` missing `# Errors` section --> mqttrust_core/src/eventloop.rs:56:5 | 56 | / pub fn connect<N: Dns + TcpClientStack<TcpSocket = S> + ?Sized>( 57 | | &mut self, 58 | | network: &mut N, 59 | | ) -> nb::Result<Option<Notification>, EventError> { | |_____________________________________________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc = note: `-W clippy::missing-errors-doc` implied by `-W clippy::pedantic` = help: to override `-W clippy::pedantic` add `#[allow(clippy::missing_errors_doc)]`
// connect to the broker
match self.network_handle.is_connected(network) {
Ok(false) => {
Expand Down Expand Up @@ -216,16 +218,23 @@ where
}
}

fn backoff(&self) -> TimerDurationU32<TIMER_HZ> {
let base_time_ms: u32 = 1000;
let backoff = base_time_ms.saturating_mul(u32::pow(2, self.connect_counter as u32 - 1));

Check warning on line 223 in mqttrust_core/src/eventloop.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `u8` to `u32` may become silently lossy if you later change the type

warning: casting `u8` to `u32` may become silently lossy if you later change the type --> mqttrust_core/src/eventloop.rs:223:63 | 223 | let backoff = base_time_ms.saturating_mul(u32::pow(2, self.connect_counter as u32 - 1)); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try: `u32::from(self.connect_counter)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_lossless = note: `-W clippy::cast-lossless` implied by `-W clippy::pedantic` = help: to override `-W clippy::pedantic` add `#[allow(clippy::cast_lossless)]`

core::cmp::min(50.secs(), backoff.millis())
}

fn mqtt_connect<N: TcpClientStack<TcpSocket = S> + ?Sized>(
&mut self,
network: &mut N,
) -> nb::Result<bool, EventError> {
) -> nb::Result<Option<Notification>, EventError> {
match self.state.connection_status {
MqttConnectionStatus::Connected => Ok(false),
MqttConnectionStatus::Connected => Ok(None),
MqttConnectionStatus::Disconnected => {
info!("MQTT connecting..");
let now = self.last_outgoing_timer.now();
self.state.last_ping_entry().insert(now);
self.connect_counter += 1;

self.state.await_pingresp = false;
self.network_handle.rx_buf.init();
Expand All @@ -242,6 +251,12 @@ where
password,
});

info!(
"MQTT connecting.. Attempt: {}. Backoff time: {}",
self.connect_counter,
self.backoff().to_millis()
);

// mqtt connection with timeout
self.network_handle.send_packet(network, &connect)?;
self.state.handle_outgoing_connect();
Expand All @@ -250,25 +265,36 @@ where
MqttConnectionStatus::Handshake => {
let now = self.last_outgoing_timer.now();

let backoff_time = core::cmp::max(50.secs(), self.backoff());

if self
.state
.last_ping_entry()
.or_insert(now)
.has_elapsed(&now, 50.secs())
.has_elapsed(&now, backoff_time)
{
return Err(nb::Error::Other(EventError::Timeout));
}

self.network_handle
let res = self
.network_handle
.receive(network)
.map_err(|e| e.map(EventError::Network))?
.decode(&mut self.state)
.and_then(|(n, p)| {
if n.is_none() && p.is_none() {
return Err(nb::Error::WouldBlock);
}
Ok(n.map(|n| n == Notification::ConnAck).unwrap_or(false))
})
Ok(n)
});

match res {
Ok(r) => {
self.connect_counter = 0;
Ok(r)
}
Err(e) => Err(e),
}
}
}
}
Expand Down Expand Up @@ -417,7 +443,7 @@ impl<S> NetworkHandle<S> {
#[derive(Debug)]
struct PacketBuffer {
range: RangeTo<usize>,
buffer: Vec<u8, 4096>,
buffer: Vec<u8, { 1024 * 16 }>,
}

impl PacketBuffer {
Expand Down
3 changes: 2 additions & 1 deletion mqttrust_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use client::Client;
use core::convert::TryFrom;
pub use eventloop::EventLoop;
use heapless::{String, Vec};
use mqttrust::encoding::v4::Connack;
pub use mqttrust::encoding::v4::{Pid, Publish, QoS, QosPid, Suback};
pub use mqttrust::*;
pub use options::{Broker, MqttOptions};
Expand All @@ -39,7 +40,7 @@ pub struct PublishNotification {
// #[cfg_attr(feature = "defmt-impl", derive(defmt::Format))]
pub enum Notification {
/// Incoming connection acknowledge
ConnAck,
ConnAck(Connack),
/// Incoming publish from the broker
#[cfg(not(feature = "std"))]
Publish(heapless::pool::singleton::Box<state::BoxedPublish, heapless::pool::Init>),
Expand Down
23 changes: 17 additions & 6 deletions mqttrust_core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl<const TIMER_HZ: u32> MqttState<TIMER_HZ> {
match packet {
Packet::Connack(connack) => self
.handle_incoming_connack(connack)
.map(|()| (Notification::ConnAck.into(), None)),
.map(|()| (Notification::ConnAck(connack).into(), None)),
Packet::Pingresp => self.handle_incoming_pingresp(),
Packet::Publish(publish) => self.handle_incoming_publish(publish),
Packet::Suback(suback) => self.handle_incoming_suback(suback),
Expand Down Expand Up @@ -269,12 +269,23 @@ impl<const TIMER_HZ: u32> MqttState<TIMER_HZ> {
let qospid = (publish.qos, publish.pid);

#[cfg(not(feature = "std"))]
let boxed_publish = BoxedPublish::alloc().unwrap();
#[cfg(not(feature = "std"))]
let notification = Notification::Publish(boxed_publish.init(publish.try_into().unwrap()));
let notification = if publish.payload.len() > 4096 {
error!(
"Received payload larger the {}! Sending ACK but discarding payload notification!",
4096
);
None
} else {
let boxed_publish = BoxedPublish::alloc().unwrap();
Some(Notification::Publish(
boxed_publish.init(publish.try_into().unwrap()),
))
};

#[cfg(feature = "std")]
let notification = Notification::Publish(std::boxed::Box::new(publish.try_into().unwrap()));
let notification = Some(Notification::Publish(std::boxed::Box::new(
publish.try_into().unwrap(),
)));

let request = match qospid {
(QoS::AtMostOnce, _) => None,
Expand All @@ -289,7 +300,7 @@ impl<const TIMER_HZ: u32> MqttState<TIMER_HZ> {
}
_ => return Err(StateError::InvalidHeader),
};
Ok((Some(notification), request))
Ok((notification, request))
}

fn handle_incoming_pubrel(
Expand Down
File renamed without changes.
27 changes: 27 additions & 0 deletions scripts/rotate_secrets.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

if [[ -z "$DEVICE_ADVISOR_PASSWORD" ]]; then
echo "DEVICE_ADVISOR_PASSWORD environment variable must be set!"
return 1;
fi

SCRIPT_DIR="$(dirname "$(readlink -f "$0")")"
SECRETS_DIR="$SCRIPT_DIR/../mqttrust_core/examples/secrets"
THING_NAME="mqttrust"

CERT_PATH="$SECRETS_DIR/cert.pem"
PRIV_KEY_PATH="$SECRETS_DIR/priv.key.pem"

CERT_ARN=$(aws iot create-keys-and-certificate --set-as-active --certificate-pem-outfile $CERT_PATH --private-key-outfile $PRIV_KEY_PATH | jq -r .certificateArn);
aws iot attach-thing-principal --thing-name $THING_NAME --principal $CERT_ARN > /dev/null 2>&1
aws iot attach-policy --policy-name Connect --target $CERT_ARN > /dev/null 2>&1
aws iot attach-policy --policy-name Input --target $CERT_ARN > /dev/null 2>&1
aws iot attach-policy --policy-name Output --target $CERT_ARN > /dev/null 2>&1

rm $SECRETS_DIR/identity.pfx

# Generate new identity.pfx
openssl pkcs12 -export -passout pass:"$DEVICE_ADVISOR_PASSWORD" -out $SECRETS_DIR/identity.pfx -inkey $PRIV_KEY_PATH -in $CERT_PATH

rm $CERT_PATH
rm $PRIV_KEY_PATH

0 comments on commit 0cb6ffd

Please sign in to comment.