Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ akka.http {
preview.enable-http2 = on
parsing.illegal-header-warnings = off
}

cluster {
use-dispatcher = "dispatchers.heartbeat-dispatcher"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assigned a separate dispatcher for the akka-cluster heartbeat.

failure-detector {
# How often keep-alive heartbeat messages should be sent to each connection.
heartbeat-interval = 1s

# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.
acceptable-heartbeat-pause = 5s

# After the heartbeat request has been sent the first failure detection
# will start after this period, even though no heartbeat message has
# been received.
expected-response-after = 1s
}
}
}

#kamon related configuration
Expand Down Expand Up @@ -72,7 +92,7 @@ kamon {
service = "openwhisk-statsd"
}
metric {
tick-interval = 1 second
tick-interval = 10 second
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the main changes. According to my analysis, it seems there is a leak in the Kamon metric.
As a scheduler is running longer, there are a huge number of MetricSnapshot instances created.

Below is the heap dump for a scheduler when it faced a thread starvation.

image

There are 97M numbers of scala.collection.immutable.$colon$colon and 96M numbers of kamon.metric.Instrument$Snapshot.

The reference dominator of scala.collection.immutable.$colon$colon is mostly MetricSnapshot.

image

Also, kamon.metric.Instrument$Snapshot is mostly referenced by scala.collection.immutable.$colon$colon, in turn, it results in MetricSnapshot.

image

All components other than MemoryQueue are emitting metrics with 10s intervals. So I updated the metric emission interval of MemoryQueue to 10s as well.
Since now we emit all metrics every 10s, we don't need to use a smaller tick interval like 1s because it will try to create a snapshot every 1 second, but the metric itself remains unchanged for 10s because we don't emit them in the middle of the interval(10s).

}

statsd {
Expand Down
13 changes: 13 additions & 0 deletions common/scala/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,17 @@ dispatchers {
type = PinnedDispatcher
executor = "thread-pool-executor"
}

# This is for akka-cluster heartbeat. Since heartbeat is a periodic light-weight message,
# fork-join executor should be enough
heartbeat-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@

package org.apache.openwhisk.common

import akka.actor.{Actor, ActorSystem, Cancellable, Props}

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Cancellable
import akka.actor.Props
import scala.util.{Failure, Success, Try}

/**
* Scheduler utility functions to execute tasks in a repetitive way with controllable behavior
Expand Down Expand Up @@ -122,4 +117,25 @@ object Scheduler {
require(interval > Duration.Zero)
system.actorOf(Props(new Worker(initialDelay, interval, true, name, f)))
}

/**
* Schedules a closure to run continuously scheduled, with at least the given interval in between runs using the dispatcher.
* This waits until the Future of the closure has finished, ignores its result and then waits for the
* given interval.
*
* @param interval the time to wait between two runs of the closure
* @param initialDelay optionally delay the first scheduled iteration by given duration
* @param dispatcher the dispatcher to handle this scheduled work
* @param f the function to run
*/
def scheduleWaitAtLeastWith(interval: FiniteDuration,
initialDelay: FiniteDuration = Duration.Zero,
name: String = "Scheduler",
dispatcher: String)(f: () => Future[Any])(
implicit system: ActorSystem,
logging: Logging,
transid: TransactionId = TransactionId.unknown) = {
require(interval > Duration.Zero)
system.actorOf(Props(new Worker(initialDelay, interval, true, name, f)).withDispatcher(dispatcher))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class FPCPoolBalancer(config: WhiskConfig,

private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
// This value is given according to the total waiting time at QueueManager for a new queue to be created.
private implicit val requestTimeout: Timeout = Timeout(8.seconds)
private implicit val requestTimeout: Timeout = Timeout(1.seconds)

private val entityStore = WhiskEntityStore.datastore()

Expand Down
54 changes: 54 additions & 0 deletions core/scheduler/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

dispatchers {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced separate dispatchers to guarantee performance and minimize performance impact.
I used the fork-join-executor as their jobs are mostly CPU-bound work.

# A custom dispatcher for the queue manager
queue-manager-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 1
parallelism-factor = 1
parallelism-max = 15
}
throughput = 5
}

# A custom dispatcher for memory queues.
memory-queue-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 10
parallelism-factor = 2
parallelism-max = 60
}
throughput = 5
}

# A custom dispatcher for monitoring actors of memory queues.
monitoring-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 10
parallelism-factor = 2
parallelism-max = 60
}
throughput = 5
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.grpc.ActivationServiceHandler
import org.apache.openwhisk.http.BasicHttpService
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.utils.ExecutionContextFactory
import pureconfig.generic.auto._
import pureconfig.loadConfigOrThrow
import spray.json.{DefaultJsonProtocol, _}
Expand Down Expand Up @@ -287,9 +286,8 @@ object Scheduler {
}

def main(args: Array[String]): Unit = {
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
implicit val actorSystem: ActorSystem = ActorSystem("scheduler-actor-system")
implicit val ec = actorSystem.dispatcher

implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
extends FSM[MemoryQueueState, MemoryQueueData]
with Stash {

private implicit val ec: ExecutionContextExecutor = context.dispatcher
private implicit val ec: ExecutionContextExecutor =
context.system.dispatchers.lookup("dispatchers.memory-queue-dispatcher")
private implicit val actorSystem: ActorSystem = context.system
private implicit val timeout = Timeout(5.seconds)
private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId)
Expand Down Expand Up @@ -181,7 +182,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private[queue] var limit: Option[Int] = None
private[queue] var initialized = false

private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { () =>
private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 10.seconds) { () =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was emitting 5 metrics every 1 second. If there are 400 queues running, they will emit around 2000 metrics per second. Considering the fact that one memory queue will spawn multiple sub-actors, and the combination with the use of CachedThreadPool, which spawns an unlimited number of actors on demand, it caused thread starvation.

MetricEmitter.emitGaugeMetric(
LoggingMarkers
.SCHEDULER_QUEUE_WAITING_ACTIVATION(invocationNamespace, action.asString, action.toStringWithoutVersion),
Expand Down Expand Up @@ -926,7 +927,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// since there is no initial delay, it will try to create a container at initialization time
// these schedulers will run forever and stop when the memory queue stops
private def startMonitoring(): (ActorRef, ActorRef) = {
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
val droppingScheduler = Scheduler.scheduleWaitAtLeastWith(
schedulingConfig.dropInterval,
dispatcher = "dispatchers.monitoring-dispatcher") { () =>
checkToDropStaleActivation(
clock,
queue,
Expand All @@ -939,7 +942,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
Future.successful(())
}

val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () =>
val monitoringScheduler = Scheduler.scheduleWaitAtLeastWith(
schedulingConfig.checkInterval,
dispatcher = "dispatchers.monitoring-dispatcher") { () =>
// the average duration is updated every checkInterval
if (averageDurationBuffer.nonEmpty) {
averageDuration = Some(averageDurationBuffer.average)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class QueueManager(
private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()

private implicit val askTimeout = Timeout(5.seconds)
private implicit val ec = context.dispatcher
private implicit val ec = context.system.dispatchers.lookup("dispatchers.queue-manager-dispatcher")
private implicit val system = context.system

private val watcherName = "queue-manager"
Expand Down
Loading