Skip to content

Commit

Permalink
fix: avoid publish until fss is set up (#1592)
Browse files Browse the repository at this point in the history
(cherry picked from commit 60f18a7)
  • Loading branch information
alter-mage authored and junfuchen99 committed Mar 30, 2024
1 parent 1f85f95 commit 9385510
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ void before(ExtensionContext context) throws Exception {
// setup fss such that it could send mqtt messages to the mock listener
FleetStatusService fleetStatusService = (FleetStatusService) kernel.locate(FLEET_STATUS_SERVICE_TOPICS);
fleetStatusService.setDeviceConfiguration(deviceConfiguration);
fleetStatusService.getIsLaunchMessageSent().set(true);
fleetStatusService.postInject();

// setup jobs helper such that it could send mqtt messages to the mock listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ void GIVEN_three_components_errored_and_recovered_THEN_fss_should_send_only_one_
//Increase this for windows testing
assertTrue(statusChange.await(30, TimeUnit.SECONDS));
// we expect a total of 5 messages, 1 Nucleus launch, 4 component status change includes:
// 1 Errored from A with B reovered, 1 Errored B, 1 Errored C, 1 recovery message for the rest of non recovery ones
// 1 Errored from A with B recovered, 1 Errored B, 1 Errored C, 1 recovery message for the rest of non recovery ones
assertEquals(5, fleetStatusDetailsList.get().size());

// the first message should be nucleus launch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ void GIVEN_kernel_launches_THEN_thing_details_and_components_terminal_states_upl
assertThat(kernel.locate(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS)::getState, eventuallyEval(is(State.RUNNING)));
assertEquals("ThingName", Coerce.toString(deviceConfiguration.getThingName()));

// we should send two status updates within 4 seconds; 1 nucleus launch and 1 component status change
assertTrue(statusChange.await(FLEET_STATUS_MESSAGE_PUBLISH_MIN_WAIT_TIME_SEC + 1L, TimeUnit.SECONDS));
// we should send two status updates within 6 seconds; 1 nucleus launch and 1 component status change
assertTrue(statusChange.await(FLEET_STATUS_MESSAGE_PUBLISH_MIN_WAIT_TIME_SEC + 4L, TimeUnit.SECONDS));
fleetStatusDetailsList.get().removeIf(f -> Trigger.NETWORK_RECONFIGURE.equals(f.getTrigger()));
assertEquals(2, fleetStatusDetailsList.get().size());
// first message is nucleus launch
Expand All @@ -122,9 +122,6 @@ void GIVEN_kernel_launches_THEN_thing_details_and_components_terminal_states_upl
@Test
void GIVEN_kernel_deployment_WHEN_device_provisioning_completes_after_kernel_has_launched_THEN_thing_details_uploaded_to_cloud()
throws Exception {
kernel.launch();
assertThat(kernel.locate(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS)::getState, eventuallyEval(is(State.RUNNING)));

deviceConfiguration = kernel.getContext().get(DeviceConfiguration.class);
deviceConfiguration.getThingName().withValue("ThingName");
deviceConfiguration.getIotDataEndpoint().withValue("xxxxxx-ats.iot.us-east-1.amazonaws.com");
Expand All @@ -135,14 +132,17 @@ void GIVEN_kernel_deployment_WHEN_device_provisioning_completes_after_kernel_has
deviceConfiguration.getAWSRegion().withValue("us-east-1");
deviceConfiguration.getIotRoleAlias().withValue("roleAliasName");

kernel.launch();
assertThat(kernel.locate(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS)::getState, eventuallyEval(is(State.RUNNING)));

assertEquals("ThingName", Coerce.toString(deviceConfiguration.getThingName()));
assertThat(() -> fleetStatusDetails.get().getThing(), eventuallyEval(is("ThingName"), Duration.ofSeconds(30)));
deviceConfiguration.getIotDataEndpoint().withValue("new-ats.iot.us-east-1.amazonaws.com");
assertEquals("new-ats.iot.us-east-1.amazonaws.com", Coerce.toString(deviceConfiguration.getIotDataEndpoint()));

// Verify have 1 publish request for each of IoTJobs, ShadowDeploymentService, and FSS
ArgumentCaptor<PublishRequest> publishRequestCaptor = ArgumentCaptor.forClass(PublishRequest.class);
verify(mqttClient, timeout(5000).times(3)).publish(publishRequestCaptor.capture());
verify(mqttClient, timeout(5000).atLeast(3)).publish(publishRequestCaptor.capture());
List<PublishRequest> publishRequests = publishRequestCaptor.getAllValues();

String IoTJobsTopic = "$aws/things/ThingName/shadow/name/AWSManagedGreengrassV2Deployment/get";
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/aws/greengrass/status/FleetStatusService.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public class FleetStatusService extends GreengrassService {
@Getter
private final AtomicBoolean isConnected = new AtomicBoolean(true);
private final AtomicBoolean isFSSSetupComplete = new AtomicBoolean(false);
@Getter
private final AtomicBoolean isLaunchMessageSent = new AtomicBoolean(false);
private final Set<GreengrassService> updatedGreengrassServiceSet =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ConcurrentHashMap<GreengrassService, Instant> serviceFssTracksMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -512,6 +514,14 @@ private void uploadFleetStatusServiceData(Set<GreengrassService> greengrassServi
OverallStatus overAllStatus,
DeploymentInformation deploymentInformation,
Trigger trigger) {
// Only allow component state change update publish if FSS set up is complete
// If set up is incomplete, it may cause a deadlock
if (!isLaunchMessageSent.get() && !Trigger.NUCLEUS_LAUNCH.equals(trigger)) {
logger.atDebug().log("Not updating fleet status data since FSS is being set up");
return;
}
isLaunchMessageSent.compareAndSet(false, true);

if (!isConnected.get() && !Trigger.isCloudDeploymentTrigger(trigger)) {
logger.atDebug().log("Not updating fleet status data since MQTT connection is interrupted");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ private FleetStatusService createFSS() {
mockKernelLifecycle, ses);
fleetStatusService.postInject();
fleetStatusService.setWaitBetweenPublishDisabled(true);
fleetStatusService.getIsLaunchMessageSent().set(true);
return fleetStatusService;
}

Expand All @@ -1054,6 +1055,7 @@ private FleetStatusService createFSSWithMockSes() {
mockKernelLifecycle, mockSes);
fleetStatusService.postInject();
fleetStatusService.setWaitBetweenPublishDisabled(true);
fleetStatusService.getIsLaunchMessageSent().set(true);
return fleetStatusService;
}

Expand All @@ -1064,6 +1066,7 @@ private FleetStatusService createFSS(int periodicUpdateIntervalSec) {
ses, periodicUpdateIntervalSec);
fleetStatusService.postInject();
fleetStatusService.setWaitBetweenPublishDisabled(true);
fleetStatusService.getIsLaunchMessageSent().set(true);
return fleetStatusService;
}
}

0 comments on commit 9385510

Please sign in to comment.