AwaitThreading is a project dedicated to ease the use of parallel programming using async/await infrastructure of C# programming language.
The project consist of two main parts:
-
AwaitThreading.Core: This is where the core functionality of the Fork-join model is implemented. It includes
ForkTask
andJoinTask
that provide the functionality of forking and joining usingawait
operator. It utilizes compiler-generated state machine to run parts of async method in parallel. All operations are also available viaParallelOperations
class. There are also some helper entities (likeParallelLocal<T>
orParallelContextStorage
withParallelContext
). -
Some applications of this concept
- AwaitThreading.Enumerable: Contains extension methods for collections, enabling parallel iteration over collection using a simple foreach loop.
- To be added: MPI-like interface, util classes for auto fork\join with
using
construction, etc.
Basic example:
using AwaitThreading.Core;
using AwaitThreading.Core.Tasks;
await NormalForkAndJoin(5);
async ParallelTask NormalForkAndJoin(int threadCount)
{
Console.Out.WriteLine("Before fork: single thread");
var id = await ParallelOperations.Fork(threadCount);
Console.Out.WriteLine($"Hello world from {id}"); //executed on two different threads
// any (sync or async) workload
await Task.Delay(100);
Thread.Sleep(100);
await ParallelOperations.Join();
Console.Out.WriteLine("After join: single thread");
}
Methods composition:
using AwaitThreading.Core;
using AwaitThreading.Core.Tasks;
await CompositionExample(5);
async ParallelTask CompositionExample(int threadCount)
{
var id = await ForkAndGetId(threadCount);
Console.Out.WriteLine($"Hello world from {id}");
await JoinInsideMethod();
}
async ParallelTask<int> ForkAndGetId(int threadCount)
{
var id = await ParallelOperations.Fork(threadCount);
return id;
}
async ParallelTask JoinInsideMethod()
{
await ParallelOperations.Join();
}
There are two main methods: AsParallel
and AsParallelAsync
. The key difference is that AsParallelAsync
performs forking inside it, so after AsParallelAsync
call caller is already forked, and it requires additional await
to perform this operation.AsParallel
, on the other hand, returns ParallelLazyAsyncEnumerable
and the fork happens inside ParallelLazyAsyncEnumerator.MoveNextAsync
on the first foreach iteration. This allows caller to not write additional await
but has slightly more overhead.
using AwaitThreading.Core.Tasks;
using AwaitThreading.Enumerable;
await AsParallelAsync();
await AsParallel();
async ParallelTask AsParallelAsync()
{
var list = Enumerable.Range(1, 10).ToList();
await foreach (var item in await list.AsParallelAsync(3))
{
// foreach body is executed across three separate threads
Console.Out.WriteLine($"Processing element {item}");
await Task.Delay(10); //simulate some workload
}
}
async ParallelTask AsParallel()
{
var list = Enumerable.Range(1, 10).ToList();
await foreach (var item in list.AsAsyncParallel(3))
{
// foreach body is executed across three separate threads
Console.Out.WriteLine($"Processing element {item}");
await Task.Delay(10); //simulate some workload
}
}
Project is in development state and not production-ready yet.
TODO list of critical items:
- API is not finalized and provides data to some internal structures (like
ParallelContext
) - ExecutionContext is not always restored when it has to
- Exceptions are not propagated from parallel foreach body (compiler-generated state machine saves the exception to a field and rethrows this after
DisposeAsync()
, so there is no way to retrieve this exception for now)