Skip to content

Commit

Permalink
New: Custom event sources, rework AjaxEventStream, and more
Browse files Browse the repository at this point in the history
- New: Signal.fromValue and Signal.fromTry
- New: Observer.toJsFn1
  • Loading branch information
raquo committed Dec 29, 2020
1 parent c96fd1c commit a60a557
Show file tree
Hide file tree
Showing 13 changed files with 472 additions and 147 deletions.
32 changes: 26 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -596,9 +596,27 @@ AjaxEventStream

Methods for POST, PUT, PATCH, and DELETE are also available.

The request is made every time the stream is started. If the stream is stopped while the request is pending, the request will not be cancelled, but its result will be discarded.
The request is made every time the stream is started. If the stream is stopped while the request is pending, the old request will not be cancelled, but its result will be discarded.

If the request times out, is aborted, returns an HTTP status code that isn't 2xx or 304, or fails in any other way, the stream will emit an `AjaxStreamError`.

If you want a stream that never fails, a stream that emits an event regardless of all those errors, call `.completeEvents` on your ajax stream.

You can listen for `progress` or `readyStateChange` events by passing in the corresponding observers to `AjaxEventStream.get` et al, for example:

```scala
val (progressObserver, $progress) = EventStream.withObserver[(dom.XMLHttpRequest, dom.ProgressEvent)]

val $request = AjaxEventStream.get(
url = "/api/kittens",
progressObserver = progressObserver
)

val $bytesLoaded = $progress.map2((xhr, ev) => ev.loaded)
```

Warning: dom.XmlHttpRequest is an ugly, imperative JS construct. We set event callbacks for onload, onerror, onabort, ontimeout, and if requested, also for onprogress and onreadystatechange. Make sure you don't override Airstream's listeners, or this stream will not work properly.

The implementation follows that of `org.scalajs.dom.ext.Ajax.apply`, but is adjusted slightly to be better behaved in Airstream.



Expand All @@ -612,14 +630,16 @@ For several users' implementations, search Laminar gitter room, and the issues i

### DOM Events

`DomEventStream` previously available in Laminar now lives in Airstream.

```scala
val element: dom.Element = ???
DomEventStream(element, "click") // EventStream[dom.MouseEvent]
DomEventStream[dom.MouseEvent](element, "click") // EventStream[dom.MouseEvent]
```

This stream, when started, registers a `click` event listener on `element`, and emits all events the listener receives until it is stopped, at which point the listener is removed.
This stream, when started, registers a `click` event listener on `element`, and emits all events the listener receives until the stream is stopped, at which point the listener is removed.

Airstream does not know the names & types of DOM events, so you need to manually specify both. You can get those manually from MDN or programmatically from event props such as `onClick` available in Laminar.

`DomEventStream` works not just on elements but on any `dom.raw.EventTarget`. However, make sure to check browser compatibility for fancy EventTarget-s such as XMLHttpRequest.



Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/com/raquo/airstream/core/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package com.raquo.airstream.core

import com.raquo.airstream.core.AirstreamError.{ObserverError, ObserverErrorHandlingError}

import scala.scalajs.js
import scala.util.{Failure, Success, Try}

