Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bloom-filter): add memory control for creator #5185

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license.workspace = true
workspace = true

[dependencies]
async-stream.workspace = true
async-trait.workspace = true
asynchronous-codec = "0.7.0"
bytemuck.workspace = true
Expand Down
171 changes: 75 additions & 96 deletions src/index/src/bloom_filter/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod finalize_segment;
mod intermediate_codec;

use std::collections::HashSet;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use fastbloom::BloomFilter;
use futures::{AsyncWrite, AsyncWriteExt};
use finalize_segment::FinalizedBloomFilterStorage;
use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
use snafu::ResultExt;

use super::error::{IoSnafu, SerdeJsonSnafu};
use crate::bloom_filter::error::Result;
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes};
use crate::external_provider::ExternalTempFileProvider;

/// The seed used for the Bloom filter.
const SEED: u128 = 42;
pub const SEED: u128 = 42;

/// The false positive rate of the Bloom filter.
const FALSE_POSITIVE_RATE: f64 = 0.01;
pub const FALSE_POSITIVE_RATE: f64 = 0.01;

/// `BloomFilterCreator` is responsible for creating and managing bloom filters
/// for a set of elements. It divides the rows into segments and creates
Expand Down Expand Up @@ -58,6 +64,9 @@ pub struct BloomFilterCreator {

/// Storage for finalized Bloom filters.
finalized_bloom_filters: FinalizedBloomFilterStorage,

/// Global memory usage of the bloom filter creator.
global_memory_usage: Arc<AtomicUsize>,
}

