From 261190f891aa46f0898487ffb55e82f1f3590891 Mon Sep 17 00:00:00 2001 From: Julien Pepy Date: Mon, 21 Sep 2020 23:23:32 +0200 Subject: [PATCH] Fix grace period after kill for health check failure Following the change of task id / instance id format, task health status was not properly tracked because bound to the same instance id after successive kills for health check failure. This fix propose to address the issue by tracking health status by task id to ensure it to cleaned each time a task is terminated, whatever the reason. JIRA Issues: MARATHON-8745 --- .../marathon/core/health/Health.scala | 4 ++-- .../marathon/core/health/HealthResult.scala | 7 +++++-- .../core/health/impl/HealthCheckActor.scala | 21 ++++++++++--------- .../core/health/impl/HealthCheckWorker.scala | 17 ++++++++------- .../impl/MarathonHealthCheckManager.scala | 9 ++++---- .../health/impl/HealthCheckActorTest.scala | 2 +- 6 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/main/scala/mesosphere/marathon/core/health/Health.scala b/src/main/scala/mesosphere/marathon/core/health/Health.scala index f5a032e266f..114781e0927 100644 --- a/src/main/scala/mesosphere/marathon/core/health/Health.scala +++ b/src/main/scala/mesosphere/marathon/core/health/Health.scala @@ -20,13 +20,13 @@ case class Health( def update(result: HealthResult): Health = result match { - case Healthy(_, _, time, _) => + case Healthy(_, _, _, time, _) => copy( firstSuccess = firstSuccess.orElse(Some(time)), lastSuccess = Some(time), consecutiveFailures = 0 ) - case Unhealthy(_, _, cause, time, _) => + case Unhealthy(_, _, _, cause, time, _) => copy( lastFailure = Some(time), lastFailureCause = Some(cause), diff --git a/src/main/scala/mesosphere/marathon/core/health/HealthResult.scala b/src/main/scala/mesosphere/marathon/core/health/HealthResult.scala index e4dc5fb5331..d21d873adcc 100644 --- a/src/main/scala/mesosphere/marathon/core/health/HealthResult.scala +++ b/src/main/scala/mesosphere/marathon/core/health/HealthResult.scala @@ -2,20 +2,23 @@ package mesosphere.marathon package core.health import mesosphere.marathon.core.instance.Instance +import mesosphere.marathon.core.task.Task import mesosphere.marathon.state.Timestamp sealed trait HealthResult { def instanceId: Instance.Id + def taskId: Task.Id def version: Timestamp def time: Timestamp def publishEvent: Boolean } -case class Healthy(instanceId: Instance.Id, version: Timestamp, time: Timestamp = Timestamp.now(), publishEvent: Boolean = true) +case class Healthy(instanceId: Instance.Id, taskId: Task.Id, version: Timestamp, time: Timestamp = Timestamp.now(), publishEvent: Boolean = true) extends HealthResult case class Unhealthy( instanceId: Instance.Id, + taskId: Task.Id, version: Timestamp, cause: String, time: Timestamp = Timestamp.now(), @@ -26,5 +29,5 @@ case class Unhealthy( * Representing an ignored HTTP response code (see [[MarathonHttpHealthCheck.ignoreHttp1xx]]. Will not update the * health check state and not be published. */ -case class Ignored(instanceId: Instance.Id, version: Timestamp, time: Timestamp = Timestamp.now(), publishEvent: Boolean = false) +case class Ignored(instanceId: Instance.Id, taskId: Task.Id, version: Timestamp, time: Timestamp = Timestamp.now(), publishEvent: Boolean = false) extends HealthResult diff --git a/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala b/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala index f81d29cc533..d1f61d67e7b 100644 --- a/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala +++ b/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckActor.scala @@ -17,6 +17,7 @@ import mesosphere.marathon.core.health.impl.AppHealthCheckActor.{ } import mesosphere.marathon.core.health.impl.HealthCheckActor._ import mesosphere.marathon.core.instance.Instance +import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.termination.{KillReason, KillService} import mesosphere.marathon.core.task.tracker.InstanceTracker import mesosphere.marathon.state.{AppDefinition, Timestamp} @@ -40,7 +41,7 @@ private[health] class HealthCheckActor( implicit val mat = ActorMaterializer() import context.dispatcher - val healthByInstanceId = TrieMap.empty[Instance.Id, Health] + val healthByInstanceId = TrieMap.empty[Task.Id, Health] private case class HealthCheckStreamStopped(thisInstance: this.type) @@ -83,10 +84,8 @@ private[health] class HealthCheckActor( def purgeStatusOfDoneInstances(instances: Seq[Instance]): Unit = { logger.debug(s"Purging health status of inactive instances for app ${app.id} version ${app.version} and healthCheck ${healthCheck}") - val inactiveInstanceIds: Set[Instance.Id] = instances.filterNot(_.isActive).iterator.map(_.instanceId).toSet - inactiveInstanceIds.foreach { inactiveId => - healthByInstanceId.remove(inactiveId) - } + val activeTaskIds: Set[Task.Id] = instances.map(_.appTask).filter(_.isActive).map(_.taskId).to(Set) + healthByInstanceId.retain((taskId, health) => activeTaskIds(taskId)) val checksToPurge = instances .withFilter(!_.isActive) @@ -144,12 +143,12 @@ private[health] class HealthCheckActor( def handleHealthResult(result: HealthResult): Unit = { val instanceId = result.instanceId - val health = healthByInstanceId.getOrElse(instanceId, Health(instanceId)) + val health = healthByInstanceId.getOrElse(result.taskId, Health(instanceId)) val updatedHealth = result match { - case Healthy(_, _, _, _) => + case Healthy(_, _, _, _, _) => Future.successful(health.update(result)) - case Unhealthy(_, _, _, _, _) => + case Unhealthy(_, _, _, _, _, _) => instanceTracker.instance(instanceId).map { case Some(instance) => if (ignoreFailures(instance, health)) { @@ -183,7 +182,7 @@ private[health] class HealthCheckActor( val newHealth = instanceHealth.newHealth logger.info(s"Received health result for app [${app.id}] version [${app.version}]: [$result]") - healthByInstanceId += (instanceId -> instanceHealth.newHealth) + healthByInstanceId += (result.taskId -> instanceHealth.newHealth) appHealthCheckActor ! HealthCheckStatusChanged(ApplicationKey(app.id, app.version), healthCheck, newHealth) if (health.alive != newHealth.alive && result.publishEvent) { @@ -192,7 +191,9 @@ private[health] class HealthCheckActor( } def receive: Receive = { - case GetInstanceHealth(instanceId) => sender() ! healthByInstanceId.getOrElse(instanceId, Health(instanceId)) + case GetInstanceHealth(instanceId) => + sender() ! healthByInstanceId.find(_._1.instanceId == instanceId) + .map(_._2).getOrElse(Health(instanceId)) case GetAppHealth => sender() ! AppHealth(healthByInstanceId.values.to(Seq)) diff --git a/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckWorker.scala b/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckWorker.scala index 028883ed4aa..7acef747f24 100644 --- a/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckWorker.scala +++ b/src/main/scala/mesosphere/marathon/core/health/impl/HealthCheckWorker.scala @@ -44,6 +44,7 @@ object HealthCheckWorker extends StrictLogging { Success( Unhealthy( instance.instanceId, + instance.appTask.taskId, instance.runSpecVersion, s"${ex.getClass.getSimpleName}: ${ex.getMessage}" ) @@ -93,18 +94,18 @@ object HealthCheckWorker extends StrictLogging { singleRequest(RequestBuilding.Get(url), check.timeout).map { response => response.discardEntityBytes() //forget about the body if (acceptableResponses.contains(response.status.intValue())) { - Healthy(instance.instanceId, instance.runSpecVersion) + Healthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion) } else if (check.ignoreHttp1xx && (toIgnoreResponses.contains(response.status.intValue))) { logger.debug(s"Ignoring health check HTTP response ${response.status.intValue} for instance=${instance.instanceId}") - Ignored(instance.instanceId, instance.runSpecVersion) + Ignored(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion) } else { logger.debug(s"Health check for instance=${instance.instanceId} responded with ${response.status}") - Unhealthy(instance.instanceId, instance.runSpecVersion, response.status.toString()) + Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, response.status.toString()) } }.recover { case NonFatal(e) => logger.debug(s"Health check for instance=${instance.instanceId} did not respond due to ${e.getMessage}.") - Unhealthy(instance.instanceId, instance.runSpecVersion, e.getMessage) + Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, e.getMessage) } } @@ -121,7 +122,7 @@ object HealthCheckWorker extends StrictLogging { socket.connect(address, timeoutMillis) socket.close() } - Healthy(instance.instanceId, instance.runSpecVersion, Timestamp.now()) + Healthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, Timestamp.now()) }(ThreadPoolContext.ioContext) } @@ -139,15 +140,15 @@ object HealthCheckWorker extends StrictLogging { singleRequestHttps(RequestBuilding.Get(url), check.timeout).map { response => response.discardEntityBytes() // forget about the body if (acceptableResponses.contains(response.status.intValue())) { - Healthy(instance.instanceId, instance.runSpecVersion) + Healthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion) } else { logger.debug(s"Health check for ${instance.instanceId} responded with ${response.status}") - Unhealthy(instance.instanceId, instance.runSpecVersion, response.status.toString()) + Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, response.status.toString()) } }.recover { case NonFatal(e) => logger.debug(s"Health check for instance=${instance.instanceId} failed to respond due to ${e.getMessage}.") - Unhealthy(instance.instanceId, instance.runSpecVersion, e.getMessage) + Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, e.getMessage) } } diff --git a/src/main/scala/mesosphere/marathon/core/health/impl/MarathonHealthCheckManager.scala b/src/main/scala/mesosphere/marathon/core/health/impl/MarathonHealthCheckManager.scala index 376e9b4fa18..2bffc27b3be 100644 --- a/src/main/scala/mesosphere/marathon/core/health/impl/MarathonHealthCheckManager.scala +++ b/src/main/scala/mesosphere/marathon/core/health/impl/MarathonHealthCheckManager.scala @@ -113,8 +113,8 @@ class MarathonHealthCheckManager( instance.tasksMap.values.withFilter(_.isRunning).map(_.status.mesosStatus).foreach { case Some(mesosStatus) if mesosStatus.hasHealthy => val health = - if (mesosStatus.getHealthy) Healthy(instance.instanceId, instance.runSpecVersion, publishEvent = false) - else Unhealthy(instance.instanceId, instance.runSpecVersion, "", publishEvent = false) + if (mesosStatus.getHealthy) Healthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, publishEvent = false) + else Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, "", publishEvent = false) ref ! health case None => } @@ -237,12 +237,13 @@ class MarathonHealthCheckManager( override def update(taskStatus: TaskStatus, version: Timestamp): Unit = appHealthChecks.readLock { ahcs => // construct a health result from the incoming task status - val instanceId = Task.Id.parse(taskStatus.getTaskId).instanceId + val taskId = Task.Id.parse(taskStatus.getTaskId) + val instanceId = taskId.instanceId val maybeResult: Option[HealthResult] = if (taskStatus.hasHealthy) { val healthy = taskStatus.getHealthy logger.info(s"Received status for $instanceId with version [$version] and healthy [$healthy]") - Some(if (healthy) Healthy(instanceId, version) else Unhealthy(instanceId, version, "")) + Some(if (healthy) Healthy(instanceId, taskId, version) else Unhealthy(instanceId, taskId, version, "")) } else { logger.debug(s"Ignoring status for $instanceId with no health information") None diff --git a/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckActorTest.scala b/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckActorTest.scala index 7f2e1881329..01eb313889f 100644 --- a/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckActorTest.scala +++ b/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckActorTest.scala @@ -45,7 +45,7 @@ class HealthCheckActorTest extends AkkaUnitTest { val healthCheckWorkerHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed] = MergeHub .source[(AppDefinition, Instance, MarathonHealthCheck, ActorRef)](1) - .map { case (_, instance, _, ref) => ref ! Healthy(instance.instanceId, Timestamp.now()) } + .map { case (_, instance, _, ref) => ref ! Healthy(instance.instanceId, instance.appTask.taskId, Timestamp.now()) } .to(Sink.ignore) .run()