trait Observer[-A] {

lazy val toJsFn1: js.Function1[A, Unit] = onNext

/** Note: must not throw! */
def onNext(nextValue: A): Unit

Expand Down
41 changes: 41 additions & 0 deletions src/main/scala/com/raquo/airstream/custom/CustomSignalSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.raquo.airstream.custom

import com.raquo.airstream.custom.CustomSource._
import com.raquo.airstream.signal.Signal

import scala.util.{Success, Try}

// @TODO[Test] needs testing

/** Use this to easily create a custom signal from an external source
*
* See docs on custom sources, and [[CustomSource.Config]]
*/
class CustomSignalSource[A] (
getInitialValue: => Try[A],
makeConfig: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => CustomSource.Config,
) extends Signal[A] with CustomSource[A] {

override protected[this] def initialValue: Try[A] = getInitialValue

override protected[this] val config: Config = makeConfig(_fireTry, tryNow, getStartIndex, getIsStarted)
}

object CustomSignalSource {

def apply[A](
initial: => A
)(
config: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => Config
): Signal[A] = {
new CustomSignalSource[A](Success(initial), config)
}

def fromTry[A](
initial: => Try[A]
)(
config: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => Config
): Signal[A] = {
new CustomSignalSource[A](initial, config)
}
}
112 changes: 112 additions & 0 deletions src/main/scala/com/raquo/airstream/custom/CustomSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.raquo.airstream.custom

import com.raquo.airstream.core.{Observable, Transaction}
import com.raquo.airstream.custom.CustomSource._

import scala.util.Try

// @TODO[Docs] Write docs and link to that.

/** Base functionality for a custom observable based on start and stop callbacks.
*
* See:
* - [[com.raquo.airstream.custom.CustomStreamSource]]
* - [[com.raquo.airstream.custom.CustomSignalSource]]
*/
trait CustomSource[A] extends Observable[A] {

protected[this] val config: Config

// --

/** CustomSource is intended for observables that don't synchronously depend on other observables. */
override protected[airstream] val topoRank: Int = 1

protected[this] var startIndex: StartIndex = 0


protected[this] val _fireValue: FireValue[A] = { value =>
//println(s"> init trx from CustomSource(${value})")
new Transaction(fireValue(value, _))
}

protected[this] val _fireError: FireError = { error =>
//println(s"> init error trx from CustomSource(${error})")
new Transaction(fireError(error, _))
}

protected[this] val _fireTry: SetCurrentValue[A] = { value =>
//println(s"> init try trx from CustomSource(${value})")
new Transaction(fireTry(value, _))
}

protected[this] val getStartIndex: GetStartIndex = () => startIndex

protected[this] val getIsStarted: GetIsStarted = () => isStarted

override protected[this] def onStart(): Unit = {
startIndex += 1
Try(config.onStart()).recover {
case err: Throwable => _fireError(err)
}
}

override protected[this] def onStop(): Unit = {
config.onStop()
}
}

object CustomSource {

/** See docs for custom sources */
final class Config private (
val onStart: () => Unit,
val onStop: () => Unit
) {

/** Create a version of a config that only runs start / stop if the predicate passes.
* - `start` will be run when the CustomSource is about to start
* if `passes` returns true at that time
* - `stop` will be run when the CustomSource is about to stop
* if your `start` code ran the last time CustomSource started
*/
def when(passes: () => Boolean): Config = {
var started = false
new Config(
() => {
if (passes()) {
started = true
onStart()
}
},
onStop = () => {
if (started) {
onStop()
}
started = false
}
)
}
}

object Config {

def apply(onStart: => Unit, onStop: => Unit): Config = {
new Config(() => onStart, () => onStop)
}
}

type StartIndex = Int

type FireValue[A] = A => Unit

type FireError = Throwable => Unit

type SetCurrentValue[A] = Try[A] => ()

type GetCurrentValue[A] = () => Try[A]

type GetStartIndex = () => StartIndex

type GetIsStarted = () => Boolean
}
25 changes: 25 additions & 0 deletions src/main/scala/com/raquo/airstream/custom/CustomStreamSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.raquo.airstream.custom

import com.raquo.airstream.custom.CustomSource._
import com.raquo.airstream.eventstream.EventStream

/** Use this to easily create a custom signal from an external source
*
* See docs on custom sources, and [[CustomSource.Config]]
*/
class CustomStreamSource[A] private (
makeConfig: (FireValue[A], FireError, GetStartIndex, GetIsStarted) => CustomSource.Config,
) extends EventStream[A] with CustomSource[A] {

override protected[this] val config: Config = makeConfig(_fireValue, _fireError, getStartIndex, getIsStarted)

}

object CustomStreamSource {

def apply[A](
config: (FireValue[A], FireError, GetStartIndex, GetIsStarted) => Config
): EventStream[A] = {
new CustomStreamSource[A](config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class EventBusStream[A] private[eventbus] (writeBus: WriteBus[A]) extends EventS
/** Made more public to allow usage from WriteBus */
override protected[eventbus] def isStarted: Boolean = super.isStarted

// @TODO document why. Basically event bus breaks the "static DAG" requirement for topo ranking
override protected[airstream] val topoRank: Int = 1

@inline private[eventbus] def addSource(sourceStream: EventStream[A]): Unit = {
Expand Down
56 changes: 48 additions & 8 deletions src/main/scala/com/raquo/airstream/eventstream/EventStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package com.raquo.airstream.eventstream

import com.raquo.airstream.core.AirstreamError.ObserverError
import com.raquo.airstream.core.{AirstreamError, Observable, Observer, Transaction}
import com.raquo.airstream.custom.CustomSource._
import com.raquo.airstream.custom.{CustomSource, CustomStreamSource}
import com.raquo.airstream.eventbus.EventBus
import com.raquo.airstream.features.{CombineObservable, Splittable}
import com.raquo.airstream.features.CombineObservable
import com.raquo.airstream.signal.{FoldLeftSignal, Signal, SignalFromEventStream}

import scala.concurrent.Future
Expand Down Expand Up @@ -174,36 +176,74 @@ object EventStream {

/** Event stream that never emits anything */
val empty: EventStream[Nothing] = {
new SeqEventStream[Nothing](events = Nil, emitOnce = true)
fromCustomSource[Nothing](
shouldStart = _ => false,
start = (_, _, _, _) => (),
stop = _ => ()
)
}

/** @param emitOnce if true, the event will be emitted at most one time.
* If false, the event will be emitted every time the stream is started. */
@deprecated("Use `fromValue` or `empty` (see docs)", "0.4") // @TODO Are we sure we want to deprecate this?
def fromSeq[A](events: Seq[A], emitOnce: Boolean): EventStream[A] = {
new SeqEventStream[A](events.map(Success(_)), emitOnce)
fromCustomSource[A](
shouldStart = startIndex => if (emitOnce) startIndex == 1 else true,
start = (fireEvent, _, _, _) => events.foreach(fireEvent),
stop = _ => ()
)
}

/** @param emitOnce if true, the event will be emitted at most one time.
* If false, the event will be emitted every time the stream is started. */
def fromValue[A](event: A, emitOnce: Boolean): EventStream[A] = {
new SeqEventStream[A](List(Success(event)), emitOnce)
fromCustomSource[A](
shouldStart = startIndex => if (emitOnce) startIndex == 1 else true,
start = (fireEvent, _, _, _) => fireEvent(event),
stop = _ => ()
)
}

/** @param emitOnce if true, the event will be emitted at most one time.
* If false, the event will be emitted every time the stream is started. */
def fromTry[A](value: Try[A], emitOnce: Boolean): EventStream[A] = {
new SeqEventStream[A](List(value), emitOnce)
fromCustomSource[A](
shouldStart = startIndex => if (emitOnce) startIndex == 1 else true,
start = (fireEvent, fireError, _, _) => value.fold(fireError, fireEvent),
stop = _ => ()
)
}

def fromFuture[A](future: Future[A]): EventStream[A] = {
new FutureEventStream(future, emitIfFutureCompleted = false)
def fromFuture[A](future: Future[A], emitFutureIfCompleted: Boolean = false): EventStream[A] = {
new FutureEventStream[A](future, emitFutureIfCompleted)
}

@inline def fromJsPromise[A](promise: js.Promise[A]): EventStream[A] = {
fromFuture(promise.toFuture)
}

/** Easy helper for custom events. See [[CustomStreamSource]] for docs.
*
* @param stop MUST NOT THROW!
*/
def fromCustomSource[A](
shouldStart: StartIndex => Boolean = _ => true,
start: (FireValue[A], FireError, GetStartIndex, GetIsStarted) => Unit,
stop: StartIndex => Unit
): EventStream[A] = {
CustomStreamSource[A] { (fireValue, fireError, getStartIndex, getIsStarted) =>
CustomSource.Config(
onStart = {
start(fireValue, fireError, getStartIndex, getIsStarted)
},
onStop = {
stop(getStartIndex())
}
).when {
() => shouldStart(getStartIndex())
}
}
}

/** Create a stream and a callback that, when fired, makes that stream emit. */
def withCallback[A]: (EventStream[A], A => Unit) = {
val bus = new EventBus[A]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ class FutureEventStream[A](future: Future[A], emitIfFutureCompleted: Boolean) ex
if (!future.isCompleted || emitIfFutureCompleted) {
// @TODO[API] Do we need "isStarted" filter on these? Doesn't seem to affect anything for now...
future.onComplete(_.fold(
nextError => new Transaction(fireError(nextError, _)),
nextError => {
//println(s"> init trx from FutureEventStream.init($nextError)")
new Transaction(fireError(nextError, _))
},
nextValue => {
//println(s"> init trx from FutureEventStream.init($nextValue)")
new Transaction(fireValue(nextValue, _))
Expand Down

This file was deleted.

Loading

0 comments on commit a60a557

Please sign in to comment.