-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: Dynamic metering of Streams #3328
Comments
I think this is an interesting idea, but I think confusion surrounding
The thing is, With that in mind, |
Mm, yeah I may be using the wrong terminology here - ignoring the whole signal, whatchamacallit, and focusing more on the semantics - I do think the idea of |
I just ran into a similar usecase. Here's my API concept (scastie). import cats.effect.Temporal
import cats.syntax.all.*
import fs2.{Stream, Pipe}
import fs2.concurrent.Signal
import scala.concurrent.duration.FiniteDuration
def waitUntil[F[_], A](
nextTimestamp: F[Signal[F, FiniteDuration]]
)(implicit F: Temporal[F]): Pipe[F, A, A] =
in =>
in.flatMap { a =>
Stream.eval(nextTimestamp).flatMap { timestampSignal =>
timestampSignal.discrete
.switchMap { timestamp =>
Stream.eval {
F.monotonic.flatMap(now => F.sleep(now - timestamp))
}
}
.head
.as(a)
}
}
Key ideas:
A useful implementation of Btw, I'm not convinced that this should be added to FS2 e.g. we have upperbound living as an external library and this seems in a similar vein. In fact, would upperbound suit your needs already? |
I initialy raised this in
fs2-kafka
, but I think this actually belongs here;fd4s/fs2-kafka#1270
In the same manner that you can use a Signal[F, Boolean] to pause consumption, which I've found incredibly useful for code with e.g. dynamic feature flags to turn on/off consumption, I am hoping to throttle consumption.
If this would be useful to other people, would you be open to a PR for this? Or, if this already exists, please do point me in its direction 🙏
1. is there currently a baked-in way to have dynamic metering on a stream, other than `evalTap(_ => doSomeDelay())
The limit of this is that
someSleep
won't have access to when the stream last emitted, how much time has elapsed, etc etc, which leads me to my actual question:2. I think this is the feature request I'm asking for, if people would find this useful
I would want the semantics of it to internally keep track of last time it produced a message, and if signal value is emitted that is less than what was last set, AND more time has passed since it was set, then it would immediately emit and then wait for the next duration to elapse. And inversely, if the duration is increased since last, then the time-delta is taken into consideration
So - in BDD format - this is the behaviour that I'm after:
The text was updated successfully, but these errors were encountered: