Skip to content

Commit

Permalink
Adding support for delay in resumeOn
Browse files Browse the repository at this point in the history
  • Loading branch information
gershnik committed Dec 14, 2023
1 parent 00ea59c commit adbac9b
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 29 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

### Added
- all variants of `resumeOn` and `resumeOnMainQueue` now accept `when` argument to request that the resumption happens no earlier than the specified time. When used with `co_await resumeOn(currentQueue, when)` this allows the caller to sleep asynchronously without blocking its queue.

## [2.0] - 2023-12-13

### Added
Expand Down
50 changes: 45 additions & 5 deletions doc/CoDispatch.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,17 @@ If the dispatched call takes more time, `co_await` will suspend the caller and t

But what if, as so often happens you want to make sure that the resumed code runs on a specific queue? This is often the case with UI code running on the main queue is delegating work to background tasks and need to resume on the main queue to update the UI with the results of computation. You can easily accomplish this via:

```objc++
```c++
auto queue = dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0);
auto myq = dispatch_get_main_queue();
co_await co_dispatch(queue, [](){

}).resumeOn(myq);
```
or more succinctly:
If the resumption queue is the main queue you cal also use `resumeOnMainQueue`
```objc++
```c++
auto queue = dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0);
co_await co_dispatch(queue, [](){
Expand All @@ -165,10 +165,27 @@ Why isn't there a way to say "resume on the current queue"? This is because, Ap

It is, however, still possible to ask "is a given queue the same one I am currently running on" thought it needs a little trick[^2]. This is how `resumeOn` avoid queue switch if the given queue happens to be the current one.


[^1]: This might have something to do with the fact that "the current queue" as such is the wrong entity to execute on since it might be a delegated part of another queue or a sequence of them.
[^2]: See `isCurrentQueue` method of `BasicPromise` class in the [header][header] for the trick

### Resuming with delay

It is also possible to request that the resumption happens no-earlier-than some specified time. This is useful in situations where you want ensure a certain delay or to avoid spamming the main thread.

You can accomplish this by passing optional `when` parameter to `resumeOn` or `resumeOnMainQueue`. This argument is of type [`dispatch_time_t`][dispatch_time_t]. Here is how to delay resumptions by 200ms

```c++
#include <chrono>

using namespace std::literals;

co_await co_dispatch(queue, [](){

}).resumeOn(anotherQueue, dispatch_time(DISPATCH_TIME_NOW, nanoseconds(200ms).count()));
```
If you request a delay then the resumption will *always* be asynchronous even if the resumption queue is the same as your current queue.
### Exceptions
Any C++ or ObjectiveC exceptions raised in the body of your callable will be propagated back to the calling coroutine. Which means you can write the following natural code
Expand Down Expand Up @@ -245,7 +262,7 @@ The tasks are reference counted and will self destruct once they run to completi

On occasion it might be convenient to simply switch coroutine execution to a different queue. While it is possible to accomplish it with `co_dispatch` and an empty lambda, a simple transition like this can be done much more efficiently. The library provides standalone functions that do so

```objc++
```c++
co_await resumeOn(someQueue);
//and
co_await resumeOnMainQueue();
Expand All @@ -255,6 +272,26 @@ This accomplishes the switch in the fastest possible manner without any extra ov
Both methods never throw exceptions and can be used in `catch` handlers to ensure running on a desired queue.
Both methods also allow for an optional `when` argument of type [`dispatch_time_t`][dispatch_time_t] that requests that the resumption happens no-earlier-than specified time.
```c++
co_await resumeOn(someQueue, when);
//and
co_await resumeOnMainQueue(when);
```

> ℹ️ If you specify `when` and target the same queue as you are currently on these methods allow you to _sleep asynchronously_.
For example here is how to sleep for 1 second on the main queue without blocking the UI thread

```c++
#include <chrono>

using namespace std::literals;

co_await resumeOnMainQueue(dispatch_time(DISPATCH_TIME_NOW, nanoseconds(1s).count()));
```
## Converting callbacks
Many Apple and 3rd party libraries on Apple platform use the callback pattern to report their results asynchronously. You call an API and pass it a callback (a block or sometimes a function). The API initiates some asynchronous work (using dispatch queues internally) and returns quickly. Later the callback is invoked on some queue with the results.
Expand Down Expand Up @@ -343,6 +380,8 @@ try {

```

And just like with `co_dispatch` you can also pass `when` argument to `resumeOn` or `resumeOnMainQueue` to request that the resumption happen no-earlier-than the specified time.

In general, all the rules that apply to the awaitable returned by `co_dispatch` also apply to the one retuned by `makeAwaitable`. You can delay calling `co_await` or not call it all (though in that case there seems to be no point in using `makeAwaitable` at all - you could just call the wrapped function).


