Skip to content

Commit

Permalink
Spinlock (#36030)
Browse files Browse the repository at this point in the history
* add align for WorkQueue

* add spinlock

* merge spinlock
  • Loading branch information
liutiexing authored Sep 29, 2021
1 parent 79bd5f9 commit a9ea41c
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
10 changes: 6 additions & 4 deletions paddle/fluid/framework/new_executor/run_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include <cstdint>
#include <mutex>
#include <vector>
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -101,7 +103,7 @@ class RunQueue {
// PushBack adds w at the end of the queue.
// If queue is full returns w, otherwise returns default-constructed Work.
Work PushBack(Work w) {
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<paddle::memory::SpinLock> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[(back - 1) & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
Expand All @@ -123,7 +125,7 @@ class RunQueue {
return Work();
}

std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<paddle::memory::SpinLock> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[back & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
Expand All @@ -145,7 +147,7 @@ class RunQueue {
return 0;
}

std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<paddle::memory::SpinLock> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
unsigned size = Size();
unsigned mid = back;
Expand Down Expand Up @@ -213,7 +215,7 @@ class RunQueue {
// modification counters.
alignas(64) std::atomic<unsigned> front_;
alignas(64) std::atomic<unsigned> back_;
std::mutex mutex_;
paddle::memory::SpinLock mutex_;
Elem array_[kSize];

// SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/framework/new_executor/workqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ std::unique_ptr<WorkQueue> CreateMultiThreadedWorkQueue(
"WorkQueueOptions.num_threads must be "
"greater than 1."));
std::unique_ptr<WorkQueue> ptr(new WorkQueueImpl(options));
return ptr;
return std::move(ptr);
}

std::unique_ptr<WorkQueueGroup> CreateWorkQueueGroup(
Expand All @@ -176,7 +176,7 @@ std::unique_ptr<WorkQueueGroup> CreateWorkQueueGroup(
"For a WorkQueueGroup, the number of WorkQueueOptions "
"must be greater than 1."));
std::unique_ptr<WorkQueueGroup> ptr(new WorkQueueGroupImpl(queues_options));
return ptr;
return std::move(ptr);
}

} // namespace framework
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/new_executor/workqueue_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdlib>
Expand Down
43 changes: 27 additions & 16 deletions paddle/fluid/memory/allocation/spin_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,48 @@
#pragma once

#include <atomic>
#if !defined(_WIN32)
#include <sched.h>
#else
#include <windows.h>
#endif // !_WIN32
#if defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || \
defined(__i386__)
#define __PADDLE_x86__
#include <immintrin.h>
#endif
#include <thread>

#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace memory {
static inline void CpuRelax() {
#if defined(__PADDLE_x86__)
_mm_pause();
#endif
}

class SpinLock {
public:
SpinLock() : mlock_(false) {}

void lock() {
bool expect = false;
uint64_t spin_cnt = 0;
while (!mlock_.compare_exchange_weak(expect, true)) {
expect = false;
if ((++spin_cnt & 0xFF) == 0) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
for (;;) {
if (!mlock_.exchange(true, std::memory_order_acquire)) {
break;
}
constexpr int kMaxLoop = 32;
for (int loop = 1; mlock_.load(std::memory_order_relaxed);) {
if (loop <= kMaxLoop) {
for (int i = 1; i <= loop; ++i) {
CpuRelax();
}
loop *= 2;
} else {
std::this_thread::yield();
}
}
}
}

void unlock() { mlock_.store(false); }
void unlock() { mlock_.store(false, std::memory_order_release); }

DISABLE_COPY_AND_ASSIGN(SpinLock);

private:
Expand Down

0 comments on commit a9ea41c

Please sign in to comment.