Skip to content

Commit

Permalink
feat: allow bootstrap deployments to cancel while waiting for service…
Browse files Browse the repository at this point in the history
…s to start (#1642)
  • Loading branch information
tiancishen authored Jul 22, 2024
1 parent aa13b92 commit 48a7153
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,6 @@ private boolean validateNucleusConfig(CompletableFuture<DeploymentResult> totall
return true;
}

/**
* Completes the provided future when all the listed services are running.
*
* @param servicesToTrack services to track
* @param mergeTime time the merge was started, used to check if a service is broken due to the merge
* @param kernel kernel
* @throws InterruptedException if the thread is interrupted while waiting here
* @throws ServiceUpdateException if a service could not be updated
*/
// TODO - remove this in follow up CR for cancel deployment, it's being used by kernel update deployment
public static void waitForServicesToStart(Collection<GreengrassService> servicesToTrack, long mergeTime,
Kernel kernel)
throws InterruptedException, ServiceUpdateException {
// Relying on the fact that all service lifecycle steps should have timeouts,
// assuming this loop will not get stuck waiting forever
while (!areAllServiceInDesiredState(servicesToTrack, mergeTime, kernel)) {
Thread.sleep(WAIT_SVC_START_POLL_INTERVAL_MILLISEC); // hardcoded
}
}

/**
* Completes the provided future when all the listed services are running.
* Exits early if the future is cancelled
Expand Down
40 changes: 18 additions & 22 deletions src/main/java/com/aws/greengrass/deployment/DeploymentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ protected void startup() throws InterruptedException {
// Handle IoT Jobs cancellation

if (currentDeploymentTaskMetadata != null && currentDeploymentTaskMetadata.getDeploymentType()
.equals(nextDeployment.getDeploymentType())
&& currentDeploymentTaskMetadata.isCancellable()) {
.equals(nextDeployment.getDeploymentType())) {
// Cancel the current deployment if it's an IoT Jobs deployment
// that is in progress and still cancellable.
logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
Expand All @@ -238,8 +237,7 @@ protected void startup() throws InterruptedException {
.log("Canceling current deployment");
// Send interrupt signal to the deployment task.
cancelCurrentDeployment();
} else if (currentDeploymentTaskMetadata != null
&& !currentDeploymentTaskMetadata.isCancellable()) {
} else if (currentDeploymentTaskMetadata != null) {
// Ignore the cancelling signal if the deployment is NOT cancellable any more.
logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME,
Expand Down Expand Up @@ -500,22 +498,23 @@ private void cleanupGroupData(Topics deploymentGroupTopics, Topics groupLastDepl
private void cancelCurrentDeployment() {
if (currentDeploymentTaskMetadata.getDeploymentResultFuture() != null && !currentDeploymentTaskMetadata
.getDeploymentResultFuture().isCancelled()) {
if (currentDeploymentTaskMetadata.getDeploymentResultFuture().isDone() || !currentDeploymentTaskMetadata
.isCancellable()) {
if (currentDeploymentTaskMetadata.getDeploymentResultFuture().isDone()) {
logger.atInfo().log("Deployment already finished processing or cannot be cancelled");
} else {
boolean canCancelDeploymentUpdateAction = context.get(UpdateSystemPolicyService.class)
.discardPendingUpdateAction(((DefaultDeploymentTask) currentDeploymentTaskMetadata
.getDeploymentTask()).getDeployment().getGreengrassDeploymentId());
if (!canCancelDeploymentUpdateAction) {
logger.atWarn().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME,
currentDeploymentTaskMetadata.getGreengrassDeploymentId())
.log("Cancelling deployment, changes may already have been applied. "
+ "Make a new deployment to revert the update");
DeploymentTask deploymentTask = currentDeploymentTaskMetadata.getDeploymentTask();
if (deploymentTask instanceof DefaultDeploymentTask) {
DefaultDeploymentTask defaultDeploymentTask = (DefaultDeploymentTask) deploymentTask;
context.get(UpdateSystemPolicyService.class)
.discardPendingUpdateAction(defaultDeploymentTask.getDeployment()
.getGreengrassDeploymentId());
}
logger.atWarn().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME,
currentDeploymentTaskMetadata.getGreengrassDeploymentId())
.log("Cancelling deployment, changes may already have been applied. "
+ "Make a new deployment to revert the update");

currentDeploymentTaskMetadata.getDeploymentResultFuture().cancel(true);
currentDeploymentTaskMetadata.cancel(true);
DeploymentType deploymentType = currentDeploymentTaskMetadata.getDeploymentType();
if (DeploymentType.SHADOW.equals(deploymentType) || DeploymentType.LOCAL.equals(deploymentType)) {
deploymentStatusKeeper.persistAndPublishDeploymentStatus(
Expand All @@ -539,12 +538,10 @@ private void createNewDeployment(Deployment deployment) {
.log("Received deployment in the queue");

DeploymentTask deploymentTask;
boolean cancellable = true;
if (DEFAULT.equals(deployment.getDeploymentStage())) {
deploymentTask = createDefaultNewDeployment(deployment);
} else {
deploymentTask = createKernelUpdateDeployment(deployment);
cancellable = false;
if (DeploymentType.IOT_JOBS.equals(deployment.getDeploymentType())) {
// Keep track of IoT jobs for de-duplication
IotJobsHelper.getLatestQueuedJobs().addProcessedJob(deployment.getId());
Expand Down Expand Up @@ -633,8 +630,7 @@ private void createNewDeployment(Deployment deployment) {
logger.atInfo().kv("deployment", deployment.getId()).log("Started deployment execution");

currentDeploymentTaskMetadata =
new DeploymentTaskMetadata(deployment, deploymentTask, process, new AtomicInteger(1),
cancellable);
new DeploymentTaskMetadata(deployment, deploymentTask, process, new AtomicInteger(1));
}

/*
Expand Down Expand Up @@ -701,7 +697,7 @@ private void updateDeploymentResultAsRejected(Deployment deployment, DeploymentT
CompletableFuture<DeploymentResult> process = CompletableFuture.completedFuture(result);

currentDeploymentTaskMetadata =
new DeploymentTaskMetadata(deployment, deploymentTask, process, new AtomicInteger(1), false);
new DeploymentTaskMetadata(deployment, deploymentTask, process, new AtomicInteger(1));
}

private void updateDeploymentResultAsFailed(Deployment deployment, DeploymentTask deploymentTask,
Expand All @@ -715,7 +711,7 @@ private void updateDeploymentResultAsFailed(Deployment deployment, DeploymentTas
process = CompletableFuture.completedFuture(result);
}
currentDeploymentTaskMetadata =
new DeploymentTaskMetadata(deployment, deploymentTask, process, new AtomicInteger(1), false);
new DeploymentTaskMetadata(deployment, deploymentTask, process, new AtomicInteger(1));
}

private void updateStatusDetailsFromException(Map<String, Object> statusDetails, Throwable failureCause,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ private void evaluateCancellationAndCancelDeploymentIfNeeded() {
// in that case don't add a cancellation deployment because it can't be added to the front of the queue
// we will just have to let current deployment finish
Deployment deployment = new Deployment(DeploymentType.IOT_JOBS, UUID.randomUUID().toString(), true);
if (deploymentQueue.isEmpty() && currentDeployment != null && currentDeployment.isCancellable()
if (deploymentQueue.isEmpty() && currentDeployment != null
&& DeploymentType.IOT_JOBS.equals(currentDeployment.getDeploymentType())
&& deploymentQueue.offer(deployment)) {
logger.atInfo().log("Added cancellation deployment to the queue");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static com.aws.greengrass.deployment.DeploymentConfigMerger.DEPLOYMENT_ID_LOG_KEY;
Expand All @@ -38,13 +42,14 @@ public class KernelUpdateDeploymentTask implements DeploymentTask {
private final Logger logger;
private final Deployment deployment;
private final ComponentManager componentManager;
private final CompletableFuture<DeploymentResult> deploymentResultCompletableFuture;

/**
* Constructor for DefaultDeploymentTask.
*
* @param kernel Kernel instance
* @param logger Logger instance
* @param deployment Deployment instance
* @param kernel Kernel instance
* @param logger Logger instance
* @param deployment Deployment instance
* @param componentManager ComponentManager instance
*/
public KernelUpdateDeploymentTask(Kernel kernel, Logger logger, Deployment deployment,
Expand All @@ -53,23 +58,41 @@ public KernelUpdateDeploymentTask(Kernel kernel, Logger logger, Deployment deplo
this.deployment = deployment;
this.logger = logger.dfltKv(DEPLOYMENT_ID_LOG_KEY, deployment.getGreengrassDeploymentId());
this.componentManager = componentManager;
this.deploymentResultCompletableFuture = new CompletableFuture<>();
}

@SuppressWarnings({"PMD.AvoidDuplicateLiterals"})
@Override
public DeploymentResult call() {
kernel.getContext().get(ExecutorService.class).execute(this::waitForServicesToStart);
DeploymentResult result;
try {
result = deploymentResultCompletableFuture.get();
} catch (InterruptedException | ExecutionException | CancellationException e) {
// nothing to report when deployment is cancelled
return null;
}
componentManager.cleanupStaleVersions();
return result;

}

private void waitForServicesToStart() {
Deployment.DeploymentStage stage = deployment.getDeploymentStage();
DeploymentResult result = null;
try {
List<GreengrassService> servicesToTrack =
kernel.orderedDependencies().stream().filter(GreengrassService::shouldAutoStart)
.filter(o -> !kernel.getMain().equals(o)).collect(Collectors.toList());
long mergeTimestamp = kernel.getConfig().lookup("system", "rootpath").getModtime();
logger.atDebug().kv("serviceToTrack", servicesToTrack).kv("mergeTime", mergeTimestamp)
.log("Nucleus update workflow waiting for services to complete update");
DeploymentConfigMerger.waitForServicesToStart(servicesToTrack, mergeTimestamp, kernel);

DeploymentResult result = null;
if (KERNEL_ACTIVATION.equals(stage)) {
logger.atInfo().kv("serviceToTrack", servicesToTrack).kv("mergeTime", mergeTimestamp)
.log("Nucleus update workflow waiting for services to complete update");
DeploymentConfigMerger.waitForServicesToStart(servicesToTrack, mergeTimestamp, kernel,
deploymentResultCompletableFuture);
if (deploymentResultCompletableFuture.isCancelled()) {
logger.atDebug().log("Kernel update deployment is cancelled");
} else if (KERNEL_ACTIVATION.equals(stage)) {
result = new DeploymentResult(DeploymentResult.DeploymentStatus.SUCCESSFUL, null);
} else if (KERNEL_ROLLBACK.equals(stage)) {
result = new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_ROLLBACK_COMPLETE,
Expand All @@ -78,19 +101,17 @@ public DeploymentResult call() {
result = new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_UNABLE_TO_ROLLBACK,
getDeploymentStatusDetails());
}

componentManager.cleanupStaleVersions();
return result;
} catch (InterruptedException e) {
logger.atError("deployment-interrupted", e).log();
try {
saveDeploymentStatusDetails(e);
} catch (IOException ioException) {
logger.atError().log("Failed to persist deployment error information", ioException);
if (!deploymentResultCompletableFuture.isCancelled()) {
logger.atError("deployment-interrupted", e).log();
try {
saveDeploymentStatusDetails(e);
} catch (IOException ioException) {
logger.atError().log("Failed to persist deployment error information", ioException);
}
// Interrupted workflow. Shutdown kernel and retry this stage.
kernel.shutdown(30, REQUEST_RESTART);
}
// Interrupted workflow. Shutdown kernel and retry this stage.
kernel.shutdown(30, REQUEST_RESTART);
return null;
} catch (ServiceUpdateException e) {
logger.atError("deployment-errored", e).log();
if (KERNEL_ACTIVATION.equals(stage)) {
Expand All @@ -106,16 +127,15 @@ public DeploymentResult call() {
kernel.shutdown(30, REQUEST_RESTART);
} catch (IOException ioException) {
logger.atError().log("Failed to set up Nucleus rollback directory", ioException);
return new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_UNABLE_TO_ROLLBACK, e);
result = new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_UNABLE_TO_ROLLBACK, e);
}
return null;
} else if (KERNEL_ROLLBACK.equals(stage) || ROLLBACK_BOOTSTRAP.equals(stage)) {
logger.atError().log("Nucleus update workflow failed on rollback", e);
return new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_UNABLE_TO_ROLLBACK,
result = new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_UNABLE_TO_ROLLBACK,
getDeploymentStatusDetails());
}
return null;
}
deploymentResultCompletableFuture.complete(result);
}

private void saveDeploymentStatusDetails(Throwable failureCause) throws IOException {
Expand All @@ -141,4 +161,9 @@ private DeploymentException getDeploymentStatusDetails() {

return new DeploymentException(deployment.getStageDetails(), errorStack, errorTypes);
}

@Override
public void cancel() {
deploymentResultCompletableFuture.cancel(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@
public interface DeploymentTask extends Callable<DeploymentResult> {
@Override
DeploymentResult call() throws InterruptedException;

default void cancel() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ public class DeploymentTaskMetadata {
private Future<DeploymentResult> deploymentResultFuture;
@NonNull @Getter
private AtomicInteger deploymentAttemptCount;
@NonNull @Getter
private boolean cancellable;

@Synchronized
public void setDeploymentResultFuture(Future<DeploymentResult> deploymentResultFuture) {
Expand Down Expand Up @@ -61,4 +59,9 @@ public DeploymentDocument getDeploymentDocument() {
public List<String> getRootPackages() {
return this.getDeploymentDocument().getRootPackages();
}

public void cancel(boolean mayInterruptIfRunning) {
deploymentTask.cancel();
deploymentResultFuture.cancel(mayInterruptIfRunning);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,11 @@ void GIVEN_waitForServicesToStart_WHEN_service_reached_desired_state_THEN_return
doAnswer((invocation) -> mockReachedDesiredState.get()).when(mockService).reachedDesiredState();

CountDownLatch serviceStarted = new CountDownLatch(1);
CompletableFuture<DeploymentResult> future = new CompletableFuture<>();
new Thread(() -> {
try {
DeploymentConfigMerger.waitForServicesToStart(newOrderedSet(mockService), System.currentTimeMillis(),
kernel);
kernel, future);
serviceStarted.countDown();
} catch (ServiceUpdateException | InterruptedException e) {
logger.error("Fail in waitForServicesToStart", e);
Expand All @@ -274,6 +275,7 @@ void GIVEN_waitForServicesToStart_WHEN_service_reached_desired_state_THEN_return

// THEN
assertTrue(serviceStarted.await(3 * WAIT_SVC_START_POLL_INTERVAL_MILLISEC, TimeUnit.MILLISECONDS));
assertFalse(future.isDone());
}

@Test
Expand All @@ -290,15 +292,19 @@ void GIVEN_waitForServicesToStart_WHEN_service_is_broken_after_merge_THEN_throw(
when(brokenService.getState()).thenReturn(State.BROKEN);
when(brokenService.getStateModTime()).thenReturn(stateModTime);

CompletableFuture<DeploymentResult> future1 = new CompletableFuture<>();
assertThrows(ServiceUpdateException.class, () -> {
DeploymentConfigMerger.waitForServicesToStart(newOrderedSet(normalService, brokenService), mergeTime,
kernel);
kernel, future1);
});
assertFalse(future1.isDone());

CompletableFuture<DeploymentResult> future2 = new CompletableFuture<>();
assertThrows(ServiceUpdateException.class, () -> {
DeploymentConfigMerger.waitForServicesToStart(newOrderedSet(brokenService, normalService), mergeTime,
kernel);
kernel, future2);
});
assertFalse(future2.isDone());
}

@Test
Expand Down
Loading

0 comments on commit 48a7153

Please sign in to comment.