diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index ca10a22b..e8512465 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -11,7 +11,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-latest, ubuntu-20.04] + os: [ubuntu-latest] flags: ['"-DL0_SAMPLING"', '"-DNO_EAGER_DSU"', '""'] steps: diff --git a/CMakeLists.txt b/CMakeLists.txt index 95e90896..312e077c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,13 +75,16 @@ endif() FetchContent_MakeAvailable(GutterTree StreamingUtilities) # AVAILABLE COMPILATION DEFINITIONS: -# VERIFY_SAMPLES_F Use a deterministic connected-components -# algorithm to verify post-processing. # NO_EAGER_DSU Do not use the eager DSU query optimization # if this flag is present. # L0_SAMPLING Run the CubeSketch l0 sampling algorithm # to ensure that we sample uniformly. # Otherwise, run a support finding algorithm. +# L0_FULLY_DENSE Fully allocate the sketch matrix at the beginning +# of the program. If this flag is not used, sketches +# are allocated dynamically. +# VERIFY_SAMPLES_F Use a deterministic connected-components +# algorithm to verify post-processing. # # Example: # cmake -DCMAKE_CXX_FLAGS="-DL0_SAMPLING" .. @@ -91,7 +94,8 @@ add_library(GraphZeppelin src/return_types.cpp src/driver_configuration.cpp src/cc_alg_configuration.cpp - src/sketch.cpp + src/sparse_sketch.cpp + src/dense_sketch.cpp src/util.cpp) add_dependencies(GraphZeppelin GutterTree StreamingUtilities) target_link_libraries(GraphZeppelin PUBLIC xxhash GutterTree StreamingUtilities) @@ -105,7 +109,8 @@ add_library(GraphZeppelinVerifyCC src/return_types.cpp src/driver_configuration.cpp src/cc_alg_configuration.cpp - src/sketch.cpp + src/sparse_sketch.cpp + src/dense_sketch.cpp src/util.cpp test/util/graph_verifier.cpp) add_dependencies(GraphZeppelinVerifyCC GutterTree StreamingUtilities) diff --git a/include/bucket.h b/include/bucket.h index 5d6a6af6..cccf67fe 100644 --- a/include/bucket.h +++ b/include/bucket.h @@ -34,13 +34,19 @@ namespace Bucket_Boruvka { inline static vec_hash_t get_index_hash(const vec_t index, const long sketch_seed); /** - * Checks whether a Bucket is good, assuming the Bucket contains all elements. + * Checks whether a Bucket is good. * @param bucket The bucket to check * @param sketch_seed The seed of the Sketch this Bucket belongs to. * @return true if this Bucket is good, else false. */ inline static bool is_good(const Bucket &bucket, const long sketch_seed); + /** + * Checks whether a Bucket is empty. + * @return true if this Bucket is empty (alpha and gamma == 0), else false. + */ + inline static bool is_empty(const Bucket &bucket); + /** * Updates a Bucket with the given update index * @param bucket The bucket to update @@ -66,6 +72,10 @@ inline bool Bucket_Boruvka::is_good(const Bucket &bucket, const long sketch_seed return bucket.gamma == get_index_hash(bucket.alpha, sketch_seed); } +inline bool Bucket_Boruvka::is_empty(const Bucket &bucket) { + return bucket.alpha == 0 && bucket.gamma == 0; +} + inline void Bucket_Boruvka::update(Bucket& bucket, const vec_t update_idx, const vec_hash_t update_hash) { bucket.alpha ^= update_idx; diff --git a/include/cc_sketch_alg.h b/include/cc_sketch_alg.h index 9e9d3f8c..55408a4a 100644 --- a/include/cc_sketch_alg.h +++ b/include/cc_sketch_alg.h @@ -201,8 +201,9 @@ class CCSketchAlg { * Specifically, the delta is in the form of a pointer to raw bucket data. * @param src_vertex The vertex where the all edges originate. * @param raw_buckets Pointer to the array of buckets from the delta sketch + * @param num_buckets Size of raw_buckets array in number of buckets */ - void apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets); + void apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets, size_t num_buckets); /** * The function performs a direct update to the associated sketch. diff --git a/include/dense_sketch.h b/include/dense_sketch.h new file mode 100644 index 00000000..afd271d7 --- /dev/null +++ b/include/dense_sketch.h @@ -0,0 +1,183 @@ +#pragma once +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "util.h" +#include "bucket.h" +#include "sketch_types.h" + +/** + * Sketch for graph processing, either CubeSketch or CameoSketch. + * Sub-linear representation of a vector. + */ +class DenseSketch { + private: + const uint64_t seed; // seed for hash functions + size_t num_samples; // number of samples we can perform + size_t cols_per_sample; // number of columns to use on each sample + size_t num_columns; // Total number of columns. (product of above 2) + size_t bkt_per_col; // maximum number of buckets per column (max number of rows) + size_t num_buckets; // number of total buckets product of above two + size_t sample_idx = 0; // number of samples performed so far + + // Allocated buckets + Bucket* buckets; + + inline Bucket& deterministic_bucket() { + return buckets[0]; + } + inline const Bucket& deterministic_bucket() const { + return buckets[0]; + } + + // return the bucket at a particular index in bucket array + inline Bucket& bucket(size_t col, size_t row) { + return buckets[col * bkt_per_col + row + 1]; + } + inline const Bucket& bucket(size_t col, size_t row) const { + return buckets[col * bkt_per_col + row + 1]; + } + + public: + /** + * The below constructors use vector length as their input. However, in graph sketching our input + * is the number of vertices. This function converts from number of graph vertices to vector + * length. + * @param num_vertices Number of graph vertices + * @return The length of the vector to sketch + */ + static vec_t calc_vector_length(node_id_t num_vertices) { + return ceil(double(num_vertices) * (num_vertices - 1) / 2); + } + + /** + * This function computes the number of samples a Sketch should support in order to solve + * connected components. Optionally, can increase or decrease the number of samples by a + * multiplicative factor. + * @param num_vertices Number of graph vertices + * @param f Multiplicative sample factor + * @return The number of samples + */ + static size_t calc_cc_samples(node_id_t num_vertices, double f) { + return std::max(size_t(18), (size_t) ceil(f * log2(num_vertices) / num_samples_div)); + } + + /** + * Construct a sketch object + * @param vector_len Length of the vector we are sketching + * @param seed Random seed of the sketch + * @param num_samples [Optional] Number of samples this sketch supports (default = 1) + * @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1) + */ + DenseSketch(vec_t vector_len, uint64_t seed, size_t num_samples = 1, + size_t cols_per_sample = default_cols_per_sample); + + /** + * Construct a sketch from a serialized stream + * @param vector_len Length of the vector we are sketching + * @param seed Random seed of the sketch + * @param binary_in Stream holding serialized sketch object + * @param num_buckets Number of buckets in serialized sketch + * @param num_samples [Optional] Number of samples this sketch supports (default = 1) + * @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1) + */ + DenseSketch(vec_t vector_len, uint64_t seed, std::istream& binary_in, size_t num_buckets, + size_t num_samples = 1, size_t cols_per_sample = default_cols_per_sample); + + /** + * Sketch copy constructor + * @param s The sketch to copy. + */ + DenseSketch(const DenseSketch& s); + + ~DenseSketch(); + + /** + * Update a sketch based on information about one of its indices. + * @param update the point update. + */ + void update(const vec_t update); + + /** + * Function to sample from the sketch. + * cols_per_sample determines the number of columns we allocate to this query + * @return A pair with the result index and a code indicating the type of result. + */ + SketchSample sample(); + + /** + * Function to sample from the appropriate columns to return 1 or more non-zero indices + * @return A pair with the result indices and a code indicating the type of result. + */ + ExhaustiveSketchSample exhaustive_sample(); + + std::mutex mutex; // lock the sketch for applying updates in multithreaded processing + + /** + * In-place merge function. + * @param other Sketch to merge into caller + */ + void merge(const DenseSketch &other); + + /** + * In-place range merge function. Updates the caller Sketch. + * The range merge only merges some of the Sketches + * This function should only be used if you know what you're doing + * @param other Sketch to merge into caller + * @param start_sample Index of first sample to merge + * @param n_samples Number of samples to merge + */ + void range_merge(const DenseSketch &other, size_t start_sample, size_t n_samples); + + /** + * Perform an in-place merge function without another Sketch and instead + * use a raw bucket memory. + * We also allow for only a portion of the buckets to be merge at once + * @param raw_bucket Raw bucket data to merge into this sketch + * @param n_raw_buckets Size of raw_buckets in number of Bucket data-structures + */ + void merge_raw_bucket_buffer(const Bucket *raw_buckets, size_t n_raw_buckets); + + /** + * Zero out all the buckets of a sketch. + */ + void zero_contents(); + + friend bool operator==(const DenseSketch& sketch1, const DenseSketch& sketch2); + friend std::ostream& operator<<(std::ostream& os, const DenseSketch& sketch); + + /** + * Serialize the sketch to a binary output stream. + * @param binary_out the stream to write to. + */ + void serialize(std::ostream& binary_out) const; + + inline void reset_sample_state() { + sample_idx = 0; + } + + // return the size of the sketching datastructure in bytes (just the buckets, not the metadata) + inline size_t bucket_array_bytes() const { + return num_buckets * sizeof(Bucket); + } + + inline const Bucket* get_readonly_bucket_ptr() const { return (const Bucket*) buckets; } + inline uint64_t get_seed() const { return seed; } + inline size_t column_seed(size_t column_idx) const { return seed + column_idx * 5; } + inline size_t checksum_seed() const { return seed; } + inline size_t get_columns() const { return num_columns; } + inline size_t get_buckets() const { return num_buckets; } + inline size_t get_num_samples() const { return num_samples; } + + static size_t calc_bkt_per_col(size_t n) { return ceil(log2(n)) + 1; } + + static constexpr size_t default_cols_per_sample = 1; + static constexpr double num_samples_div = 1 - log2(2 - 0.8); +}; diff --git a/include/sketch.h b/include/sketch.h index 80473ecc..97a208a0 100644 --- a/include/sketch.h +++ b/include/sketch.h @@ -1,204 +1,9 @@ #pragma once -#include -#include -#include +#include "dense_sketch.h" +#include "sparse_sketch.h" -#include -#include -#include -#include - -#include "util.h" -#include "bucket.h" - -// enum SerialType { -// FULL, -// RANGE, -// SPARSE, -// }; - -enum SampleResult { - GOOD, // sampling this sketch returned a single non-zero value - ZERO, // sampling this sketch returned that there are no non-zero values - FAIL // sampling this sketch failed to produce a single non-zero value -}; - -struct SketchSample { - vec_t idx; - SampleResult result; -}; - -struct ExhaustiveSketchSample { - std::unordered_set idxs; - SampleResult result; -}; - -/** - * Sketch for graph processing, either CubeSketch or CameoSketch. - * Sub-linear representation of a vector. - */ -class Sketch { - private: - const uint64_t seed; // seed for hash functions - size_t num_samples; // number of samples we can perform - size_t cols_per_sample; // number of columns to use on each sample - size_t num_columns; // Total number of columns. (product of above 2) - size_t bkt_per_col; // number of buckets per column - size_t num_buckets; // number of total buckets (product of above 2) - - size_t sample_idx = 0; // number of samples performed so far - - // bucket data - Bucket* buckets; - - public: - /** - * The below constructors use vector length as their input. However, in graph sketching our input - * is the number of vertices. This function converts from number of graph vertices to vector - * length. - * @param num_vertices Number of graph vertices - * @return The length of the vector to sketch - */ - static vec_t calc_vector_length(node_id_t num_vertices) { - return ceil(double(num_vertices) * (num_vertices - 1) / 2); - } - - /** - * This function computes the number of samples a Sketch should support in order to solve - * connected components. Optionally, can increase or decrease the number of samples by a - * multiplicative factor. - * @param num_vertices Number of graph vertices - * @param f Multiplicative sample factor - * @return The number of samples - */ - static size_t calc_cc_samples(node_id_t num_vertices, double f) { - return std::max(size_t(18), (size_t) ceil(f * log2(num_vertices) / num_samples_div)); - } - - /** - * Construct a sketch object - * @param vector_len Length of the vector we are sketching - * @param seed Random seed of the sketch - * @param num_samples [Optional] Number of samples this sketch supports (default = 1) - * @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1) - */ - Sketch(vec_t vector_len, uint64_t seed, size_t num_samples = 1, - size_t cols_per_sample = default_cols_per_sample); - - /** - * Construct a sketch from a serialized stream - * @param vector_len Length of the vector we are sketching - * @param seed Random seed of the sketch - * @param binary_in Stream holding serialized sketch object - * @param num_samples [Optional] Number of samples this sketch supports (default = 1) - * @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1) - */ - Sketch(vec_t vector_len, uint64_t seed, std::istream& binary_in, size_t num_samples = 1, - size_t cols_per_sample = default_cols_per_sample); - - /** - * Sketch copy constructor - * @param s The sketch to copy. - */ - Sketch(const Sketch& s); - - ~Sketch(); - - /** - * Update a sketch based on information about one of its indices. - * @param update the point update. - */ - void update(const vec_t update); - - /** - * Function to sample from the sketch. - * cols_per_sample determines the number of columns we allocate to this query - * @return A pair with the result index and a code indicating the type of result. - */ - SketchSample sample(); - - /** - * Function to sample from the appropriate columns to return 1 or more non-zero indices - * @return A pair with the result indices and a code indicating the type of result. - */ - ExhaustiveSketchSample exhaustive_sample(); - - std::mutex mutex; // lock the sketch for applying updates in multithreaded processing - - /** - * In-place merge function. - * @param other Sketch to merge into caller - */ - void merge(const Sketch &other); - - /** - * In-place range merge function. Updates the caller Sketch. - * The range merge only merges some of the Sketches - * This function should only be used if you know what you're doing - * @param other Sketch to merge into caller - * @param start_sample Index of first sample to merge - * @param n_samples Number of samples to merge - */ - void range_merge(const Sketch &other, size_t start_sample, size_t n_samples); - - /** - * Perform an in-place merge function without another Sketch and instead - * use a raw bucket memory. - * We also allow for only a portion of the buckets to be merge at once - * @param raw_bucket Raw bucket data to merge into this sketch - */ - void merge_raw_bucket_buffer(const Bucket *raw_buckets); - - /** - * Zero out all the buckets of a sketch. - */ - void zero_contents(); - - friend bool operator==(const Sketch& sketch1, const Sketch& sketch2); - friend std::ostream& operator<<(std::ostream& os, const Sketch& sketch); - - /** - * Serialize the sketch to a binary output stream. - * @param binary_out the stream to write to. - */ - void serialize(std::ostream& binary_out) const; - - inline void reset_sample_state() { - sample_idx = 0; - } - - // return the size of the sketching datastructure in bytes (just the buckets, not the metadata) - inline size_t bucket_array_bytes() const { return num_buckets * sizeof(Bucket); } - - inline const Bucket* get_readonly_bucket_ptr() const { return (const Bucket*) buckets; } - inline uint64_t get_seed() const { return seed; } - inline size_t column_seed(size_t column_idx) const { return seed + column_idx * 5; } - inline size_t checksum_seed() const { return seed; } - inline size_t get_columns() const { return num_columns; } - inline size_t get_buckets() const { return num_buckets; } - inline size_t get_num_samples() const { return num_samples; } - - static size_t calc_bkt_per_col(size_t n) { return ceil(log2(n)) + 1; } - -#ifdef L0_SAMPLING - static constexpr size_t default_cols_per_sample = 7; - // NOTE: can improve this but leaving for comparison purposes - static constexpr double num_samples_div = log2(3) - 1; +#ifdef L0_FULLY_DENSE +typedef DenseSketch Sketch; #else - static constexpr size_t default_cols_per_sample = 1; - static constexpr double num_samples_div = 1 - log2(2 - 0.8); +typedef SparseSketch Sketch; #endif -}; - -class OutOfSamplesException : public std::exception { - private: - std::string err_msg; - public: - OutOfSamplesException(size_t seed, size_t num_samples, size_t sample_idx) - : err_msg("This sketch (seed=" + std::to_string(seed) + - ", max samples=" + std::to_string(num_samples) + - ") cannot be sampled more times (cur idx=" + std::to_string(sample_idx) + ")!") {} - virtual const char* what() const throw() { - return err_msg.c_str(); - } -}; diff --git a/include/sketch_types.h b/include/sketch_types.h new file mode 100644 index 00000000..19f67b9f --- /dev/null +++ b/include/sketch_types.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + +#include "types.h" +// enum SerialType { +// FULL, +// RANGE, +// SPARSE, +// }; + +enum SampleResult { + GOOD, // sampling this sketch returned a single non-zero value + ZERO, // sampling this sketch returned that there are no non-zero values + FAIL // sampling this sketch failed to produce a single non-zero value +}; + +struct SketchSample { + vec_t idx; + SampleResult result; +}; + +struct ExhaustiveSketchSample { + std::vector idxs; + SampleResult result; +}; + +class OutOfSamplesException : public std::exception { + private: + std::string err_msg; + + public: + OutOfSamplesException(size_t seed, size_t num_samples, size_t sample_idx) + : err_msg("This sketch (seed=" + std::to_string(seed) + + ", max samples=" + std::to_string(num_samples) + + ") cannot be sampled more times (cur idx=" + std::to_string(sample_idx) + ")!") {} + virtual const char* what() const throw() { return err_msg.c_str(); } +}; diff --git a/include/sparse_sketch.h b/include/sparse_sketch.h new file mode 100644 index 00000000..b17c6562 --- /dev/null +++ b/include/sparse_sketch.h @@ -0,0 +1,303 @@ +#pragma once +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "util.h" +#include "bucket.h" +#include "sketch_types.h" + +#pragma pack(push,1) +struct SparseBucket { + uint8_t next; // index of next sparse bucket in this column + uint8_t row; // row of sparse bucket + Bucket bkt; // actual bucket content +}; +#pragma pack(pop) + +// TODO: Do we want to use row major or column major order? +// So the advantage of row-major is that we can update faster. Most updates will only touch +// first few rows of data-structure. However, could slow down queries. (Although most query +// answers will probably be in sparse data-structure). OH! Also, range_merge is important here +// if column-major then the column we are merging is contig, if not, then not. +// A: Keep column-major for the moment, performance evaluation later. + +/* Memory Allocation of a SparseSketch. Contiguous (only roughly to scale). + Where z is number of non-zero elements in vector we are sketching. + _________________________________________________________________________________________________ +| Dense | Sparse | Linked List | +| Bucket | Bucket | Metadata | +| Region | Region | for Sparse bkts | +| log n * log z buckets | clog n buckets | clogn/16 buckets | +|_________________________________________________|____________________________|__________________| +*/ + +/** + * SparseSketch for graph processing + * Sub-linear representation of a vector. + */ +class SparseSketch { + private: + const uint64_t seed; // seed for hash functions + const size_t num_samples; // number of samples we can perform + const size_t cols_per_sample; // number of columns to use on each sample + const size_t num_columns; // Total number of columns. (product of above 2) + const size_t bkt_per_col; // maximum number of buckets per column (max number of rows) + + size_t num_buckets; // number of total buckets (col * dense_rows + sparse_capacity) + size_t sample_idx = 0; // number of samples performed so far + + // Allocated buckets + Bucket* buckets; + + static constexpr size_t min_num_dense_rows = 6; + size_t num_dense_rows = min_num_dense_rows; + + // Variables for sparse representation of lower levels of bucket Matrix + // TODO: evaluate implications of this constant + static constexpr double sparse_bucket_constant = 3; // constant factor c (see diagram) + SparseBucket* sparse_buckets; // a pointer into the buckets array + uint8_t *ll_metadata; // pointer to heads of column LLs + size_t number_of_sparse_buckets = 0; // cur number of sparse buckets + size_t sparse_capacity = sparse_bucket_constant * num_columns; // max number of sparse buckets + + /** + * Reallocates the bucket array if necessary to either grow or shrink the dense region + */ + void reallocate_if_needed(int delta); + void dense_realloc(size_t new_num_dense_rows); + + // These variables let us know how many Buckets to allocate to make space for the SparseBuckets + // and the LL metadata that will use that space + size_t sparse_data_size = ceil(double(sparse_capacity) * sizeof(SparseBucket) / sizeof(Bucket)); + size_t ll_metadata_size = ceil((double(num_columns) + 1) * sizeof(uint8_t) / sizeof(Bucket)); + + void update_sparse(uint8_t col, const SparseBucket &to_add); + SketchSample sample_sparse(size_t first_col, size_t end_col); + + inline uint8_t remove_ll_head(size_t col) { + uint8_t temp = ll_metadata[col]; + ll_metadata[col] = sparse_buckets[ll_metadata[col]].next; + return temp; + } + inline uint8_t claim_free_bucket() { + assert(ll_metadata[num_columns] != uint8_t(-1)); + return remove_ll_head(num_columns); + } + inline void insert_to_ll_head(size_t col, uint8_t add_idx) { + sparse_buckets[add_idx].next = ll_metadata[col]; + ll_metadata[col] = add_idx; + } + inline void free_bucket(uint8_t bkt_idx) { + sparse_buckets[bkt_idx].row = 0; + sparse_buckets[bkt_idx].bkt = {0, 0}; + insert_to_ll_head(num_columns, bkt_idx); + } + inline void insert_to_ll(uint8_t add_idx, SparseBucket &prev) { + sparse_buckets[add_idx].next = prev.next; + prev.next = add_idx; + } + inline void remove_from_ll(SparseBucket& bkt_to_remove, SparseBucket &prev) { + prev.next = bkt_to_remove.next; + } + inline bool merge_sparse_bkt(uint8_t our_idx, const SparseBucket& oth, uint8_t prev_idx, + size_t col) { + SparseBucket &ours = sparse_buckets[our_idx]; + ours.bkt.alpha ^= oth.bkt.alpha; + ours.bkt.gamma ^= oth.bkt.gamma; + if (Bucket_Boruvka::is_empty(ours.bkt)) { + if (prev_idx == uint8_t(-1)) + remove_ll_head(col); + else + remove_from_ll(ours, sparse_buckets[prev_idx]); + + free_bucket(our_idx); + return true; + } + return false; + } + + inline Bucket& deterministic_bucket() { + return buckets[0]; + } + inline const Bucket& deterministic_bucket() const { + return buckets[0]; + } + + inline size_t position_func(size_t col, size_t row, size_t num_rows) const { + return col * num_rows + row + 1; + } + + // return the bucket at a particular index in bucket array + inline Bucket& bucket(size_t col, size_t row) { + assert(row < num_dense_rows); + return buckets[position_func(col, row, num_dense_rows)]; + } + inline const Bucket& bucket(size_t col, size_t row) const { + assert(row < num_dense_rows); + return buckets[position_func(col, row, num_dense_rows)]; + } + + size_t calc_num_buckets(size_t new_num_dense_rows) { + return num_columns * new_num_dense_rows + sparse_data_size + ll_metadata_size + 1; + } + + size_t calc_sparse_index(size_t rows) { + return num_columns * rows + 1; + } + + size_t calc_metadata_index(size_t rows) { + return num_columns * rows + sparse_data_size + 1; + } + + void upd_sparse_ptrs() { + sparse_buckets = (SparseBucket *) &buckets[calc_sparse_index(num_dense_rows)]; + ll_metadata = (uint8_t *) &buckets[calc_metadata_index(num_dense_rows)]; + } + + // given another SparseSketch column, merge it into ours + void merge_sparse_column(const SparseBucket* oth_sparse_buckets, const uint8_t* oth_ll_metadata, + size_t col); + public: + /** + * The below constructors use vector length as their input. However, in graph sketching our input + * is the number of vertices. This function converts from number of graph vertices to vector + * length. + * @param num_vertices Number of graph vertices + * @return The length of the vector to sketch + */ + static vec_t calc_vector_length(node_id_t num_vertices) { + return ceil(double(num_vertices) * (num_vertices - 1) / 2); + } + + /** + * This function computes the number of samples a Sketch should support in order to solve + * connected components. Optionally, can increase or decrease the number of samples by a + * multiplicative factor. + * @param num_vertices Number of graph vertices + * @param f Multiplicative sample factor + * @return The number of samples + */ + static size_t calc_cc_samples(node_id_t num_vertices, double f) { + return std::max(size_t(18), (size_t) ceil(f * log2(num_vertices) / num_samples_div)); + } + + /** + * Construct a sketch object + * @param vector_len Length of the vector we are sketching + * @param seed Random seed of the sketch + * @param num_samples [Optional] Number of samples this sketch supports (default = 1) + * @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1) + */ + SparseSketch(vec_t vector_len, uint64_t seed, size_t num_samples = 1, + size_t cols_per_sample = default_cols_per_sample); + + /** + * Construct a sketch from a serialized stream + * @param vector_len Length of the vector we are sketching + * @param seed Random seed of the sketch + * @param binary_in Stream holding serialized sketch object + * @param num_buckets Number of buckets in serialized sketch (dense + sparse_capacity) + * @param num_samples [Optional] Number of samples this sketch supports (default = 1) + * @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1) + */ + SparseSketch(vec_t vector_len, uint64_t seed, std::istream& binary_in, size_t num_buckets, + size_t num_samples = 1, size_t cols_per_sample = default_cols_per_sample); + + /** + * SparseSketch copy constructor + * @param s The sketch to copy. + */ + SparseSketch(const SparseSketch& s); + + ~SparseSketch(); + + /** + * Update a sketch based on information about one of its indices. + * @param update the point update. + */ + void update(const vec_t update); + + /** + * Function to sample from the sketch. + * cols_per_sample determines the number of columns we allocate to this query + * @return A pair with the result index and a code indicating the type of result. + */ + SketchSample sample(); + + /** + * Function to sample from the appropriate columns to return 1 or more non-zero indices + * @return A pair with the result indices and a code indicating the type of result. + */ + ExhaustiveSketchSample exhaustive_sample(); + + std::mutex mutex; // lock the sketch for applying updates in multithreaded processing + + /** + * In-place merge function. + * @param other Sketch to merge into caller + */ + void merge(const SparseSketch &other); + + /** + * In-place range merge function. Updates the caller Sketch. + * The range merge only merges some of the Sketches + * This function should only be used if you know what you're doing + * @param other Sketch to merge into caller + * @param start_sample Index of first sample to merge + * @param n_samples Number of samples to merge + */ + void range_merge(const SparseSketch &other, size_t start_sample, size_t n_samples); + + /** + * Perform an in-place merge function without another Sketch and instead + * use a raw bucket memory. + * We also allow for only a portion of the buckets to be merge at once + * @param raw_bucket Raw bucket data to merge into this sketch + * @param n_raw_buckets Size of raw_buckets in number of Bucket data-structures + */ + void merge_raw_bucket_buffer(const Bucket *raw_buckets, size_t n_raw_buckets); + + /** + * Zero out all the buckets of a sketch. + */ + void zero_contents(); + + friend bool operator==(const SparseSketch& sketch1, const SparseSketch& sketch2); + friend std::ostream& operator<<(std::ostream& os, const SparseSketch& sketch); + + /** + * Serialize the sketch to a binary output stream. + * @param binary_out the stream to write to. + */ + void serialize(std::ostream& binary_out) const; + + inline void reset_sample_state() { + sample_idx = 0; + } + + // return the size of the sketching datastructure in bytes (just the buckets, not the metadata) + inline size_t bucket_array_bytes() const { + return num_buckets * sizeof(Bucket); + } + + inline const Bucket* get_readonly_bucket_ptr() const { return (const Bucket*) buckets; } + inline uint64_t get_seed() const { return seed; } + inline size_t column_seed(size_t column_idx) const { return seed + column_idx * 5; } + inline size_t checksum_seed() const { return seed; } + inline size_t get_columns() const { return num_columns; } + inline size_t get_buckets() const { return num_buckets; } + inline size_t get_num_samples() const { return num_samples; } + inline size_t get_num_dense_rows() const { return num_dense_rows; } + + static size_t calc_bkt_per_col(size_t n) { return ceil(log2(n)) + 1; } + + static constexpr size_t default_cols_per_sample = 1; + static constexpr double num_samples_div = 1 - log2(2 - 0.8); +}; diff --git a/src/cc_alg_configuration.cpp b/src/cc_alg_configuration.cpp index becbb7e5..2ce12af1 100644 --- a/src/cc_alg_configuration.cpp +++ b/src/cc_alg_configuration.cpp @@ -34,6 +34,11 @@ std::ostream& operator<< (std::ostream &out, const CCAlgConfiguration &conf) { #else out << " Sketching algorithm = CameoSketch" << std::endl; #endif +#ifdef L0_FULLY_DENSE + out << " Sketch storage = Dense Matrix" << std::endl; +#else + out << " Sketch storage = Hybrid Matrix" << std::endl; +#endif #ifdef NO_EAGER_DSU out << " Using Eager DSU = False" << std::endl; #else diff --git a/src/cc_sketch_alg.cpp b/src/cc_sketch_alg.cpp index a1e688db..f8e5649d 100644 --- a/src/cc_sketch_alg.cpp +++ b/src/cc_sketch_alg.cpp @@ -53,7 +53,10 @@ CCSketchAlg::CCSketchAlg(node_id_t num_vertices, size_t seed, std::ifstream &bin for (node_id_t i = 0; i < num_vertices; ++i) { representatives->insert(i); - sketches[i] = new Sketch(sketch_vec_len, seed, binary_stream, sketch_num_samples); + size_t num_bkts_in_sketch; + binary_stream.read((char *) &num_bkts_in_sketch, sizeof(num_bkts_in_sketch)); + sketches[i] = + new Sketch(sketch_vec_len, seed, binary_stream, num_bkts_in_sketch, sketch_num_samples); } binary_stream.close(); @@ -117,9 +120,10 @@ void CCSketchAlg::apply_update_batch(int thr_id, node_id_t src_vertex, sketches[src_vertex]->merge(delta_sketch); } -void CCSketchAlg::apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets) { +void CCSketchAlg::apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets, + size_t num_buckets) { std::lock_guard lk(sketches[src_vertex]->mutex); - sketches[src_vertex]->merge_raw_bucket_buffer(raw_buckets); + sketches[src_vertex]->merge_raw_bucket_buffer(raw_buckets, num_buckets); } // Note: for performance reasons route updates through the driver instead of calling this function @@ -617,6 +621,8 @@ void CCSketchAlg::write_binary(const std::string &filename) { binary_out.write((char *)&num_vertices, sizeof(num_vertices)); binary_out.write((char *)&config._sketches_factor, sizeof(config._sketches_factor)); for (node_id_t i = 0; i < num_vertices; ++i) { + size_t num_bkts_in_sketch = sketches[i]->get_buckets(); + binary_out.write((char*) &num_bkts_in_sketch, sizeof(num_bkts_in_sketch)); sketches[i]->serialize(binary_out); } binary_out.close(); diff --git a/src/dense_sketch.cpp b/src/dense_sketch.cpp new file mode 100644 index 00000000..1b41cc00 --- /dev/null +++ b/src/dense_sketch.cpp @@ -0,0 +1,245 @@ +#include "dense_sketch.h" + +#include +#include +#include +#include +#include + +DenseSketch::DenseSketch(vec_t vector_len, uint64_t seed, size_t _samples, size_t _cols) + : seed(seed), + num_samples(_samples), + cols_per_sample(_cols), + num_columns(cols_per_sample * num_samples), + bkt_per_col(calc_bkt_per_col(vector_len)) { + + num_buckets = num_columns * bkt_per_col + 1; // plus 1, deterministic bucket + buckets = new Bucket[num_buckets]; + + // initialize bucket values + for (size_t i = 0; i < num_buckets; ++i) { + buckets[i].alpha = 0; + buckets[i].gamma = 0; + } +} + +DenseSketch::DenseSketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, + size_t num_buckets, size_t _samples, size_t _cols) + : seed(seed), + num_samples(_samples), + cols_per_sample(_cols), + num_columns(cols_per_sample * num_samples), + bkt_per_col(calc_bkt_per_col(vector_len)), + num_buckets(num_buckets) { + if (num_buckets != num_columns * bkt_per_col + 1) { + throw std::invalid_argument("Serial Constructor: Number of buckets does not match expectation"); + } + num_buckets = num_columns * bkt_per_col + 1; // plus 1 for deterministic bucket + buckets = new Bucket[num_buckets]; + + // Read the serialized Sketch contents + binary_in.read((char *)buckets, bucket_array_bytes()); +} + +DenseSketch::DenseSketch(const DenseSketch &s) + : seed(s.seed), + num_samples(s.num_samples), + cols_per_sample(s.cols_per_sample), + num_columns(s.num_columns), + bkt_per_col(s.bkt_per_col) { + num_buckets = s.num_buckets; + buckets = new Bucket[num_buckets]; + + std::memcpy(buckets, s.buckets, bucket_array_bytes()); +} + +DenseSketch::~DenseSketch() { delete[] buckets; } + + +void DenseSketch::update(const vec_t update_idx) { + vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed()); + + // Update depth 0 bucket + Bucket_Boruvka::update(deterministic_bucket(), update_idx, checksum); + + // Update higher depth buckets + for (unsigned i = 0; i < num_columns; ++i) { + col_hash_t depth = Bucket_Boruvka::get_index_depth(update_idx, column_seed(i), bkt_per_col); + likely_if(depth < bkt_per_col) { + Bucket_Boruvka::update(bucket(i, depth), update_idx, checksum); + } + } +} + +static void is_empty(DenseSketch &skt) { + const Bucket* buckets = skt.get_readonly_bucket_ptr(); + for (size_t i = 0; i < skt.get_buckets(); i++) { + if (!Bucket_Boruvka::is_empty(buckets[i])) { + std::cerr << "FOUND NOT EMPTY BUCKET!" << std::endl; + } + } +} + +// TODO: Switch the L0_SAMPLING flag to instead affect query procedure. +// (Only use deepest bucket. We don't need the alternate update procedure in the code anymore.) + +void DenseSketch::zero_contents() { + for (size_t i = 0; i < num_buckets; i++) { + buckets[i].alpha = 0; + buckets[i].gamma = 0; + } + reset_sample_state(); +} + +SketchSample DenseSketch::sample() { + if (sample_idx >= num_samples) { + throw OutOfSamplesException(seed, num_samples, sample_idx); + } + + size_t idx = sample_idx++; + size_t first_column = idx * cols_per_sample; + + // std::cout << "Sampling: " << first_column << ", " << first_column + cols_per_sample << std::endl; + + // std::cout << *this << std::endl; + + if (Bucket_Boruvka::is_empty(deterministic_bucket())) { + is_empty(*this); + return {0, ZERO}; // the "first" bucket is deterministic so if all zero then no edges to return + } + + if (Bucket_Boruvka::is_good(deterministic_bucket(), checksum_seed())) + return {deterministic_bucket().alpha, GOOD}; + + for (size_t i = 0; i < cols_per_sample; ++i) { + for (size_t j = 0; j < bkt_per_col; ++j) { + if (Bucket_Boruvka::is_good(bucket(i + first_column, j), checksum_seed())) + return {bucket(i + first_column, j).alpha, GOOD}; + } + } + return {0, FAIL}; +} + +ExhaustiveSketchSample DenseSketch::exhaustive_sample() { + if (sample_idx >= num_samples) { + throw OutOfSamplesException(seed, num_samples, sample_idx); + } + std::vector ret; + + size_t idx = sample_idx++; + size_t first_column = idx * cols_per_sample; + + unlikely_if (deterministic_bucket().alpha == 0 && deterministic_bucket().gamma == 0) + return {ret, ZERO}; // the "first" bucket is deterministic so if zero then no edges to return + + unlikely_if (Bucket_Boruvka::is_good(deterministic_bucket(), checksum_seed())) { + ret.push_back(deterministic_bucket().alpha); + return {ret, GOOD}; + } + + for (size_t i = 0; i < cols_per_sample; ++i) { + for (size_t j = 0; j < bkt_per_col; ++j) { + unlikely_if (Bucket_Boruvka::is_good(bucket(i + first_column, j), checksum_seed())) { + ret.push_back(bucket(i + first_column, j).alpha); + } + } + } + + unlikely_if (ret.size() == 0) + return {ret, FAIL}; + return {ret, GOOD}; +} + +void DenseSketch::merge(const DenseSketch &other) { + for (size_t i = 0; i < num_buckets; ++i) { + buckets[i].alpha ^= other.buckets[i].alpha; + buckets[i].gamma ^= other.buckets[i].gamma; + } +} + +void DenseSketch::range_merge(const DenseSketch &other, size_t start_sample, size_t n_samples) { + if (start_sample + n_samples > num_samples) { + assert(false); + sample_idx = num_samples; // sketch is in a fail state! + return; + } + + // std::cout << "MERGING THIS" << std::endl; + // std::cout << *this << std::endl; + // std::cout << "WITH THIS" << std::endl; + // std::cout << other << std::endl; + + // update sample idx to point at beginning of this range if before it + sample_idx = std::max(sample_idx, start_sample); + + // merge deterministic bucket + // TODO: I don't like this. Repeated calls to range_merge on same sketches will potentially cause us issues + deterministic_bucket().alpha ^= other.deterministic_bucket().alpha; + deterministic_bucket().gamma ^= other.deterministic_bucket().gamma; + + // merge other buckets + size_t start_column = start_sample * cols_per_sample; + size_t end_column = (start_sample + n_samples) * cols_per_sample; + + // std::cout << start_column << ", " << end_column << std::endl; + for (size_t i = start_column; i < end_column; i++) { + for (size_t j = 0; j < bkt_per_col; j++) { + bucket(i, j).alpha ^= other.bucket(i, j).alpha; + bucket(i, j).gamma ^= other.bucket(i, j).gamma; + } + } + + // std::cout << "RESULT" << std::endl; + // std::cout << *this << std::endl; +} + +void DenseSketch::merge_raw_bucket_buffer(const Bucket *raw_buckets, size_t n_raw_buckets) { + if (n_raw_buckets != num_buckets) { + throw std::invalid_argument("Raw bucket buffer is not the same size as DenseSketch"); + } + + for (size_t i = 0; i < num_buckets; i++) { + buckets[i].alpha ^= raw_buckets[i].alpha; + buckets[i].gamma ^= raw_buckets[i].gamma; + } +} + +void DenseSketch::serialize(std::ostream &binary_out) const { + binary_out.write((char*) buckets, bucket_array_bytes()); +} + +bool operator==(const DenseSketch &sketch1, const DenseSketch &sketch2) { + if (sketch1.num_buckets != sketch2.num_buckets || sketch1.seed != sketch2.seed) + return false; + + for (size_t i = 0; i < sketch1.num_buckets; ++i) { + if (sketch1.buckets[i].alpha != sketch2.buckets[i].alpha || + sketch1.buckets[i].gamma != sketch2.buckets[i].gamma) { + return false; + } + } + + return true; +} + +std::ostream &operator<<(std::ostream &os, const DenseSketch &sketch) { + Bucket bkt = sketch.buckets[sketch.num_buckets - 1]; + bool good = Bucket_Boruvka::is_good(bkt, sketch.checksum_seed()); + vec_t a = bkt.alpha; + vec_hash_t c = bkt.gamma; + + os << " a:" << a << " c:" << c << (good ? " good" : " bad") << std::endl; + + for (unsigned i = 0; i < sketch.num_columns; ++i) { + for (unsigned j = 0; j < sketch.bkt_per_col; ++j) { + Bucket bkt = sketch.bucket(i, j); + vec_t a = bkt.alpha; + vec_hash_t c = bkt.gamma; + bool good = Bucket_Boruvka::is_good(bkt, sketch.checksum_seed()); + + os << " a:" << a << " c:" << c << (good ? " good" : " bad") << std::endl; + } + os << std::endl; + } + return os; +} diff --git a/src/sketch.cpp b/src/sketch.cpp deleted file mode 100644 index ac674c5e..00000000 --- a/src/sketch.cpp +++ /dev/null @@ -1,227 +0,0 @@ -#include "sketch.h" - -#include -#include -#include -#include - -Sketch::Sketch(vec_t vector_len, uint64_t seed, size_t _samples, size_t _cols) : seed(seed) { - num_samples = _samples; - cols_per_sample = _cols; - num_columns = num_samples * cols_per_sample; - bkt_per_col = calc_bkt_per_col(vector_len); - num_buckets = num_columns * bkt_per_col + 1; // plus 1 for deterministic bucket - buckets = new Bucket[num_buckets]; - - // initialize bucket values - for (size_t i = 0; i < num_buckets; ++i) { - buckets[i].alpha = 0; - buckets[i].gamma = 0; - } -} - -Sketch::Sketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, size_t _samples, - size_t _cols) - : seed(seed) { - num_samples = _samples; - cols_per_sample = _cols; - num_columns = num_samples * cols_per_sample; - bkt_per_col = calc_bkt_per_col(vector_len); - num_buckets = num_columns * bkt_per_col + 1; // plus 1 for deterministic bucket - buckets = new Bucket[num_buckets]; - - // Read the serialized Sketch contents - binary_in.read((char *)buckets, bucket_array_bytes()); -} - -Sketch::Sketch(const Sketch &s) : seed(s.seed) { - num_samples = s.num_samples; - cols_per_sample = s.cols_per_sample; - num_columns = s.num_columns; - bkt_per_col = s.bkt_per_col; - num_buckets = s.num_buckets; - buckets = new Bucket[num_buckets]; - - std::memcpy(buckets, s.buckets, bucket_array_bytes()); -} - -Sketch::~Sketch() { delete[] buckets; } - -#ifdef L0_SAMPLING -void Sketch::update(const vec_t update_idx) { - vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed()); - - // Update depth 0 bucket - Bucket_Boruvka::update(buckets[num_buckets - 1], update_idx, checksum); - - // Update higher depth buckets - for (unsigned i = 0; i < num_columns; ++i) { - col_hash_t depth = Bucket_Boruvka::get_index_depth(update_idx, column_seed(i), bkt_per_col); - likely_if(depth < bkt_per_col) { - for (col_hash_t j = 0; j <= depth; ++j) { - size_t bucket_id = i * bkt_per_col + j; - Bucket_Boruvka::update(buckets[bucket_id], update_idx, checksum); - } - } - } -} -#else // Use support finding algorithm instead. Faster but no guarantee of uniform sample. -void Sketch::update(const vec_t update_idx) { - vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed()); - - // Update depth 0 bucket - Bucket_Boruvka::update(buckets[num_buckets - 1], update_idx, checksum); - - // Update higher depth buckets - for (unsigned i = 0; i < num_columns; ++i) { - col_hash_t depth = Bucket_Boruvka::get_index_depth(update_idx, column_seed(i), bkt_per_col); - size_t bucket_id = i * bkt_per_col + depth; - likely_if(depth < bkt_per_col) { - Bucket_Boruvka::update(buckets[bucket_id], update_idx, checksum); - } - } -} -#endif - -void Sketch::zero_contents() { - for (size_t i = 0; i < num_buckets; i++) { - buckets[i].alpha = 0; - buckets[i].gamma = 0; - } - reset_sample_state(); -} - -SketchSample Sketch::sample() { - if (sample_idx >= num_samples) { - throw OutOfSamplesException(seed, num_samples, sample_idx); - } - - size_t idx = sample_idx++; - size_t first_column = idx * cols_per_sample; - - if (buckets[num_buckets - 1].alpha == 0 && buckets[num_buckets - 1].gamma == 0) - return {0, ZERO}; // the "first" bucket is deterministic so if all zero then no edges to return - - if (Bucket_Boruvka::is_good(buckets[num_buckets - 1], checksum_seed())) - return {buckets[num_buckets - 1].alpha, GOOD}; - - for (size_t i = 0; i < cols_per_sample; ++i) { - for (size_t j = 0; j < bkt_per_col; ++j) { - size_t bucket_id = (i + first_column) * bkt_per_col + j; - if (Bucket_Boruvka::is_good(buckets[bucket_id], checksum_seed())) - return {buckets[bucket_id].alpha, GOOD}; - } - } - return {0, FAIL}; -} - -ExhaustiveSketchSample Sketch::exhaustive_sample() { - if (sample_idx >= num_samples) { - throw OutOfSamplesException(seed, num_samples, sample_idx); - } - std::unordered_set ret; - - size_t idx = sample_idx++; - size_t first_column = idx * cols_per_sample; - - unlikely_if (buckets[num_buckets - 1].alpha == 0 && buckets[num_buckets - 1].gamma == 0) - return {ret, ZERO}; // the "first" bucket is deterministic so if zero then no edges to return - - unlikely_if (Bucket_Boruvka::is_good(buckets[num_buckets - 1], checksum_seed())) { - ret.insert(buckets[num_buckets - 1].alpha); - return {ret, GOOD}; - } - - for (size_t i = 0; i < cols_per_sample; ++i) { - for (size_t j = 0; j < bkt_per_col; ++j) { - size_t bucket_id = (i + first_column) * bkt_per_col + j; - unlikely_if (Bucket_Boruvka::is_good(buckets[bucket_id], checksum_seed())) { - ret.insert(buckets[bucket_id].alpha); - } - } - } - - unlikely_if (ret.size() == 0) - return {ret, FAIL}; - return {ret, GOOD}; -} - -void Sketch::merge(const Sketch &other) { - for (size_t i = 0; i < num_buckets; ++i) { - buckets[i].alpha ^= other.buckets[i].alpha; - buckets[i].gamma ^= other.buckets[i].gamma; - } -} - -void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samples) { - if (start_sample + n_samples > num_samples) { - assert(false); - sample_idx = num_samples; // sketch is in a fail state! - return; - } - - // update sample idx to point at beginning of this range if before it - sample_idx = std::max(sample_idx, start_sample); - - // merge deterministic buffer - buckets[num_buckets - 1].alpha ^= other.buckets[num_buckets - 1].alpha; - buckets[num_buckets - 1].gamma ^= other.buckets[num_buckets - 1].gamma; - - // merge other buckets - size_t start_bucket_id = start_sample * cols_per_sample * bkt_per_col; - size_t n_buckets = n_samples * cols_per_sample * bkt_per_col; - - for (size_t i = 0; i < n_buckets; i++) { - size_t bucket_id = start_bucket_id + i; - buckets[bucket_id].alpha ^= other.buckets[bucket_id].alpha; - buckets[bucket_id].gamma ^= other.buckets[bucket_id].gamma; - } -} - -void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) { - for (size_t i = 0; i < num_buckets; i++) { - buckets[i].alpha ^= raw_buckets[i].alpha; - buckets[i].gamma ^= raw_buckets[i].gamma; - } -} - -void Sketch::serialize(std::ostream &binary_out) const { - binary_out.write((char*) buckets, bucket_array_bytes()); -} - -bool operator==(const Sketch &sketch1, const Sketch &sketch2) { - if (sketch1.num_buckets != sketch2.num_buckets || sketch1.seed != sketch2.seed) - return false; - - for (size_t i = 0; i < sketch1.num_buckets; ++i) { - if (sketch1.buckets[i].alpha != sketch2.buckets[i].alpha || - sketch1.buckets[i].gamma != sketch2.buckets[i].gamma) { - return false; - } - } - - return true; -} - -std::ostream &operator<<(std::ostream &os, const Sketch &sketch) { - Bucket bkt = sketch.buckets[sketch.num_buckets - 1]; - bool good = Bucket_Boruvka::is_good(bkt, sketch.checksum_seed()); - vec_t a = bkt.alpha; - vec_hash_t c = bkt.gamma; - - os << " a:" << a << " c:" << c << (good ? " good" : " bad") << std::endl; - - for (unsigned i = 0; i < sketch.num_columns; ++i) { - for (unsigned j = 0; j < sketch.bkt_per_col; ++j) { - unsigned bucket_id = i * sketch.bkt_per_col + j; - Bucket bkt = sketch.buckets[bucket_id]; - vec_t a = bkt.alpha; - vec_hash_t c = bkt.gamma; - bool good = Bucket_Boruvka::is_good(bkt, sketch.checksum_seed()); - - os << " a:" << a << " c:" << c << (good ? " good" : " bad") << std::endl; - } - os << std::endl; - } - return os; -} diff --git a/src/sparse_sketch.cpp b/src/sparse_sketch.cpp new file mode 100644 index 00000000..cc51bc6f --- /dev/null +++ b/src/sparse_sketch.cpp @@ -0,0 +1,627 @@ +#include "sparse_sketch.h" + +#include +#include +#include +#include + +SparseSketch::SparseSketch(vec_t vector_len, uint64_t seed, size_t _samples, size_t _cols) + : seed(seed), + num_samples(_samples), + cols_per_sample(_cols), + num_columns(cols_per_sample * num_samples), + bkt_per_col(calc_bkt_per_col(vector_len)) { + + // plus 1, deterministic bucket + num_buckets = calc_num_buckets(num_dense_rows); + buckets = new Bucket[num_buckets]; + upd_sparse_ptrs(); + + // initialize bucket values + for (size_t i = 0; i < num_buckets; ++i) { + buckets[i].alpha = 0; + buckets[i].gamma = 0; + } + + // initialize sparse bucket linked lists + // every bucket is currently free, so each points to next + for (size_t i = 0; i < sparse_capacity; i++) { + sparse_buckets[i].next = i + 1; + } + sparse_buckets[sparse_capacity - 1].next = uint8_t(-1); + + // initialize LL metadata + for (size_t i = 0; i < num_columns; i++) { + ll_metadata[i] = uint8_t(-1); // head of each column points nowhere (empty) + } + ll_metadata[num_columns] = 0; // free list head +} + +SparseSketch::SparseSketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, + size_t num_buckets, size_t _samples, size_t _cols) + : seed(seed), + num_samples(_samples), + cols_per_sample(_cols), + num_columns(cols_per_sample * num_samples), + bkt_per_col(calc_bkt_per_col(vector_len)), + num_buckets(num_buckets) { + buckets = new Bucket[num_buckets]; + num_dense_rows = (num_buckets - sparse_data_size) / num_columns; + upd_sparse_ptrs(); + + // Read the serialized Sketch contents + binary_in.read((char *)buckets, bucket_array_bytes()); +} + +SparseSketch::SparseSketch(const SparseSketch &s) + : seed(s.seed), + num_samples(s.num_samples), + cols_per_sample(s.cols_per_sample), + num_columns(s.num_columns), + bkt_per_col(s.bkt_per_col), + num_buckets(s.num_buckets), + num_dense_rows(s.num_dense_rows) { + buckets = new Bucket[num_buckets]; + upd_sparse_ptrs(); + + std::memcpy(buckets, s.buckets, bucket_array_bytes()); +} + +SparseSketch::~SparseSketch() { + // std::cout << "Deleting sketch! buckets = " << buckets << std::endl; + delete[] buckets; +} + + +// Helper functions for interfacing with SparseBuckets +void SparseSketch::dense_realloc(size_t new_num_dense_rows) { + // we are performing a reallocation + const size_t old_rows = num_dense_rows; + SparseBucket *old_sparse_pointer = sparse_buckets; + Bucket *old_buckets = buckets; + + if (new_num_dense_rows < min_num_dense_rows) { + throw std::runtime_error("new_num_dense_rows too small!"); + } + + // std::cerr << *this << std::endl; + + if (new_num_dense_rows < num_dense_rows) { + // std::cerr << "Shrinking to " << new_num_dense_rows << " from " << old_rows << std::endl; + // shrink dense region + // Scan over the rows we are removing and add all those buckets to sparse + for (size_t c = 0; c < num_columns; c++) { + for (size_t r = new_num_dense_rows; r < old_rows; r++) { + Bucket bkt = bucket(c, r); + if (!Bucket_Boruvka::is_empty(bkt)) { + uint8_t free_idx = claim_free_bucket(); + sparse_buckets[free_idx].row = r; + sparse_buckets[free_idx].bkt = bkt; + insert_to_ll_head(c, free_idx); + number_of_sparse_buckets += 1; + } + } + } + + // Allocate new memory + num_dense_rows = new_num_dense_rows; + num_buckets = calc_num_buckets(num_dense_rows); + buckets = new Bucket[num_buckets]; + } else { + // std::cerr << "Growing to " << new_num_dense_rows << " from " << old_rows << std::endl; + // grow dense region by 1 row + // Allocate new memory + num_dense_rows = new_num_dense_rows; + num_buckets = calc_num_buckets(num_dense_rows); + buckets = new Bucket[num_buckets]; + + // initialize new rows to zero + for (size_t c = 0; c < num_columns; c++) { + for (size_t r = old_rows; r < num_dense_rows; r++) { + buckets[position_func(c, r, num_dense_rows)] = {0, 0}; + } + } + } + upd_sparse_ptrs(); + + // Copy dense content + buckets[0] = old_buckets[0]; + for (size_t c = 0; c < num_columns; c++) { + for (size_t r = 0; r < std::min(num_dense_rows, old_rows); r++) { + buckets[position_func(c, r, num_dense_rows)] = old_buckets[position_func(c, r, old_rows)]; + } + } + // sparse contents + memcpy(sparse_buckets, old_sparse_pointer, + (sparse_data_size + ll_metadata_size) * sizeof(Bucket)); + + if (num_dense_rows > old_rows) { + // We growing + // Scan sparse buckets and move all updates of depth num_dense_rows-1 + // to the new dense row + for (size_t c = 0; c < num_columns; c++) { + while (ll_metadata[c] != uint8_t(-1) && sparse_buckets[ll_metadata[c]].row < num_dense_rows) { + // remove this bucket from column ll + uint8_t idx = remove_ll_head(c); + number_of_sparse_buckets -= 1; + + // add this bucket to dense region + bucket(c, sparse_buckets[idx].row) = sparse_buckets[idx].bkt; + + // add this sparse_bucket to free list + free_bucket(idx); + } + } + } + + // std::cerr << *this << std::endl; + + // 4. Clean up + delete[] old_buckets; +} + +void SparseSketch::reallocate_if_needed(int delta) { + // if we're currently adding something, don't shrink + if (delta == 1 && number_of_sparse_buckets <= num_columns / 4) { + return; + } + + // while we need to reallocate, attempt to do so. If realloc doesn't solve problem. Do it again. + while ((delta == -1 && number_of_sparse_buckets <= num_columns / 4 && + num_dense_rows > min_num_dense_rows) || + (delta == 1 && number_of_sparse_buckets == sparse_capacity)) { + if (number_of_sparse_buckets >= sparse_capacity) { + dense_realloc(num_dense_rows + 1); + } else { + dense_realloc(num_dense_rows - 1); + } + } +} + +// Update a bucket value +// Changes number_of_sparse_buckets as follows: +// +1 if we added a new bucket value +// 0 if the bucket was found and update (but not cleared) +// -1 if the bucket was found and cleared of all content +void SparseSketch::update_sparse(uint8_t col, const SparseBucket &to_add) { + uint8_t next_ptr = ll_metadata[col]; + uint8_t prev = uint8_t(-1); + while (next_ptr != uint8_t(-1)) { + if (sparse_buckets[next_ptr].row == to_add.row) { + bool removed = merge_sparse_bkt(next_ptr, to_add, prev, col); + if (removed) { + number_of_sparse_buckets -= 1; + reallocate_if_needed(-1); + } + return; + } else if (sparse_buckets[next_ptr].row > to_add.row) { + break; + } + prev = next_ptr; + next_ptr = sparse_buckets[next_ptr].next; + } + + // pull a bucket off the free list and set it equal to to_add + uint8_t free_bucket = claim_free_bucket(); + // std::cerr << "free bucket = " << size_t(free_bucket) << std::endl; + // std::cerr << "next bucket = " << size_t(next_ptr) << std::endl; + // std::cerr << "free head = " << size_t(ll_metadata[num_columns]) << std::endl; + + // update bucket + sparse_buckets[free_bucket] = to_add; + number_of_sparse_buckets += 1; + // std::cerr << "new bucket " << size_t(sparse_buckets[free_bucket].row) << " n = " << size_t(sparse_buckets[free_bucket].next) << std::endl; + + // update column ll + if (prev == uint8_t(-1)) { + insert_to_ll_head(col, free_bucket); + // std::cerr << "Set column head to new bucket " << size_t(ll_metadata[col]) << std::endl; + } else { + insert_to_ll(free_bucket, sparse_buckets[prev]); + // std::cerr << "Placed new bucket in column " << size_t(prev) << "->" << size_t(sparse_buckets[prev].next) << "->" << size_t(sparse_buckets[free_bucket].next) << std::endl; + } + + reallocate_if_needed(1); +} + +// sample a good bucket from the sparse region if one exists. +// Additionally, specify the column to query from +SketchSample SparseSketch::sample_sparse(size_t first_col, size_t end_col) { + // std::cerr << "sample_sparse" << std::endl; + for (size_t c = first_col; c < end_col; c++) { + uint8_t idx = ll_metadata[c]; + while (idx != uint8_t(-1)) { + if (Bucket_Boruvka::is_good(sparse_buckets[idx].bkt, checksum_seed())) { + return {sparse_buckets[idx].bkt.alpha, GOOD}; + } + idx = sparse_buckets[idx].next; + } + } + + // We could not find a good bucket + // std::cout << "Sketch FAIL" << std::endl; + return {0, FAIL}; +} + + +void SparseSketch::update(const vec_t update_idx) { + vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed()); + + // Update depth 0 bucket + Bucket_Boruvka::update(deterministic_bucket(), update_idx, checksum); + + // Update higher depth buckets + for (unsigned i = 0; i < num_columns; ++i) { + col_hash_t depth = Bucket_Boruvka::get_index_depth(update_idx, column_seed(i), bkt_per_col); + likely_if(depth < bkt_per_col) { + likely_if(depth < num_dense_rows) { + Bucket_Boruvka::update(bucket(i, depth), update_idx, checksum); + } else { + update_sparse(i, {uint8_t(-1), uint8_t(depth), {update_idx, checksum}}); + } + } + } +} + +// TODO: Switch the L0_SAMPLING flag to instead affect query procedure. +// (Only use deepest bucket. We don't need the alternate update procedure in the code anymore.) + +void SparseSketch::zero_contents() { + for (size_t i = 0; i < num_buckets; i++) { + buckets[i].alpha = 0; + buckets[i].gamma = 0; + } + + // initialize sparse bucket linked lists + // every bucket is currently free, so each points to next + for (size_t i = 0; i < sparse_capacity; i++) { + sparse_buckets[i].next = i + 1; + } + sparse_buckets[sparse_capacity - 1].next = uint8_t(-1); + + // initialize LL metadata + for (size_t i = 0; i < num_columns; i++) { + ll_metadata[i] = uint8_t(-1); // head of each column points nowhere (empty) + } + ll_metadata[num_columns] = 0; // free list head + + reset_sample_state(); + number_of_sparse_buckets = 0; + // if (num_dense_rows > min_num_dense_rows + 4) + // dense_realloc(min_num_dense_rows); +} + +SketchSample SparseSketch::sample() { + if (sample_idx >= num_samples) { + throw OutOfSamplesException(seed, num_samples, sample_idx); + } + + size_t idx = sample_idx++; + size_t first_column = idx * cols_per_sample; + + // std::cout << "Sampling sketch" << std::endl; + // std::cout << "first_col = " << first_column << std::endl; + // std::cout << "end_col = " << first_column + cols_per_sample << std::endl; + // std::cout << *this << std::endl; + + if (Bucket_Boruvka::is_empty(deterministic_bucket())) { + // std::cout << "ZERO!" << std::endl; + return {0, ZERO}; // the "first" bucket is deterministic so if all zero then no edges to return + } + + if (Bucket_Boruvka::is_good(deterministic_bucket(), checksum_seed())) { + // std::cout << "Deterministic GOOD" << std::endl; + return {deterministic_bucket().alpha, GOOD}; + } + + // Sample sparse region + SketchSample sample = sample_sparse(first_column, first_column + cols_per_sample); + if (sample.result == GOOD) { + return sample; + } + + for (size_t c = 0; c < cols_per_sample; ++c) { + for (int r = num_dense_rows - 1; r >= 0; --r) { + if (Bucket_Boruvka::is_good(bucket(c + first_column, r), checksum_seed())) { + // std::cout << "Found GOOD dense bucket" << std::endl; + return {bucket(c + first_column, r).alpha, GOOD}; + } + } + } + + // Sample sparse region + // std::cout << "Sketch is bad" << std::endl; + // std::cout << *this << std::endl; + return {0, FAIL}; +} + +ExhaustiveSketchSample SparseSketch::exhaustive_sample() { + if (sample_idx >= num_samples) { + throw OutOfSamplesException(seed, num_samples, sample_idx); + } + std::vector ret; + + size_t idx = sample_idx++; + size_t first_column = idx * cols_per_sample; + + unlikely_if (Bucket_Boruvka::is_empty(deterministic_bucket())) + return {ret, ZERO}; // the "first" bucket is deterministic so if zero then no edges to return + + unlikely_if (Bucket_Boruvka::is_good(deterministic_bucket(), checksum_seed())) { + ret.push_back(deterministic_bucket().alpha); + return {ret, GOOD}; + } + + for (size_t c = 0; c < cols_per_sample; ++c) { + for (size_t r = 0; r < num_dense_rows; ++r) { + unlikely_if (Bucket_Boruvka::is_good(bucket(c + first_column, r), checksum_seed())) { + ret.push_back(bucket(c + first_column, r).alpha); + } + } + } + + // TODO: How do we do exhaustive sampling properly here? + SketchSample sample = sample_sparse(first_column, first_column + cols_per_sample); + if (sample.result == GOOD) { + ret.push_back(sample.idx); + } + + unlikely_if (ret.size() == 0) + return {ret, FAIL}; + return {ret, GOOD}; +} + +void SparseSketch::merge_sparse_column(const SparseBucket *oth_sparse_buckets, + const uint8_t *oth_ll_metadata, size_t col) { + // std::cerr << "Merging sparse column: " << col << std::endl; + uint8_t oth_idx = oth_ll_metadata[col]; + uint8_t our_idx = ll_metadata[col]; + uint8_t prev = uint8_t(-1); + + // merge column until one runs out + while (oth_idx != uint8_t(-1) && our_idx != uint8_t(-1)) { + const SparseBucket& oth_sparse = oth_sparse_buckets[oth_idx]; + SparseBucket& our_sparse = sparse_buckets[our_idx]; + + if (oth_sparse.row < num_dense_rows) { + // just merge into dense! + bucket(col, oth_sparse.row).alpha ^= oth_sparse.bkt.alpha; + bucket(col, oth_sparse.row).gamma ^= oth_sparse.bkt.gamma; + oth_idx = oth_sparse.next; + continue; + } + + if (oth_sparse.row > our_sparse.row) { + // skip our bucket, sparse doesn't have anything to match it + prev = our_idx; + our_idx = our_sparse.next; + } else if (oth_sparse.row < our_sparse.row) { + // oth has a bucket we don't have, insert it + uint8_t free_bucket = claim_free_bucket(); + // std::cerr << "ours = " << size_t(our_idx) << " free = " << size_t(free_bucket) << std::endl; + + sparse_buckets[free_bucket] = oth_sparse; + if (prev == uint8_t(-1)) { + insert_to_ll_head(col, free_bucket); + } else { + insert_to_ll(free_bucket, sparse_buckets[prev]); + } + number_of_sparse_buckets += 1; + reallocate_if_needed(1); + oth_idx = oth_sparse.next; + prev = free_bucket; + if (ll_metadata[col] == uint8_t(-1) || ll_metadata[col] == our_idx) prev = uint8_t(-1); + } else { + // they are equal, merge them! + uint8_t our_next = our_sparse.next; + uint8_t oth_next = oth_sparse.next; + bool removed = merge_sparse_bkt(our_idx, oth_sparse, prev, col); + if (removed) { + number_of_sparse_buckets -= 1; + reallocate_if_needed(-1); + } else { + prev = our_idx; + } + oth_idx = oth_next; + our_idx = our_next; + } + } + + // if there's more in the other column, merge that stuff in + while (oth_idx != uint8_t(-1)) { + const SparseBucket& oth_sparse = oth_sparse_buckets[oth_idx]; + if (oth_sparse.row < num_dense_rows) { + bucket(col, oth_sparse.row).alpha ^= oth_sparse.bkt.alpha; + bucket(col, oth_sparse.row).gamma ^= oth_sparse.bkt.gamma; + oth_idx = oth_sparse.next; + continue; + } + + uint8_t free_bucket = claim_free_bucket(); + sparse_buckets[free_bucket] = oth_sparse; + if (prev == uint8_t(-1)) { + insert_to_ll_head(col, free_bucket); + } else { + insert_to_ll(free_bucket, sparse_buckets[prev]); + } + number_of_sparse_buckets += 1; + reallocate_if_needed(1); + prev = free_bucket; + if (ll_metadata[col] == uint8_t(-1)) prev = uint8_t(-1); + oth_idx = oth_sparse.next; + } +} + +void SparseSketch::merge(const SparseSketch &other) { + // std::cerr << "PERFORMING A MERGE" << std::endl; + // std::cerr << *this << std::endl; + + // std::cerr << "MERGE SKETCH" << std::endl; + // std::cerr << other << std::endl; + + // merge the deterministic bucket + deterministic_bucket().alpha ^= other.deterministic_bucket().alpha; + deterministic_bucket().gamma ^= other.deterministic_bucket().gamma; + + // merge all dense buckets from other sketch into this one + for (size_t c = 0; c < num_columns; c++) { + for (size_t r = 0; r < other.num_dense_rows; ++r) { + if (r < num_dense_rows) { + bucket(c, r).alpha ^= other.bucket(c, r).alpha; + bucket(c, r).gamma ^= other.bucket(c, r).gamma; + } else if (!Bucket_Boruvka::is_empty(other.bucket(c, r))) { + SparseBucket sparse_bkt; + sparse_bkt.row = r; + sparse_bkt.bkt = other.bucket(c, r); + update_sparse(c, sparse_bkt); + } + } + } + + // Merge all sparse buckets from other sketch into this one + for (size_t c = 0; c < num_columns; c++) { + merge_sparse_column(other.sparse_buckets, other.ll_metadata, c); + } +} + +void SparseSketch::range_merge(const SparseSketch &other, size_t start_sample, size_t n_samples) { + if (start_sample + n_samples > num_samples) { + assert(false); + sample_idx = num_samples; // sketch is in a fail state! + return; + } + // std::cerr << "SKETCH BEFORE MERGE" << std::endl; + // std::cerr << *this << std::endl; + + // std::cerr << "SKETCH WE MERGE WITH" << std::endl; + // std::cerr << other << std::endl; + + // update sample idx to point at beginning of this range if before it + sample_idx = std::max(sample_idx, start_sample); + + // Columns we be merging + size_t start_column = start_sample * cols_per_sample; + size_t end_column = (start_sample + n_samples) * cols_per_sample; + + // merge deterministic buffer + deterministic_bucket().alpha ^= other.deterministic_bucket().alpha; + deterministic_bucket().gamma ^= other.deterministic_bucket().gamma; + + // merge all their dense buckets into us + for (size_t c = start_column; c < end_column; c++) { + for (size_t r = 0; r < other.num_dense_rows; r++) { + if (r < num_dense_rows) { + bucket(c, r).alpha ^= other.bucket(c, r).alpha; + bucket(c, r).gamma ^= other.bucket(c, r).gamma; + } else if (!Bucket_Boruvka::is_empty(other.bucket(c, r))) { + SparseBucket sparse_bkt; + sparse_bkt.row = r; + sparse_bkt.bkt = other.bucket(c, r); + update_sparse(c, sparse_bkt); + } + } + } + + // Merge all sparse buckets from other sketch into this one + for (size_t c = start_column; c < end_column; c++) { + merge_sparse_column(other.sparse_buckets, other.ll_metadata, c); + } + // std::cerr << "SKETCH AFTER MERGE" << std::endl; + // std::cerr << *this << std::endl; +} + +void SparseSketch::merge_raw_bucket_buffer(const Bucket *raw_buckets, size_t n_raw_buckets) { + size_t raw_rows = (n_raw_buckets - sparse_data_size - ll_metadata_size - 1) / num_columns; + const SparseBucket *raw_sparse = (const SparseBucket *) &raw_buckets[calc_sparse_index(raw_rows)]; + const uint8_t *raw_metadata = (const uint8_t *) &raw_buckets[calc_metadata_index(raw_rows)]; + + deterministic_bucket().alpha ^= raw_buckets[0].alpha; + deterministic_bucket().gamma ^= raw_buckets[0].gamma; + + for (size_t c = 0; c < num_columns; c++) { + for (size_t r = 0; r < raw_rows; r++) { + if (r < num_dense_rows) { + bucket(c, r).alpha ^= raw_buckets[position_func(c, r, raw_rows)].alpha; + bucket(c, r).gamma ^= raw_buckets[position_func(c, r, raw_rows)].gamma; + } else if (!Bucket_Boruvka::is_empty( + raw_buckets[position_func(c, r, raw_rows)])) { + SparseBucket sparse_bkt; + sparse_bkt.row = r; + sparse_bkt.bkt = raw_buckets[position_func(c, r, raw_rows)]; + update_sparse(c, sparse_bkt); + } + } + } + + // Merge all sparse buckets from other sketch into this one + for (size_t c = 0; c < num_columns; c++) { + merge_sparse_column(raw_sparse, raw_metadata, c); + } +} + +void SparseSketch::serialize(std::ostream &binary_out) const { + binary_out.write((char*) buckets, bucket_array_bytes()); +} + +bool operator==(const SparseSketch &sketch1, const SparseSketch &sketch2) { + if (sketch1.num_buckets != sketch2.num_buckets || sketch1.seed != sketch2.seed) + return false; + + return memcmp(sketch1.buckets, sketch2.buckets, + sketch1.bucket_array_bytes() - sketch1.ll_metadata_size * sizeof(Bucket)) == 0; +} + +std::ostream &operator<<(std::ostream &os, const SparseSketch &sketch) { + Bucket bkt = sketch.deterministic_bucket(); + bool good = Bucket_Boruvka::is_good(bkt, sketch.checksum_seed()); + vec_t a = bkt.alpha; + vec_hash_t c = bkt.gamma; + + os << " a:" << a << " c:" << c << (good ? " good" : " bad") << std::endl; + + os << "Number of dense rows = " << sketch.num_dense_rows << std::endl; + for (unsigned i = 0; i < sketch.num_columns; ++i) { + for (unsigned j = 0; j < sketch.num_dense_rows; ++j) { + Bucket bkt = sketch.bucket(i, j); + vec_t a = bkt.alpha; + vec_hash_t c = bkt.gamma; + bool good = Bucket_Boruvka::is_good(bkt, sketch.checksum_seed()); + + os << " a:" << a << " c:" << c << (good ? " good" : " bad") << std::endl; + } + os << std::endl; + } + + os << "Sparse Buckets" << std::endl; + const auto sparse_buckets = sketch.sparse_buckets; + for (size_t c = 0; c < sketch.num_columns; c++) { + uint8_t idx = sketch.ll_metadata[c]; + while (idx != uint8_t(-1)) { + bool good = Bucket_Boruvka::is_good(sparse_buckets[idx].bkt, sketch.checksum_seed()); + os << "i: " << size_t(idx) << " n: " << size_t(sparse_buckets[idx].next) << " p:" << c << ", " + << size_t(sparse_buckets[idx].row) << " := a:" << sparse_buckets[idx].bkt.alpha + << " c:" << sparse_buckets[idx].bkt.gamma << (good ? " good" : " bad") << std::endl; + if (idx == sketch.sparse_buckets[idx].next) { + os << "LL error!" << std::endl; + return os; + } + idx = sketch.sparse_buckets[idx].next; + } + } + os << "Free Buckets" << std::endl; + uint8_t idx = sketch.ll_metadata[sketch.num_columns]; + while (idx != uint8_t(-1)) { + bool good = Bucket_Boruvka::is_good(sparse_buckets[idx].bkt, sketch.checksum_seed()); + os << "i: " << size_t(idx) << " n: " << size_t(sparse_buckets[idx].next) << " r:" + << size_t(sparse_buckets[idx].row) << " := a:" << sparse_buckets[idx].bkt.alpha + << " c:" << sparse_buckets[idx].bkt.gamma << (good ? " good" : " bad") << std::endl; + if (idx == sketch.sparse_buckets[idx].next) { + os << "LL error!" << std::endl; + return os; + } + idx = sketch.sparse_buckets[idx].next; + } + + os << std::endl; + return os; +} diff --git a/test/sketch_test.cpp b/test/sketch_test.cpp index e35f4aa2..efe223b4 100644 --- a/test/sketch_test.cpp +++ b/test/sketch_test.cpp @@ -102,7 +102,6 @@ void test_sketch_sample(unsigned long num_sketches, SampleResult ret_code = query_ret.result; if (ret_code == GOOD) { - //Multiple queries shouldn't happen, but if we do get here fail test ASSERT_LT(res_idx, vec_size) << "Sampled index out of bounds"; if (!test_vec.get_entry(res_idx)) { //Undetected sample error @@ -168,6 +167,7 @@ void test_sketch_merge(unsigned long num_sketches, sketch2.update(test_vec2.get_update(j)); } sketch1.merge(sketch2); + Sketch backup(sketch1); try { SketchSample query_ret = sketch1.sample(); vec_t res_idx = query_ret.idx; @@ -177,6 +177,11 @@ void test_sketch_merge(unsigned long num_sketches, ASSERT_LT(res_idx, vec_size) << "Sampled index out of bounds"; if (test_vec1.get_entry(res_idx) == test_vec2.get_entry(res_idx)) { sample_incorrect_failures++; + std::cerr << "GOT A SAMPLE INCORRECT ERROR!" << std::endl; + std::cerr << "Got: " << res_idx << std::endl; + std::cerr << sketch1 << std::endl; + std::cerr << backup << std::endl; + std::cerr << sketch2 << std::endl; } } else if (ret_code == ZERO) { @@ -189,6 +194,7 @@ void test_sketch_merge(unsigned long num_sketches, } if (!vec_zero) { sample_incorrect_failures++; + std::cout << "GOT INCORRECT ZERO!" << std::endl; } } else { // sketch failed @@ -209,26 +215,57 @@ void test_sketch_merge(unsigned long num_sketches, } TEST(SketchTestSuite, TestSketchMerge) { - test_sketch_merge(10000, 1e2, 100, 0.001, 0.03); - test_sketch_merge(1000, 1e3, 1000, 0.001, 0.03); - test_sketch_merge(1000, 1e4, 10000, 0.001, 0.03); + test_sketch_merge(10000, 1e2, 100, 0, 0.03); + test_sketch_merge(1000, 1e3, 1000, 0, 0.03); + test_sketch_merge(1000, 1e4, 10000, 0, 0.03); } TEST(SketchTestSuite, TestSketchRangeMerge) { - Sketch skt1(2048, get_seed(), 10, 3); - Sketch skt2(2048, get_seed(), 10, 3); + size_t seed = get_seed(); + Sketch skt1(2048, seed, 10, 3); + Sketch skt2(2048, seed, 10, 3); + Sketch temp_skt(2048, seed, 10, 3); + + for (vec_t i = 0; i < 1024; i++) { + skt1.update(i); + skt2.update(i + 256); + } + // allowed return values after merging are [0, 255] and [1024, 1279] + vec_t good_1 = 255; + vec_t good_2 = 1024; + vec_t good_3 = good_2 + 255; - skt1.sample(); - skt1.range_merge(skt2, 1, 1); + temp_skt.merge(skt1); - skt1.sample(); + skt1.range_merge(skt2, 0, 1); + SketchSample sample = skt1.sample(); + if (sample.result == GOOD) { + ASSERT_TRUE(sample.idx <= good_1 || (sample.idx >= good_2 && sample.idx <= good_3)); + } + skt1.zero_contents(); + skt1.merge(temp_skt); + + skt1.range_merge(skt2, 1, 1); + sample = skt1.sample(); + if (sample.result == GOOD) { + ASSERT_TRUE(sample.idx <= good_1 || (sample.idx >= good_2 && sample.idx <= good_3)); + } + skt1.zero_contents(); + skt1.merge(temp_skt); + skt1.range_merge(skt2, 2, 1); - - skt1.sample(); + sample = skt1.sample(); + if (sample.result == GOOD) { + ASSERT_TRUE(sample.idx <= good_1 || (sample.idx >= good_2 && sample.idx <= good_3)); + } + skt1.zero_contents(); + skt1.merge(temp_skt); + skt1.range_merge(skt2, 3, 1); - - skt1.sample(); - skt1.range_merge(skt2, 4, 1); + sample = skt1.sample(); + if (sample.result == GOOD) { + ASSERT_TRUE(sample.idx <= good_1 || (sample.idx >= good_2 && sample.idx <= good_3)); + } } /** @@ -308,7 +345,7 @@ TEST(SketchTestSuite, TestSerialization) { file.close(); auto in_file = std::fstream("./out_sketch.txt", std::ios::in | std::ios::binary); - Sketch reheated(vec_size, seed, in_file, 3, num_columns); + Sketch reheated(vec_size, seed, in_file, sketch.get_buckets(), 3, num_columns); ASSERT_EQ(sketch, reheated); } @@ -348,16 +385,11 @@ TEST(SketchTestSuite, TestExhaustiveQuery) { ASSERT_EQ(query_ret.idxs.size(), 0) << query_ret.result; } - // assert everything returned is valid and <= 10 things - ASSERT_LE(query_ret.idxs.size(), 10); + // assert everything returned is valid for (vec_t non_zero : query_ret.idxs) { ASSERT_GT(non_zero, 0); ASSERT_LE(non_zero, 10); } - - // assert everything returned is unique - std::set unique_elms(query_ret.idxs.begin(), query_ret.idxs.end()); - ASSERT_EQ(unique_elms.size(), query_ret.idxs.size()); } } @@ -433,7 +465,7 @@ TEST(SketchTestSuite, TestRawBucketUpdate) { const Bucket *data = sk1.get_readonly_bucket_ptr(); - sk2.merge_raw_bucket_buffer(data); + sk2.merge_raw_bucket_buffer(data, sk1.get_buckets()); SketchSample sample = sk2.sample(); @@ -446,7 +478,7 @@ TEST(SketchTestSuite, TestRawBucketUpdate) { Bucket *copy_data = new Bucket[sk1.get_buckets()]; memcpy(copy_data, data, sk1.bucket_array_bytes()); - sk2.merge_raw_bucket_buffer(copy_data); + sk2.merge_raw_bucket_buffer(copy_data, sk1.get_buckets()); sk2.reset_sample_state(); sample = sk2.sample(); diff --git a/test/util/graph_verifier.cpp b/test/util/graph_verifier.cpp index 4d35ec1d..3eb7a527 100644 --- a/test/util/graph_verifier.cpp +++ b/test/util/graph_verifier.cpp @@ -93,6 +93,7 @@ void GraphVerifier::verify_connected_components(const ConnectedComponents &cc) { // first check that the number of components is the same for both if (kruskal_ccs != cc.size()) { + std::cout << "expect: " << kruskal_ccs << ", got = " << cc.size() << std::endl; throw IncorrectCCException("Incorrect number of components!"); } diff --git a/tools/process_stream.cpp b/tools/process_stream.cpp index 53c49586..ce129112 100644 --- a/tools/process_stream.cpp +++ b/tools/process_stream.cpp @@ -85,6 +85,7 @@ int main(int argc, char **argv) { std::cout << std::endl; auto driver_config = DriverConfiguration().gutter_sys(CACHETREE).worker_threads(num_threads); + driver_config.gutter_conf().wq_batch_per_elm(4); auto cc_config = CCAlgConfiguration().batch_factor(1); CCSketchAlg cc_alg{num_nodes, get_seed(), cc_config}; GraphSketchDriver driver{&cc_alg, &stream, driver_config, reader_threads};