Skip to content

Commit

Permalink
Merge pull request #3557 from djspiewak/release/backport-prep
Browse files Browse the repository at this point in the history
Backport current series/3.4.x
  • Loading branch information
djspiewak authored Apr 28, 2023
2 parents 741f87a + 59d43b9 commit 38736ca
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 147 deletions.
12 changes: 6 additions & 6 deletions core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform {
}

private[this] def mkWarning(threshold: Duration)(when: FiniteDuration) =
s"""|${format(when)} [WARNING] Your app's responsiveness to a new asynchronous
|event (such as a new connection, an upstream response, or a timer) was in excess
|of $threshold. Your CPU is probably starving. Consider increasing the
|granularity of your delays or adding more cedes. This may also be a sign that you
|are unintentionally running blocking I/O operations (such as File or InetAddress)
|without the blocking combinator.""".stripMargin
s"""|${format(when)} [WARNING] Your app's responsiveness to a new asynchronous
| event (such as a new connection, an upstream response, or a timer) was in excess
| of $threshold. Your CPU is probably starving. Consider increasing the
| granularity of your delays or adding more cedes. This may also be a sign that you
| are unintentionally running blocking I/O operations (such as File or InetAddress)
| without the blocking combinator.""".stripMargin.replaceAll("\n", "")

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package cats.effect.kernel.instances

import cats.{~>, Align, Applicative, CommutativeApplicative, Eval, Functor, Monad, Parallel}
import cats.{~>, Align, Applicative, CommutativeApplicative, Functor, Monad, Parallel}
import cats.data.Ior
import cats.effect.kernel.{GenSpawn, Outcome, ParallelF}
import cats.effect.kernel.{GenSpawn, ParallelF}
import cats.implicits._

