Skip to content

Added the ability to use tokens as a filtering mechanism #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions PubSub.Tests/CoreHubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,32 @@ public void PubSubUnsubDirectlyToHub()
// assert
Assert.AreEqual(10, callCount);
}

[TestMethod]
public void PubSubWithToken()
{
var callCount = 0;
string token1 = "token1";
string token2 = "token2";
_hub.Subscribe<bool>((b) => { callCount++;},token1);
_hub.Subscribe<int>((i)=>callCount++,token2);

_hub.Publish(true); //There is no token with ""
_hub.Publish(true, token2); //There is no token2 with bool
_hub.Publish(true, token1); //should be true

Assert.AreEqual(1, callCount);
_hub.Publish(42, token2); //should be true
Assert.AreEqual(2, callCount);

_hub.Unsubscribe(); //This shouldn't unsubscribe anything
Assert.AreEqual(_hub._handlers.Count, 2);

_hub.Unsubscribe(token1); //This should unsubscribe token1
Assert.AreEqual(_hub._handlers.Count, 1);


}
}

public class Event
Expand Down
8 changes: 4 additions & 4 deletions PubSub.Tests/IocExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ public class IocExtensionsTests
private IPubSubPipelineFactory pubSubFactory;
private ISubscriber subscriber;
private IPublisher publisher;
private object sender;
private object preservedSender;
private string sender;
private string preservedSender;

[TestInitialize]
public void Setup()
{
pubSubFactory = new PubSubPipelineFactory();
subscriber = pubSubFactory.GetSubscriber();
publisher = pubSubFactory.GetPublisher();
sender = new object();
preservedSender = new object();
sender = "sender";
preservedSender = "preservedSender";
}

[TestMethod]
Expand Down
78 changes: 44 additions & 34 deletions PubSub/Core/Hub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace PubSub
Expand All @@ -13,10 +14,15 @@ public class Hub
private static Hub _default;

public static Hub Default => _default ?? (_default = new Hub());

public void Publish<T>(T data = default(T))
/// <summary>
///
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="data">The object we want to publish</param>
/// <param name="token">A token that we will filter on.</param>
public void Publish<T>(T data = default, string token = "")
{
foreach (var handler in GetAliveHandlers<T>())
foreach (Handler handler in GetAliveHandlers<T>(token))
{
switch (handler.Action)
{
Expand All @@ -30,9 +36,9 @@ public class Hub
}
}

public async Task PublishAsync<T>(T data = default(T))
public async Task PublishAsync<T>(T data = default, string token = "")
{
foreach (var handler in GetAliveHandlers<T>())
foreach (var handler in GetAliveHandlers<T>(token))
{
switch (handler.Action)
{
Expand All @@ -51,40 +57,40 @@ public class Hub
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="handler"></param>
public void Subscribe<T>(Action<T> handler)
/// <param name="token"></param>
public void Subscribe<T>(Action<T> handler, string token = "")
{
Subscribe(this, handler);
Subscribe(this, handler, token);
}

public void Subscribe<T>(object subscriber, Action<T> handler)
public void Subscribe<T>(object subscriber, Action<T> handler, string token = "")
{
SubscribeDelegate<T>(subscriber, handler);
SubscribeDelegate<T>(subscriber, handler, token);
}

public void Subscribe<T>(Func<T, Task> handler)
public void Subscribe<T>(Func<T, Task> handler, string token = "")
{
Subscribe(this, handler);
Subscribe(this, handler, token);
}

public void Subscribe<T>(object subscriber, Func<T, Task> handler)
public void Subscribe<T>(object subscriber, Func<T, Task> handler, string token = "")
{
SubscribeDelegate<T>(subscriber, handler);
SubscribeDelegate<T>(subscriber, handler, token);
}

/// <summary>
/// Allow unsubscribing directly to this Hub.
/// </summary>
public void Unsubscribe()
public void Unsubscribe(string token = "")
{
Unsubscribe(this);
Unsubscribe(this, token);
}

public void Unsubscribe(object subscriber)
public void Unsubscribe(object subscriber, string token = "")
{
lock (_locker)
{
var query = _handlers.Where(a => !a.Sender.IsAlive ||
a.Sender.Target.Equals(subscriber));
var query = _handlers.Where(a => !a.Sender.IsAlive || a.Sender.Target.Equals(subscriber) && a.Token == token);

foreach (var h in query.ToList())
_handlers.Remove(h);
Expand All @@ -105,18 +111,16 @@ public void Unsubscribe<T>()
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="handler"></param>
public void Unsubscribe<T>(Action<T> handler)
public void Unsubscribe<T>(Action<T> handler, string token = "")
{
Unsubscribe(this, handler);
Unsubscribe(this, handler, token);
}

public void Unsubscribe<T>(object subscriber, Action<T> handler = null)
public void Unsubscribe<T>(object subscriber, Action<T> handler = null, string token = "")
{
lock (_locker)
{
var query = _handlers.Where(a => !a.Sender.IsAlive ||
a.Sender.Target.Equals(subscriber) && a.Type == typeof(T));

var query = _handlers.Where(a => !a.Sender.IsAlive || (a.Sender.Target.Equals(subscriber) && a.Type == typeof(T) && a.Token == token));
if (handler != null)
query = query.Where(a => a.Action.Equals(handler));

Expand All @@ -125,19 +129,19 @@ public void Unsubscribe<T>(object subscriber, Action<T> handler = null)
}
}

public bool Exists<T>()
public bool Exists<T>(string token = "")
{
return Exists<T>(this);
return Exists<T>(this, token);
}

public bool Exists<T>(object subscriber)
public bool Exists<T>(object subscriber, string token = "")
{
lock (_locker)
{
foreach (var h in _handlers)
{
if (Equals(h.Sender.Target, subscriber) &&
typeof(T) == h.Type)
typeof(T) == h.Type && h.Token == token)
{
return true;
}
Expand All @@ -147,15 +151,15 @@ public bool Exists<T>(object subscriber)
return false;
}

public bool Exists<T>(object subscriber, Action<T> handler)
public bool Exists<T>(object subscriber, Action<T> handler, string token = "")
{
lock (_locker)
{
foreach (var h in _handlers)
{
if (Equals(h.Sender.Target, subscriber) &&
typeof(T) == h.Type &&
h.Action.Equals(handler))
h.Action.Equals(handler) && h.Token.Equals(token))
{
return true;
}
Expand All @@ -165,13 +169,14 @@ public bool Exists<T>(object subscriber, Action<T> handler)
return false;
}

private void SubscribeDelegate<T>(object subscriber, Delegate handler)
private void SubscribeDelegate<T>(object subscriber, Delegate handler, string token)
{
var item = new Handler
{
Action = handler,
Sender = new WeakReference(subscriber),
Type = typeof(T)
Type = typeof(T),
Token = token
};

lock (_locker)
Expand All @@ -180,10 +185,13 @@ private void SubscribeDelegate<T>(object subscriber, Delegate handler)
}
}

private IEnumerable<Handler> GetAliveHandlers<T>()
private IEnumerable<Handler> GetAliveHandlers<T>(string token)
{
PruneHandlers();
return _handlers.Where(h => h.Type.GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()));
lock (_locker)
{
return _handlers.Where(h => h.Type.GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()) && h.Token == token);
}
}

private void PruneHandlers()
Expand All @@ -203,6 +211,8 @@ internal class Handler
public Delegate Action { get; set; }
public WeakReference Sender { get; set; }
public Type Type { get; set; }
public string Token { get; set; } = "";

}
}
}
2 changes: 1 addition & 1 deletion PubSub/Ioc/Implementation/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ public Publisher( Hub hub )
this.hub = hub;
}

public void Publish<T>(T data) => hub.Publish(data);
public void Publish<T>(T data, string token) => hub.Publish(data, token);
}
}
15 changes: 8 additions & 7 deletions PubSub/Ioc/Implementation/Subscriber.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
using System;
using System.Threading;

