Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,20 @@ Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(

Predicate<Entry<TaskExecutorID, TaskExecutorState>> isAssigned =
e -> e.getValue().isAssigned();

/***
* Move the given task executor to the zombie collection.
* Zombie collection contains the task executors that failes to report heartbeat within a threshold and are ready to be permanently removed.
*
* @param taskExecutorID TaskExecutorID
*/
void markAsZombie(TaskExecutorID taskExecutorID);

/**
* True indicate the given task executor is already marked as zombie.
*
* @param taskExecutorID TaskExecutorID
* @return True indicates the task executor is zombie.
*/
boolean isZombie(TaskExecutorID taskExecutorID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ private TaskExecutorGroupKey findBestFitGroupOrDefault(SchedulingConstraints con

private final AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook;

// TODO(Gigi): add metrics and actions later
private final Cache<TaskExecutorID, TaskExecutorState> zombieState = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(30, TimeUnit.MINUTES)
.removalListener(notification ->
log.info("Archived TaskExecutor: {} removed due to: {}", notification.getKey(), notification.getCause()))
.build();

ExecutorStateManagerImpl(Map<String, String> schedulingAttributes) {
this.schedulingAttributes = schedulingAttributes;
this.fitnessCalculator = new CpuWeightedFitnessCalculator();
Expand Down Expand Up @@ -276,6 +284,22 @@ public TaskExecutorState archive(TaskExecutorID taskExecutorID) {
}
}

@Override
public void markAsZombie(TaskExecutorID taskExecutorID) {
final TaskExecutorState taskExecutorState = this.archivedState.getIfPresent(taskExecutorID);
if (taskExecutorState != null) {
this.archivedState.invalidate(taskExecutorID);
this.zombieState.put(taskExecutorID, taskExecutorState);
} else {
log.warn("TaskExecutor: {} not found from the archived", taskExecutorID);
}
}

@Override
public boolean isZombie(TaskExecutorID taskExecutorID) {
return this.zombieState.getIfPresent(taskExecutorID) != null;
}

@Override
public List<TaskExecutorID> getTaskExecutors(Predicate<Entry<TaskExecutorID, TaskExecutorState>> predicate) {
return this.taskExecutorStateMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.spectator.api.TagList;
import io.mantisrx.common.Ack;
import io.mantisrx.common.WorkerConstants;
import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory;
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest;
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesResponse;
import io.mantisrx.master.scheduler.FitnessCalculator;
Expand Down Expand Up @@ -142,6 +143,7 @@ public SupervisorStrategy supervisorStrategy() {
private final String jobClustersWithArtifactCachingEnabled;

private final boolean isJobArtifactCachingEnabled;
private final Duration zombieDetectThreshold;

static Props props(
final ClusterID clusterID,
Expand All @@ -158,7 +160,8 @@ static Props props(
boolean isJobArtifactCachingEnabled,
Map<String, String> schedulingAttributes,
FitnessCalculator fitnessCalculator,
AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook
AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook,
final Duration zombieDetectThreshold
) {
return Props.create(
ResourceClusterActor.class,
Expand All @@ -176,7 +179,8 @@ static Props props(
isJobArtifactCachingEnabled,
schedulingAttributes,
fitnessCalculator,
availableTaskExecutorMutatorHook
availableTaskExecutorMutatorHook,
zombieDetectThreshold
).withMailbox("akka.actor.metered-mailbox");
}

Expand All @@ -194,7 +198,8 @@ static Props props(
String jobClustersWithArtifactCachingEnabled,
boolean isJobArtifactCachingEnabled,
Map<String, String> schedulingAttributes,
FitnessCalculator fitnessCalculator
FitnessCalculator fitnessCalculator,
final Duration zombieDetectThreshold
) {
return Props.create(
ResourceClusterActor.class,
Expand All @@ -212,7 +217,8 @@ static Props props(
isJobArtifactCachingEnabled,
schedulingAttributes,
fitnessCalculator,
null
null,
zombieDetectThreshold
).withMailbox("akka.actor.metered-mailbox");
}

Expand All @@ -231,7 +237,8 @@ static Props props(
boolean isJobArtifactCachingEnabled,
Map<String, String> schedulingAttributes,
FitnessCalculator fitnessCalculator,
AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook) {
AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook,
Duration zombieDetectThreshold) {
this.clusterID = clusterID;
this.heartbeatTimeout = heartbeatTimeout;
this.assignmentTimeout = assignmentTimeout;
Expand All @@ -252,6 +259,7 @@ static Props props(
schedulingAttributes, fitnessCalculator, this.schedulerLeaseExpirationDuration, availableTaskExecutorMutatorHook);

this.metrics = new ResourceClusterActorMetrics();
this.zombieDetectThreshold = zombieDetectThreshold;
}

@Override
Expand Down Expand Up @@ -327,6 +335,7 @@ public Receive createReceive() {
.match(AddNewJobArtifactsToCacheRequest.class, this::onAddNewJobArtifactsToCacheRequest)
.match(RemoveJobArtifactsToCacheRequest.class, this::onRemoveJobArtifactsToCacheRequest)
.match(GetJobArtifactsToCacheRequest.class, req -> sender().tell(new ArtifactList(new ArrayList<>(jobArtifactsToCache)), self()))
.match(ZombieTimeout.class, metrics.withTracking(this::onTaskExecutorZombieTimeout))
.build();
}

Expand Down Expand Up @@ -682,6 +691,8 @@ private boolean isTaskExecutorDisabled(TaskExecutorRegistration registration) {

private void onHeartbeat(TaskExecutorHeartbeat heartbeat) {
log.debug("Received heartbeat {} from task executor {}", heartbeat, heartbeat.getTaskExecutorID());
// Cancel the zombie timer once receive the heartbeat
getTimers().cancel(getZombieTimerFor(heartbeat.getTaskExecutorID()));
setupTaskExecutorStateIfNecessary(heartbeat.getTaskExecutorID());
try {
final TaskExecutorID taskExecutorID = heartbeat.getTaskExecutorID();
Expand Down Expand Up @@ -913,6 +924,10 @@ private String getHeartbeatTimerFor(TaskExecutorID taskExecutorID) {
return "Heartbeat-" + taskExecutorID.toString();
}

private String getZombieTimerFor(TaskExecutorID taskExecutorID) {
return "Zombie-" + taskExecutorID.toString();
}

private void onTaskExecutorHeartbeatTimeout(HeartbeatTimeout timeout) {
setupTaskExecutorStateIfNecessary(timeout.getTaskExecutorID());
try {
Expand All @@ -925,14 +940,36 @@ private void onTaskExecutorHeartbeatTimeout(HeartbeatTimeout timeout) {
if (state.getLastActivity().compareTo(timeout.getLastActivity()) <= 0) {
log.info("Disconnecting task executor {}", timeout.getTaskExecutorID());
disconnectTaskExecutor(timeout.getTaskExecutorID());
// Timer to process the given TE as zombie if it didn't recover from the heartbeat in the past 30mins
getTimers().startSingleTimer(
getZombieTimerFor(timeout.taskExecutorID),
new ZombieTimeout(timeout.taskExecutorID, state.getLastActivity()),
zombieDetectThreshold
);
}

} catch (IllegalStateException e) {
sender().tell(new Status.Failure(e), self());
}
}

private void onTaskExecutorZombieTimeout(ZombieTimeout timeout) {
// the TE not recover back from last timeout for over 30mins
final TaskExecutorID taskExecutorID = timeout.getTaskExecutorID();
log.info("Zombie timeout received for {}", taskExecutorID);
SpectatorRegistryFactory.getRegistry()
.counter("zombieTaskExecutor_count", "taskExecutorID", taskExecutorID.toString())
.increment();
this.executorStateManager.markAsZombie(taskExecutorID);
}

private void setupTaskExecutorStateIfNecessary(TaskExecutorID taskExecutorID) {
if (this.executorStateManager.isZombie(taskExecutorID)) {
// TE recovers after zombie detect threshold, e.g 30mins
// TODO(Gigi): directly return and raise zombie TE once we add the action to remove the zombie every 30mins
// Log the zombie task executor for now to
log.info("Zombie task executor {} has been set", taskExecutorID);
}
this.executorStateManager
.trackIfAbsent(taskExecutorID, TaskExecutorState.of(clock, rpcService, jobMessageRouter));
}
Expand Down Expand Up @@ -1027,6 +1064,12 @@ static class HeartbeatTimeout {
Instant lastActivity;
}

@Value
static class ZombieTimeout {
TaskExecutorID taskExecutorID;
Instant lastActivity;
}

@Value
public static class TaskExecutorBatchAssignmentRequest {
Set<TaskExecutorAllocationRequest> allocationRequests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ private ActorRef createResourceClusterActorFor(ClusterID clusterID) {
masterConfiguration.isJobArtifactCachingEnabled(),
masterConfiguration.getSchedulingConstraints(),
masterConfiguration.getFitnessCalculator(),
masterConfiguration.getAvailableTaskExecutorMutatorHook()),
masterConfiguration.getAvailableTaskExecutorMutatorHook(),
Duration.ofMillis(masterConfiguration.zombieDetectThreshold())),
"ResourceClusterActor-" + clusterID.getResourceID());
log.info("Created resource cluster actor for {}", clusterID);
return clusterActor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ default Duration getSchedulerIntervalBetweenRetries() {
@Default("false")
boolean getDisableShortfallEvaluation();

@Config("mantis.agent.zombie.detect.threshold")
@Default("1800000")
long zombieDetectThreshold();

@Config("mantis.scheduling.info.observable.heartbeat.interval.secs")
@Default("120")
long getSchedulingInfoObservableHeartbeatIntervalSecs();
Expand Down
Loading