Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
d2a8dc4
Refactor SwappableLock to add NET9+ Lock overloads
dwcullop Apr 6, 2026
c4b89af
Fix race in ExpireAfter when item is removed or updated before expira…
dwcullop Apr 6, 2026
501c9f2
fix: Replace lock-during-notification with queue-based drain to preve…
dwcullop Apr 6, 2026
72ea32c
Refactor to use one lock and a serialized delivery queue to ensure Rx…
dwcullop Apr 7, 2026
ab41353
Add read-only lock for DeliveryQueue and improve safety
dwcullop Apr 7, 2026
c459365
Refactor cross-cache deadlock test to use operators
dwcullop Apr 7, 2026
8eb5ffb
Simplify delivery queue; remove pending count logic
dwcullop Apr 7, 2026
97b721a
Support .NET 9+ locking in SwappableLock's SwapTo method
dwcullop Apr 7, 2026
7625ac2
Use |= to accumulate expiration changes correctly
dwcullop Apr 7, 2026
5e4ad0e
Refactor DeliveryQueue exception handling logic
dwcullop Apr 7, 2026
a92b596
Fix MergeMany stress test timing for queue-based delivery
dwcullop Apr 7, 2026
92f1afe
Update src/DynamicData.Tests/Cache/SourceCacheFixture.cs
dwcullop Apr 8, 2026
7544f8c
Improve test reliability and ObservableCache disposal safety
dwcullop Apr 8, 2026
1665536
Merge branch 'bugfix/lock_inversion' of https://github.com/dwcullop/D…
dwcullop Apr 8, 2026
836d8f3
Prevent duplicate notifications on Connect during delivery
dwcullop Apr 8, 2026
b5c7862
Improve suspend/resume notification handling and tests
dwcullop Apr 8, 2026
b8e03b6
Improve thread safety, tests, and notification delivery
dwcullop Apr 8, 2026
9993c90
Add SharedDeliveryQueue + SynchronizeSafe infrastructure
dwcullop Apr 9, 2026
bd47444
Migrate all operators from Synchronize to SynchronizeSafe
dwcullop Apr 9, 2026
dc1a809
Add mega cross-cache stress test proving deadlock-free operation
dwcullop Apr 10, 2026
ff595db
Comprehensive cross-cache stress test with result verification
dwcullop Apr 10, 2026
79f2310
Convert SpecifiedGrouper to SynchronizeSafe, document remaining Synch…
dwcullop Apr 10, 2026
433f87a
Remove SwappableLock and ExpireAfter changes (split to separate PRs)
dwcullop Apr 10, 2026
2d92c16
Enhanced kitchen-sink stress test with all operators and result verif…
dwcullop Apr 10, 2026
fd433cd
Add AI instruction files for DynamicData
dwcullop Apr 10, 2026
228a21a
Remove AI instruction files (moved to docs/ai_instructions branch)
dwcullop Apr 10, 2026
dcf7939
Enhance kitchen sink stress test: fix MergeManyChangeSets, add 11 ope…
dwcullop Apr 10, 2026
e4b5f4c
Make stress test fully deterministic with hardcoded assertions
dwcullop Apr 10, 2026
8e24925
Use DeliveryQueue<T> for single-source operators instead of SharedDel…
dwcullop Apr 10, 2026
95cf5fb
Unify DeliveryQueue<T>: Notification<T> internal, IObserver<T> delivery
dwcullop Apr 10, 2026
12be42f
Fix race in join operators: AsObservableCache(false) → AsObservableCa…
dwcullop Apr 10, 2026
f2aa710
MergeMany: SharedDeliveryQueue → DeliveryQueue<TDestination>
dwcullop Apr 10, 2026
ce44ae4
DeliveryQueue<T> implements IObserver<T> for clean Rx composition
dwcullop Apr 10, 2026
aec362e
Refactor delivery queues and disposal for thread safety
dwcullop Apr 11, 2026
342da5f
Unified DeliveryQueue, IObserver, SynchronizeSafe overloads, EnsureDe…
dwcullop Apr 11, 2026
4e14aaf
Fix multi-agent review findings
dwcullop Apr 11, 2026
33f743e
Add missing infrastructure tests (Critical review finding)
dwcullop Apr 11, 2026
f8f2419
Refactor delivery queue: lock-free, reentrant, safer emits
dwcullop Apr 12, 2026
e297e53
Add _observer field to CacheParentSubscription
dwcullop Apr 12, 2026
6e3ab91
fix: eliminate cross-cache deadlocks via queue-drain delivery pattern
dwcullop Apr 12, 2026
d527d15
fix: LIFO drain ordering in SharedDeliveryQueue, ExpireAfter race guard
dwcullop Apr 13, 2026
d5055bd
fix: review findings - sub-queue leak, drain thread reset, IObserver,…
dwcullop Apr 13, 2026
c5843dd
Refactor Notification<T> API and update usage throughout
dwcullop Apr 13, 2026
472fbaf
Merge from main
dwcullop Apr 13, 2026
5d958bc
Refactor delivery queue for thread safety and clarity
dwcullop Apr 13, 2026
3d5556f
Clarify LIFO rationale, add notnull to DeliverySubQueue<T>
dwcullop Apr 13, 2026
284d43c
perf: add delivery queue throughput and contention benchmarks
dwcullop Apr 14, 2026
0924b99
perf: batch drain in DeliveryQueue, add MMCS contention benchmark
dwcullop Apr 14, 2026
7b282ef
refactor: revert batch drain, fix MMCS benchmark to use SourceCache c…
dwcullop Apr 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions src/DynamicData.Benchmarks/Cache/DeliveryQueueBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