impl BloomFilterCreator {
Expand All @@ -66,7 +75,12 @@ impl BloomFilterCreator {
/// # PANICS
///
/// `rows_per_segment` <= 0
pub fn new(rows_per_segment: usize) -> Self {
pub fn new(
rows_per_segment: usize,
intermediate_provider: Box<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
assert!(
rows_per_segment > 0,
"rows_per_segment must be greater than 0"
Expand All @@ -77,54 +91,67 @@ impl BloomFilterCreator {
accumulated_row_count: 0,
cur_seg_distinct_elems: HashSet::default(),
cur_seg_distinct_elems_mem_usage: 0,
finalized_bloom_filters: FinalizedBloomFilterStorage::default(),
global_memory_usage: global_memory_usage.clone(),
finalized_bloom_filters: FinalizedBloomFilterStorage::new(
intermediate_provider,
global_memory_usage,
global_memory_usage_threshold,
),
}
}

/// Adds a row of elements to the bloom filter. If the number of accumulated rows
/// reaches `rows_per_segment`, it finalizes the current segment.
pub fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) {
pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
self.accumulated_row_count += 1;

let mut mem_diff = 0;
for elem in elems.into_iter() {
let len = elem.len();
let is_new = self.cur_seg_distinct_elems.insert(elem);
if is_new {
self.cur_seg_distinct_elems_mem_usage += len;
mem_diff += len;
}
}
self.cur_seg_distinct_elems_mem_usage += mem_diff;
self.global_memory_usage
.fetch_add(mem_diff, Ordering::Relaxed);

if self.accumulated_row_count % self.rows_per_segment == 0 {
self.finalize_segment();
self.finalize_segment().await?;
}

Ok(())
}

/// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
if !self.cur_seg_distinct_elems.is_empty() {
self.finalize_segment();
self.finalize_segment().await?;
}

let mut meta = BloomFilterMeta {
rows_per_segment: self.rows_per_segment,
seg_count: self.finalized_bloom_filters.len(),
row_count: self.accumulated_row_count,
..Default::default()
};

let mut buf = Vec::new();
for segment in self.finalized_bloom_filters.drain() {
let slice = segment.bloom_filter.as_slice();
buf.clear();
write_u64_slice(&mut buf, slice);
writer.write_all(&buf).await.context(IoSnafu)?;
let mut segs = self.finalized_bloom_filters.drain().await?;
while let Some(segment) = segs.next().await {
let segment = segment?;
writer
.write_all(&segment.bloom_filter_bytes)
.await
.context(IoSnafu)?;

let size = buf.len();
let size = segment.bloom_filter_bytes.len();
meta.bloom_filter_segments.push(BloomFilterSegmentLocation {
offset: meta.bloom_filter_segments_size as _,
size: size as _,
elem_count: segment.element_count,
});
meta.bloom_filter_segments_size += size;
meta.seg_count += 1;
}

let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?;
Expand All @@ -145,91 +172,29 @@ impl BloomFilterCreator {
self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
}

fn finalize_segment(&mut self) {
async fn finalize_segment(&mut self) -> Result<()> {
let elem_count = self.cur_seg_distinct_elems.len();
self.finalized_bloom_filters
.add(self.cur_seg_distinct_elems.drain(), elem_count);
self.cur_seg_distinct_elems_mem_usage = 0;
}
}

/// Storage for finalized Bloom filters.
///
/// TODO(zhongzc): Add support for storing intermediate bloom filters on disk to control memory usage.
#[derive(Debug, Default)]
struct FinalizedBloomFilterStorage {
/// Bloom filters that are stored in memory.
in_memory: Vec<FinalizedBloomFilterSegment>,
}

impl FinalizedBloomFilterStorage {
fn memory_usage(&self) -> usize {
self.in_memory.iter().map(|s| s.size).sum()
}

/// Adds a new finalized Bloom filter to the storage.
///
/// TODO(zhongzc): Add support for flushing to disk.
fn add(&mut self, elems: impl IntoIterator<Item = Bytes>, elem_count: usize) {
let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
.seed(&SEED)
.expected_items(elem_count);
for elem in elems.into_iter() {
bf.insert(&elem);
}
.add(self.cur_seg_distinct_elems.drain(), elem_count)
.await?;

let cbf = FinalizedBloomFilterSegment::new(bf, elem_count);
self.in_memory.push(cbf);
}

fn len(&self) -> usize {
self.in_memory.len()
}

fn drain(&mut self) -> impl Iterator<Item = FinalizedBloomFilterSegment> + '_ {
self.in_memory.drain(..)
}
}

/// A finalized Bloom filter segment.
#[derive(Debug)]
struct FinalizedBloomFilterSegment {
/// The underlying Bloom filter.
bloom_filter: BloomFilter,

/// The number of elements in the Bloom filter.
element_count: usize,

/// The occupied memory size of the Bloom filter.
size: usize,
}

impl FinalizedBloomFilterSegment {
fn new(bloom_filter: BloomFilter, elem_count: usize) -> Self {
let memory_usage = std::mem::size_of_val(bloom_filter.as_slice());
Self {
bloom_filter,
element_count: elem_count,
size: memory_usage,
}
}
}

/// Writes a slice of `u64` to the buffer in little-endian order.
fn write_u64_slice(buf: &mut Vec<u8>, slice: &[u64]) {
buf.reserve(std::mem::size_of_val(slice));
for &x in slice {
buf.extend_from_slice(&x.to_le_bytes());
self.global_memory_usage
.fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
self.cur_seg_distinct_elems_mem_usage = 0;
Ok(())
}
}

#[cfg(test)]
mod tests {
use fastbloom::BloomFilter;
use futures::io::Cursor;

use super::*;
use crate::external_provider::MockExternalTempFileProvider;

fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
/// Converts a slice of bytes to a vector of `u64`.
pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
bytes
.chunks_exact(std::mem::size_of::<u64>())
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
Expand All @@ -239,18 +204,32 @@ mod tests {
#[tokio::test]
async fn test_bloom_filter_creator() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(2);
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);

creator.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()]);
creator
.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
.await
.unwrap();
assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
assert!(creator.memory_usage() > 0);

creator.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()]);
creator
.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
.await
.unwrap();
// Finalize the first segment
assert!(creator.cur_seg_distinct_elems_mem_usage == 0);
assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
assert!(creator.memory_usage() > 0);

creator.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()]);
creator
.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
.await
.unwrap();
assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
assert!(creator.memory_usage() > 0);

Expand Down
Loading
Loading