Skip to content

Commit

Permalink
Merge pull request #195 from armanbilge/pr/i139
Browse files Browse the repository at this point in the history
Refresh `SignallingSortedMapRef`
  • Loading branch information
armanbilge authored Feb 16, 2023
2 parents 522c4be + 586b2fd commit 559745b
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 32 deletions.
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ lazy val frp = crossProject(JVMPlatform, JSPlatform)
"co.fs2" %%% "fs2-core" % Fs2Version,
"org.typelevel" %%% "cats-laws" % CatsVersion % Test,
"org.typelevel" %%% "cats-effect-testkit" % CatsEffectVersion % Test,
"org.typelevel" %%% "discipline-munit" % "1.0.9" % Test,
"org.scalameta" %%% "munit-scalacheck" % "0.7.29" % Test
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
"org.scalameta" %%% "munit-scalacheck" % "1.0.0-M7" % Test
)
)

Expand Down
89 changes: 59 additions & 30 deletions frp/src/main/scala/calico/frp/SignallingSortedMapRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package calico.frp
import cats.effect.kernel.Concurrent
import cats.effect.kernel.Deferred
import cats.effect.kernel.Ref
import cats.effect.kernel.Resource
import cats.effect.syntax.all.*
import cats.kernel.Order
import cats.syntax.all.*
import fs2.Stream
Expand Down Expand Up @@ -46,7 +48,12 @@ object SignallingSortedMapRef:
value: SortedMap[K, V],
lastUpdate: Long,
listeners: LongMap[Deferred[F, (SortedMap[K, V], Long)]],
keyListeners: Map[K, LongMap[Deferred[F, (Option[V], Long)]]]
keys: Map[K, KeyState]
)

case class KeyState(
lastUpdate: Long,
listeners: LongMap[Deferred[F, (Option[V], Long)]]
)

given Ordering[K] = K.toOrdering
Expand All @@ -58,19 +65,25 @@ object SignallingSortedMapRef:
def traverse_[A, U](it: Iterable[A])(f: A => F[U]): F[Unit] =
it.foldLeft(F.unit)(_ <* f(_))

def incrementLastUpdate(lu: Long) =
// skip -1 b/c of its special semantic
if lu == -2L then 0L else lu + 1

def updateMapAndNotify[O](
state: State,
f: SortedMap[K, V] => (SortedMap[K, V], O)): (State, F[O]) =
val (newValue, result) = f(state.value)

val lastUpdate = state.lastUpdate + 1
val newState = State(newValue, lastUpdate, LongMap.empty, SortedMap.empty)
val lastUpdate = incrementLastUpdate(state.lastUpdate)
val newKeys = newValue.view.mapValues(_ => KeyState(lastUpdate, LongMap.empty)).toMap
val newState = State(newValue, lastUpdate, LongMap.empty, newKeys)

