Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spacedWith combinator #50

Open
ajaychandran opened this issue Dec 25, 2020 · 8 comments
Open

Add spacedWith combinator #50

ajaychandran opened this issue Dec 25, 2020 · 8 comments

Comments

@ajaychandran
Copy link
Contributor

ajaychandran commented Dec 25, 2020

def spaced(millis: Int): EventStream[I] =
  spacedWith(_ => millis)

def spacedSome(f: PartialFunction[I, Int]): EventStream[I] =
  spacedWith(f.applyOrElse(_, (_: Any) => 0))

/* Introduces time intervals between events */
def spacedWith(f: I => Int): EventStream[I]

Behavior:

time          0s        1s         2s         3s

events        ----------e1e2-------e3

delay(1s)     ---------------------e1e2-------e3

spaced(1s)    ----------e1---------e2---------e3

I can submit a PR if this looks useful.

@raquo
Copy link
Owner

raquo commented Dec 25, 2020

These look like they would be very nice to have if one had a need for such timing logic. What are some canonical use cases? Do you know if other streaming libraries have similar operators?

Some implementation notes and decisions to make:

  • If millis > 0, we need to fire the event in a new transaction because of async delay
    • But if millis == 0?
      • On one hand, delay(0) does fire an async event, but its purpose is to always delay, not to space events apart
      • I think for millis == 0 in spaced we should fire it off in the same transaction as I guess there is no need for delay
        • Then, topoRank of all spaced observables needs to be parent.topoRank + 1
  • I think spaced can be available on signals too, right?
    • We can't delay the initial value of a Signal, but it seems like the contract you want is ok because there are no events preceding the initial value, so it never needs to be delayed.
  • What happens after the spaced observable is stopped?
    • The queue of pending spaced values should be cleared out, and any pending timeouts cancelled
    • When observable starts again, it should probably assume that the next event does not need to be spaced
  • What happens if the user-provided millis function throws?
    • spaced observable should fire the thrown exception as error
      • since we don't know what delay was intended, we can assume 0
    • should this count as emitted event for the purposes of spacing? Probably.

btw another timing operator that's sorely missing is buffer.

@raquo raquo added the enhancement New feature or request label Dec 26, 2020
@yurique
Copy link
Sponsor Contributor

yurique commented Dec 26, 2020

I had a use case for something similar, though a bit different and simpler: mapDelay(projectMs: A => Double).

It behaves like delay(millis) but the delay is derived from the value.

My naive implementation is:

  def mapDelay(projectMs: A => Double): EventStream[A] = {
    new MapDelayEventStream(parent = underlying, projectMs)
  }
class MapDelayEventStream[A](
  override protected val parent: EventStream[A],
  projectDelayMillis: A => Double
) extends EventStream[A]
    with SingleParentObservable[A, A]
    with InternalNextErrorObserver[A] {

  /**
   * Async stream, so reset rank
   */
  override protected[airstream] val topoRank: Int = 1

  override protected[airstream] def onNext(nextValue: A, transaction: Transaction): Unit = {
    val _ = js.timers.setTimeout(projectDelayMillis(nextValue)) {
      val _ = new Transaction(fireValue(nextValue, _))
    }
  }

  override def onError(nextError: Throwable, transaction: Transaction): Unit = {
    val _ = new Transaction(fireError(nextError, _))
  }
}

@yurique
Copy link
Sponsor Contributor

yurique commented Dec 26, 2020

My use case was related to having auth tokens with expiration. I had a stream of tokens, and used this combinator to kick off the token refresh logic.

@raquo
Copy link
Owner

raquo commented Dec 26, 2020 via email

@ajaychandran
Copy link
Contributor Author

ajaychandran commented Dec 26, 2020

What are some canonical use cases?

I am building a multiplayer card game (similar to Poker) where game updates are published to a client in a (websocket) stream.
The server side produces multiple events in a transaction but the client has to animate them one at a time (so that the user can view the updates).

  • I think for millis == 0 in spaced we should fire it off in the same transaction as I guess there is no need for delay

If it simplifies the implementation, spaced could introduce an async boundary just like delay.

  • I think spaced can be available on signals too, right?

I haven't had a use for this but should be doable.

  • The queue of pending spaced values should be cleared out, and any pending timeouts cancelled

What would happen if the pending timeouts are not cancelled?

  1. The events fired would not reach anywhere (there are no observers).
  2. If the stream were stopped and started again, any pending events will get propagated. What would be the "correct" behavior in
    this case? My opinion is that these events should be propagated.
// events continue to occur on the spaced stream irrespective of whether it is being observed
spaced(1s)    ----------e1---------e2---------e3

Note that DelayEventStream has no cancellation logic.

  • When observable starts again, it should probably assume that the next event does not need to be spaced

If pending events are not cancelled, then the first event post restart must be spaced.

  • since we don't know what delay was intended, we can assume 0

Yes, this is all that is required for spacing.

@yurique
Copy link
Sponsor Contributor

yurique commented Dec 26, 2020

I think spaced makes much less sense on Signals.

@raquo
Copy link
Owner

raquo commented Dec 26, 2020

I think for millis == 0 in spaced we should fire it off in the same transaction as I guess there is no need for delay

If it simplifies the implementation, spaced could introduce an async boundary just like delay.

It's actually the opposite, unless there is good reason to want a new transaction or an async boundary, we shouldn't introduce these. It will behave better wrt glitches.

Regarding behaviour on start and stop:

Note that DelayEventStream has no cancellation logic.

That was actually what prompted by query. I just fixed that yesterday in eaca9ca (and Debounce too, in another commit).

Generally, Airstream's contract is that stopping an observable clears its state and stops emitting further events. Signals can't clear their current value, but everything else that the observable accumulated is generally cleared.

Imagine you have a component (Laminar element) that uses a spaced stream to lay out your cards. You unmount it while the cards are being laid out. Once it's unmounted, the stream has no more listeners, so even if it fires, nobody will be there to hear it, and nothing will happen, the event will be lost. That's expected, and there is no way around it. You shouldn't stop the stream if you want it running.

But, now imagine that you quickly re-mount the same component. The stream becomes active again. If you did NOT clear the queue of pending events when it stopped, some or all of those events might still fire after the component is re-mounted. But whether they will be observed, depends on timing. So this becomes really unpredictable and also unexpected. For example, re-mounting that component could trigger a new request to the backend, and you'd expect to start receiving new spaced card layout events from the result of that request, but you will instead start receiving card layout events that were pending from the previous request from before the component was unmounted. Or not, depends on timing. That's why it's important to have the observable clear its state when it's stopped. Hope that makes sense.

Perhaps we could at some point add an operator like parent.keepAlive(millis) that keeps itself as an observer of parent observable for millis after it was stopped, allowing parent to finish something, but such behaviour would need to be a generalized feature with more thought put into it, designed together with #23, not part of the spaced operator.

@raquo
Copy link
Owner

raquo commented Dec 26, 2020

I think spaced makes much less sense on Signals.

Hard for me to say, but if we don't make it available on signals, people will still be able to use it via composeChanges, except in that case the behaviour will be broken because the first change will not be spaced away from the initial value, spacing will start from the second change, which would very likely be not what the user intended.

So I think this operator should be defined on signals too, and it shouldn't be too hard to do that.

@raquo raquo added new feature / operator and removed enhancement New feature or request labels May 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants