Skip to content

Commit

Permalink
fix: shutdown external services with the old version of the shutdown …
Browse files Browse the repository at this point in the history
…command (#1327)
  • Loading branch information
MikeDombo committed Oct 1, 2022
1 parent 628b55c commit aae036f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ void GIVEN_running_service_WHEN_install_config_changes_THEN_service_reinstalls()
}

@Test
void GIVEN_running_service_WHEN_version_config_changes_THEN_service_reinstalls() throws Exception {
void GIVEN_running_service_WHEN_version_config_changes_THEN_service_reinstalls_and_prev_version_shutdown_is_used()
throws Exception {
ConfigPlatformResolver.initKernelWithMultiPlatformConfig(kernel,
getClass().getResource("service_with_dynamic_config.yaml"));
CountDownLatch mainRunning = new CountDownLatch(1);
Expand All @@ -363,13 +364,50 @@ void GIVEN_running_service_WHEN_version_config_changes_THEN_service_reinstalls()

CountDownLatch serviceReinstalled = new CountDownLatch(1);
kernel.getContext().addGlobalStateChangeListener((serviceToListenTo, oldState, newState) -> {
if ("service_with_dynamic_config".equals(serviceToListenTo.getName()) && State.NEW.equals(newState)) {
if ("service_with_dynamic_config".equals(serviceToListenTo.getName()) && State.RUNNING.equals(newState)) {
serviceReinstalled.countDown();
}
});
service.getServiceConfig().find(VERSION_CONFIG_KEY).withValue("1.0.1");

assertTrue(serviceReinstalled.await(60, TimeUnit.SECONDS));
// Validate that when we shutdown it uses the old version's shutdown command, not the new value
CompletableFuture<Void> componentShutdown = new CompletableFuture<>();
try (AutoCloseable a = createCloseableLogListener((m) -> {
if (!m.getLoggerName().equals("service_with_dynamic_config")) {
return;
}
if (m.getMessage().contains("shutdown v1.0.1")) {
componentShutdown.completeExceptionally(
new AssertionError("v1.0.1 shutdown was used instead of v1.0.0"));
} else if (m.getMessage().contains("shutdown v1.0.0")) {
componentShutdown.complete(null);
}
})) {
kernel.getContext().runOnPublishQueueAndWait(() -> {
service.getServiceConfig().find(VERSION_CONFIG_KEY).withValue("1.0.1");
service.getServiceConfig().find(SERVICE_LIFECYCLE_NAMESPACE_TOPIC, Lifecycle.LIFECYCLE_SHUTDOWN_NAMESPACE_TOPIC)
.withValue("echo shutdown v1.0.1");
});

assertTrue(serviceReinstalled.await(60, TimeUnit.SECONDS));
componentShutdown.get(0, TimeUnit.SECONDS);
}

// Now when we shut down it should use the latest version which is 1.0.1
CompletableFuture<Void> componentShutdown2 = new CompletableFuture<>();
try (AutoCloseable a = createCloseableLogListener((m) -> {
if (!m.getLoggerName().equals("service_with_dynamic_config")) {
return;
}
if (m.getMessage().contains("shutdown v1.0.0")) {
componentShutdown2.completeExceptionally(
new AssertionError("v1.0.0 shutdown was used instead of v1.0.1"));
} else if (m.getMessage().contains("shutdown v1.0.1")) {
componentShutdown2.complete(null);
}
})) {
kernel.locate("service_with_dynamic_config").requestStop();
componentShutdown2.get(5, TimeUnit.SECONDS);
}
}

@Test
Expand All @@ -384,7 +422,7 @@ void GIVEN_running_service_WHEN_run_config_changes_THEN_service_restarts() throw
});
kernel.launch();

assertTrue(mainRunning.await(5, TimeUnit.SECONDS));
assertTrue(mainRunning.await(20, TimeUnit.SECONDS));

GenericExternalService service = spy((GenericExternalService) kernel.locate("service_with_dynamic_config"));
assertEquals(State.RUNNING, service.getState());
Expand Down Expand Up @@ -426,7 +464,7 @@ void GIVEN_running_service_WHEN_setenv_config_changes_THEN_service_restarts() th
});
service.getServiceConfig().find(SETENV_CONFIG_NAMESPACE, "my_env_var").withValue("var2");

