Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTP/websocket event sources #49

Open
ajaychandran opened this issue Dec 25, 2020 · 21 comments
Open

Add HTTP/websocket event sources #49

ajaychandran opened this issue Dec 25, 2020 · 21 comments
Labels
need to find time https://falseknees.com/297.html new feature / operator

Comments

@ajaychandran
Copy link
Contributor

I am currently using these as HTTP event sources.
Would it make sense to add these to the library?

@ajaychandran ajaychandran changed the title HTTP event sources Add HTTP event sources Dec 25, 2020
@lolgab
Copy link

lolgab commented Dec 25, 2020

You can wrap Ajax requests this way:

import org.scalajs.dom.ext.Ajax
EventStream.fromFuture(Ajax.get(url))

@ajaychandran
Copy link
Contributor Author

ajaychandran commented Dec 25, 2020

@lolgab
Yes, but the Future is evaluated eagerly.
In contrast, AjaxEventStream submits the request in onStart, thus delaying effects (network request) until observers are registered.

@lolgab
Copy link

lolgab commented Dec 25, 2020

Then maybe there should be something to integrate things lazily but on a more general level.
Integrating Ajax directly in Airstream sounds too specific to me.

@ajaychandran ajaychandran changed the title Add HTTP event sources Add HTTP/websocket event sources Dec 25, 2020
@raquo
Copy link
Owner

raquo commented Dec 26, 2020

I think having a reference implementation of doing basic stuff like Ajax would be useful. There are three potential places for it:

  • In Airstream itself
  • In a third party library that depends on Airstream
  • As a live example on laminar.dev if publishing a library is too much overhead

Stuff that we add to Airstream should be pretty unopinionated. For example, I've been wanting to move DomEventStream from Laminar to Airstream for a while now. Given that the implementation of AjaxEventStream is pretty bare bones, I think it would be nice to include it in Airstream itself.

A few notes though:

  • it should go into a new directory, airstream/web, since it's web-specific
  • You should use AjaxException class from org.scalajs.dom.ext.Ajax , Airstream already depends on scala-js-dom
  • Inside onreadystatechange you should only do stuff if (isStarted). That way it will behave more predictably if you start then stop then start your stream again before the first response comes back.
  • I'd add the methods for put / patch / delete too for the sake of completeness.

there should be something to integrate things lazily but on a more general level

Currently one can achieve this with something like:

def fromLazyFuture[A](future: => Future[A], emitOnce: Boolean): EventStream[A] = {
  EventStream
    .fromValue((), emitOnce =emitOnce)
    .flatMap(_ => future)
}

I guess I could add this EventStream and a similar one to Signal.

@raquo
Copy link
Owner

raquo commented Dec 26, 2020

Oh and, I didn't have the time to evaluate the websockets helper yet. I think websockets might require a more opinionated API, but I'll check it out a bit later.

@raquo raquo added the enhancement New feature or request label Dec 26, 2020
@ajaychandran
Copy link
Contributor Author

ajaychandran commented Dec 26, 2020

Thanks for your suggestions. I will incorporate them and submit a PR.

Regarding websockets,

  • I would like to have single implementation that is bi-directional (and can be used for the uni-directional case). I will work on this in a separate PR.
  • It would be nice to have a retry/restart-on-error mechanism based on some strategy, similar to ZIO Schedule. Have you considered this before?

@raquo
Copy link
Owner

raquo commented Dec 26, 2020

Ok wrt websockets! Try to keep it unopinionated so that users can build their custom code based on it, like you did with ajax.

regarding "restart on error" – I'm guessing recoverToTry followed by flatMap can achieve this, but it could be combined into a single operator like recoverTo(err => observable) or recoverTo(observable). The latter one can be made to behave better wrt glitches. Not 100% sure about the other one, that might require a transaction boundary similar to the flatMap solution.

regarding retrys – what kind of operators / signatures are you thinking about, specifically? Kinda hard to translate ZIO stuff directly as it has many more concepts than Airstream. We have EventStream.periodic which is flexible enough to be made exponential or fibonacci, and that can probably be combined into a retry pattern again with flatMap. If there is a canonical use case for retrys in Airstream, we could include it as a standalone operator.

@ajaychandran
Copy link
Contributor Author

The usecase I had in mind is to restart a websocket stream when the underlying connection stops, for instance, when the user loses connectivity. When connectivity is resumed, it may be desirable to establish a new websocket connection automatically.

One way to do this is to use a control parameter like maxReconnectAttempts in the WebSocketEventStream constructor.
I was wondering if this could be generalized to a stream combinator.

@raquo
Copy link
Owner

raquo commented Dec 26, 2020

Yes, sorry, I worded it poorly, I understand the use cases for retrys, but I was also wondering the same thing, whether you had a specific method signature and behaviour in mind that would be general enough to include as an operator for all streams. I'm thinking it could be a certain variation of recoverTo.

@ajaychandran
Copy link
Contributor Author

How about restartOn(control: EventStream[Any])?
This is flexible enough to accommodate a wide variety of usecases (restart max n times, restart after d duration, restart max n times + after d duration).
Can this be implemented by calling onStop followed by onStart?

@raquo
Copy link
Owner

raquo commented Dec 27, 2020

But, this would only work for streams that perform a side effect (that we want to re-run) when they start. Most streams don't do that.

Even if you're listening to a stream that depends on one of those streams that perform a side effect on start, and you stop & start that child stream, that is no guarantee that the parent stream that actually performs the side effect will be restarted (that will only happen if the child stream was its only listener).

I think the problem is, streams aren't effects. You can retry an effect like IO or Task or even Future, but retrying a stream doesn't seem to make sense in general. Or, to the extent that it does, the retry logic needs to be contained within the stream that actually performs the side effect. So if there is any reusability to be gained there, it does not seem like it will be in the form of a general purpose operator.

