diff --git a/reactive/src/main/scala/colibri/reactive/Reactive.scala b/reactive/src/main/scala/colibri/reactive/Reactive.scala index 4028bf52..33f6bc9d 100644 --- a/reactive/src/main/scala/colibri/reactive/Reactive.scala +++ b/reactive/src/main/scala/colibri/reactive/Reactive.scala @@ -8,6 +8,7 @@ import monocle.{Iso, Lens, Prism} import scala.concurrent.Future import scala.reflect.ClassTag +import collection.mutable import scala.util.control.NonFatal import scala.annotation.unused @@ -335,30 +336,54 @@ object Var { def createStateless[A](write: RxWriter[A], read: Rx[A]): Var[A] = new VarCreateStateless(write, read) @inline implicit class SeqVarOperations[A](rxvar: Var[Seq[A]]) { - def sequence: Rx[Seq[Var[A]]] = Rx.observableSync(new Observable[Seq[Var[A]]] { - - def unsafeSubscribe(sink: Observer[Seq[Var[A]]]): Cancelable = { - rxvar.observable.unsafeSubscribe( - Observer.create( - { seq => - sink.unsafeOnNext(seq.zipWithIndex.map { case (a, idx) => - val observer = new Observer[A] { - def unsafeOnNext(value: A): Unit = { - rxvar.set(seq.updated(idx, value)) - } - def unsafeOnError(error: Throwable): Unit = { - sink.unsafeOnError(error) + def sequence: Rx[Seq[Var[A]]] = { + val observable = new Observable[Seq[Var[A]]] { + def unsafeSubscribe(sink: Observer[Seq[Var[A]]]): Cancelable = { + // keep a var for every index of the original sequence + val vars = mutable.ArrayBuffer.empty[Var[A]] + + def createIndexVar(idx: Int, seed: A): Var[A] = { + Var[A](seed).transformVarWrite { + _.contramap { newValue => + rxvar.update(_.updated(idx, newValue)) + newValue + } + } + } + + rxvar.observable.zipWithIndex.unsafeSubscribe( + Observer.create( + consume = { case (seq, idx) => + val needsRetrigger = if (seq.size < vars.size) { + vars.remove(seq.size, vars.size - seq.size) + true + } else if (seq.size > vars.size) { + for ((elem, idx) <- seq.zipWithIndex.takeRight(seq.size - vars.size)) { + vars += createIndexVar(idx, seed = elem) } + true + } else { + idx == 0 } - Var.createStateless(RxWriter.observer(observer), Rx.const(a)) - }) - }, - sink.unsafeOnError, - ), - ) + + assert(seq.size == vars.size) + + for ((newValue, elemVar) <- seq.zip(vars)) { + elemVar.set(newValue) + } + + if (needsRetrigger) { + sink.unsafeOnNext(vars.toSeq) + } + }, + failure = sink.unsafeOnError, + ), + ) + } } - }) + Rx.observableSync(observable) + } } @inline implicit class OptionVarOperations[A](rxvar: Var[Option[A]]) { diff --git a/reactive/src/test/scala/colibri/ReactiveSpec.scala b/reactive/src/test/scala/colibri/ReactiveSpec.scala index 86cd3f26..7733e0ac 100644 --- a/reactive/src/test/scala/colibri/ReactiveSpec.scala +++ b/reactive/src/test/scala/colibri/ReactiveSpec.scala @@ -704,6 +704,44 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { sequence.nowIfSubscribed().apply(0).set(2) variable.nowIfSubscribed() shouldBe Seq(2) } + + { + // only trigger outer Rx if collection size changed + val variable = Var[Seq[Int]](Seq(1,2)) + val sequence: Rx[Seq[Var[Int]]] = variable.sequence + var sequenceTriggered = 0 + sequence.unsafeForeach(_ => sequenceTriggered += 1) + + sequenceTriggered shouldBe 1 + sequence.now().size shouldBe 2 + + sequence.now().apply(0).set(3) + sequence.now().size shouldBe 2 + sequenceTriggered shouldBe 1 + + variable.set(Seq(3,4)) + sequence.now().size shouldBe 2 + sequence.now().apply(0).now() shouldBe 3 + sequence.now().apply(1).now() shouldBe 4 + sequenceTriggered shouldBe 1 + + variable.set(Seq(3,4,5,6)) + sequence.now().size shouldBe 4 + sequence.now().apply(0).now() shouldBe 3 + sequence.now().apply(1).now() shouldBe 4 + sequence.now().apply(2).now() shouldBe 5 + sequenceTriggered shouldBe 2 + + variable.set(Seq(7)) + sequence.now().size shouldBe 1 + sequence.now().apply(0).now() shouldBe 7 + sequenceTriggered shouldBe 3 + + variable.set(Seq(8)) + sequence.now().size shouldBe 1 + sequence.now().apply(0).now() shouldBe 8 + sequenceTriggered shouldBe 3 + } } it should "sequence on Var[Option[T]]" in {