Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import pekko.ConfigurationException
import pekko.Done
import pekko.actor.CoordinatedShutdown.Phase
import pekko.actor.CoordinatedShutdown.UnknownReason
import pekko.dispatch.ExecutionContexts
import pekko.testkit.PekkoSpec
import pekko.testkit.EventFilter
import pekko.testkit.TestKit
Expand Down Expand Up @@ -323,10 +322,10 @@ class CoordinatedShutdownSpec
Future {
testProbe.ref ! BMessage("concurrentB")
Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}
Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)

val cancellationFut: Future[Done] = {
val cancellables = (0 until 20).map { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class ExecutionContextSpec extends PekkoSpec with DefaultTimeout {
}

"work with same-thread executor plus blocking" in {
val ec = pekko.dispatch.ExecutionContexts.parasitic
val ec = scala.concurrent.ExecutionContext.parasitic
var x = 0
ec.execute(new Runnable {
override def run = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import org.scalatest.matchers.should.Matchers

import org.apache.pekko
import pekko.Done
import pekko.dispatch.internal.SameThreadExecutionContext
import pekko.testkit.PekkoSpec

class SameThreadExecutionContextSpec extends PekkoSpec with Matchers {

"The SameThreadExecutionContext" should {

"return a Scala specific version" in {
val ec = SameThreadExecutionContext()
val ec = ExecutionContext.parasitic
// in Scala 2.13 and higher parasitic is available
ec.getClass.getName should ===("scala.concurrent.ExecutionContext$parasitic$")
}
Expand All @@ -40,7 +39,7 @@ class SameThreadExecutionContextSpec extends PekkoSpec with Matchers {
.map { _ =>
Thread.currentThread().getName
}(system.dispatcher)
.map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext())
.map(firstName => firstName -> Thread.currentThread().getName)(ExecutionContext.parasitic)

promise.success(Done)
val (threadName1, threadName2) = futureThreadNames.futureValue
Expand All @@ -54,7 +53,7 @@ class SameThreadExecutionContextSpec extends PekkoSpec with Matchers {
.map { _ =>
Thread.currentThread().getName
}(ExecutionContext.global)
.map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext())
.map(firstName => firstName -> Thread.currentThread().getName)(ExecutionContext.parasitic)

promise.success(Done)
val (threadName1, threadName2) = futureThreadNames.futureValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import Tcp._
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.actor.ActorSystem
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.io.Inet.SocketOption
import pekko.testkit.{ PekkoSpec, TestProbe }
import pekko.testkit.SocketUtil.temporaryServerAddress
Expand All @@ -35,7 +35,7 @@ trait TcpIntegrationSpecSupport { this: PekkoSpec =>
// terminate clientSystem after server system
system.whenTerminated.onComplete { _ =>
res.terminate()
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
res
} else system
val bindHandler = TestProbe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.pekko
import pekko.actor.Address
import pekko.actor.typed.internal.adapter.ActorSystemAdapter
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.pattern.StatusReply
import pekko.util.BoxedType
import pekko.util.JavaDurationConverters._
Expand Down Expand Up @@ -277,7 +277,7 @@ import scala.util.Success

// Scala API impl
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit = {
future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContexts.parasitic)
future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContext.parasitic)
}

// Java API impl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.pekko
import pekko.actor.{ Cancellable, NotInfluenceReceiveTimeout }
import pekko.actor.typed.scaladsl.{ ActorContext, LoggerOps }
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.util.OptionVal
import org.slf4j.Logger

Expand Down Expand Up @@ -123,13 +123,13 @@ import scala.concurrent.duration.FiniteDuration

val task = mode match {
case SingleMode =>
ctx.system.scheduler.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContexts.parasitic)
ctx.system.scheduler.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContext.parasitic)
case m: FixedDelayMode =>
ctx.system.scheduler.scheduleWithFixedDelay(m.initialDelay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)(
ExecutionContexts.parasitic)
ExecutionContext.parasitic)
case m: FixedRateMode =>
ctx.system.scheduler.scheduleAtFixedRate(m.initialDelay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)(
ExecutionContexts.parasitic)
ExecutionContext.parasitic)
}

val nextTimer = Timer(key, msg, mode.repeat, nextGen, task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ import pekko.util.FutureConverters._
override def uptime: Long = classicSystem.uptime
override def printTree: String = system.printTree

import org.apache.pekko.dispatch.ExecutionContexts.parasitic
import scala.concurrent.ExecutionContext.parasitic

override def terminate(): Unit = system.terminate()
override lazy val whenTerminated: scala.concurrent.Future[pekko.Done] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.Future
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#CompletionStageOps.asScala")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#FutureOps.asJava$extension")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#FutureOps.asJava")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.internal.SameThreadExecutionContext")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.internal.SameThreadExecutionContext$")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ExecutionContexts.parasitic")

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.Success
import scala.annotation.nowarn

import org.apache.pekko
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.pattern.ask
import pekko.routing.MurmurHash
import pekko.util.{ Helpers, JavaDurationConverters, Timeout }
Expand Down Expand Up @@ -75,7 +75,7 @@ abstract class ActorSelection extends Serializable {
* [[ActorRef]].
*/
def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = {
implicit val ec = ExecutionContexts.parasitic
implicit val ec = ExecutionContext.parasitic
val p = Promise[ActorRef]()
this.ask(Identify(None)).onComplete {
case Success(ActorIdentity(_, Some(ref))) => p.success(ref)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.event.Logging
import pekko.pattern.after
import pekko.util.OptionConverters._
Expand Down Expand Up @@ -267,7 +266,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
system.whenTerminated.map { _ =>
if (exitJvm && !runningJvmHook) System.exit(exitCode)
Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
} else if (exitJvm) {
System.exit(exitCode)
Future.successful(Done)
Expand Down Expand Up @@ -493,7 +492,7 @@ final class CoordinatedShutdown private[pekko] (
override val size: Int = tasks.size

override def run(recoverEnabled: Boolean)(implicit ec: ExecutionContext): Future[Done] = {
Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContexts.parasitic)
Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContext.parasitic)
}

// This method may be run multiple times during the compare-and-set loop of ConcurrentHashMap, so it must be side-effect-free
Expand Down
17 changes: 0 additions & 17 deletions actor/src/main/scala/org/apache/pekko/dispatch/Future.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import java.util.concurrent.CompletionStage
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, ExecutionContextExecutorService, Future, Promise }

import org.apache.pekko
import pekko.annotation.InternalStableApi
import pekko.dispatch.internal.SameThreadExecutionContext
import pekko.japi.function.Procedure

/**
Expand Down Expand Up @@ -77,21 +75,6 @@ object ExecutionContexts {
* @return a reference to the global ExecutionContext
*/
def global(): ExecutionContextExecutor = ExecutionContext.global

/**
* INTERNAL API
*
* WARNING: Not A General Purpose ExecutionContext!
*
* This is an execution context which runs everything on the calling thread.
* It is very useful for actions which are known to be non-blocking and
* non-throwing in order to save a round-trip to the thread pool.
*
* Once Scala 2.12 is no longer supported this can be dropped in favour of directly using `ExecutionContext.parasitic`
*/
@InternalStableApi
private[pekko] val parasitic: ExecutionContext = SameThreadExecutionContext()

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.NoStackTrace
import org.apache.pekko
import pekko.actor._
import pekko.annotation.{ InternalApi, InternalStableApi }
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.dispatch.sysmsg._
import pekko.util.{ unused, ByteString, Timeout }

Expand Down Expand Up @@ -717,7 +717,7 @@ private[pekko] object PromiseActorRef {
val result = Promise[Any]()
val scheduler = provider.guardian.underlying.system.scheduler
val a = new PromiseActorRef(provider, result, messageClassName, refPathPrefix)
implicit val ec = ExecutionContexts.parasitic
implicit val ec = ExecutionContext.parasitic
val f = scheduler.scheduleOnce(timeout.duration) {
val timedOut = result.tryComplete {
val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.util.control.NonFatal
import org.apache.pekko
import pekko.PekkoException
import pekko.actor.{ ExtendedActorSystem, Scheduler }
import pekko.dispatch.ExecutionContexts.parasitic
import scala.concurrent.ExecutionContext.parasitic
import pekko.pattern.internal.{ CircuitBreakerNoopTelemetry, CircuitBreakerTelemetry }
import pekko.annotation.InternalApi
import pekko.util.FutureConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ trait FutureTimeoutSupport {
future.onComplete { result =>
timeout.cancel()
p.tryComplete(result)
}(pekko.dispatch.ExecutionContexts.parasitic)
}(scala.concurrent.ExecutionContext.parasitic)
p.future
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import scala.concurrent.duration.FiniteDuration

import org.apache.pekko
import pekko.actor._
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.dispatch.sysmsg.{ Unwatch, Watch }
import pekko.util.Timeout

Expand Down Expand Up @@ -65,6 +65,6 @@ trait GracefulStopSupport {
ref.result.future.transform({
case Terminated(t) if t.path == target.path => true
case _ => { internalTarget.sendSystemMessage(Unwatch(target, ref)); false }
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic)
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContext.parasitic)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.InvalidMessageException
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext

/**
* Generic top-level message type for replies that signal failure or success. Convenient to use together with the
Expand Down Expand Up @@ -180,5 +180,5 @@ object StatusReply {
ScalaFailure(new IllegalArgumentException(s"Unexpected status reply success value: $unexpected"))
}
case fail @ ScalaFailure(_) => fail.asInstanceOf[Try[T]]
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import pekko.actor.ActorRef
import pekko.actor.ActorSystem
import pekko.actor.SupervisorStrategy
import pekko.dispatch.Dispatchers
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.japi.Util.immutableSeq
import pekko.pattern.ask
import pekko.pattern.pipe
Expand Down Expand Up @@ -57,7 +57,7 @@ private[pekko] final case class ScatterGatherFirstCompletedRoutees(
extends Routee {

override def send(message: Any, sender: ActorRef): Unit = {
implicit val ec = ExecutionContexts.parasitic
implicit val ec = ExecutionContext.parasitic
if (routees.isEmpty) {
val reply = Future.failed(new TimeoutException("Timeout due to no routees"))
reply.pipeTo(sender)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import pekko.cluster.sharding.internal.{
RememberEntitiesCoordinatorStore,
RememberEntitiesProvider
}
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.event.{ BusLogging, Logging }
import pekko.pattern.{ pipe, AskTimeoutException }
import pekko.persistence._
Expand Down Expand Up @@ -219,7 +219,7 @@ object ShardCoordinator {
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
import pekko.util.ccompat.JavaConverters._
implicit val ec = ExecutionContexts.parasitic
implicit val ec = ExecutionContext.parasitic
rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).map(_.asScala.toSet)
}

Expand Down
Loading