Skip to content

Commit 226c3c2

Browse files
authored
Merge pull request #665 from OlivierBlanvillain/imap
Add Queue.imap, Topic.imap and Signal.map
2 parents bdd2d11 + 2ec3ade commit 226c3c2

File tree

4 files changed

+66
-22
lines changed

4 files changed

+66
-22
lines changed

core/src/main/scala/fs2/async/immutable/Signal.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ package fs2.async.immutable
22

33
import fs2.{pipe, Async, Stream}
44
import fs2.util.Functor
5-
65
import fs2.Async
76
import fs2.async.immutable
87

9-
108
/** A holder of a single value of type `A` that can be read in the effect `F`. */
11-
trait Signal[F[_],A] {
9+
trait Signal[F[_], A] { self =>
1210

1311
/**
1412
* Returns the discrete version stream of this signal, updated only when `value`
@@ -45,14 +43,13 @@ trait Signal[F[_],A] {
4543
def get: F[A]
4644
}
4745

48-
4946
object Signal {
5047

51-
implicit class ImmutableSignalSyntax[F[_] : Async,A] (val self: Signal[F,A]) {
48+
implicit class ImmutableSignalSyntax[F[_] : Async, A] (val self: Signal[F, A]) {
5249
/**
5350
* Converts this signal to signal of `B` by applying `f`
5451
*/
55-
def map[B](f: A => B):Signal[F,B] = new Signal[F,B] {
52+
def map[B](f: A => B): Signal[F,B] = new Signal[F, B] {
5653
def continuous: Stream[F, B] = self.continuous.map(f)
5754
def changes: Stream[F, Unit] = self.discrete.through(pipe.changes(_ == _)).map(_ => ())
5855
def discrete: Stream[F, B] = self.discrete.map(f)

core/src/main/scala/fs2/async/mutable/Queue.scala

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package fs2.async.mutable
22

33
import fs2._
4-
4+
import fs2.util.Functor
55
import fs2.async.immutable
66

77
/**
@@ -10,7 +10,7 @@ import fs2.async.immutable
1010
* a queue may have a bound on its size, in which case enqueuing may
1111
* block until there is an offsetting dequeue.
1212
*/
13-
trait Queue[F[_],A] {
13+
trait Queue[F[_], A] { self =>
1414

1515
/**
1616
* Enqueues one element in this `Queue`.
@@ -40,17 +40,17 @@ trait Queue[F[_],A] {
4040
def dequeue1: F[A]
4141

4242
/** Like `dequeue1` but provides a way to cancel the dequeue. */
43-
def cancellableDequeue1: F[(F[A],F[Unit])]
43+
def cancellableDequeue1: F[(F[A], F[Unit])]
4444

4545
/** Repeatedly call `dequeue1` forever. */
46-
def dequeue: Stream[F,A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
46+
def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
4747

4848
/**
4949
* The time-varying size of this `Queue`. This signal refreshes
5050
* only when size changes. Offsetting enqueues and de-queues may
5151
* not result in refreshes.
5252
*/
53-
def size: immutable.Signal[F,Int]
53+
def size: immutable.Signal[F, Int]
5454

5555
/** The size bound on the queue. `None` if the queue is unbounded. */
5656
def upperBound: Option[Int]
@@ -59,13 +59,30 @@ trait Queue[F[_],A] {
5959
* Returns the available number of entries in the queue.
6060
* Always `Int.MaxValue` when the queue is unbounded.
6161
*/
62-
def available: immutable.Signal[F,Int]
62+
def available: immutable.Signal[F, Int]
6363

6464
/**
6565
* Returns `true` when the queue has reached its upper size bound.
6666
* Always `false` when the queue is unbounded.
6767
*/
68-
def full: immutable.Signal[F,Boolean]
68+
def full: immutable.Signal[F, Boolean]
69+
70+
/**
71+
* Returns an alternate view of this `Queue` where its elements are of type [[B]],
72+
* given back and forth function from `A` to `B`.
73+
*/
74+
def imap[B](f: A => B)(g: B => A)(implicit F: Functor[F]): Queue[F, B] =
75+
new Queue[F, B] {
76+
def available: immutable.Signal[F, Int] = self.available
77+
def full: immutable.Signal[F, Boolean] = self.full
78+
def size: immutable.Signal[F, Int] = self.size
79+
def upperBound: Option[Int] = self.upperBound
80+
def enqueue1(a: B): F[Unit] = self.enqueue1(g(a))
81+
def offer1(a: B): F[Boolean] = self.offer1(g(a))
82+
def dequeue1: F[B] = F.map(self.dequeue1)(f)
83+
def cancellableDequeue1: F[(F[B],F[Unit])] =
84+
F.map(self.cancellableDequeue1)(bu => F.map(bu._1)(f) -> bu._2)
85+
}
6986
}
7087

7188
object Queue {

core/src/main/scala/fs2/async/mutable/Signal.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import fs2.Async.Change
66
import fs2._
77
import fs2.Stream._
88
import fs2.async.immutable
9-
import fs2.util.Monad
9+
import fs2.util.{Monad, Functor}
1010

1111
/**
1212
* A signal whose value may be set asynchronously. Provides continuous
1313
* and discrete streams for responding to changes to it's value.
1414
*/
15-
trait Signal[F[_],A] extends immutable.Signal[F,A] {
15+
trait Signal[F[_], A] extends immutable.Signal[F, A] { self =>
1616

1717
/** Sets the value of this `Signal`. */
1818
def set(a: A): F[Unit]
@@ -24,13 +24,29 @@ trait Signal[F[_],A] extends immutable.Signal[F,A] {
2424
*
2525
* `F` returns the result of applying `op` to current value.
2626
*/
27-
def modify(f: A => A): F[Change[A]]
27+
def modify(f: A => A): F[Change[A]]
2828

2929
/**
3030
* Asynchronously refreshes the value of the signal,
3131
* keep the value of this `Signal` the same, but notify any listeners.
3232
*/
3333
def refresh: F[Unit]
34+
35+
/**
36+
* Returns an alternate view of this `Signal` where its elements are of type [[B]],
37+
* given a function from `A` to `B`.
38+
*/
39+
def imap[B](f: A => B)(g: B => A)(implicit F: Functor[F]): Signal[F, B] =
40+
new Signal[F, B] {
41+
def discrete: Stream[F, B] = self.discrete.map(f)
42+
def continuous: Stream[F, B] = self.continuous.map(f)
43+
def changes: Stream[F, Unit] = self.changes
44+
def get: F[B] = F.map(self.get)(f)
45+
def set(b: B): F[Unit] = self.set(g(b))
46+
def refresh: F[Unit] = self.refresh
47+
def modify(bb: B => B): F[Change[B]] =
48+
F.map(self.modify(a => g(bb(f(a))))) { case Change(prev, now) => Change(f(prev), f(now)) }
49+
}
3450
}
3551

3652
object Signal {

core/src/main/scala/fs2/async/mutable/Topic.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ import fs2.Stream._
1414
* Additionally the subscriber has possibility to terminate whenever size of enqueued elements is over certain size
1515
* by using `subscribeSize`.
1616
*/
17-
trait Topic[F[_],A] {
17+
trait Topic[F[_], A] { self =>
1818

1919
/**
2020
* Published any elements from source of `A` to this topic.
2121
* If any of the subscribers reach its `maxQueued` limit, then this will hold to publish next element
2222
* before that subscriber consumes it's elements or terminates.
2323
*/
24-
def publish:Sink[F,A]
24+
def publish: Sink[F, A]
2525

2626
/**
2727
* Publish one `A` to topic.
@@ -31,7 +31,7 @@ trait Topic[F[_],A] {
3131
* some of its elements to get room for this new. published `A`
3232
*
3333
*/
34-
def publish1(a:A):F[Unit]
34+
def publish1(a: A): F[Unit]
3535

3636
/**
3737
* Subscribes to receive any published `A` to this topic.
@@ -42,7 +42,7 @@ trait Topic[F[_],A] {
4242
* then publishers will hold into publishing to the queue.
4343
*
4444
*/
45-
def subscribe(maxQueued:Int):Stream[F,A]
45+
def subscribe(maxQueued: Int): Stream[F, A]
4646

4747
/**
4848
* Subscribes to receive published `A` to this topic.
@@ -58,12 +58,26 @@ trait Topic[F[_],A] {
5858
* then publishers will hold into publishing to the queue.
5959
*
6060
*/
61-
def subscribeSize(maxQueued:Int):Stream[F,(A, Int)]
61+
def subscribeSize(maxQueued: Int): Stream[F, (A, Int)]
6262

6363
/**
6464
* Signal of current active subscribers
6565
*/
66-
def subscribers:fs2.async.immutable.Signal[F,Int]
66+
def subscribers: fs2.async.immutable.Signal[F, Int]
67+
68+
/**
69+
* Returns an alternate view of this `Topic` where its elements are of type [[B]],
70+
* given back and forth function from `A` to `B`.
71+
*/
72+
def imap[B](f: A => B)(g: B => A): Topic[F, B] =
73+
new Topic[F, B] {
74+
def publish: Sink[F, B] = sfb => self.publish(sfb.map(g))
75+
def publish1(b: B): F[Unit] = self.publish1(g(b))
76+
def subscribe(maxQueued: Int): Stream[F, B] = self.subscribe(maxQueued).map(f)
77+
def subscribers: fs2.async.immutable.Signal[F, Int] = self.subscribers
78+
def subscribeSize(maxQueued: Int): Stream[F, (B, Int)] =
79+
self.subscribeSize(maxQueued).map { case (a, i) => f(a) -> i }
80+
}
6781
}
6882

6983
object Topic {

0 commit comments

Comments
 (0)