diff --git a/core/src/main/scala/fs2/async/immutable/Signal.scala b/core/src/main/scala/fs2/async/immutable/Signal.scala index f426dd3f1d..7c7bf50a9c 100644 --- a/core/src/main/scala/fs2/async/immutable/Signal.scala +++ b/core/src/main/scala/fs2/async/immutable/Signal.scala @@ -2,13 +2,11 @@ package fs2.async.immutable import fs2.{pipe, Async, Stream} import fs2.util.Functor - import fs2.Async import fs2.async.immutable - /** A holder of a single value of type `A` that can be read in the effect `F`. */ -trait Signal[F[_],A] { +trait Signal[F[_], A] { self => /** * Returns the discrete version stream of this signal, updated only when `value` @@ -45,14 +43,13 @@ trait Signal[F[_],A] { def get: F[A] } - object Signal { - implicit class ImmutableSignalSyntax[F[_] : Async,A] (val self: Signal[F,A]) { + implicit class ImmutableSignalSyntax[F[_] : Async, A] (val self: Signal[F, A]) { /** * Converts this signal to signal of `B` by applying `f` */ - def map[B](f: A => B):Signal[F,B] = new Signal[F,B] { + def map[B](f: A => B): Signal[F,B] = new Signal[F, B] { def continuous: Stream[F, B] = self.continuous.map(f) def changes: Stream[F, Unit] = self.discrete.through(pipe.changes(_ == _)).map(_ => ()) def discrete: Stream[F, B] = self.discrete.map(f) diff --git a/core/src/main/scala/fs2/async/mutable/Queue.scala b/core/src/main/scala/fs2/async/mutable/Queue.scala index 354ac88c33..fad4f3d634 100644 --- a/core/src/main/scala/fs2/async/mutable/Queue.scala +++ b/core/src/main/scala/fs2/async/mutable/Queue.scala @@ -1,7 +1,7 @@ package fs2.async.mutable import fs2._ - +import fs2.util.Functor import fs2.async.immutable /** @@ -10,7 +10,7 @@ import fs2.async.immutable * a queue may have a bound on its size, in which case enqueuing may * block until there is an offsetting dequeue. */ -trait Queue[F[_],A] { +trait Queue[F[_], A] { self => /** * Enqueues one element in this `Queue`. @@ -40,17 +40,17 @@ trait Queue[F[_],A] { def dequeue1: F[A] /** Like `dequeue1` but provides a way to cancel the dequeue. */ - def cancellableDequeue1: F[(F[A],F[Unit])] + def cancellableDequeue1: F[(F[A], F[Unit])] /** Repeatedly call `dequeue1` forever. */ - def dequeue: Stream[F,A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat + def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat /** * The time-varying size of this `Queue`. This signal refreshes * only when size changes. Offsetting enqueues and de-queues may * not result in refreshes. */ - def size: immutable.Signal[F,Int] + def size: immutable.Signal[F, Int] /** The size bound on the queue. `None` if the queue is unbounded. */ def upperBound: Option[Int] @@ -59,13 +59,30 @@ trait Queue[F[_],A] { * Returns the available number of entries in the queue. * Always `Int.MaxValue` when the queue is unbounded. */ - def available: immutable.Signal[F,Int] + def available: immutable.Signal[F, Int] /** * Returns `true` when the queue has reached its upper size bound. * Always `false` when the queue is unbounded. */ - def full: immutable.Signal[F,Boolean] + def full: immutable.Signal[F, Boolean] + + /** + * Returns an alternate view of this `Queue` where its elements are of type [[B]], + * given back and forth function from `A` to `B`. + */ + def imap[B](f: A => B)(g: B => A)(implicit F: Functor[F]): Queue[F, B] = + new Queue[F, B] { + def available: immutable.Signal[F, Int] = self.available + def full: immutable.Signal[F, Boolean] = self.full + def size: immutable.Signal[F, Int] = self.size + def upperBound: Option[Int] = self.upperBound + def enqueue1(a: B): F[Unit] = self.enqueue1(g(a)) + def offer1(a: B): F[Boolean] = self.offer1(g(a)) + def dequeue1: F[B] = F.map(self.dequeue1)(f) + def cancellableDequeue1: F[(F[B],F[Unit])] = + F.map(self.cancellableDequeue1)(bu => F.map(bu._1)(f) -> bu._2) + } } object Queue { diff --git a/core/src/main/scala/fs2/async/mutable/Signal.scala b/core/src/main/scala/fs2/async/mutable/Signal.scala index 5099dc960b..8ab0ee4ee1 100644 --- a/core/src/main/scala/fs2/async/mutable/Signal.scala +++ b/core/src/main/scala/fs2/async/mutable/Signal.scala @@ -6,13 +6,13 @@ import fs2.Async.Change import fs2._ import fs2.Stream._ import fs2.async.immutable -import fs2.util.Monad +import fs2.util.{Monad, Functor} /** * A signal whose value may be set asynchronously. Provides continuous * and discrete streams for responding to changes to it's value. */ -trait Signal[F[_],A] extends immutable.Signal[F,A] { +trait Signal[F[_], A] extends immutable.Signal[F, A] { self => /** Sets the value of this `Signal`. */ def set(a: A): F[Unit] @@ -24,13 +24,29 @@ trait Signal[F[_],A] extends immutable.Signal[F,A] { * * `F` returns the result of applying `op` to current value. */ - def modify(f: A => A): F[Change[A]] + def modify(f: A => A): F[Change[A]] /** * Asynchronously refreshes the value of the signal, * keep the value of this `Signal` the same, but notify any listeners. */ def refresh: F[Unit] + + /** + * Returns an alternate view of this `Signal` where its elements are of type [[B]], + * given a function from `A` to `B`. + */ + def imap[B](f: A => B)(g: B => A)(implicit F: Functor[F]): Signal[F, B] = + new Signal[F, B] { + def discrete: Stream[F, B] = self.discrete.map(f) + def continuous: Stream[F, B] = self.continuous.map(f) + def changes: Stream[F, Unit] = self.changes + def get: F[B] = F.map(self.get)(f) + def set(b: B): F[Unit] = self.set(g(b)) + def refresh: F[Unit] = self.refresh + def modify(bb: B => B): F[Change[B]] = + F.map(self.modify(a => g(bb(f(a))))) { case Change(prev, now) => Change(f(prev), f(now)) } + } } object Signal { diff --git a/core/src/main/scala/fs2/async/mutable/Topic.scala b/core/src/main/scala/fs2/async/mutable/Topic.scala index 1bfe8852ac..ca175a6a3f 100644 --- a/core/src/main/scala/fs2/async/mutable/Topic.scala +++ b/core/src/main/scala/fs2/async/mutable/Topic.scala @@ -14,14 +14,14 @@ import fs2.Stream._ * Additionally the subscriber has possibility to terminate whenever size of enqueued elements is over certain size * by using `subscribeSize`. */ -trait Topic[F[_],A] { +trait Topic[F[_], A] { self => /** * Published any elements from source of `A` to this topic. * If any of the subscribers reach its `maxQueued` limit, then this will hold to publish next element * before that subscriber consumes it's elements or terminates. */ - def publish:Sink[F,A] + def publish: Sink[F, A] /** * Publish one `A` to topic. @@ -31,7 +31,7 @@ trait Topic[F[_],A] { * some of its elements to get room for this new. published `A` * */ - def publish1(a:A):F[Unit] + def publish1(a: A): F[Unit] /** * Subscribes to receive any published `A` to this topic. @@ -42,7 +42,7 @@ trait Topic[F[_],A] { * then publishers will hold into publishing to the queue. * */ - def subscribe(maxQueued:Int):Stream[F,A] + def subscribe(maxQueued: Int): Stream[F, A] /** * Subscribes to receive published `A` to this topic. @@ -58,12 +58,26 @@ trait Topic[F[_],A] { * then publishers will hold into publishing to the queue. * */ - def subscribeSize(maxQueued:Int):Stream[F,(A, Int)] + def subscribeSize(maxQueued: Int): Stream[F, (A, Int)] /** * Signal of current active subscribers */ - def subscribers:fs2.async.immutable.Signal[F,Int] + def subscribers: fs2.async.immutable.Signal[F, Int] + + /** + * Returns an alternate view of this `Topic` where its elements are of type [[B]], + * given back and forth function from `A` to `B`. + */ + def imap[B](f: A => B)(g: B => A): Topic[F, B] = + new Topic[F, B] { + def publish: Sink[F, B] = sfb => self.publish(sfb.map(g)) + def publish1(b: B): F[Unit] = self.publish1(g(b)) + def subscribe(maxQueued: Int): Stream[F, B] = self.subscribe(maxQueued).map(f) + def subscribers: fs2.async.immutable.Signal[F, Int] = self.subscribers + def subscribeSize(maxQueued: Int): Stream[F, (B, Int)] = + self.subscribeSize(maxQueued).map { case (a, i) => f(a) -> i } + } } object Topic {