Skip to content

Commit d6d71d1

Browse files
committed
made deletions parallel and faster yay
1 parent 63cce9e commit d6d71d1

File tree

3 files changed

+107
-51
lines changed

3 files changed

+107
-51
lines changed

include/cc_sketch_alg.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ class CCSketchAlg {
128128
*/
129129
void boruvka_emulation();
130130

131+
/**
132+
* Delete edges found in spanning forest from sketches
133+
* Helper function for calc_disjoint_spanning_forests(k)
134+
*/
135+
void filter_sf_edges(SpanningForest &sf);
136+
131137
// constructor for use when reading from a serialized file
132138
CCSketchAlg(node_id_t num_vertices, size_t seed, std::ifstream &binary_stream,
133139
CCAlgConfiguration config);
@@ -257,6 +263,10 @@ class CCSketchAlg {
257263
// time hooks for experiments
258264
std::chrono::steady_clock::time_point cc_alg_start;
259265
std::chrono::steady_clock::time_point cc_alg_end;
266+
std::chrono::steady_clock::time_point sf_query_start;
267+
std::chrono::steady_clock::time_point sf_query_end;
268+
std::chrono::duration<double> query_time;
269+
std::chrono::duration<double> delete_time;
260270
size_t last_query_rounds = 0;
261271

262272
// getters

src/cc_sketch_alg.cpp

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -567,25 +567,97 @@ SpanningForest CCSketchAlg::calc_spanning_forest() {
567567
return ret;
568568
}
569569

570+
void CCSketchAlg::filter_sf_edges(SpanningForest &sf) {
571+
auto start = std::chrono::steady_clock::now();
572+
573+
dsu_valid = false;
574+
shared_dsu_valid = false;
575+
576+
auto edges = sf.get_edges();
577+
size_t num = edges.size();
578+
edges.resize(2 * edges.size());
579+
580+
#pragma omp parallel for
581+
for (size_t i = 0; i < num; i++) {
582+
edges[i + num] = edges[i];
583+
std::swap(edges[i + num].src, edges[i + num].dst);
584+
}
585+
586+
auto setup = std::chrono::steady_clock::now();
587+
std::cout << "Setup time = " << std::chrono::duration<double>(setup - start).count() << std::endl;
588+
589+
// sort the edges
590+
std::sort(edges.begin(), edges.end());
591+
592+
auto sort = std::chrono::steady_clock::now();
593+
std::cout << "Sort time = " << std::chrono::duration<double>(sort - setup).count() << std::endl;
594+
595+
#pragma omp parallel
596+
{
597+
size_t thr_id = omp_get_thread_num();
598+
size_t num_threads = omp_get_num_threads();
599+
600+
std::pair<size_t, size_t> partition = get_ith_partition(edges.size(), thr_id, num_threads);
601+
size_t start = partition.first;
602+
size_t end = partition.second;
603+
604+
// check if we collide with previous thread. If so lock and apply those updates.
605+
if (start > 0 && edges[start].src == edges[start - 1].src) {
606+
sketches[edges[start].src]->mutex.lock();
607+
size_t orig_start = start;
608+
while (edges[start].src == edges[orig_start].src) {
609+
Edge edge = edges[start];
610+
sketches[edge.src]->update(static_cast<vec_t>(concat_pairing_fn(edge.src, edge.dst)));
611+
++start;
612+
}
613+
614+
sketches[edges[orig_start].src]->mutex.unlock();
615+
}
616+
617+
// check if we collide with next thread. If so lock and apply those updates.
618+
if (end < edges.size() && edges[end - 1].src == edges[end].src) {
619+
sketches[edges[end - 1].src]->mutex.lock();
620+
size_t orig_end = end;
621+
while (edges[end - 1].src == edges[orig_end - 1].src) {
622+
Edge edge = edges[end - 1];
623+
sketches[edge.src]->update(static_cast<vec_t>(concat_pairing_fn(edge.src, edge.dst)));
624+
--end;
625+
}
626+
627+
sketches[edges[orig_end].src]->mutex.unlock();
628+
}
629+
630+
for (size_t i = start; i < end; i++) {
631+
Edge edge = edges[i];
632+
sketches[edge.src]->update(static_cast<vec_t>(concat_pairing_fn(edge.src, edge.dst)));
633+
}
634+
}
635+
auto del = std::chrono::steady_clock::now();
636+
std::cout << "Delete time = " << std::chrono::duration<double>(del - sort).count() << std::endl;
637+
638+
delete_time += std::chrono::steady_clock::now() - start;
639+
}
640+
570641
std::vector<SpanningForest> CCSketchAlg::calc_disjoint_spanning_forests(size_t k) {
571642
std::vector<SpanningForest> SFs;
643+
std::chrono::steady_clock::time_point start;
572644

573645
for (size_t i = 0; i < k; i++) {
646+
start = std::chrono::steady_clock::now();
574647
SpanningForest sf = calc_spanning_forest();
575-
576648
SFs.push_back(sf);
649+
query_time += std::chrono::steady_clock::now() - start;
577650

578-
for (auto edge : sf.get_edges()) {
579-
update({edge, DELETE}); // deletes the found edge
580-
}
651+
filter_sf_edges(sf);
581652
}
582653

583654
// revert the state of the sketches to remove all deletions
584655
for (auto &sf : SFs) {
585-
for (auto edge : sf.get_edges()) {
586-
update({edge, INSERT}); // reinserts the deleted edge
587-
}
656+
filter_sf_edges(sf);
588657
}
658+
659+
std::cout << "Number of SFs: " << SFs.size() << std::endl;
660+
589661
#ifdef VERIFY_SAMPLES_F
590662
verifier->verify_spanning_forests(SFs);
591663
#endif

tools/spanning_forest_extract.cpp

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ int main(int argc, char **argv) {
6060
size_t empty = 0;
6161
std::chrono::duration<double> ingest_time(0);
6262
std::chrono::duration<double> query_time(0);
63-
std::chrono::duration<double> filter_time(0);
63+
std::chrono::duration<double> sample_time(0);
64+
std::chrono::duration<double> delete_time(0);
6465

6566
for (size_t trial = 0; trial < trials; trial++) {
6667
BinaryFileStream stream(stream_file);
@@ -79,65 +80,37 @@ int main(int argc, char **argv) {
7980
std::cout << "Beginning stream ingestion ... "; fflush(stdout);
8081
auto start = std::chrono::steady_clock::now();
8182
driver.process_stream_until(END_OF_STREAM);
83+
driver.prep_query(KSPANNINGFORESTS);
8284
std::cout << "Stream processed!" << std::endl;
8385
ingest_time += std::chrono::steady_clock::now() - start;
84-
size_t max_rounds_used = 0;
86+
std::cout << "Ingestion throughput: " << num_updates / std::chrono::duration<double>(std::chrono::steady_clock::now() - start).count() << std::endl;
8587

8688
// figure out how many rounds are required to extract log V spanning forests
87-
for (size_t s = 0; s < vertex_power; s++) {
88-
auto start = std::chrono::steady_clock::now();
89-
try {
90-
driver.prep_query(KSPANNINGFORESTS);
91-
SpanningForest forest = cc_alg.calc_spanning_forest();
92-
std::cout << "sf: " << s + 1 << ", rounds: " << cc_alg.last_query_rounds
93-
<< "/" << cc_alg.max_rounds() << " \r";
94-
fflush(stdout);
95-
max_rounds_used = std::max(max_rounds_used, cc_alg.last_query_rounds);
96-
97-
query_time += std::chrono::steady_clock::now() - start;
98-
99-
if (forest.get_edges().size() == 0) {
100-
std::cout << std::endl << "Exiting because of empty Spanning Forest " << s << std::endl;
101-
empty += 1;
102-
break;
103-
}
104-
start = std::chrono::steady_clock::now();
105-
const auto &sf_edges = forest.get_edges();
106-
107-
// filter out all the found edges from the sketches
108-
// This is technically illegal behavior. Which is like the point of this test :)
109-
for (auto edge : sf_edges) {
110-
cc_alg.update({edge, DELETE});
111-
}
112-
filter_time += std::chrono::steady_clock::now() - start;
113-
} catch (OutOfSamplesException &err) {
114-
std::cout << std::endl << "Got OutOfSamplesException on spanning forest " << s + 1 << std::endl;
115-
errors += 1;
116-
break;
117-
} catch (...) {
118-
std::cout << std::endl << "Got unknown exception on spanning forest " << s + 1 << std::endl;
119-
errors += 1;
120-
break;
121-
}
122-
}
89+
start = std::chrono::steady_clock::now();
90+
cc_alg.calc_disjoint_spanning_forests(vertex_power);
91+
query_time += std::chrono::steady_clock::now() - start;
12392

12493
// add number of rounds to get log V spanning forests to vector
125-
rounds_required[max_rounds_used] += 1;
94+
rounds_required[cc_alg.last_query_rounds] += 1;
95+
96+
sample_time += cc_alg.query_time;
97+
delete_time += cc_alg.delete_time;
12698
}
12799

128100
std::cout << std::endl;
129101
std::cout << "ERRORS = " << errors << std::endl;
130102
std::cout << "EMPTY = " << empty << std::endl;
131103

132-
for (size_t i = 0; i < rounds_required.size(); i++) {
133-
std::cout << i << ", " << rounds_required[i] << std::endl;
134-
}
104+
// for (size_t i = 0; i < rounds_required.size(); i++) {
105+
// std::cout << i << ", " << rounds_required[i] << std::endl;
106+
// }
135107

136108
auto stats = calc_stats(rounds_required);
137109
std::cout << "avg = " << stats.first << " std dev = " << stats.second << std::endl;
138110
std::cout << "ingest: " << ingest_time.count() << std::endl;
139111
std::cout << "query: " << query_time.count() << std::endl;
140-
std::cout << "filter: " << filter_time.count() << std::endl;
112+
std::cout << " sample: " << sample_time.count() << std::endl;
113+
std::cout << " delete: " << delete_time.count() << std::endl;
141114

142115
std::ofstream output_file("rounds_required.txt");
143116
output_file << "ERRORS = " << errors << std::endl;
@@ -149,5 +122,6 @@ int main(int argc, char **argv) {
149122
output_file << "std dev, " << stats.second << std::endl;
150123
output_file << "ingest: " << ingest_time.count() << std::endl;
151124
output_file << "query: " << query_time.count() << std::endl;
152-
output_file << "filter: " << filter_time.count() << std::endl;
125+
output_file << " sample: " << sample_time.count() << std::endl;
126+
output_file << " delete: " << delete_time.count() << std::endl;
153127
}

0 commit comments

Comments
 (0)