Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 31e7c61

Browse files
authoredFeb 15, 2023
Ranged HAMT iteration specified via keys (filecoin-project#1665)
1 parent c80ad6c commit 31e7c61

File tree

4 files changed

+346
-0
lines changed

4 files changed

+346
-0
lines changed
 

‎.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ target
44
!testing/integration/tests/assets/*
55
testing/conformance/traces
66
.idea/
7+
.vscode/
78
lcov.info

‎ipld/hamt/src/hamt.rs

+69
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use multihash::Code;
1313
use serde::de::DeserializeOwned;
1414
use serde::{Serialize, Serializer};
1515

16+
use crate::hash_bits::HashBits;
1617
use crate::node::Node;
1718
use crate::{Config, Error, Hash, HashAlgorithm, Sha256};
1819

@@ -362,6 +363,74 @@ where
362363
self.root.for_each(self.store.borrow(), &mut f)
363364
}
364365

366+
/// Iterates over each KV in the Hamt and runs a function on the values. If starting key is
367+
/// provided, iteration will start from that key. If max is provided, iteration will stop after
368+
/// max number of items have been traversed. The number of items that were traversed is
369+
/// returned. If there are more items in the Hamt after max items have been traversed, the key
370+
/// of the next item will be returned.
371+
///
372+
/// This function will constrain all values to be of the same type
373+
///
374+
/// # Examples
375+
///
376+
/// ```
377+
/// use fvm_ipld_hamt::Hamt;
378+
///
379+
/// let store = fvm_ipld_blockstore::MemoryBlockstore::default();
380+
///
381+
/// let mut map: Hamt<_, _, u64> = Hamt::new(store);
382+
/// map.set(1, 1).unwrap();
383+
/// map.set(2, 2).unwrap();
384+
/// map.set(3, 3).unwrap();
385+
/// map.set(4, 4).unwrap();
386+
///
387+
/// let mut numbers = vec![];
388+
///
389+
/// map.for_each_ranged(None, None, |_, v: &u64| {
390+
/// numbers.push(*v);
391+
/// Ok(())
392+
/// }).unwrap();
393+
///
394+
/// let mut subset = vec![];
395+
///
396+
/// let (_, next_key) = map.for_each_ranged(Some(&numbers[0]), Some(2), |_, v: &u64| {
397+
/// subset.push(*v);
398+
/// Ok(())
399+
/// }).unwrap();
400+
///
401+
/// assert_eq!(subset, numbers[..2]);
402+
/// assert_eq!(next_key.unwrap(), numbers[2]);
403+
/// ```
404+
#[inline]
405+
pub fn for_each_ranged<Q: ?Sized, F>(
406+
&self,
407+
starting_key: Option<&Q>,
408+
max: Option<usize>,
409+
mut f: F,
410+
) -> Result<(usize, Option<K>), Error>
411+
where
412+
K: Borrow<Q> + Clone,
413+
Q: Eq + Hash,
414+
V: DeserializeOwned,
415+
F: FnMut(&K, &V) -> anyhow::Result<()>,
416+
{
417+
match starting_key {
418+
Some(key) => {
419+
let hash = H::hash(key);
420+
self.root.for_each_ranged(
421+
self.store.borrow(),
422+
&self.conf,
423+
Some((HashBits::new(&hash), key)),
424+
max,
425+
&mut f,
426+
)
427+
}
428+
None => self
429+
.root
430+
.for_each_ranged(self.store.borrow(), &self.conf, None, max, &mut f),
431+
}
432+
}
433+
365434
/// Consumes this HAMT and returns the Blockstore it owns.
366435
pub fn into_store(self) -> BS {
367436
self.store

‎ipld/hamt/src/node.rs

+103
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,109 @@ where
174174
Ok(())
175175
}
176176

177+
pub(crate) fn for_each_ranged<Q: ?Sized, S, F>(
178+
&self,
179+
store: &S,
180+
conf: &Config,
181+
mut starting_cursor: Option<(HashBits, &Q)>,
182+
limit: Option<usize>,
183+
f: &mut F,
184+
) -> Result<(usize, Option<K>), Error>
185+
where
186+
K: Borrow<Q> + Clone,
187+
Q: Eq + Hash,
188+
F: FnMut(&K, &V) -> anyhow::Result<()>,
189+
S: Blockstore,
190+
{
191+
// determine which subtree the starting_cursor is in
192+
let cindex = match starting_cursor {
193+
Some((ref mut bits, _)) => {
194+
let idx = bits.next(conf.bit_width)?;
195+
self.index_for_bit_pos(idx)
196+
}
197+
None => 0,
198+
};
199+
200+
let mut traversed_count = 0;
201+
202+
// skip exploration of subtrees that are before the subtree which contains the cursor
203+
for p in &self.pointers[cindex..] {
204+
match p {
205+
Pointer::Link { cid, cache } => {
206+
if let Some(cached_node) = cache.get() {
207+
let (traversed, key) = cached_node.for_each_ranged(
208+
store,
209+
conf,
210+
starting_cursor.take(),
211+
limit.map(|l| l.checked_sub(traversed_count).unwrap()),
212+
f,
213+
)?;
214+
traversed_count += traversed;
215+
if limit.map_or(false, |l| traversed_count >= l) && key.is_some() {
216+
return Ok((traversed_count, key));
217+
}
218+
} else {
219+
let node = if let Some(node) = store.get_cbor(cid)? {
220+
node
221+
} else {
222+
#[cfg(not(feature = "ignore-dead-links"))]
223+
return Err(Error::CidNotFound(cid.to_string()));
224+
225+
#[cfg(feature = "ignore-dead-links")]
226+
continue;
227+
};
228+
229+
// Ignore error intentionally, the cache value will always be the same
230+
let cache_node = cache.get_or_init(|| node);
231+
let (traversed, key) = cache_node.for_each_ranged(
232+
store,
233+
conf,
234+
starting_cursor.take(),
235+
limit.map(|l| l.checked_sub(traversed_count).unwrap()),
236+
f,
237+
)?;
238+
traversed_count += traversed;
239+
if limit.map_or(false, |l| traversed_count >= l) && key.is_some() {
240+
return Ok((traversed_count, key));
241+
}
242+
}
243+
}
244+
Pointer::Dirty(node) => {
245+
let (traversed, key) = node.for_each_ranged(
246+
store,
247+
conf,
248+
starting_cursor.take(),
249+
limit.map(|l| l.checked_sub(traversed_count).unwrap()),
250+
f,
251+
)?;
252+
traversed_count += traversed;
253+
if limit.map_or(false, |l| traversed_count >= l) && key.is_some() {
254+
return Ok((traversed_count, key));
255+
}
256+
}
257+
Pointer::Values(kvs) => {
258+
for kv in kvs {
259+
if limit.map_or(false, |l| traversed_count == l) {
260+
// we have already found all requested items, return the key of the next item
261+
return Ok((traversed_count, Some(kv.0.clone())));
262+
} else if starting_cursor.map_or(false, |(_, key)| key.eq(kv.0.borrow())) {
263+
// mark that we have arrived at the starting cursor
264+
starting_cursor = None
265+
}
266+
267+
if starting_cursor.is_none() {
268+
// have already passed the start cursor
269+
f(&kv.0, kv.1.borrow())?;
270+
traversed_count += 1;
271+
}
272+
}
273+
}
274+
}
275+
}
276+
277+
Ok((traversed_count, None))
278+
}
279+
177280
/// Search for a key.
178281
fn search<Q: ?Sized, S: Blockstore>(
179282
&self,

‎ipld/hamt/tests/hamt_tests.rs

+173
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,168 @@ fn for_each(factory: HamtFactory, stats: Option<BSStats>, mut cids: CidChecker)
386386
}
387387
}
388388

389+
fn for_each_ranged(factory: HamtFactory, stats: Option<BSStats>, mut cids: CidChecker) {
390+
let mem = MemoryBlockstore::default();
391+
let store = TrackingBlockstore::new(&mem);
392+
393+
let mut hamt: Hamt<_, usize> = factory.new_with_bit_width(&store, 5);
394+
395+
const RANGE: usize = 200;
396+
for i in 0..RANGE {
397+
hamt.set(tstring(i), i).unwrap();
398+
}
399+
400+
// collect all KV paris by iterating through the entire hamt
401+
let mut kvs = Vec::new();
402+
hamt.for_each(|k, v| {
403+
assert_eq!(k, &tstring(v));
404+
kvs.push((k.clone(), *v));
405+
Ok(())
406+
})
407+
.unwrap();
408+
409+
// Iterate through the array, requesting pages of different sizes
410+
for page_size in 0..RANGE {
411+
let mut kvs_variable_page = Vec::new();
412+
let (num_traversed, next_key) = hamt
413+
.for_each_ranged::<BytesKey, _>(None, Some(page_size), |k, v| {
414+
kvs_variable_page.push((k.clone(), *v));
415+
Ok(())
416+
})
417+
.unwrap();
418+
419+
assert_eq!(num_traversed, page_size);
420+
assert_eq!(kvs_variable_page.len(), num_traversed);
421+
assert_eq!(next_key.unwrap(), kvs[page_size].0);
422+
423+
// Items iterated over should match the ordering of for_each
424+
assert_eq!(kvs_variable_page, kvs[..page_size]);
425+
}
426+
427+
// Iterate through the array, requesting more items than are remaining
428+
let (num_traversed, next_key) = hamt
429+
.for_each_ranged::<BytesKey, _>(None, Some(RANGE + 10), |_k, _v| Ok(()))
430+
.unwrap();
431+
assert_eq!(num_traversed, RANGE);
432+
assert_eq!(next_key, None);
433+
434+
// Iterate through it again starting at a certain key
435+
for start_at in 0..RANGE as usize {
436+
let mut kvs_variable_start = Vec::new();
437+
let (num_traversed, next_key) = hamt
438+
.for_each_ranged(Some(&kvs[start_at].0), None, |k, v| {
439+
assert_eq!(k, &tstring(v));
440+
kvs_variable_start.push((k.clone(), *v));
441+
442+
Ok(())
443+
})
444+
.unwrap();
445+
446+
// No limit specified, iteration should be exhaustive
447+
assert_eq!(next_key, None);
448+
assert_eq!(num_traversed, kvs_variable_start.len());
449+
assert_eq!(kvs_variable_start.len(), kvs.len() - start_at,);
450+
451+
// Items iterated over should match the ordering of for_each
452+
assert_eq!(kvs_variable_start, kvs[start_at..]);
453+
}
454+
455+
// Chain paginated requests to iterate over entire HAMT
456+
{
457+
let mut kvs_paginated_requests = Vec::new();
458+
let mut iterations = 0;
459+
let mut cursor: Option<BytesKey> = None;
460+
461+
// Request all items in pages of 20 items each
462+
const PAGE_SIZE: usize = 20;
463+
loop {
464+
let (page_size, next) = match cursor {
465+
Some(ref start) => hamt
466+
.for_each_ranged::<BytesKey, _>(Some(start), Some(PAGE_SIZE), |k, v| {
467+
kvs_paginated_requests.push((k.clone(), *v));
468+
Ok(())
469+
})
470+
.unwrap(),
471+
None => hamt
472+
.for_each_ranged::<BytesKey, _>(None, Some(PAGE_SIZE), |k, v| {
473+
kvs_paginated_requests.push((k.clone(), *v));
474+
Ok(())
475+
})
476+
.unwrap(),
477+
};
478+
iterations += 1;
479+
assert_eq!(page_size, PAGE_SIZE);
480+
assert_eq!(kvs_paginated_requests.len(), iterations * PAGE_SIZE);
481+
482+
if next.is_none() {
483+
break;
484+
} else {
485+
assert_eq!(next.clone().unwrap(), kvs[(iterations * PAGE_SIZE)].0);
486+
cursor = next;
487+
}
488+
}
489+
490+
// should have retrieved all key value pairs in the same order
491+
assert_eq!(kvs_paginated_requests.len(), kvs.len(), "{}", iterations);
492+
assert_eq!(kvs_paginated_requests, kvs);
493+
// should have used the expected number of iterations
494+
assert_eq!(iterations, RANGE / PAGE_SIZE);
495+
}
496+
497+
let c = hamt.flush().unwrap();
498+
cids.check_next(c);
499+
500+
// Chain paginated requests over a HAMT with committed nodes
501+
let mut hamt: Hamt<_, usize> = factory.load_with_bit_width(&c, &store, 5).unwrap();
502+
{
503+
let mut kvs_paginated_requests = Vec::new();
504+
let mut iterations = 0;
505+
let mut cursor: Option<BytesKey> = None;
506+
507+
// Request all items in pages of 20 items each
508+
const PAGE_SIZE: usize = 20;
509+
loop {
510+
let (page_size, next) = match cursor {
511+
Some(ref start) => hamt
512+
.for_each_ranged::<BytesKey, _>(Some(start), Some(PAGE_SIZE), |k, v| {
513+
kvs_paginated_requests.push((k.clone(), *v));
514+
Ok(())
515+
})
516+
.unwrap(),
517+
None => hamt
518+
.for_each_ranged::<BytesKey, _>(None, Some(PAGE_SIZE), |k, v| {
519+
kvs_paginated_requests.push((k.clone(), *v));
520+
Ok(())
521+
})
522+
.unwrap(),
523+
};
524+
iterations += 1;
525+
assert_eq!(page_size, PAGE_SIZE);
526+
assert_eq!(kvs_paginated_requests.len(), iterations * PAGE_SIZE);
527+
528+
if next.is_none() {
529+
break;
530+
} else {
531+
assert_eq!(next.clone().unwrap(), kvs[(iterations * PAGE_SIZE)].0);
532+
cursor = next;
533+
}
534+
}
535+
536+
// should have retrieved all key value pairs in the same order
537+
assert_eq!(kvs_paginated_requests.len(), kvs.len(), "{}", iterations);
538+
assert_eq!(kvs_paginated_requests, kvs);
539+
// should have used the expected number of iterations
540+
assert_eq!(iterations, RANGE / PAGE_SIZE);
541+
}
542+
543+
let c = hamt.flush().unwrap();
544+
cids.check_next(c);
545+
546+
if let Some(stats) = stats {
547+
assert_eq!(*store.stats.borrow(), stats);
548+
}
549+
}
550+
389551
#[cfg(feature = "identity")]
390552
fn add_and_remove_keys(
391553
bit_width: u32,
@@ -823,6 +985,17 @@ mod test_default {
823985
super::for_each(HamtFactory::default(), Some(stats), cids);
824986
}
825987

988+
#[test]
989+
fn for_each_ranged() {
990+
#[rustfmt::skip]
991+
let stats = BSStats {r: 30, w: 30, br: 2895, bw: 2895};
992+
let cids = CidChecker::new(vec![
993+
"bafy2bzacedy4ypl2vedhdqep3llnwko6vrtfiys5flciz2f3c55pl4whlhlqm",
994+
"bafy2bzacedy4ypl2vedhdqep3llnwko6vrtfiys5flciz2f3c55pl4whlhlqm",
995+
]);
996+
super::for_each_ranged(HamtFactory::default(), Some(stats), cids);
997+
}
998+
826999
#[test]
8271000
fn clean_child_ordering() {
8281001
#[rustfmt::skip]

0 commit comments

Comments
 (0)
Please sign in to comment.