diff --git a/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs b/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs index a951831d..91341d77 100644 --- a/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs +++ b/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs @@ -30,6 +30,11 @@ public interface IAsyncMessagePublisher /// Send Message to all receiver and await complete. /// IObservable PublishAsync(T message); + + /// + /// Send Message to all recievers and await data from recievers + /// + IObservable PublishAsync(T1 message); } public interface IAsyncMessageReceiver @@ -38,6 +43,11 @@ public interface IAsyncMessageReceiver /// Subscribe typed message. /// IDisposable Subscribe(Func> asyncMessageReceiver); + + /// + /// Subscribe typed message with payload. + /// + IDisposable Subscribe(Func> asyncMessageReceiver); } public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver @@ -143,6 +153,34 @@ public IObservable PublishAsync(T message) return Observable.WhenAll(awaiter); } + public IObservable PublishAsync(T1 message) + { + UniRx.InternalUtil.ImmutableList>> notifier; + lock (notifiers) + { + if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); + + object _notifier; + if (notifiers.TryGetValue(typeof(T1), out _notifier)) + { + notifier = (UniRx.InternalUtil.ImmutableList>>)_notifier; + } + else + { + return null; + } + } + + var data = notifier.Data; + var awaiter = new IObservable[data.Length]; + for (int i = 0; i < data.Length; i++) + { + awaiter[i] = data[i].Invoke(message); + } + return Observable.WhenAll(awaiter); + } + + public IDisposable Subscribe(Func> asyncMessageReceiver) { lock (notifiers) @@ -167,6 +205,30 @@ public IDisposable Subscribe(Func> asyncMessageReceiver) return new Subscription(this, asyncMessageReceiver); } + public IDisposable Subscribe(Func> asyncMessageReceiver) + { + lock (notifiers) + { + if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); + + object _notifier; + if (!notifiers.TryGetValue(typeof(T), out _notifier)) + { + var notifier = UniRx.InternalUtil.ImmutableList>>.Empty; + notifier = notifier.Add(asyncMessageReceiver); + notifiers.Add(typeof(T), notifier); + } + else + { + var notifier = (ImmutableList>>)_notifier; + notifier = notifier.Add(asyncMessageReceiver); + notifiers[typeof(T)] = notifier; + } + } + + return new Subscription(this, asyncMessageReceiver); + } + public void Dispose() { lock (notifiers) @@ -205,5 +267,32 @@ public void Dispose() } } } + + class Subscription : IDisposable + { + readonly AsyncMessageBroker parent; + readonly Func> asyncMessageReceiver; + + public Subscription(AsyncMessageBroker parent, Func> asyncMessageReceiver) + { + this.parent = parent; + this.asyncMessageReceiver = asyncMessageReceiver; + } + + public void Dispose() + { + lock (parent.notifiers) + { + object _notifier; + if (parent.notifiers.TryGetValue(typeof(T), out _notifier)) + { + var notifier = (ImmutableList>>)_notifier; + notifier = notifier.Remove(asyncMessageReceiver); + + parent.notifiers[typeof(T)] = notifier; + } + } + } + } } } \ No newline at end of file