Expand Down Expand Up @@ -840,6 +879,7 @@ As explained in the [previous section](#usage-of-coroutines-across-cpp-and-mm-fi
[header]: ../include/objc-helpers/CoDispatch.h
[for-await]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of
[odr]: https://en.cppreference.com/w/cpp/language/definition
[dispatch_time_t]: https://developer.apple.com/documentation/dispatch/dispatch_time_t?language=objc

<!-- End Links --->

73 changes: 49 additions & 24 deletions include/objc-helpers/CoDispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ inline namespace CO_DISPATCH_NS {
*/
auto isReady() const noexcept -> bool {
if (m_resumeQueue) {
if (m_when != DISPATCH_TIME_NOW)
return false;
m_awaiterOnResumeQueue = isCurrentQueue(m_resumeQueue);
if (!m_awaiterOnResumeQueue)
return false;
Expand All @@ -365,9 +367,7 @@ inline namespace CO_DISPATCH_NS {
assert(oldState != s_notStartedMarker && oldState != s_abandonedMarker);
if (oldState != s_runningMarker) {
if (m_resumeQueue && !m_awaiterOnResumeQueue) {
dispatch_async_f(m_resumeQueue, reinterpret_cast<void *>(h.address()), [](void * addr) {
std::coroutine_handle<>::from_address(addr).resume();
});
resumeHandleAsync(h.address());
return true;
}
return false;
Expand All @@ -380,6 +380,7 @@ inline namespace CO_DISPATCH_NS {
*/
auto resumeExecution(dispatch_queue_t __nullable queue) noexcept {
m_value.clear();
m_awaiterOnResumeQueue = false;
[[maybe_unused]] auto oldstate = m_state.exchange(s_runningMarker, std::memory_order_acq_rel);
assert(oldstate != s_runningMarker && oldstate != s_abandonedMarker);
auto myHandle = std::coroutine_handle<BasicPromise>::from_promise(*this);
Expand Down Expand Up @@ -414,9 +415,7 @@ inline namespace CO_DISPATCH_NS {
if (!m_resumeQueue || isCurrentQueue(m_resumeQueue)) {
std::coroutine_handle<>::from_address(reinterpret_cast<void *>(oldState)).resume();
} else {
dispatch_async_f(m_resumeQueue, reinterpret_cast<void *>(oldState), [](void * addr) {
std::coroutine_handle<>::from_address(addr).resume();
});
resumeHandleAsync(reinterpret_cast<void *>(oldState));
}
}
}
Expand Down Expand Up @@ -463,8 +462,10 @@ inline namespace CO_DISPATCH_NS {
/**
Specify a queue on which resume client
*/
void setResumeQueue(dispatch_queue_t __nullable queue) noexcept
{ m_resumeQueue = queue; }
void setResumeQueue(dispatch_queue_t __nullable queue, dispatch_time_t when) noexcept {
m_resumeQueue = queue;
m_when = when;
}


//Server interface
Expand Down Expand Up @@ -512,6 +513,18 @@ inline namespace CO_DISPATCH_NS {
return ret;
}

void resumeHandleAsync(void * __nonnull handleAddr) {

auto resumer = [](void * addr) {
std::coroutine_handle<>::from_address(addr).resume();
};

if (m_when == DISPATCH_TIME_NOW)
dispatch_async_f(m_resumeQueue, handleAddr, resumer);
else
dispatch_after_f(m_when, m_resumeQueue, handleAddr, resumer);
}

private:
static constexpr uintptr_t s_runningMarker = 0;
static constexpr uintptr_t s_notStartedMarker = 1;
Expand All @@ -520,6 +533,7 @@ inline namespace CO_DISPATCH_NS {

std::atomic<uintptr_t> m_state = s_runningMarker;
QueueHolder m_resumeQueue;
dispatch_time_t m_when;
mutable bool m_awaiterOnResumeQueue = false;
DelayedValue m_value;
};
Expand Down Expand Up @@ -669,12 +683,13 @@ inline namespace CO_DISPATCH_NS {
return awaiter{std::move(m_sharedState)};
}

auto resumeOn(dispatch_queue_t __nullable queue) && noexcept -> DispatchAwaitable && {
m_sharedState->setResumeQueue(queue);
auto resumeOn(dispatch_queue_t __nullable queue,
dispatch_time_t when = DISPATCH_TIME_NOW) && noexcept -> DispatchAwaitable && {
m_sharedState->setResumeQueue(queue, when);
return std::move(*this);
}
auto resumeOnMainQueue() && noexcept -> DispatchAwaitable &&
{ return std::move(*this).resumeOn(dispatch_get_main_queue()); }
auto resumeOnMainQueue(dispatch_time_t when = DISPATCH_TIME_NOW) && noexcept -> DispatchAwaitable &&
{ return std::move(*this).resumeOn(dispatch_get_main_queue(), when); }

template<class Func>
requires(std::is_invocable_v<FunctionFromReference<Func>>)
Expand Down Expand Up @@ -872,12 +887,12 @@ inline namespace CO_DISPATCH_NS {
}


auto resumeOn(dispatch_queue_t __nullable queue) && noexcept -> DispatchTask && {
m_promise->setResumeQueue(queue);
auto resumeOn(dispatch_queue_t __nullable queue, dispatch_time_t when = DISPATCH_TIME_NOW) && noexcept -> DispatchTask && {
m_promise->setResumeQueue(queue, when);
return std::move(*this);
}
auto resumeOnMainQueue() && noexcept -> DispatchTask &&
{ return std::move(*this).resumeOn(dispatch_get_main_queue()); }
auto resumeOnMainQueue(dispatch_time_t when = DISPATCH_TIME_NOW) && noexcept -> DispatchTask &&
{ return std::move(*this).resumeOn(dispatch_get_main_queue(), when); }

private:
DispatchTask(Promise * __nonnull promise) noexcept :
Expand Down Expand Up @@ -1008,7 +1023,7 @@ inline namespace CO_DISPATCH_NS {
{ return std::move(*this).beginOn(nullptr); }

auto resumingOn(dispatch_queue_t __nullable queue) && noexcept -> DispatchGenerator && {
m_promise->setResumeQueue(queue);
m_promise->setResumeQueue(queue, DISPATCH_TIME_NOW);
return std::move(*this);
}
auto resumingOnMainQueue() && noexcept -> DispatchGenerator &&
Expand All @@ -1027,16 +1042,23 @@ inline namespace CO_DISPATCH_NS {

/**
@function
`co_await`ing this will resume the coroutine on a given queue
`co_await`ing this will resume the coroutine on a given queue optionally on or after a given time
If you pass your qurrent queue and non default `when` this is equivalent to an asyncrounous sleep
until `when` - now.
*/
inline auto resumeOn(dispatch_queue_t __nonnull queue) noexcept {
inline auto resumeOn(dispatch_queue_t __nonnull queue, dispatch_time_t when = DISPATCH_TIME_NOW) noexcept {
struct Awaitable
{
dispatch_queue_t queue;
dispatch_time_t when;
auto await_ready() noexcept
{ return false; }
void await_suspend(std::coroutine_handle<> h) noexcept {
dispatch_async_f(queue, h.address(), Awaitable::resume);
if (when == DISPATCH_TIME_NOW)
dispatch_async_f(queue, h.address(), Awaitable::resume);
else
dispatch_after_f(when, queue, h.address(), Awaitable::resume);
}
void await_resume() noexcept
{}
Expand All @@ -1046,15 +1068,18 @@ inline namespace CO_DISPATCH_NS {
h.resume();
}
};
return Awaitable{queue};
return Awaitable{queue, when};
}

/**
@function
`co_await`ing this will resume a coroutine on the main queue
`co_await`ing this will resume a coroutine on the main queue optionally on or after a given time
If you are already on the main queue and pass non default `when` this is equivalent to an asyncrounous sleep
until `when` - now.
*/
inline auto resumeOnMainQueue() noexcept {
return resumeOn(dispatch_get_main_queue());
inline auto resumeOnMainQueue(dispatch_time_t when = DISPATCH_TIME_NOW) noexcept {
return resumeOn(dispatch_get_main_queue(), when);
}

//MARK: - Dispatch IO wrappers
Expand Down
18 changes: 18 additions & 0 deletions test/CoDispatchTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#include <string_view>
#include <map>
#include <filesystem>
#include <chrono>

using namespace std::literals;
using namespace std::chrono;

template<class T>
struct Sequence {
Expand Down Expand Up @@ -350,6 +354,12 @@ static auto checkDispatchToDifferentQueue() -> DispatchTask<> {
co_await resumeOn(conq);
CHECK(!NSThread.isMainThread);

co_await resumeOnMainQueue(dispatch_time(DISPATCH_TIME_NOW, nanoseconds(200ms).count()));
CHECK(NSThread.isMainThread);

co_await resumeOn(conq, dispatch_time(DISPATCH_TIME_NOW, nanoseconds(200ms).count()));
CHECK(!NSThread.isMainThread);

i = co_await delay(0.2, co_dispatch(dispatch_get_main_queue(), []() {
return 3;
}));
Expand Down Expand Up @@ -487,6 +497,7 @@ static auto checkTasks() -> DispatchTask<> {
}

co_await resumeOnMainQueue();
CHECK(NSThread.isMainThread);


auto coro = []() -> DispatchTask<int> {
Expand All @@ -497,6 +508,13 @@ static auto checkTasks() -> DispatchTask<> {
};

co_await coro().resumeOnMainQueue();
CHECK(NSThread.isMainThread);

co_await coro().resumeOnMainQueue(dispatch_time(DISPATCH_TIME_NOW, nanoseconds(200ms).count()));
CHECK(NSThread.isMainThread);

co_await delay(0.2, coro().resumeOnMainQueue(dispatch_time(DISPATCH_TIME_NOW, nanoseconds(1ms).count())));
CHECK(NSThread.isMainThread);
}

static auto checkGenerator() -> DispatchTask<> {
Expand Down

0 comments on commit adbac9b

Please sign in to comment.