Skip to content

Commit

Permalink
Merge pull request #2845 from bplommer/timeout
Browse files Browse the repository at this point in the history
Add methods to timeout on pulls
  • Loading branch information
mpilquist authored Jan 20, 2024
2 parents fb78701 + b38f1a7 commit d3c6900
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2856,6 +2856,55 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
.widen[Either[Throwable, Unit]]
)

/** Fails this stream with a `TimeoutException` if it does not emit a new chunk within the
* given `timeout` after it is requested.
*/
def timeoutOnPull[F2[x] >: F[x]: Temporal](timeout: FiniteDuration): Stream[F2, O] =
timeoutOnPullTo(
timeout,
Stream.raiseError[F2](new TimeoutException(s"Timed out waiting for pull after $timeout"))
)

/** Stops pulling from this stream if it does not emit a new chunk within the
* given `timeout` after it is requested, and starts pulling from the `onTimeout` stream instead.
*
* @example {{{
* scala> import cats.effect.IO
* scala> import cats.effect.unsafe.implicits.global
* scala> import scala.concurrent.duration._
* scala> val s = Stream(1) ++ Stream.sleep_[IO](100.millis) ++ Stream(2).repeat.meteredStartImmediately[IO](200.millis)
* scala> s.timeoutOnPullTo(150.millis, Stream(3)).compile.toVector.unsafeRunSync()
* res0: Vector[Int] = Vector(1, 2, 3)
* }}}
*/
def timeoutOnPullTo[F2[x] >: F[x]: Temporal, O2 >: O](
timeout: FiniteDuration,
onTimeout: => Stream[F2, O2]
): Stream[F2, O2] =
timeoutOnPullWith[F2, O2](timeout)(_ => onTimeout)

/** Applies the pipe `f` if this stream does not emit a new chunk within the given `timeout` after
* it is requested.
*
* @example {{{
* scala> import cats.effect._
* scala> import cats.effect.unsafe.implicits.global
* scala> import scala.concurrent.duration._
* scala> val s = Stream(1) ++ Stream.sleep_[IO](100.millis) ++ Stream(2).repeat.meteredStartImmediately[IO](200.millis)
* scala> Ref[IO].of(0).flatTap { lateCount =>
* | s.take(4).timeoutOnPullWith(150.millis)(Stream.exec(lateCount.update(_ + 1)) ++ _).compile.drain
* | }.flatMap(_.get).unsafeRunSync()
* res0: Int = 2
* }}}
*/
def timeoutOnPullWith[F2[x] >: F[x]: Temporal, O2 >: O](timeout: FiniteDuration)(
f: Pipe[F2, O2, O2]
): Stream[F2, O2] = this
.covaryAll[F2, O2]
.pull
.timeoutWith(timeout)(_.stream.through(f).pull.echo)
.stream

/** Creates a [[Publisher]] from this [[Stream]].
*
* The stream is only ran when elements are requested.
Expand Down Expand Up @@ -4955,6 +5004,31 @@ object Stream extends StreamLowPriority {

pull(toTimedPull(output))
}

/** Transforms this pull with the function `f` whenever an element is not emitted within
* the duration `t`.
* @example {{{
* scala> import cats.effect.IO
* scala> import cats.effect.unsafe.implicits.global
* scala> import scala.concurrent.duration._
* scala> val s = (Stream("elem") ++ Stream.sleep_[IO](600.millis)).repeat.take(3)
* scala> s.pull.timeoutWith(450.millis)(Pull.output1("late!") >> _).stream.compile.toVector.unsafeRunSync()
* res0: Vector[String] = Vector(elem, late!, elem, late!, elem)
* }}}
*/
def timeoutWith[O2 >: O](t: FiniteDuration)(f: Pull[F, O2, Unit] => Pull[F, O2, Unit])(implicit
F: Temporal[F]
): Pull[F, O2, Unit] =
timed { timedPull =>
def go(timedPull: Pull.Timed[F, O]): Pull[F, O2, Unit] =
timedPull.timeout(t) >>
timedPull.uncons.flatMap {
case Some((Right(elems), next)) => Pull.output(elems) >> go(next)
case Some((Left(_), next)) => f(go(next))
case None => Pull.done
}
go(timedPull)
}
}

/** Projection of a `Stream` providing various ways to compile a `Stream[F,O]` to a `G[...]`. */
Expand Down

0 comments on commit d3c6900

Please sign in to comment.