Skip to content

Commit

Permalink
try to stay within configured interval, cope if not possible
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasf committed Oct 21, 2024
1 parent 0c0e0c1 commit ab1da61
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.netty.channel.*
import io.netty.handler.codec.http.websocketx.*
import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener }
import org.apache.pekko.actor.typed.{ ActorRef, Scheduler }
import scala.concurrent.duration.Deadline

import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key
Expand All @@ -20,7 +21,8 @@ final private class ActorChannelConnector(
private val flushQ = ActorChannelConnector.FlushQueue()
private val monitor = Monitor.connector.flush
private val flushThread = Future:
while !workers.isShuttingDown && !workers.isTerminated do Thread.sleep(flush())
while !workers.isShuttingDown && !workers.isTerminated do
Thread.sleep(0, flush().timeLeft.toNanos.toInt.atLeast(0))

private object config:
private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key))
Expand Down Expand Up @@ -62,8 +64,8 @@ final private class ActorChannelConnector(
channel.write(TextWebSocketFrame(in.write))
flushQ.add(channel)

private def flush(): Int =
val entryUsecs = System.nanoTime() / 1000
private def flush(): Deadline =
val entered = Deadline.now
val qSize = flushQ.estimateSize()
val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1)
var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt)
Expand All @@ -75,13 +77,16 @@ final private class ActorChannelConnector(
channelsToFlush -= 1
case _ =>
channelsToFlush = 0

monitor.qSize.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)
monitor.loopRuntimeMicroseconds.record(System.nanoTime() / 1000 - entryUsecs)
monitor.loopRuntimeMicroseconds.record((Deadline.now - entered).toMicros)

if config.isFlushQEnabled() then config.interval.get()
else if qSize == 0 then 1000 // hibernate
else 1 // interval is 0 but we still need to empty the queue
entered + {
if config.isFlushQEnabled() then config.interval.get().millis
else if qSize == 0 then 1000.millis // hibernate
else 1.millis // interval is 0 but we still need to empty the queue
}

object ActorChannelConnector:
private class FlushQueue:
Expand Down

0 comments on commit ab1da61

Please sign in to comment.