using BenchmarkDotNet.Attributes;

using DynamicData.Binding;

namespace DynamicData.Benchmarks.Cache;

/// <summary>
/// Multi-threaded SourceCache contention benchmarks. Measures aggregate throughput
/// when N threads write concurrently with varying subscriber complexity.
/// Exercises the DeliveryQueue path (ObservableCache).
/// </summary>
[MemoryDiagnoser]
[MarkdownExporterAttribute.GitHub]
public class ContentionBenchmarks
{
private SourceCache<ContentionItem, int> _cache = null!;
private IDisposable? _subscription;

[Params(1, 2, 4)]
public int ThreadCount;

[Params("None", "Sort", "Chain")]
public string SubscriberWork = "None";

private const int ItemsPerThread = 1_000;

[GlobalSetup]
public void GlobalSetup()
{
_cache = new SourceCache<ContentionItem, int>(i => i.Id);
}

[GlobalCleanup]
public void GlobalCleanup()
{
_subscription?.Dispose();
_cache.Dispose();
}

[IterationSetup]
public void IterationSetup()
{
_subscription?.Dispose();
_cache.Clear();

_subscription = SubscriberWork switch
{
"Sort" => _cache.Connect()
.Sort(SortExpressionComparer<ContentionItem>.Ascending(i => i.Name))
.Subscribe(_ => { }),

"Chain" => _cache.Connect()
.Filter(i => i.Price > 0)
.Sort(SortExpressionComparer<ContentionItem>.Ascending(i => i.Name))
.Transform(i => new ContentionItemVm(i))
.Subscribe(_ => { }),

_ => _cache.Connect().Subscribe(_ => { }),
};
}

[Benchmark]
public void ConcurrentAddOrUpdate()
{
if (ThreadCount == 1)
{
for (var i = 0; i < ItemsPerThread; i++)
_cache.AddOrUpdate(new ContentionItem(i, $"Item_{i}", i * 0.1m));
}
else
{
var barrier = new Barrier(ThreadCount);
var tasks = new Task[ThreadCount];

for (var t = 0; t < ThreadCount; t++)
{
var threadId = t;
tasks[t] = Task.Run(() =>
{
barrier.SignalAndWait();
for (var i = 0; i < ItemsPerThread; i++)
{
var id = (threadId * ItemsPerThread) + i;
_cache.AddOrUpdate(new ContentionItem(id, $"Item_{id}", id * 0.1m));
}
});
}

Task.WaitAll(tasks);
barrier.Dispose();
}
}

public sealed record ContentionItem(int Id, string Name, decimal Price);
public sealed record ContentionItemVm(ContentionItem Source);
}

