Skip to content

Commit

Permalink
Fixes #146 - Better support for CancellationToken. Clean up of toke…
Browse files Browse the repository at this point in the history
…n registrations that may produce exceptions when `CancellationTokenSource.Cancel()` is called. Basic idea here is to have the caller of `CancellationTokenSource.Cancel()` dispose of the `CancellationTokenRegistration` and have the message pump worker tasked with finishing the Awatier task continuations dispose of `CancellationTokenRegistration` when it's done via using statement.
  • Loading branch information
bchavez committed Sep 17, 2019
1 parent 0447498 commit 2b25fcb
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 11 deletions.
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## v2.3.150
* Issue #146 - Better support for `CancellationToken`. Clean up of token registrations that may produce exceptions when `CancellationTokenSource.Cancel()` is called.

## v2.3.101
* Issue #143 - `ConnectionPool.Builder.ConnectAsync()` now respects `.InitialTimeout()` parameter. `CancellationToken` also supported.

Expand Down
74 changes: 74 additions & 0 deletions Source/RethinkDb.Driver.Tests/GitHubIssues/Issue146.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
using RethinkDb.Driver.Net;
using static System.Console;
using static RethinkDb.Driver.RethinkDB;

namespace RethinkDb.Driver.Tests.GitHubIssues
{
[TestFixture]
public class Issue146
{
private Connection conn;

[Test]
public async Task Test()
{
Print("Main Thread Start");

conn = R.Connection().Connect();

var cts = new CancellationTokenSource();

RunChanges(cts.Token);

Print("Starting main thread delay.");
await Task.Delay(500);

Print($"Canceling task");
Action act = () => cts.Cancel();

act.ShouldNotThrow();

Print("End of main");
}

private async void RunChanges(CancellationToken ct)
{
Print("RunChanges: called");
Cursor<Model.Change<JObject>> changes = null;
try
{
Print("RunChanges: BEFORE Query");
changes = await R.Db("rethinkdb").Table("jobs")
.Changes().OptArg("include_initial", "true")
.RunChangesAsync<JObject>(conn, ct);
Print("RunChanges: Have Cursor, iterating with MoveNextAsync.");
while (await changes.MoveNextAsync(ct))
{
Print("RunChanges: got a change");
}
}
catch (OperationCanceledException ex)
{
Print("RunChanges: op canceled");
}
finally
{
Print("RunChanges: finally");
changes?.Close();
Print("RunChanges: changes cursor closed");
}
Print("RunChanges: returning");
}

static void Print(string msg)
{
WriteLine($">>> (TID:{Thread.CurrentThread.ManagedThreadId}): {msg}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
<Compile Include="Dao\DaoTests.cs" />
<Compile Include="GitHubIssues\Issue128.cs" />
<Compile Include="GitHubIssues\Issue138.cs" />
<Compile Include="GitHubIssues\Issue146.cs" />
<Compile Include="Network\AuthTest.cs" />
<Compile Include="Network\Benchmark.cs" />
<Compile Include="Network\ConnectionPoolTests.cs" />
Expand Down
14 changes: 10 additions & 4 deletions Source/RethinkDb.Driver/Net/SocketWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,17 @@ private void ResponsePump()
{
Task.Run(() =>
{
//try, because it's possible
//the awaiting task was canceled.
if( !awaitingTask.TrySetResult(response) )
//regardless of the outcome, clean up any registered
//cancellation tokens with using statement.
using ( awaitingTask )
{
Log.Debug($"Response Pump: The awaiter waiting for response token {response.Token} could not be set. The task was probably canceled.");
//try setting the result, because it's possible
//the awaiting task was canceled.
if( !awaitingTask.TrySetResult(response) )
{
Log.Debug(
$"Response Pump: The awaiter waiting for response token {response.Token} could not be set. The task was probably canceled.");
}
}
});
}
Expand Down
15 changes: 8 additions & 7 deletions Source/RethinkDb.Driver/Utils/CancellableTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ public CancellableTask(CancellationToken cancelToken)

private void OnCancellation()
{
this.SetCanceled();
}
this.TrySetCanceled();

private bool disposed = false;
//if the user successfully signaled they want to
//cancel, remove the registration because
//the task status = canceled has been set.
//don't need the registration any more.
this.Dispose();
}

public void Dispose()
{
if( !disposed )
{
this.registration.Dispose();
}
this.registration.Dispose();
}
}
}

0 comments on commit 2b25fcb

Please sign in to comment.