Skip to content

Commit

Permalink
Made Subscription objects thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
jirkapok committed Jun 26, 2018
1 parent dbcf9ee commit fa238cd
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 31 deletions.
45 changes: 37 additions & 8 deletions Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,29 @@ internal class SubscriptionCallbacks
internal string Id { get; set; }

private readonly NotificationDeliveryServiceProxy proxy;
private readonly object itemsLock = new object();

internal IEnumerable<SubscriberCallback> Callbacks => this.callbacks;
public bool Empty => !this.callbacks.Any();
internal IEnumerable<SubscriberCallback> Callbacks
{
get
{
lock (this.itemsLock)
{
return new List<SubscriberCallback>(this.callbacks);
}
}
}

public bool Empty
{
get
{
lock (this.itemsLock)
{
return !this.callbacks.Any();
}
}
}

private readonly List<SubscriberCallback> callbacks = new List<SubscriberCallback>();

Expand All @@ -26,21 +46,30 @@ public SubscriptionCallbacks(string uri, NotificationDeliveryServiceProxy proxy,

internal void Add(SubscriberCallback callback)
{
if (!this.callbacks.Contains(callback))
this.callbacks.Add(callback);
lock (this.itemsLock)
{
if (!this.callbacks.Contains(callback))
this.callbacks.Add(callback);
}
}

internal void Remove(SubscriberCallback callback)
{
this.callbacks.Remove(callback);
lock (this.itemsLock)
{
this.callbacks.Remove(callback);
}
}

internal void CloseProxy()
{
if (this.proxy != null)
lock (this.itemsLock)
{
this.proxy.Disconnect(this.activeSubscriberAddress);
this.proxy.Close();
if (this.proxy != null)
{
this.proxy.Disconnect(this.activeSubscriberAddress);
this.proxy.Close();
}
}
}
}
Expand Down
58 changes: 35 additions & 23 deletions Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

Expand All @@ -8,9 +7,10 @@ namespace SwqlStudio.Subscriptions
internal class SubscriptionInfo
{
private readonly ConnectionInfo connection;
private readonly object itemsLock = new object();

private readonly ConcurrentDictionary<string, SubscriptionCallbacks> subscriptions =
new ConcurrentDictionary<string, SubscriptionCallbacks>();
private readonly Dictionary<string, SubscriptionCallbacks> subscriptions =
new Dictionary<string, SubscriptionCallbacks>();

internal SubscriptionInfo(ConnectionInfo connection)
{
Expand All @@ -19,7 +19,10 @@ internal SubscriptionInfo(ConnectionInfo connection)

internal bool HasSubScription(string subscriptionId)
{
return this.subscriptions.Values.Any(s => s.Id == subscriptionId);
lock (this.itemsLock)
{
return this.subscriptions.Values.Any(s => s.Id == subscriptionId);
}
}

internal string Register(string query, SubscriberCallback callback,
Expand All @@ -28,10 +31,13 @@ internal string Register(string query, SubscriberCallback callback,
SubscriptionCallbacks subscription;
var normalized = query.ToLower();

if (!this.subscriptions.TryGetValue(normalized, out subscription))
lock (this.itemsLock)
{
subscription = subscribe(this.connection, query);
this.subscriptions.TryAdd(normalized, subscription);
if (!this.subscriptions.TryGetValue(normalized, out subscription))
{
subscription = subscribe(this.connection, query);
this.subscriptions.Add(normalized, subscription);
}
}

subscription.Add(callback);
Expand All @@ -40,21 +46,24 @@ internal string Register(string query, SubscriberCallback callback,

internal void Remove(string subscriptionUri, SubscriberCallback callback)
{
var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri)
.Select(kv => kv.Key)
.FirstOrDefault();
lock (this.itemsLock)
{
var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri)
.Select(kv => kv.Key)
.FirstOrDefault();

if (String.IsNullOrEmpty(query))
return;

var subscription = this.subscriptions[query];
subscription.Remove(callback);
if (String.IsNullOrEmpty(query))
return;

if (subscription.Empty)
{
this.subscriptions.TryRemove(query, out subscription);
this.Unsubscribe(subscriptionUri);
subscription.CloseProxy();
var subscription = this.subscriptions[query];
subscription.Remove(callback);

if (subscription.Empty)
{
this.subscriptions.Remove(query);
this.Unsubscribe(subscriptionUri);
subscription.CloseProxy();
}
}
}

Expand All @@ -66,9 +75,12 @@ private void Unsubscribe(string subscriptionUri)

internal IEnumerable<SubscriberCallback> CallBacks(string subscriptionId)
{
return this.subscriptions.Values.Where(v => v.Id == subscriptionId)
.SelectMany(kv => kv.Callbacks)
.ToList();
lock (this.itemsLock)
{
return this.subscriptions.Values.Where(v => v.Id == subscriptionId)
.SelectMany(kv => kv.Callbacks)
.ToList();
}
}
}
}

0 comments on commit fa238cd

Please sign in to comment.