From 3a9198524b5d91ae9ffc2b7b02c8d97472b69657 Mon Sep 17 00:00:00 2001 From: ggao Date: Thu, 25 Sep 2025 10:48:19 -0700 Subject: [PATCH] Initial impl to support marking TE as zombie --- .../resourcecluster/ExecutorStateManager.java | 16 ++++++ .../ExecutorStateManagerImpl.java | 24 +++++++++ .../resourcecluster/ResourceClusterActor.java | 53 +++++++++++++++++-- .../ResourceClustersManagerActor.java | 3 +- .../master/config/MasterConfiguration.java | 4 ++ 5 files changed, 94 insertions(+), 6 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java index 3e1bce7d9..0612b993c 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java @@ -105,4 +105,20 @@ Optional> findFirst( Predicate> isAssigned = e -> e.getValue().isAssigned(); + + /*** + * Move the given task executor to the zombie collection. + * Zombie collection contains the task executors that failes to report heartbeat within a threshold and are ready to be permanently removed. + * + * @param taskExecutorID TaskExecutorID + */ + void markAsZombie(TaskExecutorID taskExecutorID); + + /** + * True indicate the given task executor is already marked as zombie. + * + * @param taskExecutorID TaskExecutorID + * @return True indicates the task executor is zombie. + */ + boolean isZombie(TaskExecutorID taskExecutorID); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java index fc820e83e..2dc529311 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java @@ -125,6 +125,14 @@ private TaskExecutorGroupKey findBestFitGroupOrDefault(SchedulingConstraints con private final AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook; + // TODO(Gigi): add metrics and actions later + private final Cache zombieState = CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterWrite(30, TimeUnit.MINUTES) + .removalListener(notification -> + log.info("Archived TaskExecutor: {} removed due to: {}", notification.getKey(), notification.getCause())) + .build(); + ExecutorStateManagerImpl(Map schedulingAttributes) { this.schedulingAttributes = schedulingAttributes; this.fitnessCalculator = new CpuWeightedFitnessCalculator(); @@ -276,6 +284,22 @@ public TaskExecutorState archive(TaskExecutorID taskExecutorID) { } } + @Override + public void markAsZombie(TaskExecutorID taskExecutorID) { + final TaskExecutorState taskExecutorState = this.archivedState.getIfPresent(taskExecutorID); + if (taskExecutorState != null) { + this.archivedState.invalidate(taskExecutorID); + this.zombieState.put(taskExecutorID, taskExecutorState); + } else { + log.warn("TaskExecutor: {} not found from the archived", taskExecutorID); + } + } + + @Override + public boolean isZombie(TaskExecutorID taskExecutorID) { + return this.zombieState.getIfPresent(taskExecutorID) != null; + } + @Override public List getTaskExecutors(Predicate> predicate) { return this.taskExecutorStateMap diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index 7b785857a..c6f2bc40a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -30,6 +30,7 @@ import com.netflix.spectator.api.TagList; import io.mantisrx.common.Ack; import io.mantisrx.common.WorkerConstants; +import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory; import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest; import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesResponse; import io.mantisrx.master.scheduler.FitnessCalculator; @@ -142,6 +143,7 @@ public SupervisorStrategy supervisorStrategy() { private final String jobClustersWithArtifactCachingEnabled; private final boolean isJobArtifactCachingEnabled; + private final Duration zombieDetectThreshold; static Props props( final ClusterID clusterID, @@ -158,7 +160,8 @@ static Props props( boolean isJobArtifactCachingEnabled, Map schedulingAttributes, FitnessCalculator fitnessCalculator, - AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook + AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook, + final Duration zombieDetectThreshold ) { return Props.create( ResourceClusterActor.class, @@ -176,7 +179,8 @@ static Props props( isJobArtifactCachingEnabled, schedulingAttributes, fitnessCalculator, - availableTaskExecutorMutatorHook + availableTaskExecutorMutatorHook, + zombieDetectThreshold ).withMailbox("akka.actor.metered-mailbox"); } @@ -194,7 +198,8 @@ static Props props( String jobClustersWithArtifactCachingEnabled, boolean isJobArtifactCachingEnabled, Map schedulingAttributes, - FitnessCalculator fitnessCalculator + FitnessCalculator fitnessCalculator, + final Duration zombieDetectThreshold ) { return Props.create( ResourceClusterActor.class, @@ -212,7 +217,8 @@ static Props props( isJobArtifactCachingEnabled, schedulingAttributes, fitnessCalculator, - null + null, + zombieDetectThreshold ).withMailbox("akka.actor.metered-mailbox"); } @@ -231,7 +237,8 @@ static Props props( boolean isJobArtifactCachingEnabled, Map schedulingAttributes, FitnessCalculator fitnessCalculator, - AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook) { + AvailableTaskExecutorMutatorHook availableTaskExecutorMutatorHook, + Duration zombieDetectThreshold) { this.clusterID = clusterID; this.heartbeatTimeout = heartbeatTimeout; this.assignmentTimeout = assignmentTimeout; @@ -252,6 +259,7 @@ static Props props( schedulingAttributes, fitnessCalculator, this.schedulerLeaseExpirationDuration, availableTaskExecutorMutatorHook); this.metrics = new ResourceClusterActorMetrics(); + this.zombieDetectThreshold = zombieDetectThreshold; } @Override @@ -327,6 +335,7 @@ public Receive createReceive() { .match(AddNewJobArtifactsToCacheRequest.class, this::onAddNewJobArtifactsToCacheRequest) .match(RemoveJobArtifactsToCacheRequest.class, this::onRemoveJobArtifactsToCacheRequest) .match(GetJobArtifactsToCacheRequest.class, req -> sender().tell(new ArtifactList(new ArrayList<>(jobArtifactsToCache)), self())) + .match(ZombieTimeout.class, metrics.withTracking(this::onTaskExecutorZombieTimeout)) .build(); } @@ -682,6 +691,8 @@ private boolean isTaskExecutorDisabled(TaskExecutorRegistration registration) { private void onHeartbeat(TaskExecutorHeartbeat heartbeat) { log.debug("Received heartbeat {} from task executor {}", heartbeat, heartbeat.getTaskExecutorID()); + // Cancel the zombie timer once receive the heartbeat + getTimers().cancel(getZombieTimerFor(heartbeat.getTaskExecutorID())); setupTaskExecutorStateIfNecessary(heartbeat.getTaskExecutorID()); try { final TaskExecutorID taskExecutorID = heartbeat.getTaskExecutorID(); @@ -913,6 +924,10 @@ private String getHeartbeatTimerFor(TaskExecutorID taskExecutorID) { return "Heartbeat-" + taskExecutorID.toString(); } + private String getZombieTimerFor(TaskExecutorID taskExecutorID) { + return "Zombie-" + taskExecutorID.toString(); + } + private void onTaskExecutorHeartbeatTimeout(HeartbeatTimeout timeout) { setupTaskExecutorStateIfNecessary(timeout.getTaskExecutorID()); try { @@ -925,6 +940,12 @@ private void onTaskExecutorHeartbeatTimeout(HeartbeatTimeout timeout) { if (state.getLastActivity().compareTo(timeout.getLastActivity()) <= 0) { log.info("Disconnecting task executor {}", timeout.getTaskExecutorID()); disconnectTaskExecutor(timeout.getTaskExecutorID()); + // Timer to process the given TE as zombie if it didn't recover from the heartbeat in the past 30mins + getTimers().startSingleTimer( + getZombieTimerFor(timeout.taskExecutorID), + new ZombieTimeout(timeout.taskExecutorID, state.getLastActivity()), + zombieDetectThreshold + ); } } catch (IllegalStateException e) { @@ -932,7 +953,23 @@ private void onTaskExecutorHeartbeatTimeout(HeartbeatTimeout timeout) { } } + private void onTaskExecutorZombieTimeout(ZombieTimeout timeout) { + // the TE not recover back from last timeout for over 30mins + final TaskExecutorID taskExecutorID = timeout.getTaskExecutorID(); + log.info("Zombie timeout received for {}", taskExecutorID); + SpectatorRegistryFactory.getRegistry() + .counter("zombieTaskExecutor_count", "taskExecutorID", taskExecutorID.toString()) + .increment(); + this.executorStateManager.markAsZombie(taskExecutorID); + } + private void setupTaskExecutorStateIfNecessary(TaskExecutorID taskExecutorID) { + if (this.executorStateManager.isZombie(taskExecutorID)) { + // TE recovers after zombie detect threshold, e.g 30mins + // TODO(Gigi): directly return and raise zombie TE once we add the action to remove the zombie every 30mins + // Log the zombie task executor for now to + log.info("Zombie task executor {} has been set", taskExecutorID); + } this.executorStateManager .trackIfAbsent(taskExecutorID, TaskExecutorState.of(clock, rpcService, jobMessageRouter)); } @@ -1027,6 +1064,12 @@ static class HeartbeatTimeout { Instant lastActivity; } + @Value + static class ZombieTimeout { + TaskExecutorID taskExecutorID; + Instant lastActivity; + } + @Value public static class TaskExecutorBatchAssignmentRequest { Set allocationRequests; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java index 8d10bc0ce..0f48d6c6e 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java @@ -184,7 +184,8 @@ private ActorRef createResourceClusterActorFor(ClusterID clusterID) { masterConfiguration.isJobArtifactCachingEnabled(), masterConfiguration.getSchedulingConstraints(), masterConfiguration.getFitnessCalculator(), - masterConfiguration.getAvailableTaskExecutorMutatorHook()), + masterConfiguration.getAvailableTaskExecutorMutatorHook(), + Duration.ofMillis(masterConfiguration.zombieDetectThreshold())), "ResourceClusterActor-" + clusterID.getResourceID()); log.info("Created resource cluster actor for {}", clusterID); return clusterActor; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java index 50cedd175..5fdee1e7e 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java @@ -282,6 +282,10 @@ default Duration getSchedulerIntervalBetweenRetries() { @Default("false") boolean getDisableShortfallEvaluation(); + @Config("mantis.agent.zombie.detect.threshold") + @Default("1800000") + long zombieDetectThreshold(); + @Config("mantis.scheduling.info.observable.heartbeat.interval.secs") @Default("120") long getSchedulingInfoObservableHeartbeatIntervalSecs();