Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit fe62d3e

Browse files
author
Johan 't Hart
committedMar 3, 2021
Add possibility for caller proxy to return an observable
When subscribed to it will call a progressive RPC #238
1 parent 82a672e commit fe62d3e

File tree

4 files changed

+111
-8
lines changed

4 files changed

+111
-8
lines changed
 

‎src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Reactive.Disposables;
5+
using System.Reactive.Linq;
46
using System.Threading.Tasks;
57
using NUnit.Framework;
68
using WampSharp.Core.Serialization;
@@ -64,6 +66,42 @@ public async Task ProgressiveCallsCalleeProxyProgress()
6466
Assert.That(result.Result, Is.EqualTo(10));
6567
}
6668

69+
[Test]
70+
public async Task ProgressiveCallsCalleeProxyObservable()
71+
{
72+
WampPlayground playground = new WampPlayground();
73+
74+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
75+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
76+
IWampChannel callerChannel = dualChannel.CallerChannel;
77+
78+
MyOperation myOperation = new MyOperation();
79+
80+
await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
81+
ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObsService>();
82+
83+
IEnumerable<int> results = proxy.LongOp(9, false).ToEnumerable(); // it will emit one more than asked
84+
85+
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), results);
86+
}
87+
88+
[Test]
89+
public async Task ProgressiveCallsCalleeProxyObservableError()
90+
{
91+
WampPlayground playground = new WampPlayground();
92+
93+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
94+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
95+
IWampChannel callerChannel = dualChannel.CallerChannel;
96+
97+
MyOperation myOperation = new MyOperation();
98+
99+
await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
100+
ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObsService>();
101+
102+
Assert.Throws(typeof(WampException), () => proxy.LongOp(9, true).ToEnumerable().Count());
103+
}
104+
67105
public class MyOperation : IWampRpcOperation
68106
{
69107
public string Procedure => "com.myapp.longop";
@@ -80,16 +118,27 @@ public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCal
80118
TMessage number = arguments[0];
81119
int n = formatter.Deserialize<int>(number);
82120

121+
bool endWithError = arguments.Length > 1 && formatter.Deserialize<bool>(arguments[1]);
122+
83123
for (int i = 0; i < n; i++)
84124
{
85125
caller.Result(WampObjectFormatter.Value,
86126
new YieldOptions {Progress = true},
87127
new object[] {i});
88128
}
89129

90-
caller.Result(WampObjectFormatter.Value,
91-
new YieldOptions(),
92-
new object[] {n});
130+
if (endWithError)
131+
{
132+
caller.Error(WampObjectFormatter.Value,
133+
new Dictionary<string, string>(),
134+
"Something bad happened");
135+
}
136+
else
137+
{
138+
caller.Result(WampObjectFormatter.Value,
139+
new YieldOptions(),
140+
new object[] { n });
141+
}
93142

94143
return null;
95144
}
@@ -122,6 +171,31 @@ public async Task<int> LongOp(int n, IProgress<int> progress)
122171
}
123172
}
124173

174+
public interface ILongOpObsService
175+
{
176+
[WampProcedure("com.myapp.longop")]
177+
[WampProgressiveResultProcedure]
178+
IObservable<int> LongOp(int n, bool endWithError);
179+
}
180+
181+
public class LongOpObsService : ILongOpObsService
182+
{
183+
public IObservable<int> LongOp(int n, bool endWithError) => Observable.Create<int>(async obs =>
184+
{
185+
for (int i = 0; i < n; i++)
186+
{
187+
obs.OnNext(i);
188+
await Task.Delay(100);
189+
}
190+
if (endWithError)
191+
obs.OnError(new WampException("wamp.error", "Something bad happened"));
192+
else
193+
obs.OnCompleted();
194+
195+
return Disposable.Empty;
196+
});
197+
}
198+
125199
public class MyCallback : IWampRawRpcOperationClientCallback
126200
{
127201
private readonly TaskCompletionSource<int> mTask = new TaskCompletionSource<int>();
@@ -187,4 +261,4 @@ public void Report(T value)
187261
mAction(value);
188262
}
189263
}
190-
}
264+
}

‎src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptor.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Reflection;
23
using WampSharp.V2.Core.Contracts;
34
using WampSharp.V2.Rpc;
@@ -37,7 +38,7 @@ public virtual CallOptions GetCallOptions(MethodInfo method)
3738

3839
public virtual string GetProcedureUri(MethodInfo method)
3940
{
40-
WampProcedureAttribute attribute =
41+
WampProcedureAttribute attribute =
4142
method.GetCustomAttribute<WampProcedureAttribute>();
4243

4344
if (attribute == null)
@@ -48,4 +49,4 @@ public virtual string GetProcedureUri(MethodInfo method)
4849
return attribute.Procedure;
4950
}
5051
}
51-
}
52+
}

‎src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive;
23
using System.Reflection;
34
using System.Threading.Tasks;
45
using WampSharp.V2.Rpc;
@@ -28,7 +29,12 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
2829
Type genericArgument;
2930
Type interceptorType;
3031

31-
if (!typeof(Task).IsAssignableFrom(returnType))
32+
if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(IObservable<>))
33+
{
34+
genericArgument = returnType.GetGenericArguments()[0];
35+
interceptorType = typeof(ObservableCalleeProxyInterceptor<>);
36+
}
37+
else if (!typeof(Task).IsAssignableFrom(returnType))
3238
{
3339
MethodInfoValidation.ValidateSyncMethod(method);
3440

@@ -55,4 +61,4 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
5561
return closedGenericType;
5662
}
5763
}
58-
}
64+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System.Reactive;
2+
using System.Reactive.Linq;
3+
using System.Reflection;
4+
5+
namespace WampSharp.V2.CalleeProxy
6+
{
7+
internal class ObservableCalleeProxyInterceptor<T> : CalleeProxyInterceptorBase<T>
8+
{
9+
public ObservableCalleeProxyInterceptor(MethodInfo method, IWampCalleeProxyInvocationHandler handler, ICalleeProxyInterceptor interceptor) : base(method, handler, interceptor)
10+
{
11+
}
12+
13+
public override object Invoke(MethodInfo method, object[] arguments) => Observable.Create<T>(async (obs, cancellationToken) =>
14+
{
15+
var last = await Handler.InvokeProgressiveAsync
16+
(Interceptor, method, Extractor, arguments, obs.ToProgress(), cancellationToken);
17+
if (last != null)
18+
obs.OnNext(last);
19+
obs.OnCompleted();
20+
});
21+
}
22+
}

0 commit comments

Comments
 (0)
Please sign in to comment.