Skip to content

Commit c36885a

Browse files
authored
Merge pull request #74 from hudl/MARVEL-2040-FixConcurencyProblem
Replace non threaded safe List with a ConcurrentDictionary in bulkhead factory.
2 parents 94950da + 8baff17 commit c36885a

File tree

4 files changed

+69
-9
lines changed

4 files changed

+69
-9
lines changed

src/Hudl.Mjolnir/Bulkhead/BulkheadFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ internal class SemaphoreBulkheadHolder
9797
private readonly GroupKey _key;
9898

9999
private ISemaphoreBulkhead _bulkhead;
100-
public ISemaphoreBulkhead Bulkhead { get { return _bulkhead; } }
100+
public ISemaphoreBulkhead Bulkhead => _bulkhead;
101101

102102
private readonly IMetricEvents _metricEvents;
103103
private readonly MjolnirConfiguration _config;

src/Hudl.Mjolnir/Config/MjolnirConfiguration.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34

45
namespace Hudl.Mjolnir.Config
@@ -141,7 +142,7 @@ public virtual BreakerConfiguration GetBreakerConfiguration(string key)
141142
/// </summary>
142143
public void NotifyAfterConfigUpdate()
143144
{
144-
foreach (var observer in _observers)
145+
foreach (var observer in _observers.Values)
145146
{
146147
observer.OnNext(this);
147148
}
@@ -161,13 +162,14 @@ public void Dispose()
161162
_onDispose();
162163
}
163164
}
164-
165-
private readonly List<IObserver<MjolnirConfiguration>> _observers = new List<IObserver<MjolnirConfiguration>>();
166-
165+
166+
private readonly ConcurrentDictionary<IObserver<MjolnirConfiguration>, IObserver<MjolnirConfiguration>>
167+
_observers = new ConcurrentDictionary<IObserver<MjolnirConfiguration>, IObserver<MjolnirConfiguration>>();
168+
167169
public IDisposable Subscribe(IObserver<MjolnirConfiguration> observer)
168170
{
169-
var subscription = new Subscription(() => _observers.Remove(observer));
170-
_observers.Add(observer);
171+
var subscription = new Subscription(() => _observers.TryRemove(observer, out var unused));
172+
_observers.TryAdd(observer, observer);
171173
return subscription;
172174
}
173175
}

src/Hudl.Mjolnir/Hudl.Mjolnir.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
3-
<VersionPrefix>3.2.0</VersionPrefix>
4-
<FileVersion>3.2.0</FileVersion>
3+
<VersionPrefix>3.2.1</VersionPrefix>
4+
<FileVersion>3.2.1</FileVersion>
55
<AssemblyVersion>3.0.0.0</AssemblyVersion>
66
<TargetFrameworks>netstandard1.2</TargetFrameworks>
77
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>

tests/unit/Hudl.Mjolnir.Tests/Bulkhead/BulkheadFactoryTests.cs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
using Hudl.Mjolnir.Tests.Helper;
66
using Moq;
77
using System;
8+
using System.Collections.Concurrent;
89
using System.Collections.Generic;
910
using System.Threading;
11+
using System.Threading.Tasks;
1012
using Hudl.Mjolnir.Config;
1113
using Xunit;
1214
using static Hudl.Mjolnir.Bulkhead.BulkheadFactory;
@@ -335,5 +337,61 @@ public void GetBulkhead_WhenInitializingBulkheadAndMaxConcurrentConfigIsInvalid_
335337

336338
Assert.Equal(validMaxConcurrent, bulkhead.CountAvailable);
337339
}
340+
341+
342+
/// <summary>
343+
/// This test may past even if there is a concurrency problem. It will always pass if the concurrency is working
344+
/// fine. Do not ignore FAILURES!!
345+
/// </summary>
346+
[Fact]
347+
public void MultiThreadedInitialization_DoesNotThrow()
348+
{
349+
var mockMetricEvents = new Mock<IMetricEvents>(MockBehavior.Strict);
350+
351+
var key = AnyGroupKey;
352+
const int validMaxConcurrent = 10;
353+
354+
var mockConfig = new MjolnirConfiguration
355+
{
356+
BulkheadConfigurations = new Dictionary<string, BulkheadConfiguration>
357+
{
358+
{
359+
key.Name,
360+
new BulkheadConfiguration
361+
{
362+
MaxConcurrent = validMaxConcurrent
363+
}
364+
}
365+
}
366+
};
367+
368+
var mockLogFactory = new Mock<IMjolnirLogFactory>(MockBehavior.Strict);
369+
mockLogFactory.Setup(m => m.CreateLog<BulkheadFactory>()).Returns(new DefaultMjolnirLog<BulkheadFactory>());
370+
mockLogFactory.Setup(m => m.CreateLog<SemaphoreBulkheadHolder>())
371+
.Returns(new DefaultMjolnirLog<SemaphoreBulkheadHolder>());
372+
var factory = new BulkheadFactory(mockMetricEvents.Object, mockConfig, mockLogFactory.Object);
373+
374+
var groupKeys = new List<GroupKey>();
375+
for (var i = 0; i < 50000000; ++i)
376+
{
377+
groupKeys.Add(GroupKey.Named(1.ToString()));
378+
}
379+
380+
var exceptions = new ConcurrentQueue<Exception>();
381+
382+
Parallel.ForEach(groupKeys, (groupKey) =>
383+
{
384+
try
385+
{
386+
factory.GetBulkhead(groupKey);
387+
}
388+
catch (Exception e)
389+
{
390+
exceptions.Enqueue(e);
391+
}
392+
});
393+
394+
Assert.Empty(exceptions);
395+
}
338396
}
339397
}

0 commit comments

Comments
 (0)