Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web platform streams #54

Merged
merged 9 commits into from
Dec 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ I created Airstream because I found existing solutions were not suitable for bui
* [EventStream.fromSeq](#eventstreamfromseq)
* [EventStream.periodic](#eventstreamperiodic)
* [EventStream.empty](#eventstreamempty)
* [EventStream.withCallback and withObserver](#eventstreamwithcallback-and-withobserver)
* [EventBus](#eventbus)
* [Var](#var)
* [Val](#val)
* [Ajax](#ajax)
* [Websockets](#websockets)
* [DOM Events](#dom-events)
* [Custom Observables](#custom-observables)
* [FRP Glitches](#frp-glitches)
* [Other Libraries](#other-libraries)
Expand Down Expand Up @@ -436,6 +440,22 @@ The underlying `PeriodicEventStream` class offers more functionality, including
A stream that never emits any events.


#### `EventStream.withCallback` and `withObserver`

`EventStream.withCallback[A]` Creates and returns a stream and an `A => Unit` callback that, when called, passes the input value to that stream. Of course, as streams are lazy, the stream will only emit if it has observers.

```scala
val (stream, callback) = EventStream.withCallback[Int]
callback(1) // nothing happens because stream has no observers
stream.foreach(println)
callback(2) // `2` will be printed
```

`EventStream.withJsCallback[A]` works similarly except it returns a js.Function for easier integration with Javascript libraries.

`EventStream.withObserver[A]` works similarly but creates an observer, which among other conveniences passes the errors that it receives into the stream.


#### EventBus

`new EventBus[MyEvent]` is the general-purpose way to create a stream on which you can manually trigger events. The resulting EventBus exposes two properties:
Expand Down Expand Up @@ -563,12 +583,76 @@ Remember that this atomicity guarantee only applies to failures which would have
Val is useful when a component wants to accept either a Signal or a constant value as input. You can just wrap your constant in a Val, and make the component accept a `Signal` (or a `StrictSignal`) instead.



#### Ajax

Airstream now has a built-in way to perform Ajax requests:

```scala
AjaxEventStream
.get("/api/kittens") // EventStream[dom.XMLHttpRequest]
.map(req => req.responseText) // EventStream[String]
```

Methods for POST, PUT, PATCH, and DELETE are also available.

The request is made every time the stream is started. If the stream is stopped while the request is pending, the old request will not be cancelled, but its result will be discarded.

If the request times out, is aborted, returns an HTTP status code that isn't 2xx or 304, or fails in any other way, the stream will emit an `AjaxStreamError`.

If you want a stream that never fails, a stream that emits an event regardless of all those errors, call `.completeEvents` on your ajax stream.

You can listen for `progress` or `readyStateChange` events by passing in the corresponding observers to `AjaxEventStream.get` et al, for example:

```scala
val (progressObserver, $progress) = EventStream.withObserver[(dom.XMLHttpRequest, dom.ProgressEvent)]

val $request = AjaxEventStream.get(
url = "/api/kittens",
progressObserver = progressObserver
)

val $bytesLoaded = $progress.map2((xhr, ev) => ev.loaded)
```

In a similar manner, you can pass a `requestObserver` that will be called with the newly created `dom.XMLHttpRequest` just before the request is sent. This way you can save the pending request into a Var and e.g. `abort()` it if needed.

Warning: dom.XmlHttpRequest is an ugly, imperative JS construct. We set event callbacks for onload, onerror, onabort, ontimeout, and if requested, also for onprogress and onreadystatechange. Make sure you don't override Airstream's listeners, or this stream will not work properly.




### Websockets

Airstream has no official websockets integration yet.

For several users' implementations, search Laminar gitter room, and the issues in this repo.



### DOM Events

```scala
val element: dom.Element = ???
DomEventStream[dom.MouseEvent](element, "click") // EventStream[dom.MouseEvent]
```

This stream, when started, registers a `click` event listener on `element`, and emits all events the listener receives until the stream is stopped, at which point the listener is removed.

Airstream does not know the names & types of DOM events, so you need to manually specify both. You can get those manually from MDN or programmatically from event props such as `onClick` available in Laminar.

`DomEventStream` works not just on elements but on any `dom.raw.EventTarget`. However, make sure to check browser compatibility for fancy EventTarget-s such as XMLHttpRequest.



#### Custom Observables

EventBus is a very generic solution that should suit most needs, even if perhaps not very elegantly sometimes.

You can create your own observables that emit events in their own unique way by wrapping or extending EventBus (easier) or extending Observable (more work and knowledge required, but rewarded with better behavior)).

If extending Observable, you will need to make the `topoRank` field public to be able to override it. See [#37](https://github.com/raquo/Airstream/issues/37).

Unfortunately I don't have enough time to describe how to create custom observables in detail right now. You will need to read the rest of the documentation and the source code – you will see how other observables such as MapEventStream or FilterEventStream are implemented. Airstream's source code should be easy to comprehend. It is clean, small (a bit more than 1K LoC with all the operators), and does not use complicated implicits or hardcore functional stuff.


Expand Down Expand Up @@ -1135,7 +1219,7 @@ stream.recoverToTry.collect { case Failure(err) => err } // EventStream[Throwabl
## Limitations

* Airstream only runs on Scala.js because its primary intended use case is unidirectional dataflow architecture on the frontend. I have no plans to make it run on the JVM. It would require too much of my time and too much compromise, complicating the API to support a completely different environment and use cases.
* Airstream has no concept of observables "completing". Personally I don't think this is a limitation, but I can see it being viewed as such. See [Issue #23](https://github.com/raquo/Airstream/issues/23).
* Airstream has no concept of observables "completing". Personally I don't think this is much of a limitation, but I can see it being viewed as such. See [Issue #23](https://github.com/raquo/Airstream/issues/23).


## My Related Projects
Expand Down
9 changes: 8 additions & 1 deletion src/main/scala/com/raquo/airstream/core/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package com.raquo.airstream.core

import com.raquo.airstream.core.AirstreamError.{ObserverError, ObserverErrorHandlingError}

import scala.scalajs.js
import scala.util.{Failure, Success, Try}

trait Observer[-A] {

lazy val toJsFn1: js.Function1[A, Unit] = onNext

/** Note: must not throw! */
def onNext(nextValue: A): Unit

Expand All @@ -31,6 +34,11 @@ trait Observer[-A] {
)
}

/** Available only on Observers of Option, this is a shortcut for contramap[B](Some(_)) */
def contramapSome[V](implicit evidence: Option[V] <:< A): Observer[V] = {
contramap[V](value => evidence(Some(value)))
}

/** Like `contramap` but with `collect` semantics: not calling the original observer when `pf` is not defined */
def contracollect[B](pf: PartialFunction[B, A]): Observer[B] = {
Observer.withRecover(
Expand All @@ -53,7 +61,6 @@ trait Observer[-A] {

object Observer {


/** An observer that does nothing. Use it to ensure that an Observable is started
*
* Used by SignalView and EventStreamView
Expand Down
41 changes: 41 additions & 0 deletions src/main/scala/com/raquo/airstream/custom/CustomSignalSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.raquo.airstream.custom

import com.raquo.airstream.custom.CustomSource._
import com.raquo.airstream.signal.Signal

import scala.util.{Success, Try}

// @TODO[Test] needs testing

/** Use this to easily create a custom signal from an external source
*
* See docs on custom sources, and [[CustomSource.Config]]
*/
class CustomSignalSource[A] (
getInitialValue: => Try[A],
makeConfig: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => CustomSource.Config,
) extends Signal[A] with CustomSource[A] {

override protected[this] def initialValue: Try[A] = getInitialValue

override protected[this] val config: Config = makeConfig(_fireTry, tryNow, getStartIndex, getIsStarted)
}

object CustomSignalSource {

def apply[A](
initial: => A
)(
config: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => Config
): Signal[A] = {
new CustomSignalSource[A](Success(initial), config)
}

def fromTry[A](
initial: => Try[A]
)(
config: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => Config
): Signal[A] = {
new CustomSignalSource[A](initial, config)
}
}
112 changes: 112 additions & 0 deletions src/main/scala/com/raquo/airstream/custom/CustomSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.raquo.airstream.custom

import com.raquo.airstream.core.{Observable, Transaction}
import com.raquo.airstream.custom.CustomSource._

import scala.util.Try

// @TODO[Docs] Write docs and link to that.

/** Base functionality for a custom observable based on start and stop callbacks.
*
* See:
* - [[com.raquo.airstream.custom.CustomStreamSource]]
* - [[com.raquo.airstream.custom.CustomSignalSource]]
*/
trait CustomSource[A] extends Observable[A] {

protected[this] val config: Config

// --

/** CustomSource is intended for observables that don't synchronously depend on other observables. */
override protected[airstream] val topoRank: Int = 1

protected[this] var startIndex: StartIndex = 0


protected[this] val _fireValue: FireValue[A] = { value =>
//println(s"> init trx from CustomSource(${value})")
new Transaction(fireValue(value, _))
}

protected[this] val _fireError: FireError = { error =>
//println(s"> init error trx from CustomSource(${error})")
new Transaction(fireError(error, _))
}

protected[this] val _fireTry: SetCurrentValue[A] = { value =>
//println(s"> init try trx from CustomSource(${value})")
new Transaction(fireTry(value, _))
}

protected[this] val getStartIndex: GetStartIndex = () => startIndex

protected[this] val getIsStarted: GetIsStarted = () => isStarted

override protected[this] def onStart(): Unit = {
startIndex += 1
Try(config.onStart()).recover {
case err: Throwable => _fireError(err)
}
}

override protected[this] def onStop(): Unit = {
config.onStop()
}
}

object CustomSource {

/** See docs for custom sources */
final class Config private (
val onStart: () => Unit,
val onStop: () => Unit
) {

/** Create a version of a config that only runs start / stop if the predicate passes.
* - `start` will be run when the CustomSource is about to start
* if `passes` returns true at that time
* - `stop` will be run when the CustomSource is about to stop
* if your `start` code ran the last time CustomSource started
*/
def when(passes: () => Boolean): Config = {
var started = false
new Config(
() => {
if (passes()) {
started = true
onStart()
}
},
onStop = () => {
if (started) {
onStop()
}
started = false
}
)
}
}

object Config {

def apply(onStart: => Unit, onStop: => Unit): Config = {
new Config(() => onStart, () => onStop)
}
}

type StartIndex = Int

type FireValue[A] = A => Unit

type FireError = Throwable => Unit

type SetCurrentValue[A] = Try[A] => Unit

type GetCurrentValue[A] = () => Try[A]

type GetStartIndex = () => StartIndex

type GetIsStarted = () => Boolean
}
25 changes: 25 additions & 0 deletions src/main/scala/com/raquo/airstream/custom/CustomStreamSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.raquo.airstream.custom

import com.raquo.airstream.custom.CustomSource._
import com.raquo.airstream.eventstream.EventStream

/** Use this to easily create a custom signal from an external source
*
* See docs on custom sources, and [[CustomSource.Config]]
*/
class CustomStreamSource[A] private (
makeConfig: (FireValue[A], FireError, GetStartIndex, GetIsStarted) => CustomSource.Config,
) extends EventStream[A] with CustomSource[A] {

override protected[this] val config: Config = makeConfig(_fireValue, _fireError, getStartIndex, getIsStarted)

}

object CustomStreamSource {

def apply[A](
config: (FireValue[A], FireError, GetStartIndex, GetIsStarted) => Config
): EventStream[A] = {
new CustomStreamSource[A](config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class EventBusStream[A] private[eventbus] () extends EventStream[A] with Interna
/** Made more public to allow usage from WriteBus */
override protected[eventbus] def isStarted: Boolean = super.isStarted

// @TODO document why. Basically event bus breaks the "static DAG" requirement for topo ranking
override protected[airstream] val topoRank: Int = 1

@inline private[eventbus] def addSource(sourceStream: EventStream[A]): Unit = {
Expand Down
Loading