Skip to content

Commit

Permalink
Add partial support for cancelling async write mutex requests
Browse files Browse the repository at this point in the history
While we can't cancel the actual wait on the write mutex, we can dequeue
specific Transactions which are waiting for their turn to write, and only block
when the DB itself is destroyed. This makes it so that individual Transactions
with cancelled async writes can be cleaned up while the write lock is held.

This is done by changing the async write queue in DB::AsyncCommitHelper from
arbitrary callbacks to a queue of Transaction instances. This increases the
coupling between the types, but makes it easier to dequeue specific instances
and relying on the specific details of what Transaction will do simplifies the
locking involved.
  • Loading branch information
tgoyne committed Apr 13, 2023
1 parent 7a17dab commit 4ede7ad
Show file tree
Hide file tree
Showing 21 changed files with 530 additions and 280 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Fixed
* Fixed a crash or exception when doing a fulltext search for multiple keywords when the intersection of results is not equal. ([#6465](https://github.com/realm/realm-core/issues/6465) since v13.2.0).
* Fixed issue where build would not succeed when consuming core as an installed dependancy due to missing install headers ([#6479](https://github.com/realm/realm-core/pull/6479) since v13.4.1).
* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6486](https://github.com/realm/realm-core/pull/6486), since v11.10.0)

### Breaking changes
* None.
Expand Down
1 change: 1 addition & 0 deletions src/realm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ set(REALM_INSTALL_HEADERS
util/random.hpp
util/safe_int_ops.hpp
util/scope_exit.hpp
util/semaphore.hpp
util/serializer.hpp
util/sha_crypto.hpp
util/span.hpp
Expand Down
196 changes: 94 additions & 102 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1861,37 +1861,43 @@ class DB::AsyncCommitHelper {
}
~AsyncCommitHelper()
{
{
std::unique_lock lg(m_mutex);
if (!m_running) {
return;
}
m_running = false;
m_cv_worker.notify_one();
if (!m_running.exchange(false, std::memory_order_relaxed)) {
return;
}
m_cv_worker.notify_one();
m_thread.join();
}

void begin_write(util::UniqueFunction<void()> fn)
void begin_write(Transaction* tr) REQUIRES(!m_mutex)
{
std::unique_lock lg(m_mutex);
start_thread();
m_pending_writes.emplace_back(std::move(fn));
m_cv_worker.notify_one();
{
util::CheckedLockGuard lg(m_mutex);
m_pending_writes.push_back(tr);
}
wake_up_thread();
}

void blocking_begin_write()
bool cancel_begin_write(Transaction* tr) REQUIRES(!m_mutex)
{
std::unique_lock lg(m_mutex);
util::CheckedLockGuard lg(m_mutex);
if (auto it = std::find(m_pending_writes.begin(), m_pending_writes.end(), tr); it != m_pending_writes.end()) {
m_pending_writes.erase(it);
return true;
}
return false;
}

void blocking_begin_write() REQUIRES(!m_mutex)
{
util::CheckedUniqueLock lg(m_mutex);

// If we support unlocking InterprocessMutex from a different thread
// than it was locked on, we can sometimes just begin the write on
// the current thread. This requires that no one is currently waiting
// for the worker thread to acquire the write lock, as we'll deadlock
// if we try to async commit while the worker is waiting for the lock.
bool can_lock_on_caller =
!InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && m_pending_writes.empty() &&
m_write_lock_claim_ticket == m_write_lock_claim_fulfilled);
!InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && !has_pending_write_requests());

// If we support cross-thread unlocking and m_running is false,
// can_lock_on_caller should always be true or we forgot to launch the thread
Expand All @@ -1911,17 +1917,16 @@ class DB::AsyncCommitHelper {

// Otherwise we have to ask the worker thread to acquire it and wait
// for that
start_thread();
size_t ticket = ++m_write_lock_claim_ticket;
m_cv_worker.notify_one();
m_cv_callers.wait(lg, [this, ticket] {
wake_up_thread();
m_cv_callers.wait(lg.native_handle(), [this, ticket]() REQUIRES(m_mutex) {
return ticket == m_write_lock_claim_fulfilled;
});
}

void end_write()
void end_write() REQUIRES(!m_mutex)
{
std::unique_lock lg(m_mutex);
util::CheckedLockGuard lg(m_mutex);
REALM_ASSERT(m_has_write_mutex);
REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);

Expand All @@ -1937,9 +1942,9 @@ class DB::AsyncCommitHelper {
}
}

bool blocking_end_write()
bool blocking_end_write() REQUIRES(!m_mutex)
{
std::unique_lock lg(m_mutex);
util::CheckedUniqueLock lg(m_mutex);
if (!m_has_write_mutex) {
return false;
}
Expand All @@ -1950,7 +1955,7 @@ class DB::AsyncCommitHelper {
if (m_owns_write_mutex) {
m_pending_mx_release = true;
m_cv_worker.notify_one();
m_cv_callers.wait(lg, [this] {
m_cv_callers.wait(lg.native_handle(), [this]() REQUIRES(m_mutex) {
return !m_pending_mx_release;
});
}
Expand All @@ -1969,67 +1974,85 @@ class DB::AsyncCommitHelper {
}


void sync_to_disk(util::UniqueFunction<void()> fn)
void sync_to_disk(Transaction* tr) REQUIRES(!m_mutex)
{
REALM_ASSERT(fn);
std::unique_lock lg(m_mutex);
REALM_ASSERT(tr);
util::CheckedLockGuard lg(m_mutex);
REALM_ASSERT(!m_pending_sync);
start_thread();
m_pending_sync = std::move(fn);
m_cv_worker.notify_one();
REALM_ASSERT(m_has_write_mutex);
m_pending_sync = tr;
wake_up_thread();
}

private:
DB* m_db;
std::thread m_thread;
std::mutex m_mutex;
// A mutex which guards most of the members in this class
util::CheckedMutex m_mutex;
// CV which the worker thread waits on to await work
std::condition_variable m_cv_worker;
// CV which other threads wait on to await the worker thread completing work
std::condition_variable m_cv_callers;
std::deque<util::UniqueFunction<void()>> m_pending_writes;
util::UniqueFunction<void()> m_pending_sync;
size_t m_write_lock_claim_ticket = 0;
size_t m_write_lock_claim_fulfilled = 0;
bool m_pending_mx_release = false;
bool m_running = false;
bool m_has_write_mutex = false;
bool m_owns_write_mutex = false;
bool m_waiting_for_write_mutex = false;

void main();

void start_thread()
// Transactions which are waiting for their turn to write. These are non-owning
// pointers to avoid a retain cycle. Weak pointers would result in the
// Transaction sometimes being destroyed on the worker thread, which results
// in complicated teardown. The non-owning pointers are safe because
// Transaction will unregister itself and/or wait for operations to complete
// before closing.
std::deque<Transaction*> m_pending_writes GUARDED_BY(m_mutex);
// The Transaction which has commits to write to disk.
Transaction* m_pending_sync GUARDED_BY(m_mutex) = nullptr;
// Ticketing system for blocking write transactions. Blocking writes increment
// claim_ticket and then wait on m_cv_callers until claim_fulfilled is equal
// to the value of claim_ticket they saw at the start of the wait. The worker
// thread increments claim_fulfilled instead of calling an async callback
// if it is below claim_ticket, as sync writes take priority over async.
size_t m_write_lock_claim_ticket GUARDED_BY(m_mutex) = 0;
size_t m_write_lock_claim_fulfilled GUARDED_BY(m_mutex) = 0;
bool m_pending_mx_release GUARDED_BY(m_mutex) = false;
std::atomic<bool> m_running = false;
bool m_has_write_mutex GUARDED_BY(m_mutex) = false;
// True if the worker thread specifically owns the write mutex. May be false
// while `m_has_write_mutex` is true if the write mutex was acquired via
// a blocking begin and the mutex supports cross-thread unlocks.
bool m_owns_write_mutex GUARDED_BY(m_mutex) = false;
bool m_waiting_for_write_mutex GUARDED_BY(m_mutex) = false;

void main() REQUIRES(!m_mutex);

void wake_up_thread()
{
if (m_running) {
if (m_running.exchange(true, std::memory_order_relaxed)) {
m_cv_worker.notify_one();
return;
}
m_running = true;
m_thread = std::thread([this]() {
main();
});
}

bool has_pending_write_requests()
bool has_pending_write_requests() REQUIRES(m_mutex)
{
return m_write_lock_claim_fulfilled < m_write_lock_claim_ticket || !m_pending_writes.empty();
}
};

void DB::AsyncCommitHelper::main()
{
std::unique_lock lg(m_mutex);
util::CheckedUniqueLock lg(m_mutex);
while (m_running) {
#if 0 // Enable for testing purposes
std::this_thread::sleep_for(std::chrono::milliseconds(10));
#endif
if (m_has_write_mutex) {
if (auto cb = std::move(m_pending_sync)) {
if (auto tr = m_pending_sync) {
m_pending_sync = nullptr;
// Only one of sync_to_disk(), end_write(), or blocking_end_write()
// should be called, so we should never have both a pending sync
// and pending release.
REALM_ASSERT(!m_pending_mx_release);
lg.unlock();
cb();
cb = nullptr; // Release things captured by the callback before reacquiring the lock
tr->sync_async_commit();
lg.lock();
m_pending_mx_release = true;
}
Expand Down Expand Up @@ -2069,29 +2092,37 @@ void DB::AsyncCommitHelper::main()
continue;
}

REALM_ASSERT(!m_pending_writes.empty());
auto callback = std::move(m_pending_writes.front());
// The request could have been cancelled while we were waiting
// for the lock
if (m_pending_writes.empty()) {
m_pending_mx_release = true;
continue;
}

auto writer = m_pending_writes.front();
m_pending_writes.pop_front();
lg.unlock();
callback();
// Release things captured by the callback before reacquiring the lock
callback = nullptr;
lg.lock();
writer->async_write_began();
continue;
}
}
m_cv_worker.wait(lg);
m_cv_worker.wait(lg.native_handle());
}
if (m_has_write_mutex && m_owns_write_mutex) {
m_db->do_end_write();
}
}


void DB::async_begin_write(util::UniqueFunction<void()> fn)
void DB::async_begin_write(Transaction* tr)
{
REALM_ASSERT(m_commit_helper);
m_commit_helper->begin_write(std::move(fn));
m_commit_helper->begin_write(tr);
}

bool DB::cancel_async_begin_write(Transaction* tr)
{
REALM_ASSERT(m_commit_helper);
return m_commit_helper->cancel_begin_write(tr);
}

void DB::async_end_write()
Expand All @@ -2100,10 +2131,10 @@ void DB::async_end_write()
m_commit_helper->end_write();
}

void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
void DB::async_sync_to_disk(Transaction* tr)
{
REALM_ASSERT(m_commit_helper);
m_commit_helper->sync_to_disk(std::move(fn));
m_commit_helper->sync_to_disk(tr);
}

bool DB::has_changed(TransactionRef& tr)
Expand Down Expand Up @@ -2687,44 +2718,6 @@ TransactionRef DB::start_write(bool nonblocking)
return tr;
}

void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
{
{
util::CheckedLockGuard lck(tr->m_async_mutex);
REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
tr->m_async_stage = Transaction::AsyncState::Requesting;
tr->m_request_time_point = std::chrono::steady_clock::now();
if (tr->db->m_logger) {
tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock");
}
}
std::weak_ptr<Transaction> weak_tr = tr;
async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
if (auto tr = weak_tr.lock()) {
util::CheckedLockGuard lck(tr->m_async_mutex);
// If a synchronous transaction happened while we were pending
// we may be in HasCommits
if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
tr->m_async_stage = Transaction::AsyncState::HasLock;
}
if (tr->db->m_logger) {
auto t2 = std::chrono::steady_clock::now();
tr->db->m_logger->log(
util::Logger::Level::trace, "Got write lock in %1 us",
std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
}
if (tr->m_waiting_for_write_lock) {
tr->m_waiting_for_write_lock = false;
tr->m_async_cv.notify_one();
}
else if (cb) {
cb();
}
tr.reset(); // Release pointer while lock is held
}
});
}

inline DB::DB(const DBOptions& options)
: m_upgrade_callback(std::move(options.upgrade_callback))
{
Expand Down Expand Up @@ -2818,7 +2811,6 @@ void DB::do_begin_possibly_async_write()

void DB::end_write_on_correct_thread() noexcept
{
// m_local_write_mutex.unlock();
if (!m_commit_helper || !m_commit_helper->blocking_end_write()) {
do_end_write();
}
Expand Down
10 changes: 3 additions & 7 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,6 @@ class DB : public std::enable_shared_from_this<DB> {
// an invalid TransactionRef is returned.
TransactionRef start_write(bool nonblocking = false) REQUIRES(!m_mutex);

// ask for write mutex. Callback takes place when mutex has been acquired.
// callback may occur on ANOTHER THREAD. Must not be called if write mutex
// has already been acquired.
void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired);

// report statistics of last commit done on THIS DB.
// The free space reported is what can be expected to be freed
// by compact(). This may not correspond to the space which is free
Expand Down Expand Up @@ -612,9 +607,10 @@ class DB : public std::enable_shared_from_this<DB> {
void close_internal(std::unique_lock<util::InterprocessMutex>, bool allow_open_read_transactions)
REQUIRES(!m_mutex);

void async_begin_write(util::UniqueFunction<void()> fn);
void async_begin_write(Transaction* tr);
bool cancel_async_begin_write(Transaction* tr);
void async_end_write();
void async_sync_to_disk(util::UniqueFunction<void()> fn);
void async_sync_to_disk(Transaction*);

friend class SlabAlloc;
friend class Transaction;
Expand Down
11 changes: 0 additions & 11 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1306,14 +1306,3 @@ void RealmCoordinator::write_copy(StringData path, const char* key)
{
m_db->write_copy(path, key);
}

void RealmCoordinator::async_request_write_mutex(Realm& realm)
{
auto tr = Realm::Internal::get_transaction_ref(realm);
m_db->async_request_write_mutex(tr, [realm = realm.shared_from_this()]() mutable {
auto& scheduler = *realm->scheduler();
scheduler.invoke([realm = std::move(realm)] {
Realm::Internal::run_writes(*realm);
});
});
}
2 changes: 0 additions & 2 deletions src/realm/object-store/impl/realm_coordinator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
return std::move(util::CheckedUniqueLock(m_running_notifiers_mutex).native_handle());
}

void async_request_write_mutex(Realm& realm);

AuditInterface* audit_context() const noexcept
{
return m_audit_context.get();
Expand Down
Loading

0 comments on commit 4ede7ad

Please sign in to comment.