From a5d36a8e365d349f9476b41125caeeb0c8c87355 Mon Sep 17 00:00:00 2001 From: Nikita Gazarov Date: Thu, 21 Jan 2021 01:12:13 -0800 Subject: [PATCH] Fix: Bug in JsPriorityQueue implementation that produced glitches in complex cases. Specifically, when two or more observables were pending in the same transaction at the same time, if one of them actually synchronously depended on the other, it could fire ahead of the observable that it depended on, causing a glitch. Whether this actually happened also depended on the order of (internal) observers in the observable graph, you could avoid this bug just by luck. This bug could affect nested combineWith or delaySync observables, resulting in extraneous events (glitches) in case of combineWith, or observables firing in the wrong order (in case of delaySync). See https://gitter.im/Laminar_/Lobby?at=6007655b36db01248a8bf5a9 for the original bug report. --- .../airstream/util/JsPriorityQueue.scala | 13 +- .../timing/SyncDelayEventStreamSpec.scala | 96 ++++++ .../airstream/util/JsPriorityQueueSpec.scala | 286 ++++++++++++++++++ 3 files changed, 389 insertions(+), 6 deletions(-) create mode 100644 src/test/scala/com/raquo/airstream/util/JsPriorityQueueSpec.scala diff --git a/src/main/scala/com/raquo/airstream/util/JsPriorityQueue.scala b/src/main/scala/com/raquo/airstream/util/JsPriorityQueue.scala index cb0450e4..a95422e3 100644 --- a/src/main/scala/com/raquo/airstream/util/JsPriorityQueue.scala +++ b/src/main/scala/com/raquo/airstream/util/JsPriorityQueue.scala @@ -2,23 +2,22 @@ package com.raquo.airstream.util import scala.scalajs.js -// @TODO[Test] add tests for this class - class JsPriorityQueue[A](getRank: A => Int) { private[this] val queue: js.Array[A] = js.Array() def enqueue(item: A): Unit = { + val itemRank = getRank(item) var insertAtIndex = 0 var foundHigherRank = false while ( - insertAtIndex < queue.length && - !foundHigherRank + insertAtIndex < queue.length && !foundHigherRank ) { - if (getRank(queue(insertAtIndex)) <= getRank(item)) { + if (getRank(queue(insertAtIndex)) > itemRank) { foundHigherRank = true + } else { + insertAtIndex += 1 } - insertAtIndex += 1 } queue.splice(index = insertAtIndex, deleteCount = 0, item) // insert at index } @@ -40,4 +39,6 @@ class JsPriorityQueue[A](getRank: A => Int) { @inline def isEmpty: Boolean = size == 0 @inline def nonEmpty: Boolean = !isEmpty + + def debugQueue: List[A] = queue.toList } diff --git a/src/test/scala/com/raquo/airstream/timing/SyncDelayEventStreamSpec.scala b/src/test/scala/com/raquo/airstream/timing/SyncDelayEventStreamSpec.scala index aa182644..1680e440 100644 --- a/src/test/scala/com/raquo/airstream/timing/SyncDelayEventStreamSpec.scala +++ b/src/test/scala/com/raquo/airstream/timing/SyncDelayEventStreamSpec.scala @@ -99,4 +99,100 @@ class SyncDelayEventStreamSpec extends UnitSpec { calculations.clear() effects.clear() } + + it("multi-level, nested, dependent sync observables") { + + // See https://gitter.im/Laminar_/Lobby?at=6007655b36db01248a8bf5a9 and below + + implicit val testOwner: TestableOwner = new TestableOwner + + val v = Var(0) + + val calculations = mutable.Buffer[Calculation[Int]]() + + val signal1 = v.signal + .map(Calculation.log("signal1", calculations)) + + val signal2 = signal1 + .map(ev => ev) + .map(Calculation.log("signal2", calculations)) + + val signal3 = signal2 + .composeChanges { signal2Changes => + signal2Changes + .delaySync(signal1.changes) + .map(Calculation.log("signal3-changes-source", calculations)) + } + .map(Calculation.log("signal3", calculations)) + + val signal4 = signal1 + .composeChanges { signal1Changes => + signal1Changes + .delaySync(signal3.changes) + .map(Calculation.log("signal4-changes-source", calculations)) + } + .map(Calculation.log("signal4", calculations)) + + val signal5 = signal2 + .composeChanges { signal2Changes => + signal2Changes + .delaySync(signal3.changes) + .map(Calculation.log("signal5-changes-source", calculations)) + } + .map(Calculation.log("signal5", calculations)) + + // -- + + // Order is important + signal5.addObserver(Observer.empty) + signal4.addObserver(Observer.empty) + signal3.addObserver(Observer.empty) + + // signal4's and signal5's initial value does not depend on signal3's initial value, + // only on its changes, so it's ok that signal3's initial value is evaluated later. + assert(calculations.toList == List( + Calculation("signal1", 0), + Calculation("signal2", 0), + Calculation("signal5", 0), + Calculation("signal4", 0), + Calculation("signal3", 0) + )) + + calculations.clear() + + // -- + + v.set(1) + + assert(calculations.toList == List( + Calculation("signal1", 1), + Calculation("signal2", 1), + Calculation("signal3-changes-source", 1), + Calculation("signal3", 1), + Calculation("signal5-changes-source", 1), + Calculation("signal5", 1), + Calculation("signal4-changes-source", 1), + Calculation("signal4", 1) + )) + + calculations.clear() + + // -- + + v.set(2) + + assert(calculations.toList == List( + Calculation("signal1", 2), + Calculation("signal2", 2), + Calculation("signal3-changes-source", 2), + Calculation("signal3", 2), + Calculation("signal5-changes-source", 2), + Calculation("signal5", 2), + Calculation("signal4-changes-source", 2), + Calculation("signal4", 2) + )) + + calculations.clear() + + } } diff --git a/src/test/scala/com/raquo/airstream/util/JsPriorityQueueSpec.scala b/src/test/scala/com/raquo/airstream/util/JsPriorityQueueSpec.scala new file mode 100644 index 00000000..36cd459a --- /dev/null +++ b/src/test/scala/com/raquo/airstream/util/JsPriorityQueueSpec.scala @@ -0,0 +1,286 @@ +package com.raquo.airstream.util + +import com.raquo.airstream.UnitSpec + +import scala.util.Try + +class JsPriorityQueueSpec extends UnitSpec { + + class Foo(val rank: Int) { + override def toString: String = s"Foo@${this.hashCode().toHexString}(rank=$rank)" + } + + it("queues items by priority") { + + val q = new JsPriorityQueue[String](_.length) + + // -- + + q.enqueue("22") + q.enqueue("333") + q.enqueue("1") + q.enqueue("4444") + + assert(q.debugQueue == List( + "1", "22", "333", "4444" + )) + + // -- + + assert(q.dequeue() == "1") + + assert(q.debugQueue == List( + "22", "333", "4444" + )) + + // -- + + assert(q.dequeue() == "22") + + assert(q.debugQueue == List( + "333", "4444" + )) + + // -- + + assert(q.dequeue() == "333") + + assert(q.debugQueue == List( + "4444" + )) + + // -- + + assert(q.dequeue() == "4444") + + assert(q.debugQueue == Nil) + + // -- + + assert(Try(q.dequeue()).isFailure) + + // -- + + q.enqueue("a") + q.enqueue("bb") + + assert(q.debugQueue == List( + "a", "bb" + )) + + // -- + + assert(q.dequeue() == "a") + + assert(q.debugQueue == List( + "bb" + )) + + // -- + + q.enqueue("dddd") + + assert(q.debugQueue == List( + "bb", "dddd" + )) + + // -- + + q.enqueue("ccc") + + assert(q.debugQueue == List( + "bb", "ccc", "dddd" + )) + + // -- + + q.enqueue("a") + + assert(q.debugQueue == List( + "a", "bb", "ccc", "dddd" + )) + + // -- + + assert(q.dequeue() == "a") + + assert(q.debugQueue == List( + "bb", "ccc", "dddd" + )) + + // -- + + assert(q.dequeue() == "bb") + + assert(q.debugQueue == List( + "ccc", "dddd" + )) + + // -- + + assert(q.dequeue() == "ccc") + + assert(q.debugQueue == List( + "dddd" + )) + + // -- + + assert(q.dequeue() == "dddd") + + assert(q.debugQueue == Nil) + } + + it("same-priority edge cases") { + + val q = new JsPriorityQueue[Foo](_.rank) + + val f1 = new Foo(1) + val f2 = new Foo(2) + val f31 = new Foo(3) + val f32 = new Foo(3) + val f33 = new Foo(3) + val f41 = new Foo(4) + val f42 = new Foo(4) + val f5 = new Foo(5) + + // -- + + q.enqueue(f5) + q.enqueue(f41) + q.enqueue(f42) + q.enqueue(f33) + q.enqueue(f31) + q.enqueue(f32) + q.enqueue(f1) + q.enqueue(f2) + + assert(q.debugQueue == List( + f1, f2, f33, f31, f32, f41, f42, f5 + )) + + // -- + + assert(q.dequeue() == f1) + assert(q.dequeue() == f2) + + assert(q.debugQueue == List( + f33, f31, f32, f41, f42, f5 + )) + + // -- + + assert(q.dequeue() == f33) + + assert(q.debugQueue == List( + f31, f32, f41, f42, f5 + )) + + // -- + + assert(q.dequeue() == f31) + + assert(q.debugQueue == List( + f32, f41, f42, f5 + )) + + // -- + + assert(q.dequeue() == f32) + + assert(q.debugQueue == List( + f41, f42, f5 + )) + + // -- + + assert(q.dequeue() == f41) + + assert(q.debugQueue == List( + f42, f5 + )) + + // -- + + assert(q.dequeue() == f42) + + assert(q.debugQueue == List( + f5 + )) + + // -- + + assert(q.dequeue() == f5) + + assert(q.debugQueue == Nil) + } + + it("duplicate items edge case") { + + val q = new JsPriorityQueue[Foo](_.rank) + + val f1 = new Foo(1) + val f2 = new Foo(2) + val f3 = new Foo(3) + val f4 = new Foo(4) + + // -- + + q.enqueue(f2) + q.enqueue(f1) + q.enqueue(f1) + q.enqueue(f4) + q.enqueue(f3) + q.enqueue(f2) + + assert(q.debugQueue == List( + f1, f1, f2, f2, f3, f4 + )) + + // -- + + assert(q.dequeue() == f1) + + assert(q.debugQueue == List( + f1, f2, f2, f3, f4 + )) + + // -- + + assert(q.dequeue() == f1) + + assert(q.debugQueue == List( + f2, f2, f3, f4 + )) + + // -- + + assert(q.dequeue() == f2) + + assert(q.debugQueue == List( + f2, f3, f4 + )) + + // -- + + assert(q.dequeue() == f2) + + assert(q.debugQueue == List( + f3, f4 + )) + + // -- + + assert(q.dequeue() == f3) + + assert(q.debugQueue == List( + f4 + )) + + // -- + + assert(q.dequeue() == f4) + + assert(q.debugQueue == Nil) + } +}