diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs index 33db06f0..ce6cc89f 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs @@ -5,6 +5,7 @@ using System.Reactive.Disposables; using System.Reactive; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; using System.Threading.Tasks; using Bogus; using DynamicData.Kernel; @@ -90,9 +91,11 @@ IObservable AddRemovePrices(Market market, int priceCount, int para .Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)) .Finally(market.PricesCache.Dispose); - var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true); + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true).Publish(); var adding = true; + var cacheCompleted = merged.LastOrDefaultAsync().ToTask(); using var priceResults = merged.AsAggregator(); + using var connect = merged.Connect(); // Start asynchrononously modifying the parent list and the child lists using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default) @@ -118,6 +121,9 @@ IObservable AddRemovePrices(Market market, int priceCount, int para } while (adding); + // Wait for the source cache to finish delivering all notifications. + await cacheCompleted; + // Verify the results CheckResultContents(_marketCacheResults, priceResults, Market.RatingCompare); } diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs index a1380137..ca46b77f 100644 --- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -1,6 +1,9 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; using DynamicData.Tests.Domain; @@ -188,4 +191,168 @@ public void StaticFilterRemove() public record class SomeObject(int Id, int Value); + + [Fact] + public async Task MultiCacheFanInDoesNotDeadlock() + { + const int itemCount = 100; + + using var cacheA = new SourceCache(static x => x.Key); + using var cacheB = new SourceCache(static x => x.Key); + using var destination = new SourceCache(static x => x.Key); + using var subA = cacheA.Connect().PopulateInto(destination); + using var subB = cacheB.Connect().PopulateInto(destination); + using var results = destination.Connect().AsAggregator(); + + var taskA = Task.Run(() => + { + for (var i = 0; i < itemCount; i++) + { + cacheA.AddOrUpdate(new TestItem($"a-{i}", $"ValueA-{i}")); + } + }); + + var taskB = Task.Run(() => + { + for (var i = 0; i < itemCount; i++) + { + cacheB.AddOrUpdate(new TestItem($"b-{i}", $"ValueB-{i}")); + } + }); + + var completed = Task.WhenAll(taskA, taskB); + var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(10))); + + finished.Should().BeSameAs(completed, "concurrent edits with cross-cache subscribers should not deadlock"); + results.Error.Should().BeNull(); + results.Data.Count.Should().Be(itemCount * 2, "all items from both caches should arrive in the destination"); + results.Data.Items.Should().BeEquivalentTo([.. cacheA.Items, .. cacheB.Items], "all items should be in the destination"); + } + + [Fact] + public async Task DirectCrossWriteDoesNotDeadlock() + { + const int iterations = 100; + + for (var iter = 0; iter < iterations; iter++) + { + using var cacheA = new SourceCache(static x => x.Key); + using var cacheB = new SourceCache(static x => x.Key); + + // Bidirectional: A items flow into B, B items flow into A. + // Filter by prefix prevents infinite feedback. + using var aToB = cacheA.Connect() + .Filter(static x => x.Key.StartsWith('a')) + .Transform(static (item, _) => new TestItem("from-a-" + item.Key, item.Value)) + .PopulateInto(cacheB); + + using var bToA = cacheB.Connect() + .Filter(static x => x.Key.StartsWith('b')) + .Transform(static (item, _) => new TestItem("from-b-" + item.Key, item.Value)) + .PopulateInto(cacheA); + + using var barrier = new Barrier(2); + + var taskA = Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < 1000; i++) + { + cacheA.AddOrUpdate(new TestItem("a" + i, "V" + i)); + } + }); + + var taskB = Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < 1000; i++) + { + cacheB.AddOrUpdate(new TestItem("b" + i, "V" + i)); + } + }); + + var completed = Task.WhenAll(taskA, taskB); + var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(30))); + + finished.Should().BeSameAs(completed, $"iteration {iter}: bidirectional cross-cache writes should not deadlock"); + } + } + + [Fact] + public void ConnectDuringDeliveryDoesNotDuplicate() + { + // Exploits the dequeue-to-OnNext window. Thread A writes two items in + // separate batches. The first delivery is held by a slow subscriber. + // While item1 delivery is blocked, item2 is committed to ReaderWriter + // and sitting in the queue. Thread B calls Connect(), takes a snapshot + // (sees both items), subscribes to _changes, then item2 is delivered + // via OnNext — producing a duplicate if not guarded by a generation counter. + using var cache = new SourceCache(static x => x.Key); + + using var delivering = new ManualResetEventSlim(false); + using var item2Written = new ManualResetEventSlim(false); + using var connectDone = new ManualResetEventSlim(false); + + var firstDelivery = true; + + // First subscriber: blocks on the first delivery to create the window + using var slowSub = cache.Connect().Subscribe(_ => + { + if (firstDelivery) + { + firstDelivery = false; + delivering.Set(); + + // Wait until item2 has been written and the Connect has subscribed + connectDone.Wait(TimeSpan.FromSeconds(5)); + } + }); + + // Write item1 on a background thread — delivery starts, slow subscriber blocks + var writeTask = Task.Run(() => + { + cache.AddOrUpdate(new TestItem("k1", "v1")); + }); + + // Wait for delivery of item1 to be in progress (slow sub is blocking) + delivering.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("delivery should have started"); + + // Now write item2 on another thread. It will acquire the lock, commit to + // ReaderWriter, enqueue a notification, and return. The notification sits + // in the queue because the deliverer (Thread A) is blocked by the slow sub. + var writeTask2 = Task.Run(() => + { + cache.AddOrUpdate(new TestItem("k2", "v2")); + item2Written.Set(); + }); + item2Written.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("item2 should have been written"); + + // Now Connect on the main thread. The snapshot from ReaderWriter includes + // BOTH k1 and k2. The subscription to _changes is added. When the slow + // subscriber unblocks, item2's notification will be delivered via OnNext + // and the new subscriber will see k2 again — a duplicate Add. + var addCounts = new Dictionary(); + using var newSub = cache.Connect().Subscribe(changes => + { + foreach (var c in changes) + { + if (c.Reason == ChangeReason.Add) + { + var key = c.Current.Key; + addCounts[key] = addCounts.GetValueOrDefault(key) + 1; + } + } + }); + + // Unblock the slow subscriber — delivery resumes, item2 delivered + connectDone.Set(); + writeTask.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("writeTask should complete"); + writeTask2.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("writeTask2 should complete"); + + // Each key should appear exactly once in the new subscriber's view + addCounts.GetValueOrDefault("k1").Should().Be(1, "k1 should appear once (snapshot only)"); + addCounts.GetValueOrDefault("k2").Should().Be(1, "k2 should appear once, not duplicated from snapshot + queued delivery"); + } + + private sealed record TestItem(string Key, string Value); } diff --git a/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs b/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs index e73de850..db39604a 100644 --- a/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs +++ b/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; +using System.Threading; using System.Threading.Tasks; using DynamicData.Kernel; using FluentAssertions; @@ -352,6 +353,234 @@ public async Task SuspensionsAreThreadSafe() _results.Messages[0].Adds.Should().Be(100, "Should have 100 adds"); } + [Fact] + public void ResumeThenReSuspendDeliversFirstBatchOnly() + { + // Forces the ordering: resume completes before re-suspend. + // The deferred subscriber activates with the first batch snapshot, + // then re-suspend holds the second batch until final resume. + using var cache = new SourceCache(static x => x); + var dataSet1 = Enumerable.Range(0, 100).ToList(); + var dataSet2 = Enumerable.Range(1000, 100).ToList(); + var allData = dataSet1.Concat(dataSet2).ToList(); + + var suspend1 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet1); + + using var results = cache.Connect().AsAggregator(); + results.Messages.Count.Should().Be(0, "no messages during suspension"); + + // Resume first — subscriber activates + suspend1.Dispose(); + + results.Messages.Count.Should().Be(1, "exactly one message after resume"); + results.Messages[0].Adds.Should().Be(dataSet1.Count, $"snapshot should have {dataSet1.Count} adds"); + results.Messages[0].Removes.Should().Be(0, "no removes"); + results.Messages[0].Updates.Should().Be(0, "no updates"); + results.Messages[0].Select(x => x.Key).Should().Equal(dataSet1, "snapshot should contain first batch keys"); + + // Re-suspend, write second batch + var suspend2 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet2); + + results.Messages.Count.Should().Be(1, "still one message — second batch held by suspension"); + results.Summary.Overall.Adds.Should().Be(dataSet1.Count, $"still {dataSet1.Count} adds total"); + + // Final resume + suspend2.Dispose(); + + results.Messages.Count.Should().Be(2, "two messages total"); + results.Messages[1].Adds.Should().Be(dataSet2.Count, $"second message has {dataSet2.Count} adds"); + results.Messages[1].Removes.Should().Be(0, "no removes in second message"); + results.Messages[1].Updates.Should().Be(0, "no updates in second message"); + results.Messages[1].Select(x => x.Key).Should().Equal(dataSet2, "second message should contain second batch keys"); + + results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total"); + results.Summary.Overall.Removes.Should().Be(0, "no removes"); + results.Data.Count.Should().Be(allData.Count, $"{allData.Count} items in final state"); + results.Error.Should().BeNull(); + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public void ReSuspendThenResumeDeliversAllInSingleBatch() + { + // Forces the ordering: re-suspend before resume. + // Suspend count goes 1→2→1, no resume signal fires. + // Both batches accumulate and arrive as a single changeset on final resume. + using var cache = new SourceCache(static x => x); + var dataSet1 = Enumerable.Range(0, 100).ToList(); + var dataSet2 = Enumerable.Range(1000, 100).ToList(); + var allData = dataSet1.Concat(dataSet2).ToList(); + + var suspend1 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet1); + + using var results = cache.Connect().AsAggregator(); + results.Messages.Count.Should().Be(0, "no messages during suspension"); + + // Re-suspend first — count goes 1→2 + var suspend2 = cache.SuspendNotifications(); + + // Resume first suspend — count goes 2→1, still suspended + suspend1.Dispose(); + + results.Messages.Count.Should().Be(0, "no messages — still suspended (count=1)"); + results.Summary.Overall.Adds.Should().Be(0, "no adds — still suspended"); + + // Write second batch while still suspended + cache.AddOrUpdate(dataSet2); + + results.Messages.Count.Should().Be(0, "still no messages"); + + // Final resume — count goes 1→0 + suspend2.Dispose(); + + results.Messages.Count.Should().Be(1, "single message with all data"); + results.Messages[0].Adds.Should().Be(allData.Count, $"all {allData.Count} items in one changeset"); + results.Messages[0].Removes.Should().Be(0, "no removes"); + results.Messages[0].Updates.Should().Be(0, "no updates"); + results.Messages[0].Select(c => c.Key).OrderBy(k => k).Should().Equal(allData, "should contain both batches in order"); + + results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total"); + results.Summary.Overall.Removes.Should().Be(0, "no removes"); + results.Summary.Overall.Updates.Should().Be(0, "no updates"); + results.Data.Count.Should().Be(allData.Count, $"{allData.Count} items in final state"); + results.Error.Should().BeNull(); + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public async Task ConcurrentSuspendDuringResumeDoesNotCorrupt() + { + // Stress test: races resume against re-suspend on two threads. + // Both orderings are correct (tested deterministically above). + // This test verifies no corruption, deadlocks, or data loss under contention. + const int iterations = 200; + var dataSet1 = Enumerable.Range(0, 100).ToList(); + var dataSet2 = Enumerable.Range(1000, 100).ToList(); + var allData = dataSet1.Concat(dataSet2).ToList(); + + for (var iter = 0; iter < iterations; iter++) + { + using var cache = new SourceCache(static x => x); + + var suspend1 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet1); + using var results = cache.Connect().AsAggregator(); + + using var barrier = new Barrier(2); + var resumeTask = Task.Run(() => + { + barrier.SignalAndWait(); + suspend1.Dispose(); + }); + + var reSuspendTask = Task.Run(() => + { + barrier.SignalAndWait(); + return cache.SuspendNotifications(); + }); + + await Task.WhenAll(resumeTask, reSuspendTask); + var suspend2 = await reSuspendTask; + + cache.AddOrUpdate(dataSet2); + suspend2.Dispose(); + + results.Summary.Overall.Adds.Should().Be(allData.Count, $"iteration {iter}: exactly {allData.Count} adds"); + results.Summary.Overall.Removes.Should().Be(0, $"iteration {iter}: no removes"); + results.Summary.Overall.Updates.Should().Be(0, $"iteration {iter}: no updates because keys don't overlap"); + results.Data.Count.Should().Be(allData.Count, $"iteration {iter}: {allData.Count} items in final state"); + results.Data.Keys.OrderBy(k => k).Should().Equal(allData, $"iteration {iter}: all keys present in order"); + results.Error.Should().BeNull($"iteration {iter}: no errors"); + results.IsCompleted.Should().BeFalse($"iteration {iter}: not completed"); + } + } + + [Fact] + public async Task ResumeSignalUnderLockPreventsStaleSnapshotFromReSuspend() + { + // Verifies that a deferred Connect subscriber never sees data written during + // a re-suspension. The resume signal fires under the lock (reentrant), so the + // deferred subscriber activates and takes its snapshot before any other thread + // can re-suspend or write new data. + // + // A slow first subscriber blocks delivery of accumulated changes, creating a + // window where the main thread re-suspends and writes a second batch. The + // deferred subscriber's snapshot must contain only the first batch. + using var cache = new SourceCache(static x => x); + var dataSet1 = Enumerable.Range(0, 100).ToList(); + var dataSet2 = Enumerable.Range(1000, 100).ToList(); + var allData = dataSet1.Concat(dataSet2).ToList(); + + using var delivering = new SemaphoreSlim(0, 1); + using var proceedWithResuspend = new SemaphoreSlim(0, 1); + + var suspend1 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet1); + + // First subscriber blocks on delivery to hold the delivery thread + var firstDelivery = true; + using var slowSub = cache.Connect().Subscribe(_ => + { + if (firstDelivery) + { + firstDelivery = false; + delivering.Release(); + proceedWithResuspend.Wait(TimeSpan.FromSeconds(5)); + } + }); + + // Deferred subscriber — will activate when resume signal fires + using var results = cache.Connect().AsAggregator(); + results.Messages.Count.Should().Be(0, "no messages during suspension"); + + // Resume on background thread — delivery blocks on slow subscriber + var resumeTask = Task.Run(() => suspend1.Dispose()); + (await delivering.WaitAsync(TimeSpan.FromSeconds(5))).Should().BeTrue("delivery should have started"); + + // Re-suspend and write second batch while delivery is blocked + var suspend2 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet2); + + // dataSet2 must not appear in any message received so far + foreach (var msg in results.Messages) + { + foreach (var change in msg) + { + change.Key.Should().BeInRange(0, 99, + "deferred subscriber should only have first-batch keys before second resume"); + } + } + + // Unblock delivery + proceedWithResuspend.Release(); + await resumeTask; + + // Only dataSet1 should have been delivered — dataSet2 is held by second suspension + results.Summary.Overall.Adds.Should().Be(dataSet1.Count, + $"exactly {dataSet1.Count} adds before second resume — dataSet2 must be held by suspension"); + results.Messages.Should().HaveCount(1, "exactly one message (snapshot of dataSet1)"); + results.Messages[0].Adds.Should().Be(dataSet1.Count); + results.Messages[0].Select(c => c.Key).Should().Equal(dataSet1, + "snapshot should contain exactly first-batch keys in order"); + + // Resume second suspension — dataSet2 arrives now + suspend2.Dispose(); + + results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total"); + results.Summary.Overall.Removes.Should().Be(0, "no removes"); + results.Messages.Should().HaveCount(2, "two messages: snapshot + second batch"); + results.Messages[1].Adds.Should().Be(dataSet2.Count); + results.Messages[1].Select(c => c.Key).Should().Equal(dataSet2, + "second message should contain exactly second-batch keys in order"); + results.Data.Count.Should().Be(allData.Count); + results.Data.Keys.OrderBy(k => k).Should().Equal(allData); + results.Error.Should().BeNull(); + results.IsCompleted.Should().BeFalse(); + } + public void Dispose() { _source.Dispose(); diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs new file mode 100644 index 00000000..9aac3205 --- /dev/null +++ b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs @@ -0,0 +1,367 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Internal; +using FluentAssertions; +using Xunit; + +namespace DynamicData.Tests.Internal; + +public class DeliveryQueueFixture +{ +#if NET9_0_OR_GREATER + private readonly Lock _gate = new(); +#else + private readonly object _gate = new(); +#endif + + private static void EnqueueAndDeliver(DeliveryQueue queue, T item) + { + using var notifications = queue.AcquireLock(); + notifications.Enqueue(item); + } + + private static void TriggerDelivery(DeliveryQueue queue) + { + using var notifications = queue.AcquireLock(); + } + + [Fact] + public void EnqueueAndDeliverDeliversItem() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + EnqueueAndDeliver(queue, "A"); + + delivered.Should().Equal("A"); + } + + [Fact] + public void DeliverDeliversItemsInFifoOrder() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + using (var notifications = queue.AcquireLock()) + { + notifications.Enqueue("A"); + notifications.Enqueue("B"); + notifications.Enqueue("C"); + } + + delivered.Should().Equal("A", "B", "C"); + } + + [Fact] + public void DeliverWithEmptyQueueIsNoOp() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + TriggerDelivery(queue); + + delivered.Should().BeEmpty(); + } + + [Fact] + public async Task OnlyOneDelivererAtATime() + { + var concurrentCount = 0; + var maxConcurrent = 0; + var deliveryCount = 0; + var delivered = new ConcurrentBag(); + using var firstDeliveryStarted = new ManualResetEventSlim(false); + using var allowFirstDeliveryToContinue = new ManualResetEventSlim(false); + using var startContenders = new ManualResetEventSlim(false); + + var queue = new DeliveryQueue(_gate, item => + { + var current = Interlocked.Increment(ref concurrentCount); + int snapshot; + do + { + snapshot = maxConcurrent; + if (current <= snapshot) + { + break; + } + } + while (Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot); + + delivered.Add(item); + + if (Interlocked.Increment(ref deliveryCount) == 1) + { + firstDeliveryStarted.Set(); + allowFirstDeliveryToContinue.Wait(); + } + + Thread.SpinWait(1000); + Interlocked.Decrement(ref concurrentCount); + return true; + }); + + // Start delivering the first item — it will block in the callback + var firstDelivery = Task.Run(() => EnqueueAndDeliver(queue, -1)); + firstDeliveryStarted.Wait(); + + // While first delivery is blocked, enqueue 100 items from concurrent threads + var enqueueTasks = Enumerable.Range(0, 100) + .Select(i => Task.Run(() => + { + startContenders.Wait(); + EnqueueAndDeliver(queue, i); + })); + + var triggerTasks = Enumerable.Range(0, 4) + .Select(_ => Task.Run(() => + { + startContenders.Wait(); + TriggerDelivery(queue); + })); + + var tasks = enqueueTasks.Concat(triggerTasks).ToArray(); + startContenders.Set(); + allowFirstDeliveryToContinue.Set(); + + await Task.WhenAll(tasks.Append(firstDelivery)); + + maxConcurrent.Should().Be(1, "only one thread should be delivering at a time"); + delivered.Should().HaveCount(101); + } + + [Fact] + public void SecondWriterItemPickedUpByFirstDeliverer() + { + var delivered = new List(); + var deliveryCount = 0; + DeliveryQueue? q = null; + + var queue = new DeliveryQueue(_gate, item => + { + delivered.Add(item); + if (Interlocked.Increment(ref deliveryCount) == 1) + { + using var notifications = q!.AcquireLock(); + notifications.Enqueue("B"); + } + + return true; + }); + q = queue; + + EnqueueAndDeliver(queue, "A"); + + delivered.Should().Equal("A", "B"); + } + + [Fact] + public void ReentrantEnqueueDoesNotRecurse() + { + var callDepth = 0; + var maxDepth = 0; + var delivered = new List(); + DeliveryQueue? q = null; + + var queue = new DeliveryQueue(_gate, item => + { + callDepth++; + if (callDepth > maxDepth) + { + maxDepth = callDepth; + } + + delivered.Add(item); + + if (item == "A") + { + using var notifications = q!.AcquireLock(); + notifications.Enqueue("B"); + } + + callDepth--; + return true; + }); + q = queue; + + EnqueueAndDeliver(queue, "A"); + + delivered.Should().Equal("A", "B"); + maxDepth.Should().Be(1, "delivery callback should not recurse"); + } + + [Fact] + public void ExceptionInDeliveryResetsDeliveryToken() + { + var callCount = 0; + var queue = new DeliveryQueue(_gate, item => + { + callCount++; + if (callCount == 1) + { + throw new InvalidOperationException("boom"); + } + + return true; + }); + + var act = () => EnqueueAndDeliver(queue, "A"); + act.Should().Throw(); + + EnqueueAndDeliver(queue, "B"); + + callCount.Should().Be(2, "delivery should work after exception recovery"); + } + + [Fact] + public void RemainingItemsDeliveredAfterExceptionRecovery() + { + var delivered = new List(); + var shouldThrow = true; + var queue = new DeliveryQueue(_gate, item => + { + if (shouldThrow && item == "A") + { + throw new InvalidOperationException("boom"); + } + + delivered.Add(item); + return true; + }); + + var act = () => + { + using var notifications = queue.AcquireLock(); + notifications.Enqueue("A"); + notifications.Enqueue("B"); + }; + + act.Should().Throw(); + + shouldThrow = false; + TriggerDelivery(queue); + + delivered.Should().Equal("B"); + } + + [Fact] + public void TerminalCallbackStopsDelivery() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => + { + delivered.Add(item); + return item != "STOP"; + }); + + using (var notifications = queue.AcquireLock()) + { + notifications.Enqueue("A"); + notifications.Enqueue("STOP"); + notifications.Enqueue("B"); + } + + delivered.Should().Equal("A", "STOP"); + queue.IsTerminated.Should().BeTrue(); + } + + [Fact] + public void EnqueueAfterTerminationIsIgnored() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => + { + delivered.Add(item); + return item != "STOP"; + }); + + EnqueueAndDeliver(queue, "STOP"); + + EnqueueAndDeliver(queue, "AFTER"); + + delivered.Should().Equal("STOP"); + } + + [Fact] + public void IsTerminatedIsFalseInitially() + { + var queue = new DeliveryQueue(_gate, _ => true); + queue.IsTerminated.Should().BeFalse(); + } + + [Fact] + public async Task ConcurrentEnqueueAllItemsDelivered() + { + const int threadCount = 8; + const int itemsPerThread = 500; + var delivered = new ConcurrentBag(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() => + { + for (var i = 0; i < itemsPerThread; i++) + { + EnqueueAndDeliver(queue, (t * itemsPerThread) + i); + } + })).ToArray(); + + await Task.WhenAll(tasks); + TriggerDelivery(queue); + + delivered.Count.Should().Be(threadCount * itemsPerThread); + } + + [Fact] + public async Task ConcurrentEnqueueNoDuplicates() + { + const int threadCount = 8; + const int itemsPerThread = 500; + var delivered = new ConcurrentBag(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() => + { + for (var i = 0; i < itemsPerThread; i++) + { + EnqueueAndDeliver(queue, (t * itemsPerThread) + i); + } + })).ToArray(); + + await Task.WhenAll(tasks); + TriggerDelivery(queue); + + delivered.Distinct().Count().Should().Be(threadCount * itemsPerThread, "each item should be delivered exactly once"); + } + + [Fact] + public async Task ConcurrentEnqueuePreservesPerThreadOrdering() + { + const int threadCount = 4; + const int itemsPerThread = 200; + var delivered = new ConcurrentQueue<(int Thread, int Seq)>(); + var queue = new DeliveryQueue<(int Thread, int Seq)>(_gate, item => { delivered.Enqueue(item); return true; }); + + var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() => + { + for (var i = 0; i < itemsPerThread; i++) + { + EnqueueAndDeliver(queue, (t, i)); + } + })).ToArray(); + + await Task.WhenAll(tasks); + TriggerDelivery(queue); + + var itemsByThread = delivered.ToArray().GroupBy(x => x.Thread).ToDictionary(g => g.Key, g => g.Select(x => x.Seq).ToList()); + + foreach (var (thread, sequences) in itemsByThread) + { + sequences.Should().BeInAscendingOrder($"items from thread {thread} should preserve enqueue order"); + } + } +} diff --git a/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs b/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs index cf3e317e..60bcd9de 100644 --- a/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs +++ b/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs @@ -161,11 +161,18 @@ private void OnEditingSource(ISourceUpdater updater) { _expirationDueTimesByKey.Remove(proposedExpiration.Key); - _removedItemsBuffer.Add(new( - key: proposedExpiration.Key, - value: updater.Lookup(proposedExpiration.Key).Value)); - - updater.RemoveKey(proposedExpiration.Key); + // The item may have been removed or updated by another thread between when + // this expiration was scheduled and when it fired. Check that the item is + // still present and still has an expiration before removing it. + var lookup = updater.Lookup(proposedExpiration.Key); + if (lookup.HasValue && _timeSelector.Invoke(lookup.Value) is not null) + { + _removedItemsBuffer.Add(new( + key: proposedExpiration.Key, + value: lookup.Value)); + + updater.RemoveKey(proposedExpiration.Key); + } } } _proposedExpirationsQueue.RemoveRange(0, proposedExpirationIndex); @@ -273,7 +280,7 @@ private void OnSourceNext(IChangeSet changes) { if (_timeSelector.Invoke(change.Current) is { } expireAfter) { - haveExpirationsChanged = TrySetExpiration( + haveExpirationsChanged |= TrySetExpiration( key: change.Key, dueTime: now + expireAfter); } diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index e56bab14..60bcbf07 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. @@ -6,9 +6,11 @@ using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; +using System.Threading; using DynamicData.Binding; using DynamicData.Cache; using DynamicData.Cache.Internal; +using DynamicData.Internal; // ReSharper disable once CheckNamespace namespace DynamicData; @@ -38,69 +40,51 @@ internal sealed class ObservableCache : IObservableCache _readerWriter; + private readonly DeliveryQueue _notifications; + private int _editLevel; // The level of recursion in editing. + private long _currentVersion; // Monotonic counter incremented under lock for each enqueued change notification. + + private long _currentDeliveryVersion; // Version of the item currently being delivered. Set before _changes.OnNext. + public ObservableCache(IObservable> source) { - _suspensionTracker = new(() => new SuspensionTracker(_changes.OnNext, InvokeCountNext)); _readerWriter = new ReaderWriter(); + _notifications = new DeliveryQueue(_locker, DeliverNotification); + _suspensionTracker = new(() => new SuspensionTracker()); - var loader = source.Synchronize(_locker).Finally( - () => - { - _changes.OnCompleted(); - _changesPreview.OnCompleted(); - }).Subscribe( + var loader = source.Subscribe( changeSet => { + using var notifications = _notifications.AcquireLock(); + var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; var changes = _readerWriter.Write(changeSet, previewHandler, _changes.HasObservers); - InvokeNext(changes); + + if (changes is not null) + { + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); + } }, - ex => - { - _changesPreview.OnError(ex); - _changes.OnError(ex); - }); + NotifyError, + NotifyCompleted); _cleanUp = Disposable.Create( () => { loader.Dispose(); - _changes.OnCompleted(); - _changesPreview.OnCompleted(); - if (_suspensionTracker.IsValueCreated) - { - _suspensionTracker.Value.Dispose(); - } - - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnCompleted(); - } + NotifyCompleted(); }); } public ObservableCache(Func? keySelector = null) { - _suspensionTracker = new(() => new SuspensionTracker(_changes.OnNext, InvokeCountNext)); _readerWriter = new ReaderWriter(keySelector); + _notifications = new DeliveryQueue(_locker, DeliverNotification); + _suspensionTracker = new(() => new SuspensionTracker()); - _cleanUp = Disposable.Create( - () => - { - _changes.OnCompleted(); - _changesPreview.OnCompleted(); - if (_suspensionTracker.IsValueCreated) - { - _suspensionTracker.Value.Dispose(); - } - - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnCompleted(); - } - }); + _cleanUp = Disposable.Create(NotifyCompleted); } public int Count => _readerWriter.Count; @@ -109,11 +93,15 @@ public ObservableCache(Func? keySelector = null) Observable.Create( observer => { - lock (_locker) - { - var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); - return source.SubscribeSafe(observer); - } + using var readLock = _notifications.AcquireReadLock(); + + var snapshotVersion = _currentVersion; + var countChanged = readLock.HasPending + ? _countChanged.Value.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : _countChanged.Value; + + var source = countChanged.StartWith(_readerWriter.Count).DistinctUntilChanged(); + return source.SubscribeSafe(observer); }); public IReadOnlyList Items => _readerWriter.Items; @@ -188,27 +176,26 @@ internal void UpdateFromIntermediate(Action> update { updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); - lock (_locker) - { - ChangeSet? changes = null; + using var notifications = _notifications.AcquireLock(); - _editLevel++; - if (_editLevel == 1) - { - var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; - changes = _readerWriter.Write(updateAction, previewHandler, _changes.HasObservers); - } - else - { - _readerWriter.WriteNested(updateAction); - } + ChangeSet? changes = null; + + _editLevel++; + if (_editLevel == 1) + { + var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; + changes = _readerWriter.Write(updateAction, previewHandler, _changes.HasObservers); + } + else + { + _readerWriter.WriteNested(updateAction); + } - _editLevel--; + _editLevel--; - if (changes is not null && _editLevel == 0) - { - InvokeNext(changes); - } + if (changes is not null && _editLevel == 0) + { + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); } } @@ -216,27 +203,26 @@ internal void UpdateFromSource(Action> updateActio { updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); - lock (_locker) - { - ChangeSet? changes = null; + using var notifications = _notifications.AcquireLock(); - _editLevel++; - if (_editLevel == 1) - { - var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; - changes = _readerWriter.Write(updateAction, previewHandler, _changes.HasObservers); - } - else - { - _readerWriter.WriteNested(updateAction); - } + ChangeSet? changes = null; + + _editLevel++; + if (_editLevel == 1) + { + var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; + changes = _readerWriter.Write(updateAction, previewHandler, _changes.HasObservers); + } + else + { + _readerWriter.WriteNested(updateAction); + } - _editLevel--; + _editLevel--; - if (changes is not null && _editLevel == 0) - { - InvokeNext(changes); - } + if (changes is not null && _editLevel == 0) + { + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); } } @@ -244,122 +230,248 @@ private IObservable> CreateConnectObservable(Func>( observer => { - lock (_locker) - { - var initial = InternalEx.Return(() => (IChangeSet)GetInitialUpdates(predicate)); - var changes = initial.Concat(_changes); + using var readLock = _notifications.AcquireReadLock(); - if (predicate != null) - { - changes = changes.Filter(predicate, suppressEmptyChangeSets); - } - else if (suppressEmptyChangeSets) - { - changes = changes.NotEmpty(); - } + var initial = InternalEx.Return(() => (IChangeSet)GetInitialUpdates(predicate)); + + // The current snapshot may contain changes that have been made but the notifications + // have yet to be delivered. We need to filter those out to avoid delivering an update + // that has already been applied (but detect this possibility and skip filtering unless absolutely needed) + var snapshotVersion = _currentVersion; + var changes = readLock.HasPending + ? _changes.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : (IObservable>)_changes; - return changes.SubscribeSafe(observer); + changes = initial.Concat(changes); + + if (predicate != null) + { + changes = changes.Filter(predicate, suppressEmptyChangeSets); + } + else if (suppressEmptyChangeSets) + { + changes = changes.NotEmpty(); } + + return changes.SubscribeSafe(observer); }); private IObservable> CreateWatchObservable(TKey key) => Observable.Create>( observer => { - lock (_locker) + using var readLock = _notifications.AcquireReadLock(); + + var initial = _readerWriter.Lookup(key); + if (initial.HasValue) { - var initial = _readerWriter.Lookup(key); - if (initial.HasValue) - { - observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); - } + observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); + } + + // The current snapshot may contain changes that have been made but the notifications + // have yet to be delivered. We need to filter those out to avoid delivering an update + // that has already been applied (but detect this possibility and skip filtering unless absolutely needed) + var snapshotVersion = _currentVersion; + var changes = readLock.HasPending + ? _changes.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : _changes; - return _changes.Finally(observer.OnCompleted).Subscribe( - changes => + return changes.Finally(observer.OnCompleted).Subscribe( + changes => + { + foreach (var change in changes) { - foreach (var change in changes.ToConcreteType()) + var match = EqualityComparer.Default.Equals(change.Key, key); + if (match) { - var match = EqualityComparer.Default.Equals(change.Key, key); - if (match) - { - observer.OnNext(change); - } + observer.OnNext(change); } - }); - } + } + }); }); - private void InvokeNext(ChangeSet changes) + /// + /// Delivers a preview notification synchronously under _locker. Preview is + /// called by ReaderWriter during a write, between two data swaps, so it MUST + /// fire under the lock with the pre-write state visible to subscribers. + /// + private void InvokePreview(ChangeSet changes) { - lock (_locker) + if (changes.Count != 0 && !_notifications.IsTerminated) { - // If Notifications are not suspended - if (!_suspensionTracker.IsValueCreated || !_suspensionTracker.Value.AreNotificationsSuspended) - { - // Emit the changes - _changes.OnNext(changes); - } - else - { - // Don't emit the changes, but add them to the list - _suspensionTracker.Value.EnqueueChanges(changes); - } - - // If CountChanges are not suspended - if (!_suspensionTracker.IsValueCreated || !_suspensionTracker.Value.IsCountSuspended) - { - InvokeCountNext(); - } + _changesPreview.OnNext(changes); } } - private void InvokePreview(ChangeSet changes) + private void NotifyCompleted() { - lock (_locker) + using var notifications = _notifications.AcquireLock(); + notifications.Enqueue(NotificationItem.CreateCompleted()); + } + + private void NotifyError(Exception ex) + { + using var notifications = _notifications.AcquireLock(); + notifications.Enqueue(NotificationItem.CreateError(ex)); + } + + /// + /// Delivers a single notification to subscribers. This method is the delivery + /// callback for and must never be called directly. + /// It is invoked by the after releasing the + /// lock, which guarantees that no lock is held when subscriber code runs. The + /// queue's single-deliverer token ensures this method is never called concurrently, + /// preserving the Rx serialization contract across all subjects. + /// Returns true to continue delivery, or false for terminal items (OnCompleted/OnError) + /// which causes the queue to self-terminate. + /// + private bool DeliverNotification(NotificationItem item) + { + switch (item.Kind) { - if (changes.Count != 0) + case NotificationKind.Completed: + _changes.OnCompleted(); + _changesPreview.OnCompleted(); + + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnCompleted(); + } + + // Dispose outside lock because it fires OnCompleted + if (_suspensionTracker.IsValueCreated) + { + _suspensionTracker.Value.Dispose(); + } + + return false; + + case NotificationKind.Error: + _changesPreview.OnError(item.Error!); + _changes.OnError(item.Error!); + + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnError(item.Error!); + } + + // Dispose outside lock because it fires OnCompleted + if (_suspensionTracker.IsValueCreated) + { + _suspensionTracker.Value.Dispose(); + } + + return false; + + case NotificationKind.CountOnly: + EmitCount(item.Count); + return true; + + default: + Volatile.Write(ref _currentDeliveryVersion, item.Version); + EmitChanges(item.Changes); + EmitCount(item.Count); + return true; + } + + void EmitChanges(ChangeSet changes) + { + if (_suspensionTracker.IsValueCreated) { - _changesPreview.OnNext(changes); + lock (_locker) + { + if (_suspensionTracker.Value.AreNotificationsSuspended) + { + _suspensionTracker.Value.EnqueueChanges(changes); + return; + } + } } + + _changes.OnNext(changes); } - } - private void InvokeCountNext() - { - lock (_locker) + void EmitCount(int count) { + if (_suspensionTracker.IsValueCreated) + { + lock (_locker) + { + if (_suspensionTracker.Value.IsCountSuspended) + { + return; + } + } + } + if (_countChanged.IsValueCreated) { - _countChanged.Value.OnNext(_readerWriter.Count); + _countChanged.Value.OnNext(count); } } } private void ResumeCount() { - lock (_locker) + using var notifications = _notifications.AcquireLock(); + Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Count without Suspend Count instance"); + + if (_suspensionTracker.Value.ResumeCount() && _countChanged.IsValueCreated) { - Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Count without Suspend Count instance"); - _suspensionTracker.Value.ResumeCount(); + notifications.Enqueue(NotificationItem.CreateCountOnly(_readerWriter.Count)); } } private void ResumeNotifications() { - lock (_locker) + bool emitResume; + + using (var notifications = _notifications.AcquireLock()) { - Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Count instance"); - _suspensionTracker.Value.ResumeNotifications(); + Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); + + (var changes, emitResume) = _suspensionTracker.Value.ResumeNotifications(); + if (changes is not null) + { + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); + } + } + + // Emit the resume signal after releasing the delivery scope so that + // accumulated changes are delivered first + if (emitResume) + { + using var readLock = _notifications.AcquireReadLock(); + _suspensionTracker.Value.EmitResumeNotification(); } } - private sealed class SuspensionTracker(Action> onResumeNotifications, Action onResumeCount) : IDisposable + private enum NotificationKind { - private readonly BehaviorSubject _areNotificationsSuspended = new(false); + Changes, + CountOnly, + Completed, + Error, + } + + private readonly record struct NotificationItem(NotificationKind Kind, ChangeSet Changes, int Count = 0, long Version = 0, Exception? Error = null) + { + public static NotificationItem CreateChanges(ChangeSet changes, int count, long version) => + new(NotificationKind.Changes, changes, count, version); + + public static NotificationItem CreateCountOnly(int count) => + new(NotificationKind.CountOnly, [], count); - private readonly Action> _onResumeNotifications = onResumeNotifications; + public static NotificationItem CreateCompleted() => + new(NotificationKind.Completed, []); - private readonly Action _onResumeCount = onResumeCount; + public static NotificationItem CreateError(Exception error) => + new(NotificationKind.Error, [], Error: error); + } + + private sealed class SuspensionTracker : IDisposable + { + private readonly BehaviorSubject _areNotificationsSuspended = new(false); private List> _pendingChanges = []; @@ -392,30 +504,29 @@ public void SuspendNotifications() public void SuspendCount() => ++_countSuspendCount; - public void ResumeNotifications() + public bool ResumeCount() => --_countSuspendCount == 0; + + public (ChangeSet? Changes, bool EmitResume) ResumeNotifications() { if (--_notifySuspendCount == 0 && !_areNotificationsSuspended.IsDisposed) { - // Fire pending changes to existing subscribers + ChangeSet? changes = null; + if (_pendingChanges.Count > 0) { - _onResumeNotifications(new ChangeSet(_pendingChanges)); - _pendingChanges.Clear(); + var changesToDeliver = _pendingChanges; + _pendingChanges = []; + changes = new ChangeSet(changesToDeliver); } - // Tell deferred subscribers they can continue - _areNotificationsSuspended.OnNext(false); + return (changes, true); } - } - public void ResumeCount() - { - if (--_countSuspendCount == 0) - { - _onResumeCount(); - } + return (null, false); } + public void EmitResumeNotification() => _areNotificationsSuspended.OnNext(false); + public void Dispose() { _areNotificationsSuspended.OnCompleted(); diff --git a/src/DynamicData/Internal/DeliveryQueue.cs b/src/DynamicData/Internal/DeliveryQueue.cs new file mode 100644 index 00000000..284e6c57 --- /dev/null +++ b/src/DynamicData/Internal/DeliveryQueue.cs @@ -0,0 +1,238 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace DynamicData.Internal; + +/// +/// A queue that serializes item delivery outside a caller-owned lock. +/// Use to obtain a scoped ScopedAccess for enqueueing items. +/// When the ScopedAccess is disposed, the lock is released +/// and pending items are delivered. Only one thread delivers at a time. +/// +/// The item type. +internal sealed class DeliveryQueue +{ + private readonly Queue _queue = new(); + private readonly Func _deliver; + +#if NET9_0_OR_GREATER + private readonly Lock _gate; +#else + private readonly object _gate; +#endif + + private bool _isDelivering; + private volatile bool _isTerminated; + + /// + /// Initializes a new instance of the class. + /// + /// The lock shared with the caller. The queue acquires this + /// lock during and during the dequeue step of delivery. + /// Callback invoked for each item, outside the lock. Returns false if the item was terminal, which stops further delivery. +#if NET9_0_OR_GREATER + public DeliveryQueue(Lock gate, Func deliver) +#else + public DeliveryQueue(object gate, Func deliver) +#endif + { + _gate = gate; + _deliver = deliver; + } + + /// + /// Gets whether this queue has been terminated. Safe to read from any thread. + /// + public bool IsTerminated => _isTerminated; + + /// + /// Acquires the gate and returns a scoped ScopedAccess for enqueueing items. + /// When the ScopedAccess is disposed, the gate is released + /// and delivery runs if needed. The ScopedAccess is a ref struct and cannot + /// escape the calling method. + /// + public ScopedAccess AcquireLock() => new(this); + + /// + /// Acquires the gate and returns a read-only scoped access for inspecting + /// queue state. No mutation is possible and disposing does not trigger + /// delivery — the lock is simply released. + /// + public ReadOnlyScopedAccess AcquireReadLock() => new(this); + +#if NET9_0_OR_GREATER + private void EnterLock() => _gate.Enter(); + + private void ExitLock() => _gate.Exit(); +#else + private void EnterLock() => Monitor.Enter(_gate); + + private void ExitLock() => Monitor.Exit(_gate); +#endif + + private void EnqueueItem(TItem item) + { + if (_isTerminated) + { + return; + } + + _queue.Enqueue(item); + } + + private void ExitLockAndDeliver() + { + // Before releasing the lock, check if we should start delivery. Only one thread can succeed + var shouldDeliver = TryStartDelivery(); + + // Now release the lock. We do this before delivering to allow other threads to enqueue items while delivery is in progress. + ExitLock(); + + // If this thread has been chosen to deliver, do it now that the lock is released. + // If not, another thread is already delivering or there are no items to deliver. + if (shouldDeliver) + { + DeliverAll(); + } + + bool TryStartDelivery() + { + // Bail if something is already delivering or there's nothing to do + if (_isDelivering || _queue.Count == 0) + { + return false; + } + + // Mark that we're doing the delivering + _isDelivering = true; + return true; + } + + void DeliverAll() + { + try + { + while (true) + { + TItem item; + + // Inside of the lock, see if there is work and get the next item to deliver. + // If there is no work, mark that we're done delivering and exit. + lock (_gate) + { + if (_queue.Count == 0) + { + _isDelivering = false; + return; + } + + item = _queue.Dequeue(); + } + + // Outside of the lock, invoke the callback to deliver the item. + // If delivery returns false, it means the item was terminal + // and we should stop delivering and clear the queue. + if (!_deliver(item)) + { + lock (_gate) + { + _isTerminated = true; + _isDelivering = false; + _queue.Clear(); + } + + return; + } + } + } + catch + { + // Safety net: if an exception bypassed the normal exit paths, + // ensure _isDelivering is reset so the queue doesn't get stuck. + lock (_gate) + { + _isDelivering = false; + } + + throw; + } + } + } + + /// + /// A scoped ScopedAccess for working under the gate lock. All queue mutation + /// goes through this ScopedAccess, ensuring the lock is held. Disposing + /// releases the lock and triggers delivery if needed. + /// + public ref struct ScopedAccess + { + private DeliveryQueue? _owner; + + internal ScopedAccess(DeliveryQueue owner) + { + _owner = owner; + owner.EnterLock(); + } + + /// + /// Adds an item to the queue. Ignored if the queue has been terminated. + /// + /// The item to enqueue. + public readonly void Enqueue(TItem item) => _owner?.EnqueueItem(item); + + /// + /// Releases the gate lock and delivers pending items if this thread + /// holds the delivery token. + /// + public void Dispose() + { + var owner = _owner; + if (owner is null) + { + return; + } + + _owner = null; + owner.ExitLockAndDeliver(); + } + } + + /// + /// A read-only scoped access for inspecting queue state under the gate lock. + /// No mutation is possible. Disposing releases the lock without triggering + /// delivery. + /// + public ref struct ReadOnlyScopedAccess + { + private DeliveryQueue? _owner; + + internal ReadOnlyScopedAccess(DeliveryQueue owner) + { + _owner = owner; + owner.EnterLock(); + } + + /// + /// Gets whether there are notifications pending delivery (queued or + /// currently being delivered outside the lock). + /// + public readonly bool HasPending => + _owner is not null && (_owner._queue.Count > 0 || _owner._isDelivering); + + /// + /// Releases the gate lock. Does not trigger delivery. + /// + public void Dispose() + { + var owner = _owner; + if (owner is null) + { + return; + } + + _owner = null; + owner.ExitLock(); + } + } +} diff --git a/src/DynamicData/Internal/SwappableLock.cs b/src/DynamicData/Internal/SwappableLock.cs index 267607e9..5f1f4759 100644 --- a/src/DynamicData/Internal/SwappableLock.cs +++ b/src/DynamicData/Internal/SwappableLock.cs @@ -18,23 +18,73 @@ public static SwappableLock CreateAndEnter(object gate) return result; } +#if NET9_0_OR_GREATER + public static SwappableLock CreateAndEnter(Lock gate) + { + gate.Enter(); + return new SwappableLock() { _lockGate = gate }; + } +#endif + public void SwapTo(object gate) { +#if NET9_0_OR_GREATER + if (_gate is null && _lockGate is null) + throw new InvalidOperationException("Lock is not initialized"); +#else if (_gate is null) throw new InvalidOperationException("Lock is not initialized"); +#endif var hasNewLock = false; Monitor.Enter(gate, ref hasNewLock); +#if NET9_0_OR_GREATER + if (_lockGate is not null) + { + _lockGate.Exit(); + _lockGate = null; + } + else +#endif if (_hasLock) - Monitor.Exit(_gate); + { + Monitor.Exit(_gate!); + } _hasLock = hasNewLock; _gate = gate; } +#if NET9_0_OR_GREATER + public void SwapTo(Lock gate) + { + if (_lockGate is null && _gate is null) + throw new InvalidOperationException("Lock is not initialized"); + + gate.Enter(); + + if (_lockGate is not null) + _lockGate.Exit(); + else if (_hasLock) + Monitor.Exit(_gate!); + + _lockGate = gate; + _hasLock = false; + _gate = null; + } +#endif + public void Dispose() { +#if NET9_0_OR_GREATER + if (_lockGate is not null) + { + _lockGate.Exit(); + _lockGate = null; + } + else +#endif if (_hasLock && (_gate is not null)) { Monitor.Exit(_gate); @@ -45,4 +95,8 @@ public void Dispose() private bool _hasLock; private object? _gate; + +#if NET9_0_OR_GREATER + private Lock? _lockGate; +#endif }