From 2fd0316df03c5f388b2035cb124161cf771b4ffa Mon Sep 17 00:00:00 2001 From: Diretnan Domnan Date: Fri, 7 Jun 2024 01:33:36 +0200 Subject: [PATCH] Switched to some iterators in PredicateIndices for #26 --- ahnlich/db/src/algorithm/heap.rs | 8 ++++---- ahnlich/db/src/engine/predicate.rs | 19 +++++++------------ ahnlich/db/src/engine/store.rs | 8 ++++---- ahnlich/db/src/lib.rs | 2 +- 4 files changed, 16 insertions(+), 21 deletions(-) diff --git a/ahnlich/db/src/algorithm/heap.rs b/ahnlich/db/src/algorithm/heap.rs index 653f621a..2a864caf 100644 --- a/ahnlich/db/src/algorithm/heap.rs +++ b/ahnlich/db/src/algorithm/heap.rs @@ -28,11 +28,11 @@ impl<'a> MinHeap<'a> { } pub(crate) fn output(&mut self) -> Vec<(&'a StoreKey, f64)> { - let mut result: Vec<_> = vec![]; + let mut result: Vec<_> = Vec::with_capacity(self.max_capacity.get()); loop { match self.pop() { - Some(value) if result.len() < self.max_capacity.into() => { + Some(value) if result.len() < self.max_capacity.get() => { let vector_sim = value.0; result.push((vector_sim.0, vector_sim.1)); } @@ -66,11 +66,11 @@ impl<'a> MaxHeap<'a> { } fn output(&mut self) -> Vec<(&'a StoreKey, f64)> { - let mut result: Vec<_> = vec![]; + let mut result: Vec<_> = Vec::with_capacity(self.max_capacity.get()); loop { match self.heap.pop() { - Some(value) if result.len() < self.max_capacity.into() => { + Some(value) if result.len() < self.max_capacity.get() => { let vector_sim = value.0; result.push((vector_sim.0, vector_sim.1)); } diff --git a/ahnlich/db/src/engine/predicate.rs b/ahnlich/db/src/engine/predicate.rs index 0741c156..006adcc1 100644 --- a/ahnlich/db/src/engine/predicate.rs +++ b/ahnlich/db/src/engine/predicate.rs @@ -81,11 +81,9 @@ impl PredicateIndices { predicates .iter() .filter(|a| !pinned_keys.contains(a)) - .cloned() - .collect::>() - .pop(), + .next(), ) { - return Err(ServerError::PredicateNotFound(non_existing_index)); + return Err(ServerError::PredicateNotFound(non_existing_index.clone())); } let mut deleted = 0; for predicate in predicates { @@ -111,13 +109,13 @@ impl PredicateIndices { let pinned_inner = self.inner.pin(); // `insert` implicity adds it to allowed_predicates which is what lets us to be able to // search again - let new_predicates: StdHashSet<_> = predicates + let mut new_predicates = predicates .into_iter() .filter(|pred| pinned_keys.insert(pred.clone())) .unique() - .collect(); + .peekable(); // Only update for new predicates - if let Some(new_values) = (!new_predicates.is_empty()) + if let Some(new_values) = (new_predicates.peek().is_some()) .then_some(refresh_with_values) .flatten() { @@ -132,9 +130,7 @@ impl PredicateIndices { }) .collect::>(); let pred = PredicateIndex::init(val.clone()); - if let Err(existing_predicate) = - pinned_inner.try_insert(new_predicate.clone(), pred) - { + if let Err(existing_predicate) = pinned_inner.try_insert(new_predicate, pred) { existing_predicate.current.add(val) } } @@ -252,8 +248,7 @@ impl PredicateIndex { // was not previously there as it has been inserted on a different thread let new_hashset = ConcurrentHashSet::new(); new_hashset.insert(store_key_id.clone(), &new_hashset.guard()); - if let Err(error_current) = pinned.try_insert(predicate_value.clone(), new_hashset) - { + if let Err(error_current) = pinned.try_insert(predicate_value, new_hashset) { error_current .current .insert(store_key_id, &error_current.current.guard()); diff --git a/ahnlich/db/src/engine/store.rs b/ahnlich/db/src/engine/store.rs index 96bf109a..3ff256c9 100644 --- a/ahnlich/db/src/engine/store.rs +++ b/ahnlich/db/src/engine/store.rs @@ -94,7 +94,7 @@ impl StoreHandler { predicates: Vec, ) -> Result { let store = self.get(store_name)?; - Ok(store.create_index(predicates.into_iter().collect())) + Ok(store.create_index(predicates)) } /// Matches DELKEY - removes keys from a store @@ -164,7 +164,7 @@ impl StoreHandler { .flat_map(|(store_key, similarity)| { keys_to_value_map .remove(&StoreKeyId::from(store_key)) - .map(|value| (store_key.clone(), value.clone(), Similarity(similarity))) + .map(|value| (store_key.to_owned(), value.clone(), Similarity(similarity))) }) .collect()) } @@ -422,9 +422,9 @@ impl Store { } #[tracing::instrument(skip(self))] - fn create_index(&self, requested_predicates: StdHashSet) -> usize { + fn create_index(&self, requested_predicates: Vec) -> usize { let current_predicates = self.predicate_indices.current_predicates(); - let new_predicates: Vec<_> = requested_predicates + let new_predicates: Vec<_> = StdHashSet::from_iter(requested_predicates) .difference(¤t_predicates) .cloned() .collect(); diff --git a/ahnlich/db/src/lib.rs b/ahnlich/db/src/lib.rs index 60318d51..fae20856 100644 --- a/ahnlich/db/src/lib.rs +++ b/ahnlich/db/src/lib.rs @@ -201,7 +201,7 @@ impl ServerTask { let mut data = Vec::new(); if data.try_reserve(data_length as usize).is_err() { tracing::error!("{}", self.prefix_log(format!("Failed to reserve buffer of length {data_length}"))); - continue + break }; data.resize(data_length as usize, 0u8); self.reader.read_exact(&mut data).await?;