Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions core/src/main/scala/fs2/async/immutable/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package fs2.async.immutable

import fs2.{pipe, Async, Stream}
import fs2.util.Functor

import fs2.Async
import fs2.async.immutable


/** A holder of a single value of type `A` that can be read in the effect `F`. */
trait Signal[F[_],A] {
trait Signal[F[_], A] { self =>

/**
* Returns the discrete version stream of this signal, updated only when `value`
Expand Down Expand Up @@ -45,14 +43,13 @@ trait Signal[F[_],A] {
def get: F[A]
}


object Signal {

implicit class ImmutableSignalSyntax[F[_] : Async,A] (val self: Signal[F,A]) {
implicit class ImmutableSignalSyntax[F[_] : Async, A] (val self: Signal[F, A]) {
/**
* Converts this signal to signal of `B` by applying `f`
*/
def map[B](f: A => B):Signal[F,B] = new Signal[F,B] {
def map[B](f: A => B): Signal[F,B] = new Signal[F, B] {
def continuous: Stream[F, B] = self.continuous.map(f)
def changes: Stream[F, Unit] = self.discrete.through(pipe.changes(_ == _)).map(_ => ())
def discrete: Stream[F, B] = self.discrete.map(f)
Expand Down
31 changes: 24 additions & 7 deletions core/src/main/scala/fs2/async/mutable/Queue.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package fs2.async.mutable

import fs2._

import fs2.util.Functor
import fs2.async.immutable

/**
Expand All @@ -10,7 +10,7 @@ import fs2.async.immutable
* a queue may have a bound on its size, in which case enqueuing may
* block until there is an offsetting dequeue.
*/
trait Queue[F[_],A] {
trait Queue[F[_], A] { self =>

/**
* Enqueues one element in this `Queue`.
Expand Down Expand Up @@ -40,17 +40,17 @@ trait Queue[F[_],A] {
def dequeue1: F[A]

/** Like `dequeue1` but provides a way to cancel the dequeue. */
def cancellableDequeue1: F[(F[A],F[Unit])]
def cancellableDequeue1: F[(F[A], F[Unit])]

/** Repeatedly call `dequeue1` forever. */
def dequeue: Stream[F,A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat

/**
* The time-varying size of this `Queue`. This signal refreshes
* only when size changes. Offsetting enqueues and de-queues may
* not result in refreshes.
*/
def size: immutable.Signal[F,Int]
def size: immutable.Signal[F, Int]

/** The size bound on the queue. `None` if the queue is unbounded. */
def upperBound: Option[Int]
Expand All @@ -59,13 +59,30 @@ trait Queue[F[_],A] {
* Returns the available number of entries in the queue.
* Always `Int.MaxValue` when the queue is unbounded.
*/
def available: immutable.Signal[F,Int]
def available: immutable.Signal[F, Int]

/**
* Returns `true` when the queue has reached its upper size bound.
* Always `false` when the queue is unbounded.
*/
def full: immutable.Signal[F,Boolean]
def full: immutable.Signal[F, Boolean]

/**
* Returns an alternate view of this `Queue` where its elements are of type [[B]],
* given back and forth function from `A` to `B`.
*/
def imap[B](f: A => B)(g: B => A)(implicit F: Functor[F]): Queue[F, B] =
new Queue[F, B] {
def available: immutable.Signal[F, Int] = self.available
def full: immutable.Signal[F, Boolean] = self.full
def size: immutable.Signal[F, Int] = self.size
def upperBound: Option[Int] = self.upperBound
def enqueue1(a: B): F[Unit] = self.enqueue1(g(a))
def offer1(a: B): F[Boolean] = self.offer1(g(a))
def dequeue1: F[B] = F.map(self.dequeue1)(f)
def cancellableDequeue1: F[(F[B],F[Unit])] =
F.map(self.cancellableDequeue1)(bu => F.map(bu._1)(f) -> bu._2)
}
}

object Queue {
Expand Down
22 changes: 19 additions & 3 deletions core/src/main/scala/fs2/async/mutable/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import fs2.Async.Change
import fs2._
import fs2.Stream._
import fs2.async.immutable
import fs2.util.Monad
import fs2.util.{Monad, Functor}

/**
* A signal whose value may be set asynchronously. Provides continuous
* and discrete streams for responding to changes to it's value.
*/
trait Signal[F[_],A] extends immutable.Signal[F,A] {
trait Signal[F[_], A] extends immutable.Signal[F, A] { self =>

/** Sets the value of this `Signal`. */
def set(a: A): F[Unit]
Expand All @@ -24,13 +24,29 @@ trait Signal[F[_],A] extends immutable.Signal[F,A] {
*
* `F` returns the result of applying `op` to current value.
*/
def modify(f: A => A): F[Change[A]]
def modify(f: A => A): F[Change[A]]

/**
* Asynchronously refreshes the value of the signal,
* keep the value of this `Signal` the same, but notify any listeners.
*/
def refresh: F[Unit]

/**
* Returns an alternate view of this `Signal` where its elements are of type [[B]],
* given a function from `A` to `B`.
*/
def imap[B](f: A => B)(g: B => A)(implicit F: Functor[F]): Signal[F, B] =
new Signal[F, B] {
def discrete: Stream[F, B] = self.discrete.map(f)
def continuous: Stream[F, B] = self.continuous.map(f)
def changes: Stream[F, Unit] = self.changes
def get: F[B] = F.map(self.get)(f)
def set(b: B): F[Unit] = self.set(g(b))
def refresh: F[Unit] = self.refresh
def modify(bb: B => B): F[Change[B]] =
F.map(self.modify(a => g(bb(f(a))))) { case Change(prev, now) => Change(f(prev), f(now)) }
}
}

object Signal {
Expand Down
26 changes: 20 additions & 6 deletions core/src/main/scala/fs2/async/mutable/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import fs2.Stream._
* Additionally the subscriber has possibility to terminate whenever size of enqueued elements is over certain size
* by using `subscribeSize`.
*/
trait Topic[F[_],A] {
trait Topic[F[_], A] { self =>

/**
* Published any elements from source of `A` to this topic.
* If any of the subscribers reach its `maxQueued` limit, then this will hold to publish next element
* before that subscriber consumes it's elements or terminates.
*/
def publish:Sink[F,A]
def publish: Sink[F, A]

/**
* Publish one `A` to topic.
Expand All @@ -31,7 +31,7 @@ trait Topic[F[_],A] {
* some of its elements to get room for this new. published `A`
*
*/
def publish1(a:A):F[Unit]
def publish1(a: A): F[Unit]

/**
* Subscribes to receive any published `A` to this topic.
Expand All @@ -42,7 +42,7 @@ trait Topic[F[_],A] {
* then publishers will hold into publishing to the queue.
*
*/
def subscribe(maxQueued:Int):Stream[F,A]
def subscribe(maxQueued: Int): Stream[F, A]

/**
* Subscribes to receive published `A` to this topic.
Expand All @@ -58,12 +58,26 @@ trait Topic[F[_],A] {
* then publishers will hold into publishing to the queue.
*
*/
def subscribeSize(maxQueued:Int):Stream[F,(A, Int)]
def subscribeSize(maxQueued: Int): Stream[F, (A, Int)]

/**
* Signal of current active subscribers
*/
def subscribers:fs2.async.immutable.Signal[F,Int]
def subscribers: fs2.async.immutable.Signal[F, Int]

/**
* Returns an alternate view of this `Topic` where its elements are of type [[B]],
* given back and forth function from `A` to `B`.
*/
def imap[B](f: A => B)(g: B => A): Topic[F, B] =
new Topic[F, B] {
def publish: Sink[F, B] = sfb => self.publish(sfb.map(g))
def publish1(b: B): F[Unit] = self.publish1(g(b))
def subscribe(maxQueued: Int): Stream[F, B] = self.subscribe(maxQueued).map(f)
def subscribers: fs2.async.immutable.Signal[F, Int] = self.subscribers
def subscribeSize(maxQueued: Int): Stream[F, (B, Int)] =
self.subscribeSize(maxQueued).map { case (a, i) => f(a) -> i }
}
}

object Topic {
Expand Down