Skip to content

Commit 993c527

Browse files
committed
added atomic apply delta
1 parent f5aec7a commit 993c527

File tree

3 files changed

+57
-0
lines changed

3 files changed

+57
-0
lines changed

include/sketch/sketch_columns.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class FixedSizeSketchColumn {
5757

5858
const ColumnEntryDelta generate_entry_delta(vec_t update) const;
5959
void apply_entry_delta(const ColumnEntryDelta &delta);
60+
void atomic_apply_entry_delta(const ColumnEntryDelta &delta);
6061

6162

6263
inline bool is_initialized() const {
@@ -122,6 +123,7 @@ FRIEND_TEST(SketchColumnTestSuite, TestUpdateReallocation);
122123
void apply_entry_delta(const ColumnEntryDelta &delta);
123124

124125
void atomic_update(const vec_t update);
126+
void atomic_apply_entry_delta(const ColumnEntryDelta &delta);
125127
void merge(ResizeableSketchColumn const& other);
126128
uint8_t get_depth() const;
127129

@@ -196,6 +198,10 @@ class ResizeableAlignedSketchColumn {
196198
const ColumnEntryDelta generate_entry_delta(vec_t update) const;
197199
void apply_entry_delta(const ColumnEntryDelta &delta);
198200

201+
// TODO - implement later
202+
void atomic_apply_entry_delta(const ColumnEntryDelta &delta) {
203+
this->apply_entry_delta(delta);
204+
}
199205
void atomic_update(const vec_t update) {
200206
// TODO - implement later
201207
this->update(update);

include/sketch/sketch_concept.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ concept SketchColumnConcept = requires(T t, T other) {
4949
// require an atomic_update function.
5050
// up to implementer whether it uses locks or simply atomic XOR
5151
// (note that only the prior works for fixed size sketches)
52+
{ t.atomic_apply_entry_delta(std::declval<const ColumnEntryDelta>()) } -> std::same_as<void>;
5253
{ t.atomic_update(std::declval<V>()) } -> std::same_as<void>;
5354

5455
{ t.merge(other) } -> std::same_as<void>;

src/sketch_columns.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,21 @@ void FixedSizeSketchColumn::atomic_update(const vec_t update) {
140140
// todo - gccc intrinsics?
141141

142142
}
143+
void FixedSizeSketchColumn::atomic_apply_entry_delta(const ColumnEntryDelta &delta) {
144+
assert(delta.depth < capacity);
145+
146+
std::atomic_ref<vec_t> det_alpha(deterministic_bucket.alpha);
147+
std::atomic_ref<vec_hash_t> det_gamma(deterministic_bucket.gamma);
148+
149+
std::atomic_ref<vec_t> bucket_alpha(buckets[delta.depth].alpha);
150+
std::atomic_ref<vec_hash_t> bucket_gamma(buckets[delta.depth].gamma);
151+
152+
det_alpha.fetch_xor(delta.bucket.alpha, std::memory_order_relaxed);
153+
det_gamma.fetch_xor(delta.bucket.gamma, std::memory_order_relaxed);
154+
bucket_alpha.fetch_xor(delta.bucket.alpha, std::memory_order_relaxed);
155+
bucket_gamma.fetch_xor(delta.bucket.gamma, std::memory_order_relaxed);
156+
// todo - gccc intrinsics?
157+
}
143158

144159
ResizeableSketchColumn::ResizeableSketchColumn(uint8_t start_capacity,
145160
uint64_t seed)
@@ -277,6 +292,7 @@ void ResizeableSketchColumn::apply_entry_delta(const ColumnEntryDelta &delta) {
277292
}
278293

279294
void ResizeableSketchColumn::atomic_update(const vec_t update) {
295+
// TODO - there's code duplication with apply entry delta.
280296
vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update, seed);
281297
col_hash_t depth = Bucket_Boruvka::get_index_depth_legacy(update, seed, 60);
282298

@@ -312,6 +328,40 @@ void ResizeableSketchColumn::atomic_update(const vec_t update) {
312328
this->lock.unlock();
313329
}
314330

331+
}
332+
void ResizeableSketchColumn::atomic_apply_entry_delta(const ColumnEntryDelta &delta) {
333+
334+
// grab reader lock
335+
this->lock.lock_shared();
336+
if (delta.depth < capacity) {
337+
// can atomically update as normal
338+
std::atomic_ref<vec_t> det_alpha(deterministic_bucket.alpha);
339+
std::atomic_ref<vec_hash_t> det_gamma(deterministic_bucket.gamma);
340+
std::atomic_ref<vec_t> bucket_alpha(buckets[delta.depth].alpha);
341+
std::atomic_ref<vec_hash_t> bucket_gamma(buckets[delta.depth].gamma);
342+
det_alpha.fetch_xor(delta.bucket.alpha, std::memory_order_relaxed);
343+
det_gamma.fetch_xor(delta.bucket.gamma, std::memory_order_relaxed);
344+
bucket_alpha.fetch_xor(delta.bucket.alpha, std::memory_order_relaxed);
345+
bucket_gamma.fetch_xor(delta.bucket.gamma, std::memory_order_relaxed);
346+
this->lock.unlock_shared();
347+
} else {
348+
// release the reader lock
349+
this->lock.unlock_shared();
350+
// grab writer lock
351+
this->lock.lock();
352+
// note: the alllocation may have shrunk OR grown.
353+
// so we need to account for that
354+
size_t desired_capacity = ((delta.depth >> 2) << 2) + 4;
355+
desired_capacity = std::max(desired_capacity, static_cast<size_t>(capacity));
356+
if (desired_capacity != capacity) {
357+
reallocate(desired_capacity);
358+
}
359+
// now we can update the buckets (non-atomically)
360+
deterministic_bucket ^= delta.bucket;
361+
buckets[delta.depth] ^= delta.bucket;
362+
this->lock.unlock();
363+
}
364+
315365
}
316366

317367
void ResizeableSketchColumn::merge(ResizeableSketchColumn const& other) {

0 commit comments

Comments
 (0)