Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase purity by introducing F[_] to Sink, Source and Cancelable #126

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion colibri/src/main/scala/colibri/CanCancel.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package colibri

import cats.effect.Sync

trait CanCancel[-T] {
def cancel(cancelable: T): Unit
def cancel[F[_] : Sync](cancelable: T): F[Unit]
}
object CanCancel {
@inline def apply[T](implicit cancel: CanCancel[T]): CanCancel[T] = cancel
Expand Down
114 changes: 57 additions & 57 deletions colibri/src/main/scala/colibri/Cancelable.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package colibri

import cats.Monoid
import cats.effect.{Sync, SyncEffect}
import cats.implicits._
import cats.syntax.all._
import colibri.effect.RunSyncEffect

import scala.scalajs.js

trait Cancelable {
def cancel(): Unit
def cancel[F[_] : Sync](): F[Unit]
}
object Cancelable {

Expand All @@ -16,51 +19,48 @@ object Cancelable {
}

implicit object cancelCancelable extends CanCancel[Cancelable] {
@inline def cancel(subscription: Cancelable): Unit = subscription.cancel()
@inline def cancel[F[_] : Sync](subscription: Cancelable): F[Unit] = subscription.cancel()
}

class Builder extends Cancelable {
private var buffer = new js.Array[Cancelable]()

def +=(subscription: Cancelable): Unit =
def +=[F[_] : Sync : RunSyncEffect](subscription: Cancelable): Unit =
if (buffer == null) {
subscription.cancel()
RunSyncEffect[F].unsafeRun(subscription.cancel())
} else {
buffer.push(subscription)
()
}

def cancel(): Unit =
if (buffer != null) {
buffer.foreach(_.cancel())
buffer = null
}
def cancel[F[_] : Sync](): F[Unit] = (
buffer.toList.traverse_(_.cancel()) *>
{ buffer = null }.pure[F]
).whenA(buffer != null)
}

class Variable extends Cancelable {
private var current: Cancelable = Cancelable.empty

def update(subscription: Cancelable): Unit =
def update[F[_] : Sync : RunSyncEffect](subscription: Cancelable): Unit =
if (current == null) {
subscription.cancel()
RunSyncEffect[F].unsafeRun(subscription.cancel())
} else {
current.cancel()
RunSyncEffect[F].unsafeRun(current.cancel())
current = subscription
}

def cancel(): Unit =
if (current != null) {
current.cancel()
current = null
}
def cancel[F[_] : Sync](): F[Unit] =
current.cancel() *>
{ current = null }.pure[F]
}

class Consecutive extends Cancelable {
private var latest: Cancelable = null
private var subscriptions: js.Array[() => Cancelable] = new js.Array[() => Cancelable]

def switch(): Unit = if (latest != null) {
latest.cancel()
def switch[F[_] : Sync : RunSyncEffect](): Unit = if (latest != null) {
RunSyncEffect[F].unsafeRun(latest.cancel())
latest = null
if (subscriptions != null && subscriptions.nonEmpty) {
val nextCancelable = subscriptions(0)
Expand All @@ -72,7 +72,7 @@ object Cancelable {
}
}

def +=(subscription: () => Cancelable): Unit = if (subscriptions != null) {
def +=[F[_] : Sync : RunSyncEffect](subscription: () => Cancelable): Unit = if (subscriptions != null) {
if (latest == null) {
val variable = Cancelable.variable()
latest = variable
Expand All @@ -83,86 +83,86 @@ object Cancelable {
}
}

def cancel(): Unit = if (subscriptions != null) {
subscriptions = null
if (latest != null) {
latest.cancel()
latest = null
}
}
def cancel[F[_] : Sync](): F[Unit] = (
{ subscriptions = null }.pure[F] *>
(
latest.cancel() *>
{ latest = null }.pure[F]
).whenA(latest != null)
).whenA(subscriptions != null)
}

class SingleOrDrop extends Cancelable {
private var latest: Cancelable = null
private var isCancel = false

def done(): Unit = if (latest != null) {
latest.cancel()
def done[F[_] : Sync : RunSyncEffect](): Unit = if (latest != null) {
RunSyncEffect[F].unsafeRun(latest.cancel())
latest = null
}

def update(subscription: () => Cancelable): Unit = if (latest == null) {
def update[F[_] : Sync : RunSyncEffect](subscription: () => Cancelable): Unit = if (latest == null) {
val variable = Cancelable.variable()
latest = variable
variable() = subscription()
}

def cancel(): Unit = if (!isCancel) {
isCancel = true
if (latest != null) {
latest.cancel()
latest = null
}
}
def cancel[F[_] : Sync](): F[Unit] = (
{ isCancel = true }.pure[F] *>
(
latest.cancel() *>
{ latest = null }.pure[F]
).whenA(latest != null)
).whenA(!isCancel)
}

class RefCount(subscription: () => Cancelable) extends Cancelable {
private var counter = 0
private var currentCancelable: Cancelable = null

def ref(): Cancelable = if (counter == -1) Cancelable.empty else {
def ref[F[_] : Sync : SyncEffect : RunSyncEffect](): Cancelable = if (counter == -1) Cancelable.empty else {
counter += 1
if (counter == 1) {
currentCancelable = subscription()
}

Cancelable({ () =>
Cancelable({ () => Sync[F].delay {
counter -= 1
if (counter == 0) {
currentCancelable.cancel()
RunSyncEffect[F].unsafeRun(currentCancelable.cancel())
currentCancelable = null
}
})
}})
}

def cancel(): Unit = {
counter = -1
if (currentCancelable != null) {
currentCancelable.cancel()
currentCancelable = null
}
}
def cancel[F[_] : Sync](): F[Unit] =
{ counter = -1 }.pure[F] *>
(
currentCancelable.cancel() *>
{ currentCancelable = null }.pure[F]
).whenA(currentCancelable != null)
}

object Empty extends Cancelable {
@inline def cancel(): Unit = ()
def cancel[F[_] : Sync](): F[Unit] = Sync[F].delay(())
}

@inline def empty = Empty
@inline def empty: Cancelable = Empty

@inline def apply(f: () => Unit): Cancelable = new Cancelable {
@inline def apply[F[_] : SyncEffect](f: () => F[Unit]): Cancelable = new Cancelable {
private var isCanceled = false
@inline def cancel() = if (!isCanceled) {
isCanceled = true
f()
}
@inline def cancel[FF[_] : Sync](): FF[Unit] = (
{ isCanceled = true }.pure[FF] *>
SyncEffect[F].runSync[FF, Unit](f())
).whenA(!isCanceled)
}

@inline def lift[T : CanCancel](subscription: T) = apply(() => CanCancel[T].cancel(subscription))
@inline def lift[F[_] : Sync : SyncEffect : RunSyncEffect, T : CanCancel](subscription: T) = apply(() => CanCancel[T].cancel(subscription))

@inline def composite(subscriptions: Cancelable*): Cancelable = compositeFromIterable(subscriptions)
@inline def compositeFromIterable(subscriptions: Iterable[Cancelable]): Cancelable = new Cancelable {
def cancel() = subscriptions.foreach(_.cancel())
def cancel[F[_] : Sync](): F[Unit] =
subscriptions.toList.traverse_(cancelable => cancelable.cancel())
}

@inline def builder(): Builder = new Builder
Expand Down
83 changes: 39 additions & 44 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package colibri

import cats.effect.concurrent.Ref
import colibri.effect._

import cats.{ MonoidK, Applicative, FunctorFilter, Eq, Semigroupal }
import cats.effect.{ Effect, IO }
import cats.syntax.all._
import cats.implicits._
import cats.{Applicative, Eq, FunctorFilter, MonoidK, Semigroupal}
import cats.effect.{Effect, IO, LiftIO, Sync, SyncEffect}

import scala.scalajs.js
import org.scalajs.dom

import scala.util.control.NonFatal
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

trait Observable[+A] {
//TODO: def subscribe[G[_]: Sink, F[_] : Sync](sink: G[_ >: A]): F[Cancelable]
def subscribe[G[_] : Sink](sink: G[_ >: A]): Cancelable
def subscribe[F[_] : Sync, G[_] : Sink](sink: G[_ >: A]): F[Cancelable]
}
object Observable {

implicit object source extends Source[Observable] {
@inline def subscribe[G[_]: Sink, A](source: Observable[A])(sink: G[_ >: A]): Cancelable = source.subscribe(sink)
@inline def subscribe[F[_] : Sync, G[_] : Sink, A](source: Observable[A])(sink: G[_ >: A]): F[Cancelable] = source.subscribe(sink)
}

implicit object liftSource extends LiftSource[Observable] {
Expand Down Expand Up @@ -74,79 +75,72 @@ object Observable {
type HotMaybeValue[+A] = MaybeValue[A] with Cancelable

final class Synchronous[+A] private[colibri](source: Observable[A]) extends Observable[A] {
def subscribe[G[_]: Sink](sink: G[_ >: A]): Cancelable = source.subscribe(sink)
def subscribe[F[_] : Sync, G[_]: Sink](sink: G[_ >: A]): F[Cancelable] = source.subscribe(sink)
}

object Empty extends Observable[Nothing] {
@inline def subscribe[G[_]: Sink](sink: G[_ >: Nothing]): Cancelable = Cancelable.empty
@inline def subscribe[F[_] : Sync, G[_]: Sink](sink: G[_ >: Nothing]): F[Cancelable] = Sync[F].delay(Cancelable.empty)
}

@inline def empty = Empty

def apply[T](value: T): Observable[T] = new Observable[T] {
def subscribe[G[_]: Sink](sink: G[_ >: T]): Cancelable = {
Sink[G].onNext(sink)(value)
Cancelable.empty
def subscribe[F[_] : Sync, G[_]: Sink](sink: G[_ >: T]): F[Cancelable] = {
Sink[G].onNext(sink)(value) *>
Sync[F].delay(Cancelable.empty)
}
}

def fromIterable[T](values: Iterable[T]): Observable[T] = new Observable[T] {
def subscribe[G[_]: Sink](sink: G[_ >: T]): Cancelable = {
values.foreach(Sink[G].onNext(sink))
Cancelable.empty
def subscribe[F[_] : Sync, G[_]: Sink](sink: G[_ >: T]): F[Cancelable] = {
values.toList.traverse_(Sink[G].onNext(sink)) *>
Sync[F].delay(Cancelable.empty)
}
}

def lift[H[_] : Source, A](source: H[A]): Observable[A] = source match {
case source: Observable[A@unchecked] => source
case _ => new Observable[A] {
def subscribe[G[_]: Sink](sink: G[_ >: A]): Cancelable = Source[H].subscribe(source)(sink)
def subscribe[F[_] : Sync, G[_]: Sink](sink: G[_ >: A]): F[Cancelable] = Source[H].subscribe(source)(sink)
}
}

@inline def create[A](produce: Observer[A] => Cancelable): Observable[A] = createLift[Observer, A](produce)

def createLift[F[_]: LiftSink, A](produce: F[_ >: A] => Cancelable): Observable[A] = new Observable[A] {
def subscribe[G[_]: Sink](sink: G[_ >: A]): Cancelable = produce(LiftSink[F].lift(sink))
def createLift[G[_]: LiftSink, A](produce: G[_ >: A] => Cancelable): Observable[A] = new Observable[A] {
def subscribe[F[_] : Sync, GG[_]: Sink](sink: GG[_ >: A]): F[Cancelable] = produce(LiftSink[G].lift(sink)).pure[F]
}

def fromEither[A](value: Either[Throwable, A]): Observable[A] = new Observable[A] {
def subscribe[G[_]: Sink](sink: G[_ >: A]): Cancelable = {
value match {
def subscribe[F[_] : Sync, G[_]: Sink](sink: G[_ >: A]): F[Cancelable] = (value match {
case Right(a) => Sink[G].onNext(sink)(a)
case Left(error) => Sink[G].onError(sink)(error)
}
Cancelable.empty
}
}) *> Sync[F].delay(Cancelable.empty)
}

def fromSync[F[_]: RunSyncEffect, A](effect: F[A]): Observable[A] = new Observable[A] {
def subscribe[G[_]: Sink](sink: G[_ >: A]): Cancelable = {
recovered(Sink[G].onNext(sink)(RunSyncEffect[F].unsafeRun(effect)), Sink[G].onError(sink)(_))
Cancelable.empty
}
def fromSync[F[_] : SyncEffect, A](effect: F[A]): Observable[A] = new Observable[A] {
def subscribe[FF[_] : Sync, G[_] : Sink](sink: G[_ >: A]): FF[Cancelable] = recoveredF(
SyncEffect[F].runSync[FF, A](effect) >>= Sink[G].onNext[FF, A](sink),
Sink[G].onError[FF, A](sink)(_)
) *> Cancelable.empty.pure[FF]
}

def fromAsync[F[_]: Effect, A](effect: F[A]): Observable[A] = new Observable[A] {
def subscribe[G[_]: Sink](sink: G[_ >: A]): Cancelable = {
//TODO: proper cancel effects?
var isCancel = false

Effect[F].runAsync(effect)(either => IO {
if (!isCancel) either match {
case Right(value) => Sink[G].onNext(sink)(value)
case Left(error) => Sink[G].onError(sink)(error)
}
}).unsafeRunSync()

Cancelable(() => isCancel = true)
}
def fromAsync[F[_] : Effect, FF[_] : SyncEffect : LiftIO, A](effect: F[A]): Observable[A] = new Observable[A] {
def subscribe[FFF[_] : Sync, G[_] : Sink](sink: G[_ >: A]): FFF[Cancelable] = for {
isCancelled <- Ref[FFF].of(false)
converted = Effect[F].runAsync(effect)(either => (either match {
case Right(value) => Sink[G].onNext(sink)(value)
case Left(error) => Sink[G].onError(sink)(error)
})).to[FF]
_ <- SyncEffect[FF].runSync[FFF, Unit](converted)
cancelable = Cancelable.apply(() => isCancelled.set(true))
} yield ???
}

def fromFuture[A](future: Future[A])(implicit ec: ExecutionContext): Observable[A] = fromAsync(IO.fromFuture(IO.pure(future))(IO.contextShift(ec)))

def ofEvent[EV <: dom.Event](target: dom.EventTarget, eventType: String): Synchronous[EV] = new Synchronous(new Observable[EV] {
def subscribe[G[_] : Sink](sink: G[_ >: EV]): Cancelable = {
def subscribe[F[_] : Sync, G[_] : Sink](sink: G[_ >: EV]): F[Cancelable] = {
var isCancel = false

val eventHandler: js.Function1[EV, Unit] = { v =>
Expand All @@ -163,7 +157,7 @@ object Observable {

register()

Cancelable(() => unregister())
Cancelable(() => Sync[F].delay(unregister())).pure[F]
}
})

Expand Down Expand Up @@ -1063,4 +1057,5 @@ object Observable {
}

private def recovered[T](action: => Unit, onError: Throwable => Unit) = try action catch { case NonFatal(t) => onError(t) }
private def recoveredF[F[_] : Sync](action: => F[Unit], onError: Throwable => F[Unit]): F[Unit] = try action catch { case NonFatal(t) => onError(t) }
}
Loading