-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrent_block_queue.h
185 lines (152 loc) · 4.56 KB
/
concurrent_block_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#ifndef CONCURRENT_BLOCK_QUEUE_H
#define CONCURRENT_BLOCK_QUEUE_H
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <system_error>
#include <utility>
#include <vector>
namespace ds {
template <typename T, size_t BLOCK_SIZE = 512>
requires(std::copyable<T> || std::movable<T>)
class ConcurrentBlockQueue {
/////////////////////////////////////////////
/// PRIVATE DATA STRUCTURES
/////////////////////////////////////////////
struct Node {
std::vector<T> data = {};
std::unique_ptr<Node> next = nullptr;
Node() { data.reserve(BLOCK_SIZE); }
~Node() = default;
Node(Node&&) = default;
Node& operator=(Node&&) = default;
Node(const Node&) = delete;
Node& operator=(const Node&) = delete;
};
struct Head {
std::unique_ptr<Node> head_block = nullptr;
size_t block_offset = 0;
std::mutex lock;
T pop_data() {
auto retval = std::move(head_block->data[block_offset]);
update_head();
return retval;
}
void update_head() {
++block_offset;
if (block_offset == BLOCK_SIZE) {
// save the next block as head is going to be deleted
// first before it gets reassigned and keeping next
// linked to head will delete that as well.
auto next = std::move(head_block->next);
head_block = std::move(next);
block_offset = 0;
}
}
~Head() {
// delete last node(empty) for queue destruction
auto next = std::move(head_block->next);
head_block = std::move(next);
}
Head(const Head&) = delete;
Head& operator=(const Head&) = delete;
Head(Head&&) = default;
Head& operator=(Head&&) = default;
explicit Head(std::unique_ptr<Node> _head)
: head_block(std::move(_head)), block_offset(0) {}
};
struct Tail {
Node* tail_block = nullptr;
size_t block_offset = 0;
std::mutex lock;
Tail() = default;
~Tail() = default;
Tail(const Tail&) = delete;
Tail& operator=(const Tail&) = delete;
Tail(Tail&&) = default;
Tail& operator=(Tail&&) = default;
void update_tail() {
++block_offset;
if (block_offset == BLOCK_SIZE) {
auto next = std::make_unique<Node>();
tail_block->next = std::move(next);
tail_block = (tail_block->next).get();
block_offset = 0;
}
}
inline void add_data(T&& val) {
tail_block->data.emplace_back(std::move(val));
update_tail();
}
};
/////////////////////////////////////////////
/// PRIVATE DATA MEMBERS
/////////////////////////////////////////////
Head m_head;
Tail m_tail;
std::atomic<size_t> m_size{0};
std::condition_variable m_queue_signal;
/*
* - controlled by client
* true = no push, no wait, only pop
* false = push and pop available
*/
std::atomic<bool> m_clear_mode_enabled{false};
/////////////////////////////////////////////
/// PUBLIC API
/////////////////////////////////////////////
public:
ConcurrentBlockQueue() : m_head(std::move(std::make_unique<Node>())) {
m_tail.tail_block = m_head.head_block.get();
m_tail.block_offset = m_head.block_offset;
}
ConcurrentBlockQueue(const ConcurrentBlockQueue&) = delete;
ConcurrentBlockQueue& operator=(const ConcurrentBlockQueue&) = delete;
ConcurrentBlockQueue(ConcurrentBlockQueue&&) = default;
ConcurrentBlockQueue& operator=(ConcurrentBlockQueue&&) = default;
~ConcurrentBlockQueue() = default;
/*
* retval: 0= SUCCESS, 1= FAILED
*/
size_t push(T&& val) {
std::lock_guard<std::mutex> guard(m_tail.lock);
if (m_clear_mode_enabled)
return 1;
m_tail.add_data(std::move(val));
++m_size;
m_queue_signal.notify_one();
return 0; // success
}
std::optional<T> try_pop() {
std::lock_guard<std::mutex> guard(m_head.lock);
if (not was_empty()) {
auto data = m_head.pop_data();
--m_size;
return {std::move(data)};
}
return {};
}
std::optional<T> wait_and_pop() {
std::unique_lock<std::mutex> guard(m_head.lock);
m_queue_signal.wait(
guard, [this]() { return m_clear_mode_enabled || not was_empty(); });
if (m_clear_mode_enabled && was_empty())
return {};
auto data{m_head.pop_data()};
--m_size;
return data;
}
void enable_clear_mode() {
m_clear_mode_enabled = true;
m_queue_signal.notify_all();
}
bool was_empty() const { return !m_size; }
size_t was_size() const { return m_size; }
};
} // namespace ds
#endif // CONCURRENT_BLOCK_QUEUE_H