Skip to content

Commit

Permalink
fix(mqtt): backoff and retry spooler connection failure (#1548)
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored Oct 17, 2023
1 parent eed08e2 commit 4db3bfb
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
24 changes: 17 additions & 7 deletions src/main/java/com/aws/greengrass/mqttclient/MqttClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_ROOT_CA_PATH;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_THING_NAME;
import static com.aws.greengrass.mqttclient.AwsIotMqttClient.TOPIC_KEY;
import static com.aws.greengrass.util.RetryUtils.RANDOM;

@SuppressWarnings({"PMD.AvoidDuplicateLiterals"})
public class MqttClient implements Closeable {
Expand Down Expand Up @@ -813,7 +814,21 @@ protected void runSpooler() {
Set<CompletableFuture<?>> publishRequests = ConcurrentHashMap.newKeySet();
while (!Thread.currentThread().isInterrupted()) {
try {
getConnection(false).connect().get();
try {
getConnection(false).connect().get();
} catch (ExecutionException e) {
if (Utils.getUltimateCause(e) instanceof InterruptedException) {
logger.atWarn().log("Shutting down spooler task");
Thread.currentThread().interrupt();
break;
} else {
logger.atError().log("Error when publishing from spooler", e.getCause());
// Do not retry connecting immediately, most likely it would fail. Backoff and retry later.
// It fails when PKCS11 is not available, for example, so retrying that immediately
// will just spam the logs.
Thread.sleep(Duration.ofMinutes(2).toMillis() + RANDOM.nextInt(10_000));
}
}
while (mqttOnline.get()) {
synchronized (publishRequests) {
// Wait for number of outstanding requests to decrease
Expand Down Expand Up @@ -851,14 +866,9 @@ protected void runSpooler() {
});
}
break;
} catch (ExecutionException e) {
logger.atError().log("Error when publishing from spooler", e);
if (Utils.getUltimateCause(e) instanceof InterruptedException) {
logger.atWarn().log("Shutting down spooler task");
break;
}
} catch (InterruptedException e) {
logger.atWarn().log("Shutting down spooler task");
Thread.currentThread().interrupt();
break;
} catch (Throwable ex) {
logger.atError().log("Unchecked error when publishing from spooler", ex);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/aws/greengrass/util/RetryUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

public class RetryUtils {

private static final Random RANDOM = new Random();
public static final Random RANDOM = new Random();
private static final int LOG_ON_FAILURE_COUNT = 20;

// Need this to make spotbug check happy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ void GIVEN_keep_qos_0_when_offline_is_false_and_mqtt_is_online_WHEN_publish_THEN
assertEquals(0, future.get());
verify(spool, times(1)).addMessage(request.toPublish());
verify(spool, never()).getSpoolConfig();
Thread.interrupted(); // Clear interrupt flag set by throwing InterruptedException
}

@Test
Expand Down Expand Up @@ -887,6 +888,7 @@ void GIVEN_spool_pop_id_interrupted_WHEN_spool_message_THEN_stop_spooling_messag
// The 3rd call is to trigger Interrupted Exception and exit the loop
verify(spool, times(2)).popId();
verify(client, times(2)).publishSingleSpoolerMessage(awsIotMqttClient);
Thread.interrupted(); // Clear interrupt flag set by throwing InterruptedException
}

@Test
Expand Down Expand Up @@ -922,6 +924,7 @@ void GIVEN_publish_request_execution_exception_WHEN_spool_message_THEN_continue_
// The 3rd call is to trigger Interrupted Exception and exit the loop
verify(spool, times(3)).popId();
verify(client, times(3)).publishSingleSpoolerMessage(awsIotMqttClient);
Thread.interrupted(); // Clear interrupt flag set by throwing InterruptedException
}


Expand Down Expand Up @@ -958,6 +961,7 @@ void GIVEN_connection_resumed_WHEN_callback_THEN_start_spool_messages(ExtensionC

verify(spool).getSpoolConfig();
verify(spool).popOutMessagesWithQosZero();
Thread.interrupted(); // Clear interrupt flag set by throwing InterruptedException
}

@Test
Expand Down

0 comments on commit 4db3bfb

Please sign in to comment.