diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d05767e..1526897 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,9 +72,8 @@ jobs: AWS_DEFAULT_REGION: ${{ secrets.MGMT_AWS_DEFAULT_REGION }} AWS_ACCESS_KEY_ID: ${{ secrets.MGMT_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.MGMT_AWS_SECRET_ACCESS_KEY }} - SUITE_ID: greb3uy2wtq3 - THING_ARN: arn:aws:iot:eu-west-1:274906834921:thing/mqttrust - CERTIFICATE_ARN: arn:aws:iot:eu-west-1:274906834921:cert/e7280d8d316b58da3058037a2c1730d9eb15de50e96f4d47e54ea655266b76db + SUITE_ID: 1gaev57dq6i5 + THING_ARN: arn:aws:iot:eu-west-1:411974994697:thing/mqttrust steps: - name: Checkout uses: actions/checkout@v1 @@ -89,7 +88,7 @@ jobs: - name: Get AWS_HOSTNAME id: hostname run: | - hostname=$(aws iotdeviceadvisor get-endpoint --output text --query endpoint) + hostname=$(aws iotdeviceadvisor get-endpoint --thing-arn ${{ env.THING_ARN }} --output text --query endpoint) ret=$? echo "::set-output name=AWS_HOSTNAME::$hostname" exit $ret @@ -105,7 +104,7 @@ jobs: - name: Start test suite id: test_suite run: | - suite_id=$(aws iotdeviceadvisor start-suite-run --suite-definition-id ${{ env.SUITE_ID }} --suite-run-configuration "primaryDevice={thingArn=${{ env.THING_ARN }},certificateArn=${{ env.CERTIFICATE_ARN }}}" --output text --query suiteRunId) + suite_id=$(aws iotdeviceadvisor start-suite-run --suite-definition-id ${{ env.SUITE_ID }} --suite-run-configuration "primaryDevice={thingArn=${{ env.THING_ARN }}},parallelRun=true" --output text --query suiteRunId) ret=$? echo "::set-output name=SUITE_RUN_ID::$suite_id" exit $ret @@ -121,9 +120,9 @@ jobs: - name: Monitor test run run: | - chmod +x ./.github/scripts/da_monitor.sh + chmod +x ./scripts/da_monitor.sh echo ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }} - ./.github/scripts/da_monitor.sh ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }} + ./scripts/da_monitor.sh ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }} - name: Kill test binary process if: ${{ always() }} diff --git a/mqttrust_core/examples/aws_device_advisor.rs b/mqttrust_core/examples/aws_device_advisor.rs index 0d86e29..8720095 100644 --- a/mqttrust_core/examples/aws_device_advisor.rs +++ b/mqttrust_core/examples/aws_device_advisor.rs @@ -1,5 +1,6 @@ mod common; +use mqttrust::encoding::v4::{Connack, ConnectReturnCode}; use mqttrust::{Mqtt, QoS, SubscribeTopic}; use mqttrust_core::bbqueue::BBBuffer; use mqttrust_core::{EventLoop, MqttOptions, Notification}; @@ -44,10 +45,13 @@ fn main() { .spawn(move || loop { match nb::block!(mqtt_eventloop.connect(&mut network)) { Err(_) => continue, - Ok(true) => { + Ok(Some(Notification::ConnAck(Connack { + session_present, + code: ConnectReturnCode::Accepted, + }))) => { log::info!("Successfully connected to broker"); } - Ok(false) => {} + Ok(_) => {} } match mqtt_eventloop.yield_event(&mut network) { @@ -64,14 +68,14 @@ fn main() { thread::sleep(std::time::Duration::from_millis(5000)); mqtt_client .subscribe(&[SubscribeTopic { - topic_path: format!("{}/device/advisor", thing_name).as_str(), + topic_path: format!("plc/output/{}", thing_name).as_str(), qos: QoS::AtLeastOnce, }]) .unwrap(); mqtt_client .publish( - format!("{}/device/advisor/hello", thing_name).as_str(), + format!("plc/input/{}", thing_name).as_str(), format!("Hello from {}", thing_name).as_bytes(), QoS::AtLeastOnce, ) diff --git a/mqttrust_core/examples/secrets/identity.pfx b/mqttrust_core/examples/secrets/identity.pfx index 625ee6f..254c582 100644 Binary files a/mqttrust_core/examples/secrets/identity.pfx and b/mqttrust_core/examples/secrets/identity.pfx differ diff --git a/mqttrust_core/src/eventloop.rs b/mqttrust_core/src/eventloop.rs index b534663..64e5500 100644 --- a/mqttrust_core/src/eventloop.rs +++ b/mqttrust_core/src/eventloop.rs @@ -7,7 +7,7 @@ use core::convert::Infallible; use core::ops::DerefMut; use core::ops::RangeTo; use embedded_nal::{AddrType, Dns, SocketAddr, TcpClientStack}; -use fugit::ExtU32; +use fugit::{ExtU32, TimerDurationU32}; use heapless::{String, Vec}; use mqttrust::encoding::v4::{decode_slice, encode_slice, Connect, Packet, Protocol}; @@ -24,6 +24,7 @@ where /// Request stream pub(crate) requests: Option>, network_handle: NetworkHandle, + connect_counter: u8, } impl<'a, 'b, S, O, const TIMER_HZ: u32, const L: usize> EventLoop<'a, 'b, S, O, TIMER_HZ, L> @@ -41,6 +42,7 @@ where options, requests: Some(requests), network_handle: NetworkHandle::new(), + connect_counter: 0, } } @@ -54,7 +56,7 @@ where pub fn connect + ?Sized>( &mut self, network: &mut N, - ) -> nb::Result { + ) -> nb::Result, EventError> { // connect to the broker match self.network_handle.is_connected(network) { Ok(false) => { @@ -216,16 +218,23 @@ where } } + fn backoff(&self) -> TimerDurationU32 { + let base_time_ms: u32 = 1000; + let backoff = base_time_ms.saturating_mul(u32::pow(2, self.connect_counter as u32 - 1)); + + core::cmp::min(50.secs(), backoff.millis()) + } + fn mqtt_connect + ?Sized>( &mut self, network: &mut N, - ) -> nb::Result { + ) -> nb::Result, EventError> { match self.state.connection_status { - MqttConnectionStatus::Connected => Ok(false), + MqttConnectionStatus::Connected => Ok(None), MqttConnectionStatus::Disconnected => { - info!("MQTT connecting.."); let now = self.last_outgoing_timer.now(); self.state.last_ping_entry().insert(now); + self.connect_counter += 1; self.state.await_pingresp = false; self.network_handle.rx_buf.init(); @@ -242,6 +251,12 @@ where password, }); + info!( + "MQTT connecting.. Attempt: {}. Backoff time: {}", + self.connect_counter, + self.backoff().to_millis() + ); + // mqtt connection with timeout self.network_handle.send_packet(network, &connect)?; self.state.handle_outgoing_connect(); @@ -250,16 +265,19 @@ where MqttConnectionStatus::Handshake => { let now = self.last_outgoing_timer.now(); + let backoff_time = core::cmp::max(50.secs(), self.backoff()); + if self .state .last_ping_entry() .or_insert(now) - .has_elapsed(&now, 50.secs()) + .has_elapsed(&now, backoff_time) { return Err(nb::Error::Other(EventError::Timeout)); } - self.network_handle + let res = self + .network_handle .receive(network) .map_err(|e| e.map(EventError::Network))? .decode(&mut self.state) @@ -267,8 +285,16 @@ where if n.is_none() && p.is_none() { return Err(nb::Error::WouldBlock); } - Ok(n.map(|n| n == Notification::ConnAck).unwrap_or(false)) - }) + Ok(n) + }); + + match res { + Ok(r) => { + self.connect_counter = 0; + Ok(r) + } + Err(e) => Err(e), + } } } } @@ -417,7 +443,7 @@ impl NetworkHandle { #[derive(Debug)] struct PacketBuffer { range: RangeTo, - buffer: Vec, + buffer: Vec, } impl PacketBuffer { diff --git a/mqttrust_core/src/lib.rs b/mqttrust_core/src/lib.rs index df72d34..285bb57 100644 --- a/mqttrust_core/src/lib.rs +++ b/mqttrust_core/src/lib.rs @@ -18,6 +18,7 @@ pub use client::Client; use core::convert::TryFrom; pub use eventloop::EventLoop; use heapless::{String, Vec}; +use mqttrust::encoding::v4::Connack; pub use mqttrust::encoding::v4::{Pid, Publish, QoS, QosPid, Suback}; pub use mqttrust::*; pub use options::{Broker, MqttOptions}; @@ -39,7 +40,7 @@ pub struct PublishNotification { // #[cfg_attr(feature = "defmt-impl", derive(defmt::Format))] pub enum Notification { /// Incoming connection acknowledge - ConnAck, + ConnAck(Connack), /// Incoming publish from the broker #[cfg(not(feature = "std"))] Publish(heapless::pool::singleton::Box), diff --git a/mqttrust_core/src/state.rs b/mqttrust_core/src/state.rs index f4d1087..441c472 100644 --- a/mqttrust_core/src/state.rs +++ b/mqttrust_core/src/state.rs @@ -145,7 +145,7 @@ impl MqttState { match packet { Packet::Connack(connack) => self .handle_incoming_connack(connack) - .map(|()| (Notification::ConnAck.into(), None)), + .map(|()| (Notification::ConnAck(connack).into(), None)), Packet::Pingresp => self.handle_incoming_pingresp(), Packet::Publish(publish) => self.handle_incoming_publish(publish), Packet::Suback(suback) => self.handle_incoming_suback(suback), @@ -269,12 +269,23 @@ impl MqttState { let qospid = (publish.qos, publish.pid); #[cfg(not(feature = "std"))] - let boxed_publish = BoxedPublish::alloc().unwrap(); - #[cfg(not(feature = "std"))] - let notification = Notification::Publish(boxed_publish.init(publish.try_into().unwrap())); + let notification = if publish.payload.len() > 4096 { + error!( + "Received payload larger the {}! Sending ACK but discarding payload notification!", + 4096 + ); + None + } else { + let boxed_publish = BoxedPublish::alloc().unwrap(); + Some(Notification::Publish( + boxed_publish.init(publish.try_into().unwrap()), + )) + }; #[cfg(feature = "std")] - let notification = Notification::Publish(std::boxed::Box::new(publish.try_into().unwrap())); + let notification = Some(Notification::Publish(std::boxed::Box::new( + publish.try_into().unwrap(), + ))); let request = match qospid { (QoS::AtMostOnce, _) => None, @@ -289,7 +300,7 @@ impl MqttState { } _ => return Err(StateError::InvalidHeader), }; - Ok((Some(notification), request)) + Ok((notification, request)) } fn handle_incoming_pubrel( diff --git a/.github/scripts/da_monitor.sh b/scripts/da_monitor.sh similarity index 100% rename from .github/scripts/da_monitor.sh rename to scripts/da_monitor.sh diff --git a/scripts/rotate_secrets.sh b/scripts/rotate_secrets.sh new file mode 100755 index 0000000..d99bf21 --- /dev/null +++ b/scripts/rotate_secrets.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +if [[ -z "$DEVICE_ADVISOR_PASSWORD" ]]; then + echo "DEVICE_ADVISOR_PASSWORD environment variable must be set!" + return 1; +fi + +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +SECRETS_DIR="$SCRIPT_DIR/../mqttrust_core/examples/secrets" +THING_NAME="mqttrust" + +CERT_PATH="$SECRETS_DIR/cert.pem" +PRIV_KEY_PATH="$SECRETS_DIR/priv.key.pem" + +CERT_ARN=$(aws iot create-keys-and-certificate --set-as-active --certificate-pem-outfile $CERT_PATH --private-key-outfile $PRIV_KEY_PATH | jq -r .certificateArn); +aws iot attach-thing-principal --thing-name $THING_NAME --principal $CERT_ARN > /dev/null 2>&1 +aws iot attach-policy --policy-name Connect --target $CERT_ARN > /dev/null 2>&1 +aws iot attach-policy --policy-name Input --target $CERT_ARN > /dev/null 2>&1 +aws iot attach-policy --policy-name Output --target $CERT_ARN > /dev/null 2>&1 + +rm $SECRETS_DIR/identity.pfx + +# Generate new identity.pfx +openssl pkcs12 -export -passout pass:"$DEVICE_ADVISOR_PASSWORD" -out $SECRETS_DIR/identity.pfx -inkey $PRIV_KEY_PATH -in $CERT_PATH + +rm $CERT_PATH +rm $PRIV_KEY_PATH \ No newline at end of file