Skip to content

Commit

Permalink
Merge pull request grpc#6420 from jtattermusch/csharp_more_api_fixes
Browse files Browse the repository at this point in the history
C# finishing serverside request stream should not be required to dispose.
  • Loading branch information
jtattermusch committed May 4, 2016
2 parents 05146ac + 6504c02 commit 099b756
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 40 deletions.
34 changes: 11 additions & 23 deletions src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,24 @@ public void Cleanup()

[Test]
public void CancelNotificationAfterStartDisposes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

[Test]
public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);

// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

Expand All @@ -101,9 +109,8 @@ public void ReadAfterCancelNotificationCanSucceed()

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);

// Check that startin a read after cancel notification has been processed is legal.
// Check that starting a read after cancel notification has been processed is legal.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

AssertFinished(asyncCallServer, fakeCall, finishedTask);
Expand Down Expand Up @@ -136,42 +143,28 @@ public void WriteAfterCancelNotificationFails()

// TODO(jtattermusch): should we throw a different exception type instead?
Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));

// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

[Test]
public void WriteCompletionFailureThrows()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);

var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);

// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);

AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

[Test]
public void WriteAndWriteStatusCanRunConcurrently()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);

var writeTask = responseStream.WriteAsync("request1");
Expand All @@ -183,11 +176,6 @@ public void WriteAndWriteStatusCanRunConcurrently()
Assert.DoesNotThrowAsync(async () => await writeTask);
Assert.DoesNotThrowAsync(async () => await writeStatusTask);

// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);

AssertFinished(asyncCallServer, fakeCall, finishedTask);
Expand Down
7 changes: 1 addition & 6 deletions src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected Task<TRead> ReadMessageInternalAsync()
{
lock (myLock)
{
CheckReadingAllowed();
GrpcPreconditions.CheckState(started);
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
Expand Down Expand Up @@ -224,11 +224,6 @@ protected void CheckSendingAllowed(bool allowFinished)
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}

protected virtual void CheckReadingAllowed()
{
GrpcPreconditions.CheckState(started);
}

protected void CheckNotCancelled()
{
if (cancelRequested)
Expand Down
14 changes: 8 additions & 6 deletions src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,6 @@ protected override bool IsClient
get { return false; }
}

protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();
GrpcPreconditions.CheckArgument(!cancelRequested);
}

protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
Expand All @@ -204,6 +198,14 @@ private void HandleFinishedServerside(bool success, bool cancelled)
lock (myLock)
{
finished = true;
if (streamingReadTcs == null)
{
// if there's no pending read, readingDone=true will dispose now.
// if there is a pending read, we will dispose once that read finishes.
readingDone = true;
streamingReadTcs = new TaskCompletionSource<TRequest>();
streamingReadTcs.SetResult(default(TRequest));
}
ReleaseResourcesIfPossible();
}

Expand Down
5 changes: 0 additions & 5 deletions src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
var result = await handler(request, context).ConfigureAwait(false);
status = context.Status;
await responseStream.WriteAsync(result).ConfigureAwait(false);
Expand Down Expand Up @@ -136,8 +134,6 @@ public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
await handler(request, responseStream, context).ConfigureAwait(false);
status = context.Status;
}
Expand Down Expand Up @@ -298,7 +294,6 @@ public static Status StatusFromException(Exception e)
return rpcException.Status;
}

// TODO(jtattermusch): what is the right status code here?
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}

Expand Down

0 comments on commit 099b756

Please sign in to comment.