/// <summary>
/// MergeManyChangeSets contention benchmark. Multiple threads mutating child
/// SourceCaches while a CPS pipeline is subscribed. Uses SourceCache (not
/// SourceList) for children so the full path exercises DeliveryQueue + SDQ.
/// </summary>
[MemoryDiagnoser]
[MarkdownExporterAttribute.GitHub]
public class MmcsContentionBenchmarks
{
private SourceCache<MmcsParent, int> _parents = null!;
private MmcsParent[] _parentItems = null!;
private IDisposable? _subscription;

[Params(1, 2, 4)]
public int ThreadCount;

[Params("None", "Sort", "Transform")]
public string SubscriberWork = "None";

private const int ParentCount = 50;
private const int ChildOpsPerThread = 200;

[GlobalSetup]
public void GlobalSetup()
{
_parents = new SourceCache<MmcsParent, int>(p => p.Id);
_parentItems = Enumerable.Range(0, ParentCount).Select(i =>
{
var p = new MmcsParent(i);
p.Children.Edit(u =>
{
for (var j = 0; j < 10; j++)
u.AddOrUpdate(new MmcsChild(i * 100 + j, $"Child_{i}_{j}"));
});
return p;
}).ToArray();
}

[GlobalCleanup]
public void GlobalCleanup()
{
_subscription?.Dispose();
foreach (var p in _parentItems) p.Dispose();
_parents.Dispose();
}

[IterationSetup]
public void IterationSetup()
{
_subscription?.Dispose();
_parents.Clear();
_parents.AddOrUpdate(_parentItems);

var pipeline = _parents.Connect()
.MergeManyChangeSets(p => p.Children.Connect());

_subscription = SubscriberWork switch
{
"Sort" => pipeline
.Sort(SortExpressionComparer<MmcsChild>.Ascending(c => c.Name))
.Bind(out _)
.Subscribe(_ => { }),

"Transform" => pipeline
.Transform(c => new MmcsChildVm(c))
.Subscribe(_ => { }),

_ => pipeline.Subscribe(_ => { }),
};
}

[Benchmark]
public void ConcurrentChildMutations()
{
if (ThreadCount == 1)
{
MutateChildren(0);
}
else
{
var barrier = new Barrier(ThreadCount);
var tasks = new Task[ThreadCount];
for (var t = 0; t < ThreadCount; t++)
{
var threadId = t;
tasks[t] = Task.Run(() =>
{
barrier.SignalAndWait();
MutateChildren(threadId);
});
}
Task.WaitAll(tasks);
barrier.Dispose();
}
}

private void MutateChildren(int threadId)
{
for (var i = 0; i < ChildOpsPerThread; i++)
{
var parentIdx = (threadId * ChildOpsPerThread + i) % ParentCount;
var parent = _parentItems[parentIdx];
var childId = threadId * 100_000 + i;
parent.Children.AddOrUpdate(new MmcsChild(childId, $"New_{childId}"));
if (parent.Children.Count > 15)
parent.Children.RemoveKey(parent.Children.Keys.First());
}
}

public sealed record MmcsChild(int Id, string Name);
public sealed record MmcsChildVm(MmcsChild Source);

public sealed class MmcsParent(int id) : IDisposable
{
public int Id { get; } = id;
public SourceCache<MmcsChild, int> Children { get; } = new(c => c.Id);
public void Dispose() => Children.Dispose();
}
}
2 changes: 1 addition & 1 deletion src/DynamicData.Benchmarks/DynamicData.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0-windows</TargetFramework>
<TargetFramework>net9.0</TargetFramework>
<PlatformTarget>AnyCPU</PlatformTarget>
<IsPackable>false</IsPackable>
<NoWarn>;1591;1701;1702;1705;CA1822;CA1001</NoWarn>
Expand Down
Loading
Loading