trait GenSpawnInstances {
Expand All @@ -41,7 +41,6 @@ trait GenSpawnInstances {
new (M ~> F) {
def apply[A](ma: M[A]): F[A] = ParallelF[M, A](ma)
}

}

implicit def commutativeApplicativeForParallelF[F[_], E](
Expand All @@ -51,134 +50,10 @@ trait GenSpawnInstances {
final override def pure[A](a: A): ParallelF[F, A] = ParallelF(F.pure(a))

final override def map2[A, B, Z](fa: ParallelF[F, A], fb: ParallelF[F, B])(
f: (A, B) => Z): ParallelF[F, Z] =
f: (A, B) => Z): ParallelF[F, Z] = {
ParallelF(
F.uncancelable { poll =>
for {
fiberA <- F.start(ParallelF.value(fa))
fiberB <- F.start(ParallelF.value(fb))

// start a pair of supervisors to ensure that the opposite is canceled on error
_ <- F start {
fiberB.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberA.cancel
}
}

_ <- F start {
fiberA.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberB.cancel
}
}

a <- F
.onCancel(poll(fiberA.join), bothUnit(fiberA.cancel, fiberB.cancel))
.flatMap[A] {
case Outcome.Succeeded(fa) =>
fa

case Outcome.Errored(e) =>
fiberB.cancel *> F.raiseError(e)

case Outcome.Canceled() =>
fiberB.cancel *> poll {
fiberB.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}

z <- F.onCancel(poll(fiberB.join), fiberB.cancel).flatMap[Z] {
case Outcome.Succeeded(fb) =>
fb.map(b => f(a, b))

case Outcome.Errored(e) =>
F.raiseError(e)

case Outcome.Canceled() =>
poll {
fiberA.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}
} yield z
}
)

final override def map2Eval[A, B, Z](fa: ParallelF[F, A], fb: Eval[ParallelF[F, B]])(
f: (A, B) => Z): Eval[ParallelF[F, Z]] =
Eval.now(
ParallelF(
F.uncancelable { poll =>
for {
fiberA <- F.start(ParallelF.value(fa))
fiberB <- F.start(ParallelF.value(fb.value))

// start a pair of supervisors to ensure that the opposite is canceled on error
_ <- F start {
fiberB.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberA.cancel
}
}

_ <- F start {
fiberA.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberB.cancel
}
}

a <- F
.onCancel(poll(fiberA.join), bothUnit(fiberA.cancel, fiberB.cancel))
.flatMap[A] {
case Outcome.Succeeded(fa) =>
fa

case Outcome.Errored(e) =>
fiberB.cancel *> F.raiseError(e)

case Outcome.Canceled() =>
fiberB.cancel *> poll {
fiberB.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}

z <- F.onCancel(poll(fiberB.join), fiberB.cancel).flatMap[Z] {
case Outcome.Succeeded(fb) =>
fb.map(b => f(a, b))

case Outcome.Errored(e) =>
F.raiseError(e)

case Outcome.Canceled() =>
poll {
fiberA.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}
} yield z
}
)
)
F.both(ParallelF.value(fa), ParallelF.value(fb)).map { case (a, b) => f(a, b) })
}

final override def ap[A, B](ff: ParallelF[F, A => B])(
fa: ParallelF[F, A]): ParallelF[F, B] =
Expand All @@ -194,10 +69,6 @@ trait GenSpawnInstances {

final override def unit: ParallelF[F, Unit] =
ParallelF(F.unit)

// assumed to be uncancelable
private[this] def bothUnit(a: F[Unit], b: F[Unit]): F[Unit] =
F.start(a).flatMap(f => b *> f.join.void)
}

implicit def alignForParallelF[F[_], E](implicit F: GenSpawn[F, E]): Align[ParallelF[F, *]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ class PureConcSpec extends Specification with Discipline with BaseSpec {
pure.run((F.raiseError[Unit](42), F.never[Unit]).parTupled) mustEqual Outcome.Errored(42)
}

"short-circuit on canceled" in {
pure.run((F.never[Unit], F.canceled).parTupled.start.flatMap(_.join)) mustEqual Outcome
.Succeeded(Some(Outcome.canceled[F, Nothing, Unit]))
pure.run((F.canceled, F.never[Unit]).parTupled.start.flatMap(_.join)) mustEqual Outcome
.Succeeded(Some(Outcome.canceled[F, Nothing, Unit]))
}

"not run forever on chained product" in {
import cats.effect.kernel.Par.ParallelF

Expand Down
1 change: 1 addition & 0 deletions tests/js/src/main/scala/cats/effect/DetectPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ trait DetectPlatform {
}

def isJS: Boolean = true
def isJVM: Boolean = false
def isNative: Boolean = false
}
1 change: 1 addition & 0 deletions tests/jvm/src/test/scala/cats/effect/DetectPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ package cats.effect
trait DetectPlatform {
def isWSL: Boolean = System.getProperty("os.version").contains("-WSL")
def isJS: Boolean = false
def isJVM: Boolean = true
def isNative: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ package cats.effect
trait DetectPlatform {
def isWSL: Boolean = System.getProperty("os.version").contains("-WSL")
def isJS: Boolean = false
def isJVM: Boolean = false
def isNative: Boolean = true
}
75 changes: 75 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,81 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
(IO.raiseError[Unit](TestException), IO.never[Unit]).parTupled.void must failAs(
TestException)
}

"short-circuit on canceled" in ticked { implicit ticker =>
(IO.never[Unit], IO.canceled)
.parTupled
.start
.flatMap(_.join.map(_.isCanceled)) must completeAs(true)
(IO.canceled, IO.never[Unit])
.parTupled
.start
.flatMap(_.join.map(_.isCanceled)) must completeAs(true)
}

"run finalizers when canceled" in ticked { implicit ticker =>
val tsk = IO.ref(0).flatMap { ref =>
val t = IO.never[Unit].onCancel(ref.update(_ + 1))
for {
fib <- (t, t).parTupled.start
_ <- IO { ticker.ctx.tickAll() }
_ <- fib.cancel
c <- ref.get
} yield c
}

tsk must completeAs(2)
}

"run right side finalizer when canceled (and left side already completed)" in ticked {
implicit ticker =>
val tsk = IO.ref(0).flatMap { ref =>
for {
fib <- (IO.unit, IO.never[Unit].onCancel(ref.update(_ + 1))).parTupled.start
_ <- IO { ticker.ctx.tickAll() }
_ <- fib.cancel
c <- ref.get
} yield c
}

tsk must completeAs(1)
}

"run left side finalizer when canceled (and right side already completed)" in ticked {
implicit ticker =>
val tsk = IO.ref(0).flatMap { ref =>
for {
fib <- (IO.never[Unit].onCancel(ref.update(_ + 1)), IO.unit).parTupled.start
_ <- IO { ticker.ctx.tickAll() }
_ <- fib.cancel
c <- ref.get
} yield c
}

tsk must completeAs(1)
}

"complete if both sides complete" in ticked { implicit ticker =>
val tsk = (
IO.sleep(2.seconds).as(20),
IO.sleep(3.seconds).as(22)
).parTupled.map { case (l, r) => l + r }

tsk must completeAs(42)
}

"not run forever on chained product" in ticked { implicit ticker =>
import cats.effect.kernel.Par.ParallelF

case object TestException extends RuntimeException

val fa: IO[String] = IO.pure("a")
val fb: IO[String] = IO.pure("b")
val fc: IO[Unit] = IO.raiseError[Unit](TestException)
val tsk =
ParallelF.value(ParallelF(fa).product(ParallelF(fb)).product(ParallelF(fc))).void
tsk must failAs(TestException)
}
}

"miscellaneous" should {
Expand Down
9 changes: 6 additions & 3 deletions tests/shared/src/test/scala/cats/effect/Runners.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import org.specs2.matcher.Matcher
import org.specs2.mutable.SpecificationLike
import org.specs2.specification.core.Execution

import scala.concurrent.{Future, Promise, TimeoutException}
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._
import scala.reflect.ClassTag

trait Runners extends SpecificationLike with TestInstances with RunnersPlatform { outer =>

def executionTimeout = 10.seconds
def executionTimeout = 20.seconds

def ticked[A: AsResult](test: Ticker => A): Execution =
Execution.result(test(Ticker(TestContext())))
Expand Down Expand Up @@ -105,7 +105,8 @@ trait Runners extends SpecificationLike with TestInstances with RunnersPlatform
val r = runtime()
implicit val ec = r.compute

val cancel = r.scheduler.sleep(duration, { () => p.tryFailure(new TimeoutException); () })
val cancel =
r.scheduler.sleep(duration, { () => p.tryFailure(new TestTimeoutException); () })

f.onComplete { result =>
p.tryComplete(result)
Expand All @@ -115,3 +116,5 @@ trait Runners extends SpecificationLike with TestInstances with RunnersPlatform
p.future
}
}

class TestTimeoutException extends Exception
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.all._

import scala.concurrent.duration._

class DeferredSpec extends BaseSpec { outer =>
class DeferredSpec extends BaseSpec with DetectPlatform { outer =>

"Deferred for Async" should {
tests(IO(Deferred.unsafe), IO(Deferred.unsafe))
Expand Down Expand Up @@ -180,7 +180,7 @@ class DeferredSpec extends BaseSpec { outer =>
d.get.as(1).parReplicateA(n).map(_.sum must be_==(n))
}
}
.replicateA_(100)
.replicateA_(if (isJVM) 100 else 1)
}
.as(true)
}
Expand Down
5 changes: 3 additions & 2 deletions tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class BoundedQueueSpec extends BaseSpec with QueueTests[Queue] with DetectPlatfo
constructor(8) flatMap { q =>
val offerer = List.fill(8)(List.fill(8)(0)).parTraverse_(_.traverse(q.offer(_)))

(offerer &> 0.until(8 * 8).toList.traverse_(_ => q.take)).replicateA_(1000) *>
val iter = if (isJVM) 1000 else 1
(offerer &> 0.until(8 * 8).toList.traverse_(_ => q.take)).replicateA_(iter) *>
q.size.flatMap(s => IO(s mustEqual 0))
}
}
Expand All @@ -300,7 +301,7 @@ class BoundedQueueSpec extends BaseSpec with QueueTests[Queue] with DetectPlatfo
}
}

(offerer &> taker(0)).replicateA_(1000) *>
(offerer &> taker(0)).replicateA_(if (isJVM) 1000 else 1) *>
q.size.flatMap(s => IO(s mustEqual 0))
}
}
Expand Down

0 comments on commit 38736ca

Please sign in to comment.