namespace PubSub
{
public class Subscriber : ISubscriber
{
private readonly Hub hub;

public Subscriber( Hub hub )
public Subscriber(Hub hub)
{
this.hub = hub;
}

public bool Exists<T>( object subscriber ) => hub.Exists<T>( subscriber );
public bool Exists<T>(object subscriber, string token) => hub.Exists<T>(subscriber, token);

public bool Exists<T>( object subscriber, Action<T> handler ) => hub.Exists( subscriber, handler );
public bool Exists<T>(object subscriber, Action<T> handler, string token) => hub.Exists(subscriber, handler, token);

public void Subscribe<T>( object subscriber, Action<T> handler ) => hub.Subscribe( subscriber, handler );
public void Subscribe<T>(object subscriber, Action<T> handler, string token) => hub.Subscribe(subscriber, handler, token);

public void Unsubscribe( object subscriber ) => hub.Unsubscribe( subscriber );
public void Unsubscribe(object subscriber, string token = "") => hub.Unsubscribe(subscriber, token);

public void Unsubscribe<T>( object subscriber ) => hub.Unsubscribe( subscriber, (Action<T>) null );
public void Unsubscribe<T>(object subscriber, string token) => hub.Unsubscribe(subscriber, (Action<T>)null, token);

public void Unsubscribe<T>( object subscriber, Action<T> handler ) => hub.Unsubscribe( subscriber, handler );
public void Unsubscribe<T>(object subscriber, Action<T> handler, string token) => hub.Unsubscribe(subscriber, handler, token);
}
}
2 changes: 1 addition & 1 deletion PubSub/Ioc/Interfaces/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
{
public interface IPublisher
{
void Publish<T>(T data);
void Publish<T>(T data, string token = "");
}
}
12 changes: 6 additions & 6 deletions PubSub/Ioc/Interfaces/ISubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ namespace PubSub
{
public interface ISubscriber
{
bool Exists<T>( object subscriber );
bool Exists<T>( object subscriber, Action<T> handler );
void Subscribe<T>( object subscriber, Action<T> handler );
void Unsubscribe( object subscriber );
void Unsubscribe<T>( object subscriber );
void Unsubscribe<T>( object subscriber, Action<T> handler );
bool Exists<T>( object subscriber, string token = "" );
bool Exists<T>( object subscriber, Action<T> handler, string token = "");
void Subscribe<T>( object subscriber, Action<T> handler, string token ="");
void Unsubscribe( object subscriber, string token = "");
void Unsubscribe<T>( object subscriber, string token = "");
void Unsubscribe<T>( object subscriber, Action<T> handler, string token = "");
}
}