Skip to content

Eager Empty Bucket Checking #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ set(CMAKE_CXX_EXTENSIONS ON)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib)
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)



# Make the default build type Release. If user or another
# project sets a different value than use that
if(NOT CMAKE_BUILD_TYPE)
Expand All @@ -28,6 +30,12 @@ else()
message(STATUS "${CMAKE_CXX_COMPILER_ID} not recognized, no flags added")
endif()

include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-march=native" COMPILER_SUPPORTS_MARCH_NATIVE)
if(COMPILER_SUPPORTS_MARCH_NATIVE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=native")
endif()

#add_compile_options(-fsanitize=address)
#add_link_options(-fsanitize=address)
#add_compile_options(-fsanitize=undefined)
Expand Down
27 changes: 24 additions & 3 deletions include/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,26 @@
struct Bucket {
vec_t alpha;
vec_hash_t gamma;
Bucket operator^(const Bucket &rhs) {
return {alpha ^= rhs.alpha,
gamma ^= rhs.gamma};
};
void operator^=(const Bucket &rhs) {
alpha ^= rhs.alpha;
gamma ^= rhs.gamma;
};
};
#pragma pack(pop)

namespace Bucket_Boruvka {
static constexpr size_t col_hash_bits = sizeof(col_hash_t) * 8;

/**
* Returns whether or not a bucket is empty.
* @param bucket Bucket to check for empty.
* @return With high probability, return whether or not a given bucket is empty.
*/
inline static bool is_empty(const Bucket &bucket);
/**
* Hashes the column index and the update index together to determine the depth of an update
* This is used as a parameter to Bucket::contains.
Expand All @@ -33,6 +48,7 @@ namespace Bucket_Boruvka {
*/
inline static vec_hash_t get_index_hash(const vec_t index, const long sketch_seed);


/**
* Checks whether a Bucket is good, assuming the Bucket contains all elements.
* @param bucket The bucket to check
Expand All @@ -51,19 +67,24 @@ namespace Bucket_Boruvka {
const vec_hash_t update_hash);
} // namespace Bucket_Boruvka

inline bool Bucket_Boruvka::is_empty(const Bucket &bucket) {
return (bucket.alpha | bucket.gamma) == 0;
}

inline col_hash_t Bucket_Boruvka::get_index_depth(const vec_t update_idx, const long seed_and_col,
const vec_hash_t max_depth) {
col_hash_t depth_hash = col_hash(&update_idx, sizeof(vec_t), seed_and_col);
col_hash_t depth_hash = XXH3_128bits_withSeed(&update_idx, sizeof(vec_t), seed_and_col).high64;
depth_hash |= (1ull << max_depth); // assert not > max_depth by ORing
return __builtin_ctzll(depth_hash);
}

inline vec_hash_t Bucket_Boruvka::get_index_hash(const vec_t update_idx, const long sketch_seed) {
return vec_hash(&update_idx, sizeof(vec_t), sketch_seed);
return (XXH3_128bits_withSeed (&update_idx, sizeof(vec_t), sketch_seed)).low64;
}


inline bool Bucket_Boruvka::is_good(const Bucket &bucket, const long sketch_seed) {
return bucket.gamma == get_index_hash(bucket.alpha, sketch_seed);
return !Bucket_Boruvka::is_empty(bucket) && bucket.gamma == get_index_hash(bucket.alpha, sketch_seed);
}

inline void Bucket_Boruvka::update(Bucket& bucket, const vec_t update_idx,
Expand Down
5 changes: 4 additions & 1 deletion include/cc_sketch_alg.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ enum QueryCode {
* (no self-edges or multi-edges)
*/
class CCSketchAlg {
public:
Sketch **sketches;

private:
node_id_t num_vertices;
size_t seed;
bool update_locked = false;
// a set containing one "representative" from each supernode
std::set<node_id_t> *representatives;
Sketch **sketches;
// Sketch **sketches;
// DSU representation of supernode relationship
DisjointSetUnion_MT<node_id_t> dsu;

Expand Down
97 changes: 94 additions & 3 deletions include/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,39 @@ struct ExhaustiveSketchSample {
* Sub-linear representation of a vector.
*/
class Sketch {
public:
size_t num_columns; // Total number of columns. (product of above 2)
size_t bkt_per_col; // number of buckets per column
private:
const uint64_t seed; // seed for hash functions
size_t num_samples; // number of samples we can perform
size_t cols_per_sample; // number of columns to use on each sample
size_t num_columns; // Total number of columns. (product of above 2)
size_t bkt_per_col; // number of buckets per column
// size_t num_columns; // Total number of columns. (product of above 2)
// size_t bkt_per_col; // number of buckets per column
size_t num_buckets; // number of total buckets (product of above 2)

size_t sample_idx = 0; // number of samples performed so far

// bucket data
Bucket* buckets;

// flags

#ifdef EAGER_BUCKET_CHECK
vec_t *nonempty_buckets;
/**
* Updates the nonempty flags in a given range by recalculating the is_empty() call.
* @param col_idx The column to update
* @param start_row The depth of the first bucket in the column to check the emptyness of.
* @param end_row The depth of the first bucket not to check the emptyness (i.e., an exclusive bound)
*/
void recalculate_flags(size_t col_idx, size_t start_row, size_t end_row);
#endif
private:
inline Bucket& get_deterministic_bucket() const {
return buckets[num_buckets - 1];
}

public:
/**
* The below constructors use vector length as their input. However, in graph sketching our input
Expand Down Expand Up @@ -85,6 +105,19 @@ class Sketch {
Sketch(vec_t vector_len, uint64_t seed, size_t num_samples = 1,
size_t cols_per_sample = default_cols_per_sample);


/**
* Construct a sketch from a (potentially compressed) serialized stream
* @param vector_len Length of the vector we are sketching
* @param seed Random seed of the sketch
* @param binary_in Stream holding serialized sketch object
* @param num_samples [Optional] Number of samples this sketch supports (default = 1)
* @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1)
* @param compressed Whether or not to use the compression (default = true)
*/
Sketch(vec_t vector_len, uint64_t seed, bool compressed, std::istream& binary_in, size_t num_samples = 1,
size_t cols_per_sample = default_cols_per_sample);

/**
* Construct a sketch from a serialized stream
* @param vector_len Length of the vector we are sketching
Expand All @@ -104,12 +137,42 @@ class Sketch {

~Sketch();

/**
* Get the bucket at a specific column and depth
*/
inline Bucket& get_bucket(size_t col_idx, size_t depth) const {
#ifdef ROW_MAJOR_SKETCHES
// contiguous by bucket depth
return buckets[depth * num_columns + col_idx];
#else
// contiguous by column
return buckets[col_idx * bkt_per_col + depth];
#endif
}

/**
* Occupies the contents of an empty sketch with input from a stream that contains
* the compressed version.
* @param binary_in Stream holding serialized/compressed sketch object.
*/
void compressed_deserialize(std::istream& binary_in);


/**
* Update a sketch based on information about one of its indices.
* @param update the point update.
*/
void update(const vec_t update);


#ifdef EAGER_BUCKET_CHECK
/**
* TODO - make this less silly
*/

void unsafe_update();
#endif

/**
* Function to sample from the sketch.
* cols_per_sample determines the number of columns we allocate to this query
Expand All @@ -125,6 +188,21 @@ class Sketch {

std::mutex mutex; // lock the sketch for applying updates in multithreaded processing


/**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line up * and provide a little more documentation. Specify that this operates per column and that all non-empty buckets are above this cutoff.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't think of one

* Gives the cutoff index such that all non-empty buckets are strictly above.
* @param col_idx The column to find the cutoff index of.
* @return The depth of the non-zero'th bucket + 1. If the bucket is entirely empty, returns 0
*/
uint8_t effective_size(size_t col_idx) const;


/**
* Gives the cutoff index such that all non-empty buckets are strictly above for ALL columns
* @return Depth of the deepest non-zero'th bucket + 1. 0 if all buckets are empty.
*/
uint8_t effective_depth() const;

/**
* In-place merge function.
* @param other Sketch to merge into caller
Expand Down Expand Up @@ -163,12 +241,25 @@ class Sketch {
*/
void serialize(std::ostream& binary_out) const;

/**
* Serialize the sketch to a binary output stream, with a compressed representation.
* takes significantly less space for mostly-empty sketches.
* @param binary_out the stream to write to.
*/
void compressed_serialize(std::ostream& binary_out) const;

inline void reset_sample_state() {
sample_idx = 0;
}

// return the size of the sketching datastructure in bytes (just the buckets, not the metadata)
inline size_t bucket_array_bytes() const { return num_buckets * sizeof(Bucket); }
inline size_t bucket_array_bytes() const {
#ifdef EAGER_BUCKET_CHECK
return (num_buckets * sizeof(Bucket)) + (num_columns * sizeof(vec_t));
#else
return num_buckets * sizeof(Bucket);
#endif
}

inline const Bucket* get_readonly_bucket_ptr() const { return (const Bucket*) buckets; }
inline uint64_t get_seed() const { return seed; }
Expand Down
8 changes: 8 additions & 0 deletions src/cc_sketch_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,15 @@ void CCSketchAlg::apply_update_batch(int thr_id, node_id_t src_vertex,

for (const auto &dst : dst_vertices) {
delta_sketch.update(static_cast<vec_t>(concat_pairing_fn(src_vertex, dst)));
#ifdef EAGER_BUCKET_CHECK
delta_sketch.unsafe_update(static_cast<vec_t>(concat_pairing_fn(src_vertex, dst)));
}
for (size_t i = 0; i < delta_sketch.num_columns; i++) {
delta_sketch.recalculate_flags(i, 0, delta_sketch.bkt_per_col);
}
#else // EAGER_BUCKET_CHECK
}
#endif

std::lock_guard<std::mutex> lk(sketches[src_vertex]->mutex);
sketches[src_vertex]->merge(delta_sketch);
Expand Down
Loading
Loading