@ajaychandran
Copy link
Contributor Author

ajaychandran commented Dec 27, 2020

The initial draft for the websocket implementation is available in this branch..

  • Uni-directional and bi-directional usecases are supported with the same class.

Need some input on the following:

  • The type of dom.MessageEvent.data is Any. This would require a cast at call site, event.data.asInstanceOf[String]. Is this fine?
  • When transmitting messages, if it is detected that the underlying socket was closed, should this be reported downstream as an error? If so, then what type of error?
  • What to do with the errors on the transmission channel? Right now they are ignored.

I will test this implementation in my application and explore the restart-on-error usecase.

@ajaychandran
Copy link
Contributor Author

The above issues have been resolved for now.

  • The type of dom.MessageEvent.data is Any. This would require a cast at call site, event.data.asInstanceOf[String]. Is this fine?

Added project parameter and Builder type to handle this.

  • When transmitting messages, if it is detected that the underlying socket was closed, should this be reported downstream as an error? If so, then what type of error?

Redefined error type to support transmission and termination errors.

  • What to do with the errors on the transmission channel?

These are not propagated.

TODO
Test implementation and open PR.

@raquo
Copy link
Owner

raquo commented Dec 30, 2020

As I'm adding more params to AjaxStreamRequest, I'm getting more and more annoyed by having to duplicate all them in the six factory methods (apply / get / post / etc.). How about we just have one apply factory that covers all http methods?

// Usage
AjaxEventStream(_.GET, url, ...)

where the first param would be HttpMethod.type => HttpMethod and

@js.native
trait HttpMethod extends js.Any

object HttpMethod {
  val GET = "GET".asInstanceOf[HttpMethod]
  val POST = "POST".asInstanceOf[HttpMethod]
  val PUT = "PUT".asInstanceOf[HttpMethod]
  val PATCH = "PATCH".asInstanceOf[HttpMethod]
  val DELETE = "DELETE".asInstanceOf[HttpMethod]
}

@ajaychandran
Copy link
Contributor Author

How about something similar to this pattern?

object AjaxEventStream {

  def apply(
    url: String,
    data: dom.ext.Ajax.InputData = null,
    timeout: Int = 0,
    headers: Map[String, String] = Map.empty,
    withCredentials: Boolean = false,
    responseType: String = "",
    progressObserver: Observer[(dom.XMLHttpRequest, dom.ProgressEvent)] = Observer.empty,
    readyStateChangeObserver: Observer[dom.XMLHttpRequest] = Observer.empty
  ) = new Builder(url, data, timeout, headers,  withCredentials, responseType, progressObserver, readyStateChangeObserver)

  final class Builder(
    url: String,
    data: dom.ext.Ajax.InputData,
    timeout: Int,
    headers: Map[String, String],
    withCredentials: Boolean,
    responseType: String,
    progressObserver: Observer[(dom.XMLHttpRequest, dom.ProgressEvent)],
    readyStateChangeObserver: Observer[dom.XMLHttpRequest]
  ) {

    def get: EventStream[dom.XMLHttpRequest] = ???
    def post: EventStream[dom.XMLHttpRequest] = ???
    // and so on...
  }
}

@raquo
Copy link
Owner

raquo commented Dec 30, 2020

What would be in those ??? then? I assume it's new AjaxEventStream(...) with all the same params again for each method. So it only saves ~50% of the boilerplate.

@raquo
Copy link
Owner

raquo commented Dec 30, 2020

I guess AjaxEventStream could accept builder as a single param, but I don't like that kind of coupling. Having a lambda for one param in one factory, that doesn't leak into AjaxEventStream itself, would be simpler imo.

(This is as concerning Ajax, I haven't had a change to review Websockets stuff yet. I should probably look at that first so that the API ends up more or less consistent if that makes sense)

@ajaychandran
Copy link
Contributor Author

What would be in those ??? then? I assume it's new AjaxEventStream(...) with all the same params again for each method.

Correct. I prefer the usage pattern it promotes.

AjaxEventStream(url).get
// versus
AjaxEventStream(_.GET, url, ...))

So it only saves ~50% of the boilerplate.

An alternate encoding could be

object AjaxEventStream {
  sealed abstract class method(name: String) {
    final def apply(
      url: String,
      data: dom.ext.Ajax.InputData = null,
      timeout: Int = 0,
      headers: Map[String, String] = Map.empty,
      withCredentials: Boolean = false,
      responseType: String = "",
      progressObserver: Observer[(dom.XMLHttpRequest, dom.ProgressEvent)] = Observer.empty,
      readyStateChangeObserver: Observer[dom.XMLHttpRequest] = Observer.empty): EventStream[dom.XMLHttpRequest] =
    new AjaxEventStream(name, url, data, timeout, headers, withCredentials, responseType, progressObserver, readyStateChangeObserver)
  }

  final object get extends method("GET")
  final object post extends method("POST")
  // and so on
}

// usage
AjaxEventStream.get(???)

@raquo
Copy link
Owner

raquo commented Dec 30, 2020

Hm that last one is pretty neat. If it works ok with editor autocomplete (I'll check intellij), that's a winner.

Side note, final would be redundant in this case even on Scala 2.12 I think.

@ajaychandran
Copy link
Contributor Author

Side note, final would be redundant in this case even on Scala 2.12 I think.

Thanks. I had extended this best-practice to non-case objects and followed it blindly. Time to self correct!

@raquo raquo added new feature / operator need to find time https://falseknees.com/297.html and removed enhancement New feature or request labels May 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
need to find time https://falseknees.com/297.html new feature / operator
Projects
None yet
Development

No branches or pull requests

4 participants
@raquo @ajaychandran @lolgab and others