Skip to content

Commit a25b239

Browse files
committed
feat: Add virtual thread support
1 parent 51d6e09 commit a25b239

File tree

4 files changed

+251
-4
lines changed

4 files changed

+251
-4
lines changed

actor/src/main/resources/reference.conf

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ pekko {
376376
# Valid options:
377377
# - "default-executor" requires a "default-executor" section
378378
# - "fork-join-executor" requires a "fork-join-executor" section
379+
# - "virtual-thread-executor" requires a "virtual-thread-executor" section
379380
# - "thread-pool-executor" requires a "thread-pool-executor" section
380381
# - "affinity-pool-executor" requires an "affinity-pool-executor" section
381382
# - A FQCN of a class extending ExecutorServiceConfigurator
@@ -539,6 +540,19 @@ pekko {
539540
allow-core-timeout = on
540541
}
541542

543+
# This will be used if you have set "executor = "virtual-thread-executor"
544+
# This executor will execute the every task on a new virtual thread.
545+
# Underlying thread pool implementation is java.util.concurrent.ForkJoinPool for JDK <= 22
546+
# If the current runtime does not support virtual thread,
547+
# then the executor configured in "fallback" will be used.
548+
virtual-thread-executor {
549+
#Please set the the underlying pool with system properties below:
550+
#jdk.virtualThreadScheduler.parallelism
551+
#jdk.virtualThreadScheduler.maxPoolSize
552+
#jdk.virtualThreadScheduler.minRunnable
553+
#jdk.unparker.maxPoolSize
554+
fallback = "fork-join-executor"
555+
}
542556
# How long time the dispatcher will wait for new actors until it shuts down
543557
shutdown-timeout = 1s
544558

actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@ package org.apache.pekko.dispatch
1616
import java.{ util => ju }
1717
import java.util.concurrent._
1818

19-
import scala.annotation.tailrec
19+
import scala.annotation.{ nowarn, tailrec }
2020
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
2121
import scala.concurrent.duration.{ Duration, FiniteDuration }
2222
import scala.util.control.NonFatal
2323

24-
import scala.annotation.nowarn
25-
import com.typesafe.config.Config
26-
2724
import org.apache.pekko
2825
import pekko.actor._
2926
import pekko.annotation.InternalStableApi
@@ -33,6 +30,8 @@ import pekko.event.EventStream
3330
import pekko.event.Logging.{ Debug, Error, LogEventException }
3431
import pekko.util.{ unused, Index, Unsafe }
3532

33+
import com.typesafe.config.Config
34+
3635
final case class Envelope private (message: Any, sender: ActorRef) {
3736

3837
def copy(message: Any = message, sender: ActorRef = sender) = {
@@ -367,9 +366,16 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
367366
def dispatcher(): MessageDispatcher
368367

369368
def configureExecutor(): ExecutorServiceConfigurator = {
369+
@tailrec
370370
def configurator(executor: String): ExecutorServiceConfigurator = executor match {
371371
case null | "" | "fork-join-executor" =>
372372
new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
373+
case "virtual-thread-executor" =>
374+
if (VirtualThreadSupport.isSupported) {
375+
new VirtualThreadExecutorConfigurator(config.getConfig("virtual-thread-executor"), prerequisites)
376+
} else {
377+
configurator(config.getString("virtual-thread-executor.fallback"))
378+
}
373379
case "thread-pool-executor" =>
374380
new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
375381
case "affinity-pool-executor" =>
@@ -401,6 +407,29 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
401407
}
402408
}
403409

410+
class VirtualThreadExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
411+
extends ExecutorServiceConfigurator(config, prerequisites) {
412+
413+
override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
414+
val tf = threadFactory match {
415+
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
416+
new ThreadFactory {
417+
private val vtFactory = VirtualThreadSupport.create(name)
418+
override def newThread(r: Runnable): Thread = {
419+
val vt = vtFactory.newThread(r)
420+
vt.setUncaughtExceptionHandler(exceptionHandler)
421+
contextClassLoader.foreach(vt.setContextClassLoader)
422+
vt
423+
}
424+
}
425+
case _ => VirtualThreadSupport.create(prerequisites.settings.name);
426+
}
427+
new ExecutorServiceFactory {
428+
override def createExecutorService: ExecutorService = new NewVirtualThreadPerTaskExecutor(tf)
429+
}
430+
}
431+
}
432+
404433
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
405434
extends ExecutorServiceConfigurator(config, prerequisites) {
406435

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import java.util
21+
import java.util.Collections
22+
import java.util.concurrent._
23+
import java.util.concurrent.atomic.AtomicInteger
24+
import java.util.concurrent.locks.ReentrantLock
25+
26+
private[dispatch] class NewVirtualThreadPerTaskExecutor(threadFactory: ThreadFactory) extends AbstractExecutorService {
27+
import NewVirtualThreadPerTaskExecutor._
28+
29+
/**
30+
* 0 RUNNING
31+
* 1 SHUTDOWN
32+
* 2 TERMINATED
33+
*/
34+
private val state = new AtomicInteger(RUNNING)
35+
private val virtualThreads = ConcurrentHashMap.newKeySet[Thread]()
36+
private val terminateLock = new ReentrantLock()
37+
private val terminatedCondition = terminateLock.newCondition()
38+
39+
override def shutdown(): Unit = {
40+
shutdown(false)
41+
}
42+
43+
private def shutdown(interrupt: Boolean): Unit = {
44+
if (!isShutdown) {
45+
terminateLock.lock()
46+
try {
47+
if (isTerminated) {
48+
()
49+
} else {
50+
if (state.compareAndSet(RUNNING, SHUTDOWN) && interrupt) {
51+
virtualThreads.forEach(thread => {
52+
if (!thread.isInterrupted) {
53+
thread.interrupt()
54+
}
55+
})
56+
}
57+
tryTerminateAndSignal()
58+
}
59+
} finally {
60+
terminateLock.unlock()
61+
}
62+
}
63+
}
64+
65+
private def tryTerminateAndSignal(): Unit = {
66+
if (isTerminated) {
67+
()
68+
}
69+
terminateLock.lock()
70+
try {
71+
if (isTerminated) {
72+
return
73+
}
74+
if (virtualThreads.isEmpty && state.compareAndSet(SHUTDOWN, TERMINATED)) {
75+
terminatedCondition.signalAll()
76+
}
77+
} finally {
78+
terminateLock.unlock()
79+
}
80+
}
81+
82+
override def shutdownNow(): util.List[Runnable] = {
83+
shutdown(true)
84+
Collections.emptyList()
85+
}
86+
87+
override def isShutdown: Boolean = state.get() >= SHUTDOWN
88+
89+
override def isTerminated: Boolean = state.get() >= TERMINATED
90+
91+
private def isRunning: Boolean = state.get() == RUNNING
92+
93+
override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
94+
if (isTerminated) {
95+
return true
96+
}
97+
terminateLock.lock()
98+
try {
99+
var nanosRemaining = unit.toNanos(timeout)
100+
while (!isTerminated && nanosRemaining > 0) {
101+
nanosRemaining = terminatedCondition.awaitNanos(nanosRemaining)
102+
}
103+
} finally {
104+
terminateLock.unlock()
105+
}
106+
isTerminated
107+
}
108+
109+
// TODO AS only this execute method is been used in `Dispatcher.scala`, so `submit` and other methods is not override.
110+
override def execute(command: Runnable): Unit = {
111+
if (state.get() >= SHUTDOWN) {
112+
throw new RejectedExecutionException("Shutdown")
113+
}
114+
var started = false;
115+
try {
116+
val thread = threadFactory.newThread(Task(this, command))
117+
virtualThreads.add(thread)
118+
if (isRunning) {
119+
thread.start()
120+
started = true
121+
} else {
122+
onThreadExit(thread)
123+
}
124+
} finally {
125+
if (!started) {
126+
throw new RejectedExecutionException("Shutdown")
127+
}
128+
}
129+
}
130+
131+
private def onThreadExit(thread: Thread): Unit = {
132+
virtualThreads.remove(thread)
133+
if (state.get() == SHUTDOWN) {
134+
tryTerminateAndSignal()
135+
}
136+
}
137+
}
138+
139+
private[dispatch] object NewVirtualThreadPerTaskExecutor {
140+
private final val RUNNING = 0
141+
private final val SHUTDOWN = 1
142+
private final val TERMINATED = 2
143+
144+
private case class Task(executor: NewVirtualThreadPerTaskExecutor, runnable: Runnable) extends Runnable {
145+
override def run(): Unit = {
146+
try {
147+
runnable.run()
148+
} finally {
149+
executor.onThreadExit(Thread.currentThread())
150+
}
151+
}
152+
}
153+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import org.apache.pekko.annotation.InternalApi
21+
22+
import java.lang.invoke.{ MethodHandles, MethodType }
23+
import java.util.concurrent.ThreadFactory
24+
25+
@InternalApi
26+
private[dispatch] object VirtualThreadSupport {
27+
28+
/**
29+
* Returns if the current Runtime supports virtual threads.
30+
*/
31+
def isSupported: Boolean = create("testIsSupported") ne null
32+
33+
/**
34+
* Create a virtual thread factory, returns null when failed.
35+
*/
36+
def create(prefix: String): ThreadFactory =
37+
try {
38+
val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
39+
val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
40+
val lookup = MethodHandles.lookup
41+
val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", MethodType.methodType(ofVirtualClass))
42+
var builder = ofVirtualMethod.invoke()
43+
val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
44+
MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
45+
val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory]))
46+
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L)
47+
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
48+
} catch {
49+
case _: Throwable => null
50+
}
51+
}

0 commit comments

Comments
 (0)