Skip to content

Commit

Permalink
Fix: Draft fix of SwitchEventStream for raquo#103
Browse files Browse the repository at this point in the history
  • Loading branch information
raquo committed Nov 6, 2021
1 parent 7874e12 commit d256110
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions src/main/scala/com/raquo/airstream/flatten/SwitchEventStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.util.{ Failure, Success, Try }
*
* This stream emits the events from the last such stream created this way.
*
* Events are emitted at the same time as the currently tracked stream emits them.
* Events are emitted at the same time as the currently tracked stream emits them (but in a new transaction).
*
* When `parent` emits a nextValue, this stream switches to emitting events from `makeStream(nextValue)` (which is a stream).
*
Expand All @@ -35,10 +35,9 @@ class SwitchEventStream[I, O](

override protected val topoRank: Int = 1

private[this] var maybeCurrentEventStream: js.UndefOr[Try[EventStream[O]]] = parent match {
case signal: Signal[I @unchecked] => signal.tryNow().map(makeStream)
case _ => js.undefined
}
private[this] var firstStart: Boolean = true

private[this] var maybeCurrentEventStream: js.UndefOr[Try[EventStream[O]]] = js.undefined

// @TODO[Elegance] Maybe we should abstract away this kind of internal observer
private[this] val internalEventObserver: InternalObserver[O] = InternalObserver[O](
Expand Down Expand Up @@ -71,11 +70,18 @@ class SwitchEventStream[I, O](
}

override protected[this] def onStart(): Unit = {
super.onStart()
if (firstStart) {
firstStart = false
maybeCurrentEventStream = parent match {
case signal: Signal[I @unchecked] => signal.tryNow().map(makeStream)
case _ => js.undefined
}
}
maybeCurrentEventStream.foreach { streamTry =>
val initialStream = streamTry.fold(err => EventStream.fromTry(Failure(err), emitOnce = true), identity)
initialStream.addInternalObserver(internalEventObserver)
}
super.onStart()
}

override protected[this] def onStop(): Unit = {
Expand Down

0 comments on commit d256110

Please sign in to comment.