Skip to content

Commit

Permalink
chore: refactor component update event handlers (#1526)
Browse files Browse the repository at this point in the history
  • Loading branch information
shaguptashaikh authored Oct 2, 2023
1 parent 07502bd commit 8a08a8a
Showing 1 changed file with 133 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.inject.Inject;

import static com.aws.greengrass.ipc.IPCEventStreamService.DEFAULT_STREAM_MESSAGE_TIMEOUT_SECONDS;
Expand All @@ -68,10 +69,15 @@ public class LifecycleIPCEventStreamAgent {
private static final String COMPONENT_NAME = "componentName";
private static final Logger log = LogManager.getLogger(LifecycleIPCEventStreamAgent.class);

// Listeners registered from generic external components (through IPC client)
@Getter(AccessLevel.PACKAGE)
private final ConcurrentHashMap<String, Set<StreamEventPublisher<ComponentUpdatePolicyEvents>>>
componentUpdateListeners = new ConcurrentHashMap<>();

// Listeners registered from plugins
private final ConcurrentHashMap<String, Set<Consumer<ComponentUpdatePolicyEvents>>>
componentUpdateListenersInternal = new ConcurrentHashMap<>();

// When a PreComponentUpdateEvent is pushed to components, a future is created for each component. When the
// component responds with DeferComponentUpdateRequest the future is marked as complete. The caller of
// sendPreComponentUpdateEvent will have reference to the set of futures.
Expand Down Expand Up @@ -181,19 +187,20 @@ protected void onStreamClosed() {
public SubscribeToComponentUpdatesResponse handleRequest(SubscribeToComponentUpdatesRequest request) {
return translateExceptions(() -> {
try {
kernel.locate(serviceName);
subscribeToComponentUpdate(serviceName, () -> {
componentUpdateListeners.putIfAbsent(serviceName, new HashSet<>());
componentUpdateListeners.get(serviceName).add(this);
});
} catch (ServiceLoadException e) {
log.atWarn().kv(COMPONENT_NAME, serviceName).log("Got subscribe to component update request from a "
+ "component that is not found in Greengrass");
log.atWarn().kv(COMPONENT_NAME, serviceName)
.log("Got subscribe to component update request from a component that is"
+ " not found in Greengrass");
ResourceNotFoundError rnf = new ResourceNotFoundError();
rnf.setMessage("Component with given name not found currently in Greengrass");
rnf.setResourceType("Component");
rnf.setResourceName(serviceName);
throw rnf;
}
componentUpdateListeners.putIfAbsent(serviceName, new HashSet<>());
componentUpdateListeners.get(serviceName).add(this);
log.atDebug().log("{} subscribed to component update", serviceName);
return SubscribeToComponentUpdatesResponse.VOID;
});
}
Expand All @@ -205,6 +212,30 @@ public void handleStreamEvent(EventStreamJsonMessage streamRequestEvent) {

}


/**
* Subscribe to component update events internally, e.g. from a plugin.
*
* @param serviceName name of the service that is subscribing
* @param updateEventCallback callback to invoke for sending update events
* @throws ServiceLoadException when the requesting service cannot be located
*/
public void subscribeToComponentUpdateInternal(String serviceName,
Consumer<ComponentUpdatePolicyEvents> updateEventCallback)
throws ServiceLoadException {
subscribeToComponentUpdate(serviceName, () -> {
componentUpdateListenersInternal.putIfAbsent(serviceName, new HashSet<>());
componentUpdateListenersInternal.get(serviceName).add(updateEventCallback);
});
}

private void subscribeToComponentUpdate(String serviceName, Runnable recordListener) throws ServiceLoadException {
kernel.locate(serviceName);
recordListener.run();

log.atDebug().log("{} subscribed to component update", serviceName);
}

class DeferComponentUpdateHandler extends GeneratedAbstractDeferComponentUpdateOperationHandler {

private final String serviceName;
Expand All @@ -223,22 +254,7 @@ protected void onStreamClosed() {
public DeferComponentUpdateResponse handleRequest(DeferComponentUpdateRequest request) {
return translateExceptions(() -> {
// TODO: [P32540011]: All IPC service requests need input validation
if (!componentUpdateListeners.containsKey(serviceName)) {
throw new InvalidArgumentsError("Component is not subscribed to component update events");
}
if (request.getDeploymentId() == null) {
throw new InvalidArgumentsError("Cannot defer the update, the deployment ID provided was null");
}

CompletableFuture<DeferComponentUpdateRequest> deferComponentUpdateRequestFuture =
deferUpdateFuturesMap.remove(new Pair<>(serviceName, request.getDeploymentId()));
if (deferComponentUpdateRequestFuture == null) {
throw new ServiceError("Time limit to respond to PreComponentUpdateEvent exceeded");
} else {
log.atDebug().log("Processing deployment deferral from {} for deployment {}", serviceName,
request.getDeploymentId());
deferComponentUpdateRequestFuture.complete(request);
}
deferComponentUpdate(request, serviceName);
return new DeferComponentUpdateResponse();
});
}
Expand All @@ -249,6 +265,33 @@ public void handleStreamEvent(EventStreamJsonMessage streamRequestEvent) {
}
}

/**
* Defer a component update.
*
* @param request DeferComponentUpdateRequest object
* @param serviceName nam of the service deferring the update
* @throws InvalidArgumentsError if service name or deployment id inputs are invalid
*/
public void deferComponentUpdate(DeferComponentUpdateRequest request, String serviceName) {
if (!componentUpdateListeners.containsKey(serviceName) && !componentUpdateListenersInternal.containsKey(
serviceName)) {
throw new InvalidArgumentsError("Component is not subscribed to component update events");
}
if (request.getDeploymentId() == null) {
throw new InvalidArgumentsError("Cannot defer the update, the deployment ID provided was null");
}

CompletableFuture<DeferComponentUpdateRequest> deferComponentUpdateRequestFuture =
deferUpdateFuturesMap.remove(new Pair<>(serviceName, request.getDeploymentId()));
if (deferComponentUpdateRequestFuture == null) {
throw new ServiceError("Time limit to respond to PreComponentUpdateEvent exceeded");
} else {
log.atDebug().log("Processing deployment deferral from {} for deployment {}", serviceName,
request.getDeploymentId());
deferComponentUpdateRequestFuture.complete(request);
}
}

/**
* Signal components about pending component updates.
*
Expand All @@ -259,13 +302,13 @@ public List<Future<DeferComponentUpdateRequest>> sendPreComponentUpdateEvent(
PreComponentUpdateEvent preComponentUpdateEvent) {
List<Future<DeferComponentUpdateRequest>> deferUpdateFutures = new ArrayList<>();
discardDeferComponentUpdateFutures();

// For callbacks registered by generic external components
for (Map.Entry<String, Set<StreamEventPublisher<ComponentUpdatePolicyEvents>>> entry : componentUpdateListeners
.entrySet()) {
String serviceName = entry.getKey();
entry.getValue().forEach(subscribeHandler -> {
log.atTrace().kv(COMPONENT_NAME, serviceName).log("Sending preComponentUpdate event");
ComponentUpdatePolicyEvents componentUpdatePolicyEvents = new ComponentUpdatePolicyEvents();
componentUpdatePolicyEvents.setPreUpdateEvent(preComponentUpdateEvent);
ComponentUpdatePolicyEvents events = makePreUpdateEvents(serviceName, preComponentUpdateEvent);

CompletableFuture<DeferComponentUpdateRequest> deferUpdateFuture = new CompletableFuture<>();
// If there are multiple pre component events sent to same service, we will store the latest future
Expand All @@ -277,7 +320,7 @@ public List<Future<DeferComponentUpdateRequest>> sendPreComponentUpdateEvent(
deferUpdateFuturesMap.put(serviceAndDeployment, deferUpdateFuture);

try {
subscribeHandler.sendStreamEvent(componentUpdatePolicyEvents)
subscribeHandler.sendStreamEvent(events)
.get(DEFAULT_STREAM_MESSAGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.atError().setCause(e).kv(COMPONENT_NAME, serviceName)
Expand All @@ -288,33 +331,93 @@ public List<Future<DeferComponentUpdateRequest>> sendPreComponentUpdateEvent(
deferUpdateFutures.add(deferUpdateFuture);
});
}

// For internal callbacks registered by plugins
for (Map.Entry<String, Set<Consumer<ComponentUpdatePolicyEvents>>> entry : componentUpdateListenersInternal
.entrySet()) {
String serviceName = entry.getKey();
entry.getValue().forEach(subscribeHandler -> {
ComponentUpdatePolicyEvents events = makePreUpdateEvents(serviceName, preComponentUpdateEvent);

CompletableFuture<DeferComponentUpdateRequest> deferUpdateFuture = new CompletableFuture<>();
// If there are multiple pre component events sent to same service, we will store the latest future
// As the update should be waiting for the latest one to complete.
Pair<String, String> serviceAndDeployment =
new Pair<>(serviceName, preComponentUpdateEvent.getDeploymentId());
// Save to the map before sending the event so that it will be there if
// they respond very quickly
deferUpdateFuturesMap.put(serviceAndDeployment, deferUpdateFuture);

try {
subscribeHandler.accept(events);
} catch (Exception e) {
log.atError().setCause(e).kv(COMPONENT_NAME, serviceName)
.log("Failed to send the pre component update on stream");
deferUpdateFuturesMap.remove(serviceAndDeployment);
return;
}
deferUpdateFutures.add(deferUpdateFuture);
});
}

return deferUpdateFutures;
}

private ComponentUpdatePolicyEvents makePreUpdateEvents(String serviceName,
PreComponentUpdateEvent preComponentUpdateEvent) {
log.atTrace().kv(COMPONENT_NAME, serviceName).log("Sending preComponentUpdate event");
ComponentUpdatePolicyEvents componentUpdatePolicyEvents = new ComponentUpdatePolicyEvents();
componentUpdatePolicyEvents.setPreUpdateEvent(preComponentUpdateEvent);
return componentUpdatePolicyEvents;
}

private ComponentUpdatePolicyEvents makePostUpdateEvents(String serviceName,
PostComponentUpdateEvent postComponentUpdateEvent) {
ComponentUpdatePolicyEvents componentUpdatePolicyEvents = new ComponentUpdatePolicyEvents();
log.atDebug().kv(COMPONENT_NAME, serviceName).log("Sending postComponentUpdate event");
componentUpdatePolicyEvents.setPostUpdateEvent(postComponentUpdateEvent);
return componentUpdatePolicyEvents;
}

/**
* Signal component that updates are complete.
*
* @param postComponentUpdateEvent event sent to subscribed components
*/
@SuppressWarnings("PMD.AvoidCatchingGenericException")
public void sendPostComponentUpdateEvent(PostComponentUpdateEvent postComponentUpdateEvent) {
// For callbacks registered by generic external components
for (Map.Entry<String, Set<StreamEventPublisher<ComponentUpdatePolicyEvents>>> entry : componentUpdateListeners
.entrySet()) {
String serviceName = entry.getKey();
entry.getValue().forEach(subscribeHandler -> {
ComponentUpdatePolicyEvents componentUpdatePolicyEvents = new ComponentUpdatePolicyEvents();
log.atDebug().kv(COMPONENT_NAME, serviceName).log("Sending postComponentUpdate event");
componentUpdatePolicyEvents.setPostUpdateEvent(postComponentUpdateEvent);
ComponentUpdatePolicyEvents events = makePostUpdateEvents(serviceName, postComponentUpdateEvent);

try {
subscribeHandler.sendStreamEvent(componentUpdatePolicyEvents)
subscribeHandler.sendStreamEvent(events)
.get(DEFAULT_STREAM_MESSAGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.atError().setCause(e).kv(COMPONENT_NAME, serviceName)
.log("Failed to send the post component update on stream");
}
});
}

// For internal callbacks registered by plugins
for (Map.Entry<String, Set<Consumer<ComponentUpdatePolicyEvents>>> entry : componentUpdateListenersInternal
.entrySet()) {
String serviceName = entry.getKey();
entry.getValue().forEach(subscribeHandler -> {
ComponentUpdatePolicyEvents events = makePostUpdateEvents(serviceName, postComponentUpdateEvent);

try {
subscribeHandler.accept(events);
} catch (Exception e) {
log.atError().setCause(e).kv(COMPONENT_NAME, serviceName)
.log("Failed to send the post component update on stream");
}
});
}
}

/**
Expand Down

0 comments on commit 8a08a8a

Please sign in to comment.