assertTrue(serviceRestarted.await(5, TimeUnit.SECONDS));
assertTrue(serviceRestarted.await(35, TimeUnit.SECONDS));
}

@Test
Expand Down Expand Up @@ -609,7 +647,7 @@ void GIVEN_service_starts_up_WHEN_service_breaks_THEN_status_details_persisted_f
AtomicReference<Long> timestamp = new AtomicReference<>(System.currentTimeMillis());
kernel.launch();

assertTrue(serviceBrokenLatch.await(15, TimeUnit.SECONDS));
assertTrue(serviceBrokenLatch.await(55, TimeUnit.SECONDS));
assertTrue(serviceErroredLatch.await(15, TimeUnit.SECONDS));
componentStatus.forEach(status -> {
assertThat(status.getRight(), greaterThan(timestamp.getAndSet(status.getRight())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ services:
lifecycle:
install:
posix: |-
echo "install as `id -u -n`"
rm -f ./install-file
rm -f ./run-file
touch ./install-file
echo "install as `id -u -n`"
run:
posix: |-
echo "run as `id -u -n`"
touch ./run-file
echo "run as `id -u -n`"
version: 1.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ services:
echo "Running service_with_dynamic_config" && sleep 1000
windows:
powershell -command echo \"Running service_with_dynamic_config\"; sleep 1000
shutdown:
all:
echo shutdown v1.0.0
version: 1.0.0
setenv:
my_env_var: var1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.aws.greengrass.logging.api.LogEventBuilder;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.CrashableSupplier;
import com.aws.greengrass.util.Exec;
import com.aws.greengrass.util.Utils;
import com.aws.greengrass.util.platforms.Platform;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class GenericExternalService extends GreengrassService {
@Inject
protected RunWithPathOwnershipHandler ownershipHandler;
protected RunWith runWith;
protected volatile RunResult shutdownExecCache = null;

private final AtomicBoolean paused = new AtomicBoolean();

Expand Down Expand Up @@ -353,6 +355,13 @@ protected synchronized void install() throws InterruptedException {
// to operate properly
@Override
protected synchronized void startup() throws InterruptedException {
// Cache the proper shutdown command right when we're starting up (if desired).
// This guarantees that the shutdown command that we eventually run will be the same
// shutdown command which *should* be applied to *this* startup.
// If the component version changes, we will restart by shutting down. Without caching
// the shutdown command, we'd end up shutting down this component using the *new*
// shutdown command, and not the one which is associated with the current startup.
cacheShutdownExec();
stopAllLifecycleProcesses();

long startingStateGeneration = getStateGeneration();
Expand Down Expand Up @@ -390,6 +399,20 @@ protected synchronized void startup() throws InterruptedException {
}
}

@SuppressWarnings("PMD.NullAssignment")
protected void cacheShutdownExec() throws InterruptedException {
if (!shouldCacheShutdownExec()) {
shutdownExecCache = null;
return;
}
// Cache the Exec without calling it
shutdownExecCache = run(Lifecycle.LIFECYCLE_SHUTDOWN_NAMESPACE_TOPIC, null, lifecycleProcesses, false);
}

protected boolean shouldCacheShutdownExec() {
return true;
}

/**
* Pause a running component.
*
Expand Down Expand Up @@ -534,7 +557,13 @@ protected synchronized void shutdown() {
}

try {
run(Lifecycle.LIFECYCLE_SHUTDOWN_NAMESPACE_TOPIC, null, lifecycleProcesses);
RunResult cached = shutdownExecCache;
if (shouldCacheShutdownExec() && cached != null && cached.getDoExec() != null) {
logger.atDebug().log("Using cached shutdown command");
cached.getDoExec().apply();
} else {
run(Lifecycle.LIFECYCLE_SHUTDOWN_NAMESPACE_TOPIC, null, lifecycleProcesses);
}
} catch (InterruptedException ex) {
logger.atWarn("generic-service-shutdown").log("Thread interrupted while shutting down service");
} finally {
Expand Down Expand Up @@ -631,34 +660,41 @@ protected boolean updateComponentPathOwner() {
}
}

protected RunResult run(String name, IntConsumer background, List<Exec> trackingList)
throws InterruptedException {
return run(name, background, trackingList, true);
}

/**
* Run one of the commands defined in the config on the command line.
*
* @param name name of the command to run ("run", "install", "startup", "bootstrap").
* @param background IntConsumer to and run the command as background process and receive the exit code. If null,
* the command will run as a foreground process and blocks indefinitely.
* @param trackingList List used to track running processes.
* @param name name of the command to run ("run", "install", "startup", "bootstrap").
* @param background IntConsumer to and run the command as background process and receive the exit code. If
* null, the command will run as a foreground process and blocks indefinitely.
* @param trackingList List used to track running processes.
* @param runImmediately True if the command should be run immediately, false to construct without running
* @return the status of the run and the Exec.
*/
protected RunResult run(String name, IntConsumer background, List<Exec> trackingList)
protected RunResult run(String name, IntConsumer background, List<Exec> trackingList, boolean runImmediately)
throws InterruptedException {
Node n = (getLifecycleTopic() == null) ? null : getLifecycleTopic().getChild(name);
if (n == null) {
return new RunResult(RunStatus.NothingDone, null, null);
}

if (n instanceof Topic) {
return run(name, (Topic) n, Coerce.toString(n), background, trackingList, isPrivilegeRequired(name));
return run(name, (Topic) n, Coerce.toString(n), background,
trackingList, isPrivilegeRequired(name), runImmediately);
}
if (n instanceof Topics) {
return run(name, (Topics) n, background, trackingList, isPrivilegeRequired(name));
return run(name, (Topics) n, background, trackingList, isPrivilegeRequired(name), runImmediately);
}
return new RunResult(RunStatus.NothingDone, null, null);
}

@SuppressWarnings("PMD.CloseResource")
protected RunResult run(String name, Topic t, String cmd, IntConsumer background, List<Exec> trackingList,
boolean requiresPrivilege) throws InterruptedException {
boolean requiresPrivilege, boolean runImmediately) throws InterruptedException {
if (runWith == null) {
Optional<RunWith> opt = computeRunWithConfiguration();
if (!opt.isPresent()) {
Expand Down Expand Up @@ -698,19 +734,28 @@ protected RunResult run(String name, Topic t, String cmd, IntConsumer background
exec = addShell(exec);

addEnv(exec, t.parent);
logger.atDebug().setEventType("generic-service-run").log();

// Track all running processes that we fork
if (exec.isRunning()) {
trackingList.add(exec);
Exec finalExec = exec;
CrashableSupplier<RunStatus, InterruptedException> doRun = () -> {
logger.atDebug().setEventType("generic-service-run").log();
// Track all running processes that we fork
if (finalExec.isRunning()) {
trackingList.add(finalExec);
}
return shellRunner.successful(finalExec, t.getFullName(),
background, this) ? RunStatus.OK : RunStatus.Errored;
};

if (runImmediately) {
RunStatus ret = doRun.apply();
return new RunResult(ret, exec, null);
} else {
return new RunResult(null, exec, null, doRun);
}
RunStatus ret =
shellRunner.successful(exec, t.getFullName(), background, this) ? RunStatus.OK : RunStatus.Errored;
return new RunResult(ret, exec, null);
}

protected RunResult run(String name, Topics t, IntConsumer background, List<Exec> trackingList,
boolean requiresPrivilege)
boolean requiresPrivilege, boolean runImmediately)
throws InterruptedException {
try {
if (shouldSkip(t)) {
Expand All @@ -723,7 +768,8 @@ protected RunResult run(String name, Topics t, IntConsumer background, List<Exec

Node script = t.getChild("script");
if (script instanceof Topic) {
return run(name, (Topic) script, Coerce.toString(script), background, trackingList, requiresPrivilege);
return run(name, (Topic) script, Coerce.toString(script), background, trackingList, requiresPrivilege,
runImmediately);
} else {
logger.atError().setEventType("generic-service-invalid-config").addKeyValue(CONFIG_NODE, t.getFullName())
.log("Missing script");
Expand Down Expand Up @@ -839,5 +885,12 @@ private static class RunResult {
private RunStatus runStatus;
private Exec exec;
private ComponentStatusCode statusCode;
private CrashableSupplier<RunStatus, InterruptedException> doExec;

RunResult(RunStatus runStatus, Exec exec, ComponentStatusCode statusCode) {
this.runStatus = runStatus;
this.exec = exec;
this.statusCode = statusCode;
}
}
}

0 comments on commit aae036f

Please sign in to comment.