val notifyListeners =
traverse_(state.listeners.values)(_.complete(newValue -> lastUpdate))
val notifyKeyListeners = traverse_(state.keyListeners) { (k, listeners) =>
val v = newValue.get(k)
traverse_(listeners.values)(_.complete(v -> lastUpdate))
val notifyKeyListeners = traverse_(state.keys) {
case (k, KeyState(_, listeners)) =>
val v = newValue.get(k)
traverse_(listeners.values)(_.complete(v -> lastUpdate))
}

newState -> (notifyListeners *> notifyKeyListeners).as(result)
Expand All @@ -80,14 +93,21 @@ object SignallingSortedMapRef:
val (newValue, result) = f(state.value.get(k))

val newMap = newValue.fold(state.value - k)(v => state.value + (k -> v))
val lastUpdate = state.lastUpdate + 1
val newKeyListeners = state.keyListeners - k
val newState = State(newMap, lastUpdate, LongMap.empty, newKeyListeners)

val lastUpdate = incrementLastUpdate(state.lastUpdate)
val lastKeyUpdate = if newValue.isDefined then lastUpdate else -1L

val newKeys =
if newValue.isDefined then
state.keys.updated(k, KeyState(lastKeyUpdate, LongMap.empty))
else state.keys - k // prevent memory leak
val newState = State(newMap, lastUpdate, LongMap.empty, newKeys)

val notifyListeners =
traverse_(state.listeners.values)(_.complete(newMap -> lastUpdate))
val notifyKeyListeners = state.keyListeners.get(k).fold(F.unit) { listeners =>
traverse_(listeners.values)(_.complete(newValue -> lastUpdate))
val notifyKeyListeners = state.keys.get(k).fold(F.unit) {
case KeyState(_, listeners) =>
traverse_(listeners.values)(_.complete(newValue -> lastUpdate))
}

newState -> (notifyListeners *> notifyKeyListeners).as(result)
Expand All @@ -107,25 +127,27 @@ object SignallingSortedMapRef:
def updateAndNotify[O](s: State, f: SortedMap[K, V] => (SortedMap[K, V], O)) =
updateMapAndNotify(s, f)

def keys = new Signal[F, SortedSet[K]]:
def get = outer.get.map(_.keySet)
def continuous = outer.continuous.map(_.keySet)
def discrete = outer.discrete.map(_.keySet).changes
def keys = outer.map(_.keySet).changes

def apply(k: K) = new AbstractSignallingRef[F, State, Option[V]](newId, state):

def getValue(s: State) = s.value.get(k)

def getLastUpdate(s: State) = s.lastUpdate
def getLastUpdate(s: State) = s.keys.get(k).fold(-1L)(_.lastUpdate)

def withListener(s: State, id: Long, wait: Deferred[F, (Option[V], Long)]) =
s.copy(keyListeners = s
.keyListeners
.updatedWith(k)(_.getOrElse(LongMap.empty).updated(id, wait).some))
s.copy(keys = s.keys.updatedWith(k) { ks =>
val lastUpdate = ks.fold(-1L)(_.lastUpdate)
val listeners = ks.fold(LongMap.empty)(_.listeners).updated(id, wait)
Some(KeyState(lastUpdate, listeners))
})

def withoutListener(s: State, id: Long) =
s.copy(keyListeners =
s.keyListeners.updatedWith(k)(_.map(_.removed(id)).filterNot(_.isEmpty)))
s.copy(keys = s.keys.updatedWith(k) {
_.map(ks => ks.copy(listeners = ks.listeners.removed(id)))
// prevent memory leak
.filterNot(ks => ks.lastUpdate == -1 && ks.listeners.isEmpty)
})

def updateAndNotify[O](s: State, f: Option[V] => (Option[V], O)) =
updateKeyAndNotify(s, k, f)
Expand All @@ -150,16 +172,24 @@ object SignallingSortedMapRef:

def continuous: Stream[F, A] = Stream.repeatEval(get)

def discrete: Stream[F, A] = {
def discrete: Stream[F, A] =
Stream.resource(getAndDiscreteUpdates).flatMap {
case (a, updates) =>
Stream.emit(a) ++ updates
}

override def getAndDiscreteUpdates(using Concurrent[F]): Resource[F, (A, Stream[F, A])] =
getAndDiscreteUpdatesImpl

private def getAndDiscreteUpdatesImpl: Resource[F, (A, Stream[F, A])] =
def go(id: Long, lastSeen: Long): Stream[F, A] =
def getNext: F[(A, Long)] =
F.deferred[(A, Long)].flatMap { wait =>
state.modify { state =>
val lastUpdate = getLastUpdate(state)
if lastUpdate != lastSeen then state -> (getValue(state) -> lastUpdate).pure[F]
else withListener(state, id, wait) -> wait.get

}.flatten
}.flatten // cancelable
}

Stream.eval(getNext).flatMap {
Expand All @@ -169,22 +199,21 @@ object SignallingSortedMapRef:

def cleanup(id: Long): F[Unit] = state.update(withoutListener(_, id))

Stream.bracket(newId)(cleanup).flatMap { id =>
Stream.eval(state.get).flatMap { state =>
Stream.emit(getValue(state)) ++ go(id, getLastUpdate(state))
Resource.eval {
state.get.map { state =>
(getValue(state), Stream.bracket(newId)(cleanup).flatMap(go(_, getLastUpdate(state))))
}
}
}

def set(a: A): F[Unit] = update(_ => a)

def update(f: A => A): F[Unit] = modify(v => (f(v), ()))

def modify[B](f: A => (A, B)): F[B] =
state.modify(updateAndNotify(_, f)).flatten
state.flatModify(updateAndNotify(_, f))

def tryModify[B](f: A => (A, B)): F[Option[B]] =
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence)
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence).uncancelable

def tryUpdate(f: A => A): F[Boolean] =
tryModify(a => (f(a), ())).map(_.isDefined)
Expand Down
50 changes: 50 additions & 0 deletions frp/src/test/scala/calico/frp/SignallingSortedMapRefSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2022 Arman Bilge
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package calico.frp

import cats.effect.IO
import cats.effect.testkit.TestControl
import munit.CatsEffectSuite

import scala.collection.immutable.SortedMap
import scala.concurrent.duration.*

class SignallingSortedMapRefSuite extends CatsEffectSuite:

test("mapref - does not emit spurious events") {
SignallingSortedMapRef[IO, Boolean, Int]
.flatTap(_.set(SortedMap(false -> 0, true -> 0)))
.flatMap { s =>

val events =
s(false).discrete.evalTap(_ => IO.sleep(1.seconds)).unNoneTerminate.compile.toList

val updates =
IO.sleep(1100.millis) *>
s(false).update(_.map(_ + 1)) *>
IO.sleep(1.second) *>
s(true).update(_.map(_ + 1)) *>
IO.sleep(1.seconds) *>
s(false).update(_.map(_ + 1)) *>
IO.sleep(1.seconds) *>
s(false).set(None)

TestControl
.executeEmbed(updates.background.surround(events))
.assertEquals(List(0, 1, 2))
}
}

0 comments on commit 559745b

Please sign in to comment.