-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Use separate dispatchers for MemoryQueue, QueueManager, and Akka heartbeat #5549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,26 @@ akka.http { | |
preview.enable-http2 = on | ||
parsing.illegal-header-warnings = off | ||
} | ||
|
||
cluster { | ||
use-dispatcher = "dispatchers.heartbeat-dispatcher" | ||
failure-detector { | ||
# How often keep-alive heartbeat messages should be sent to each connection. | ||
heartbeat-interval = 1s | ||
|
||
# Number of potentially lost/delayed heartbeats that will be | ||
# accepted before considering it to be an anomaly. | ||
# This margin is important to be able to survive sudden, occasional, | ||
# pauses in heartbeat arrivals, due to for example garbage collect or | ||
# network drop. | ||
acceptable-heartbeat-pause = 5s | ||
|
||
# After the heartbeat request has been sent the first failure detection | ||
# will start after this period, even though no heartbeat message has | ||
# been received. | ||
expected-response-after = 1s | ||
} | ||
} | ||
} | ||
|
||
#kamon related configuration | ||
|
@@ -72,7 +92,7 @@ kamon { | |
service = "openwhisk-statsd" | ||
} | ||
metric { | ||
tick-interval = 1 second | ||
tick-interval = 10 second | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is one of the main changes. According to my analysis, it seems there is a leak in the Kamon metric. Below is the heap dump for a scheduler when it faced a thread starvation. ![]() There are 97M numbers of The reference dominator of ![]() Also, ![]() All components other than MemoryQueue are emitting metrics with 10s intervals. So I updated the metric emission interval of MemoryQueue to 10s as well. |
||
} | ||
|
||
statsd { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
dispatchers { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I introduced separate dispatchers to guarantee performance and minimize performance impact. |
||
# A custom dispatcher for the queue manager | ||
queue-manager-dispatcher { | ||
type = Dispatcher | ||
executor = "fork-join-executor" | ||
fork-join-executor { | ||
parallelism-min = 1 | ||
parallelism-factor = 1 | ||
parallelism-max = 15 | ||
} | ||
throughput = 5 | ||
} | ||
|
||
# A custom dispatcher for memory queues. | ||
memory-queue-dispatcher { | ||
type = Dispatcher | ||
executor = "fork-join-executor" | ||
fork-join-executor { | ||
parallelism-min = 10 | ||
parallelism-factor = 2 | ||
parallelism-max = 60 | ||
} | ||
throughput = 5 | ||
} | ||
|
||
# A custom dispatcher for monitoring actors of memory queues. | ||
monitoring-dispatcher { | ||
type = Dispatcher | ||
executor = "fork-join-executor" | ||
fork-join-executor { | ||
parallelism-min = 10 | ||
parallelism-factor = 2 | ||
parallelism-max = 60 | ||
} | ||
throughput = 5 | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,7 +148,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
extends FSM[MemoryQueueState, MemoryQueueData] | ||
with Stash { | ||
|
||
private implicit val ec: ExecutionContextExecutor = context.dispatcher | ||
private implicit val ec: ExecutionContextExecutor = | ||
context.system.dispatchers.lookup("dispatchers.memory-queue-dispatcher") | ||
private implicit val actorSystem: ActorSystem = context.system | ||
private implicit val timeout = Timeout(5.seconds) | ||
private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId) | ||
|
@@ -181,7 +182,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
private[queue] var limit: Option[Int] = None | ||
private[queue] var initialized = false | ||
|
||
private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { () => | ||
private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 10.seconds) { () => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was emitting 5 metrics every 1 second. If there are 400 queues running, they will emit around 2000 metrics per second. Considering the fact that one memory queue will spawn multiple sub-actors, and the combination with the use of |
||
MetricEmitter.emitGaugeMetric( | ||
LoggingMarkers | ||
.SCHEDULER_QUEUE_WAITING_ACTIVATION(invocationNamespace, action.asString, action.toStringWithoutVersion), | ||
|
@@ -926,7 +927,9 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
// since there is no initial delay, it will try to create a container at initialization time | ||
// these schedulers will run forever and stop when the memory queue stops | ||
private def startMonitoring(): (ActorRef, ActorRef) = { | ||
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () => | ||
val droppingScheduler = Scheduler.scheduleWaitAtLeastWith( | ||
schedulingConfig.dropInterval, | ||
dispatcher = "dispatchers.monitoring-dispatcher") { () => | ||
checkToDropStaleActivation( | ||
clock, | ||
queue, | ||
|
@@ -939,7 +942,9 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
Future.successful(()) | ||
} | ||
|
||
val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () => | ||
val monitoringScheduler = Scheduler.scheduleWaitAtLeastWith( | ||
schedulingConfig.checkInterval, | ||
dispatcher = "dispatchers.monitoring-dispatcher") { () => | ||
// the average duration is updated every checkInterval | ||
if (averageDurationBuffer.nonEmpty) { | ||
averageDuration = Some(averageDurationBuffer.average) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assigned a separate dispatcher for the akka-cluster heartbeat.