diff --git a/data_structures/hashing/coalesced_hashing.py b/data_structures/hashing/coalesced_hashing.py new file mode 100644 index 000000000000..ee08316af0ac --- /dev/null +++ b/data_structures/hashing/coalesced_hashing.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +""" +Coalesced hashing (hybrid of open addressing + chaining inside the table). + +Reference: [https://en.wikipedia.org/wiki/Hash_table#Coalesced_hashing](https://en.wikipedia.org/wiki/Hash_table#Coalesced_hashing) +""" + +from __future__ import annotations + +from collections.abc import Iterator, MutableMapping +from dataclasses import dataclass +from typing import Generic, TypeVar + +KEY = TypeVar("KEY") +VAL = TypeVar("VAL") + + +@dataclass(slots=True) +class _Node(Generic[KEY, VAL]): # noqa: UP046 + key: KEY + val: VAL + next: int # -1 means end of chain + + +class CoalescedHashMap(MutableMapping[KEY, VAL]): + """ + Coalesced hashing stores the chain pointers inside the array. + + This implementation uses: + - Primary area: all indices, chaining occurs via `next` pointers. + - Free slot choice: highest-index free slot (easy to explain + deterministic). + + >>> ch = CoalescedHashMap(5) + >>> ch["a"] = 1 + >>> ch["b"] = 2 + >>> ch["a"] + 1 + >>> len(ch) + 2 + """ + + def __init__(self, capacity: int = 8, capacity_factor: float = 0.8) -> None: + if capacity < 1: + raise ValueError("capacity must be >= 1") + if not (0.0 < capacity_factor < 1.0): + raise ValueError("capacity_factor must be between 0 and 1") + + self._capacity_factor = capacity_factor + self._table: list[_Node[KEY, VAL] | None] = [None] * capacity + self._len = 0 + + def _home(self, key: KEY) -> int: + return hash(key) % len(self._table) + + def _is_full(self) -> bool: + return self._len >= int(len(self._table) * self._capacity_factor) + + def _find_free_from_end(self) -> int: + for i in range(len(self._table) - 1, -1, -1): + if self._table[i] is None: + return i + return -1 + + def _resize(self, new_capacity: int) -> None: + old_items = list(self.items()) + self._table = [None] * new_capacity + self._len = 0 + for k, v in old_items: + self[k] = v + + def __setitem__(self, key: KEY, val: VAL) -> None: + if self._is_full(): + self._resize(len(self._table) * 2) + + home = self._home(key) + node = self._table[home] + + if node is None: + self._table[home] = _Node(key, val, -1) + self._len += 1 + return + + # Search chain for update. + cur = home + while True: + # Explicitly type the current node to satisfy mypy + current_node = self._table[cur] + if current_node is None: + # Should not happen if logic is correct, but handles None safety + break + + if current_node.key == key: + current_node.val = val + return + if current_node.next == -1: + break + cur = current_node.next + + # Insert new node at a free slot and link it. + free = self._find_free_from_end() + if free == -1: + self._resize(len(self._table) * 2) + self[key] = val + return + + self._table[free] = _Node(key, val, -1) + + # Link the previous end of chain to the new free slot + # We re-fetch the node at 'cur' to be safe + if (tail_node := self._table[cur]) is not None: + tail_node.next = free + self._len += 1 + + def __getitem__(self, key: KEY) -> VAL: + home = self._home(key) + cur = home + while cur != -1: + node = self._table[cur] + if node is None: + break + if node.key == key: + return node.val + cur = node.next + raise KeyError(key) + + def __delitem__(self, key: KEY) -> None: + home = self._home(key) + prev = -1 + cur = home + + while cur != -1: + node = self._table[cur] + if node is None: + break + if node.key == key: + # If deleting head: copy next node into home if exists + # (keeps chains valid). + if prev == -1: + if node.next == -1: + self._table[cur] = None + else: + nxt = node.next + next_node = self._table[nxt] + # Must assert next_node is not None for mypy + if next_node is not None: + self._table[cur] = _Node( + next_node.key, + next_node.val, + next_node.next, + ) + self._table[nxt] = None + else: + # Update previous node's next pointer + prev_node = self._table[prev] + if prev_node is not None: + prev_node.next = node.next + self._table[cur] = None + self._len -= 1 + return + prev, cur = cur, node.next + + raise KeyError(key) + + def __iter__(self) -> Iterator[KEY]: + for node in self._table: + if node is not None: + yield node.key + + def __len__(self) -> int: + return self._len + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/data_structures/hashing/fnv_hashtable.py b/data_structures/hashing/fnv_hashtable.py new file mode 100644 index 000000000000..fed324145b85 --- /dev/null +++ b/data_structures/hashing/fnv_hashtable.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +""" +FNV-1a hashing + a small educational hash map. + +FNV-1a is a fast, non-cryptographic hash often used for hashing bytes/strings. +Reference: https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function +""" + +from __future__ import annotations + +from collections.abc import Iterator, MutableMapping +from dataclasses import dataclass +from typing import Generic, TypeVar + +KEY = TypeVar("KEY") +VAL = TypeVar("VAL") + + +def fnv1a_32(data: bytes) -> int: + """ + Compute 32-bit FNV-1a over bytes. + + >>> fnv1a_32(b"") + 2166136261 + >>> fnv1a_32(b"a") # deterministic + 3826002220 + """ + h = 0x811C9DC5 # offset basis + for b in data: + h ^= b + h = (h * 0x01000193) & 0xFFFFFFFF + return h + + +def fnv1a_64(data: bytes) -> int: + """ + Compute 64-bit FNV-1a over bytes. + + >>> fnv1a_64(b"") + 14695981039346656037 + """ + h = 0xCBF29CE484222325 # offset basis + for b in data: + h ^= b + h = (h * 0x100000001B3) & 0xFFFFFFFFFFFFFFFF + return h + + +@dataclass(slots=True) +class _Item(Generic[KEY, VAL]): # noqa: UP046 + key: KEY + val: VAL + + +class _DeletedItem(_Item): + def __init__(self) -> None: + super().__init__(None, None) + + def __bool__(self) -> bool: + return False + + +_deleted = _DeletedItem() + + +class FNVHashMap(MutableMapping[KEY, VAL]): + """ + Hash map using FNV-1a for string/bytes keys and Python's hash otherwise. + + >>> hm = FNVHashMap() + >>> hm["hello"] = 1 + >>> hm[b"hello"] = 2 + >>> hm["hello"] + 1 + >>> hm[b"hello"] + 2 + >>> "missing" in hm + False + """ + + def __init__( + self, initial_block_size: int = 8, capacity_factor: float = 0.75 + ) -> None: + if initial_block_size < 1: + raise ValueError("initial_block_size must be >= 1") + if not (0.0 < capacity_factor < 1.0): + raise ValueError("capacity_factor must be between 0 and 1") + + self._initial_block_size = initial_block_size + self._buckets: list[_Item | None] = [None] * initial_block_size + self._capacity_factor = capacity_factor + self._len = 0 + + def _hash_key(self, key: KEY) -> int: + if isinstance(key, bytes): + return fnv1a_32(key) + if isinstance(key, str): + return fnv1a_32(key.encode("utf-8")) + return hash(key) + + def _get_bucket_index(self, key: KEY) -> int: + return self._hash_key(key) % len(self._buckets) + + def _iterate_buckets(self, key: KEY) -> Iterator[int]: + ind = self._get_bucket_index(key) + for _ in range(len(self._buckets)): + yield ind + ind = (ind + 1) % len(self._buckets) + + def _is_full(self) -> bool: + return self._len >= int(len(self._buckets) * self._capacity_factor) + + def _resize(self, new_size: int) -> None: + old = self._buckets + self._buckets = [None] * new_size + self._len = 0 + for item in old: + if item: + self[item.key] = item.val + + def __setitem__(self, key: KEY, val: VAL) -> None: + if self._is_full(): + self._resize(len(self._buckets) * 2) + + for ind in self._iterate_buckets(key): + stored = self._buckets[ind] + if not stored: + self._buckets[ind] = _Item(key, val) + self._len += 1 + return + if stored.key == key: + stored.val = val + return + + # Extremely unlikely due to resize policy, but safe. + self._resize(len(self._buckets) * 2) + self[key] = val + + def __getitem__(self, key: KEY) -> VAL: + for ind in self._iterate_buckets(key): + item = self._buckets[ind] + if item is None: + break + if item is _deleted: + continue + if item.key == key: + return item.val + raise KeyError(key) + + def __delitem__(self, key: KEY) -> None: + for ind in self._iterate_buckets(key): + item = self._buckets[ind] + if item is None: + break + if item is _deleted: + continue + if item.key == key: + self._buckets[ind] = _deleted + self._len -= 1 + return + raise KeyError(key) + + def __iter__(self) -> Iterator[KEY]: + yield from (item.key for item in self._buckets if item) + + def __len__(self) -> int: + return self._len + + def __repr__(self) -> str: + parts = ", ".join(f"{k!r}: {v!r}" for k, v in self.items()) + return f"FNVHashMap({parts})" + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/data_structures/hashing/hopscotch.py b/data_structures/hashing/hopscotch.py new file mode 100644 index 000000000000..3927b71c23c2 --- /dev/null +++ b/data_structures/hashing/hopscotch.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +""" +Hopscotch hashing (open addressing with neighborhood invariants). + +Reference: https://en.wikipedia.org/wiki/Hash_table#Hopscotch_hashing + +This is an educational, single-threaded implementation. +""" + +from __future__ import annotations + +from collections.abc import Iterator, MutableMapping +from dataclasses import dataclass +from typing import Generic, TypeVar + +KEY = TypeVar("KEY") +VAL = TypeVar("VAL") + + +@dataclass(slots=True) +class _Item(Generic[KEY, VAL]): # noqa: UP046 + key: KEY + val: VAL + + +class HopscotchHashMap(MutableMapping[KEY, VAL]): + """ + Hopscotch hashing keeps each item within a fixed neighborhood (H) + from its home bucket. + + >>> hs = HopscotchHashMap(initial_capacity=8, neighborhood_size=4) + >>> hs["a"] = 1 + >>> hs["b"] = 2 + >>> hs["a"] + 1 + >>> len(hs) + 2 + """ + + def __init__( + self, + initial_capacity: int = 8, + neighborhood_size: int = 32, + max_load: float = 0.85, + ) -> None: + if initial_capacity < 1: + raise ValueError("initial_capacity must be >= 1") + if neighborhood_size < 2: + raise ValueError("neighborhood_size must be >= 2") + if not (0.0 < max_load < 1.0): + raise ValueError("max_load must be between 0 and 1") + + self._H = neighborhood_size + self._max_load = max_load + self._buckets: list[_Item[KEY, VAL] | None] = [None] * initial_capacity + # hop-info: bitmap for each bucket; bit i means an item exists at bucket+ i + self._hop: list[int] = [0] * initial_capacity + self._len = 0 + + def _home(self, key: KEY) -> int: + return hash(key) % len(self._buckets) + + def _is_full(self) -> bool: + return self._len >= int(len(self._buckets) * self._max_load) + + def _resize(self, new_capacity: int) -> None: + items = list(self.items()) + self._buckets = [None] * new_capacity + self._hop = [0] * new_capacity + self._len = 0 + for k, v in items: + self[k] = v + + def _find_free(self, start: int) -> int: + for dist in range(len(self._buckets)): + idx = (start + dist) % len(self._buckets) + if self._buckets[idx] is None: + return idx + return -1 + + def _distance(self, home: int, idx: int) -> int: + n = len(self._buckets) + return (idx - home) % n + + def _try_relocate(self, home: int, free: int) -> int: + """ + Try to move some existing item closer to its home so that `free` moves into + the neighborhood of `home`. + + Returns the new free index if relocation succeeded, else -1. + """ + n = len(self._buckets) + + while self._distance(home, free) >= self._H: + moved = False + # Search backward within H-1 positions from free. + for back in range(self._H - 1, 0, -1): + cand = (free - back) % n + cand_hop = self._hop[cand] + # Find an item in cand's neighborhood to move into free. + for off in range(self._H - 1, -1, -1): + if (cand_hop >> off) & 1: + from_idx = (cand + off) % n + # from_idx item will move to free only if it stays within + # cand neighborhood. + if self._distance(cand, free) < self._H: + self._buckets[free] = self._buckets[from_idx] + self._buckets[from_idx] = None + + # Update hop bitmaps + self._hop[cand] &= ~(1 << off) + new_off = self._distance(cand, free) + self._hop[cand] |= 1 << new_off + + free = from_idx + moved = True + break + if moved: + break + + if not moved: + return -1 + + return free + + def __setitem__(self, key: KEY, val: VAL) -> None: + if self._is_full(): + self._resize(len(self._buckets) * 2) + + home = self._home(key) + + # Update if already present in neighborhood. + hop = self._hop[home] + for off in range(self._H): + if (hop >> off) & 1: + idx = (home + off) % len(self._buckets) + item = self._buckets[idx] + if item is not None and item.key == key: + item.val = val + return + + free = self._find_free(home) + if free == -1: + self._resize(len(self._buckets) * 2) + self[key] = val + return + + free = self._try_relocate(home, free) + if free == -1: + # Relocation failed; grow and retry. + self._resize(len(self._buckets) * 2) + self[key] = val + return + + off = self._distance(home, free) + if off >= self._H: + # Should not happen due to _try_relocate, but keep safe. + self._resize(len(self._buckets) * 2) + self[key] = val + return + + self._buckets[free] = _Item(key, val) + self._hop[home] |= 1 << off + self._len += 1 + + def __getitem__(self, key: KEY) -> VAL: + home = self._home(key) + hop = self._hop[home] + for off in range(self._H): + if (hop >> off) & 1: + idx = (home + off) % len(self._buckets) + item = self._buckets[idx] + if item is not None and item.key == key: + return item.val + raise KeyError(key) + + def __delitem__(self, key: KEY) -> None: + home = self._home(key) + hop = self._hop[home] + for off in range(self._H): + if (hop >> off) & 1: + idx = (home + off) % len(self._buckets) + item = self._buckets[idx] + if item is not None and item.key == key: + self._buckets[idx] = None + self._hop[home] &= ~(1 << off) + self._len -= 1 + return + raise KeyError(key) + + def __iter__(self) -> Iterator[KEY]: + for item in self._buckets: + if item is not None: + yield item.key + + def __len__(self) -> int: + return self._len + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/data_structures/hashing/linear_probing.py b/data_structures/hashing/linear_probing.py new file mode 100644 index 000000000000..457bb7ccd7f0 --- /dev/null +++ b/data_structures/hashing/linear_probing.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +""" +Linear Probing Hash Table Implementation with Collision Resolution. +Reference: https://en.wikipedia.org/wiki/Linear_probing +""" + +from typing import Any + +from .hash_table import HashTable + + +class LinearProbing(HashTable): + """ + Hash Table with Linear Probing collision resolution. + + Hash function: h(k, i) = (h(k) + i) % m + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + def _collision_resolution(self, key: int, data: Any = None) -> int | None: + """ + Resolve collisions by probing sequentially. + + Examples: + >>> lp = LinearProbing(5) + >>> lp.insert_data(10) # hash(10) -> 0 + >>> lp.insert_data(15) # hash(15) -> 0 (Collision) -> Probe 1 + >>> lp.keys() + {0: 10, 1: 15} + + >>> lp = LinearProbing(2) + >>> lp.insert_data(1); lp.insert_data(2); lp.insert_data(3) # Resizes + >>> len(lp.keys()) + 3 + """ + i = 1 + # Initial probe is already occupied, start from next slot + new_key = (key + 1) % self.size_table + + while self.values[new_key] is not None and self.values[new_key] != data: + # Check load factor to avoid filling table completely + if self.balanced_factor() >= self.lim_charge: + return None # Trigger rehashing in parent + + new_key = (new_key + 1) % self.size_table + i += 1 + + # Safety break: Avoid infinite loop if table is full but + # lim_charge logic fails + if i > self.size_table: + return None + + return new_key + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/data_structures/hashing/power_of_two.py b/data_structures/hashing/power_of_two.py new file mode 100644 index 000000000000..b4f1be3c853e --- /dev/null +++ b/data_structures/hashing/power_of_two.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +""" +Power-of-two sized hash map (mask-based indexing). + +Power-of-two tables often use a bitmask instead of modulo, but should mix the hash +so low bits are usable. + +Reference (hash table sizing / open addressing context): +https://en.wikipedia.org/wiki/Hash_table +""" + +from __future__ import annotations + +from collections.abc import Iterator, MutableMapping +from dataclasses import dataclass +from typing import Generic, TypeVar + +KEY = TypeVar("KEY") +VAL = TypeVar("VAL") + + +def _next_power_of_two(number: int) -> int: + if number < 1: + raise ValueError("number must be >= 1") + return 1 << (number - 1).bit_length() + + +def _mix_hash(hash_value: int) -> int: + # Simple avalanching to make low bits more useful. + hash_value ^= hash_value >> 16 + hash_value *= 0x85EBCA6B + hash_value &= 0xFFFFFFFFFFFFFFFF + hash_value ^= hash_value >> 13 + hash_value *= 0xC2B2AE35 + hash_value &= 0xFFFFFFFFFFFFFFFF + hash_value ^= hash_value >> 16 + return hash_value + + +@dataclass(slots=True) +class _Item(Generic[KEY, VAL]): # noqa: UP046 + key: KEY + val: VAL + + +class _DeletedItem(_Item): + def __init__(self) -> None: + super().__init__(None, None) + + def __bool__(self) -> bool: + return False + + +_deleted = _DeletedItem() + + +class PowerOfTwoHashMap(MutableMapping[KEY, VAL]): + """ + Open addressing with a power-of-two bucket count. + + >>> hm = PowerOfTwoHashMap(8) + >>> hm["a"] = 1 + >>> hm["b"] = 2 + >>> hm["a"] + 1 + >>> len(hm) + 2 + >>> del hm["a"] + >>> "a" in hm + False + """ + + def __init__( + self, initial_capacity: int = 8, capacity_factor: float = 0.75 + ) -> None: + if not (0.0 < capacity_factor < 1.0): + raise ValueError("capacity_factor must be between 0 and 1") + cap = _next_power_of_two(max(1, initial_capacity)) + self._initial_capacity = cap + self._buckets: list[_Item | None] = [None] * cap + self._capacity_factor = capacity_factor + self._len = 0 + + def _mask(self) -> int: + return len(self._buckets) - 1 + + def _index(self, key: KEY) -> int: + return _mix_hash(hash(key)) & self._mask() + + def _iterate(self, key: KEY) -> Iterator[int]: + ind = self._index(key) + for _ in range(len(self._buckets)): + yield ind + ind = (ind + 1) & self._mask() + + def _is_full(self) -> bool: + return self._len >= int(len(self._buckets) * self._capacity_factor) + + def _resize(self, new_capacity: int) -> None: + new_capacity = _next_power_of_two(new_capacity) + old = self._buckets + self._buckets = [None] * new_capacity + self._len = 0 + for item in old: + if item: + self[item.key] = item.val + + def __setitem__(self, key: KEY, val: VAL) -> None: + if self._is_full(): + self._resize(len(self._buckets) * 2) + + for ind in self._iterate(key): + item = self._buckets[ind] + if not item: + self._buckets[ind] = _Item(key, val) + self._len += 1 + return + if item.key == key: + item.val = val + return + + self._resize(len(self._buckets) * 2) + self[key] = val + + def __getitem__(self, key: KEY) -> VAL: + for ind in self._iterate(key): + item = self._buckets[ind] + if item is None: + break + if item is _deleted: + continue + if item.key == key: + return item.val + raise KeyError(key) + + def __delitem__(self, key: KEY) -> None: + for ind in self._iterate(key): + item = self._buckets[ind] + if item is None: + break + if item is _deleted: + continue + if item.key == key: + self._buckets[ind] = _deleted + self._len -= 1 + return + raise KeyError(key) + + def __iter__(self) -> Iterator[KEY]: + yield from (item.key for item in self._buckets if item) + + def __len__(self) -> int: + return self._len + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/data_structures/hashing/robin_hood.py b/data_structures/hashing/robin_hood.py new file mode 100644 index 000000000000..ac9d9113986f --- /dev/null +++ b/data_structures/hashing/robin_hood.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +""" +Robin Hood hashing (open addressing with probe-sequence-length balancing). + +Reference: https://en.wikipedia.org/wiki/Hash_table#Robin_Hood_hashing +""" + +from __future__ import annotations + +from collections.abc import Iterator, MutableMapping +from dataclasses import dataclass +from typing import Generic, TypeVar + +KEY = TypeVar("KEY") +VAL = TypeVar("VAL") + + +@dataclass(slots=True) +class _Entry(Generic[KEY, VAL]): # noqa: UP046 + key: KEY + val: VAL + psl: int # probe sequence length (distance from home) + + +class RobinHoodHashMap(MutableMapping[KEY, VAL]): + """ + Robin Hood hashing reduces variance by swapping when the incoming item has + a larger probe distance than the resident. + + Deletion uses backward-shift deletion (no tombstones), keeping lookups fast. + + >>> rh = RobinHoodHashMap() + >>> rh["a"] = 1 + >>> rh["b"] = 2 + >>> rh["a"] + 1 + >>> del rh["a"] + >>> "a" in rh + False + """ + + def __init__(self, initial_capacity: int = 8, max_load: float = 0.75) -> None: + if initial_capacity < 1: + raise ValueError("initial_capacity must be >= 1") + if not (0.0 < max_load < 1.0): + raise ValueError("max_load must be between 0 and 1") + + self._buckets: list[_Entry[KEY, VAL] | None] = [None] * initial_capacity + self._max_load = max_load + self._len = 0 + + def _home(self, key: KEY) -> int: + return hash(key) % len(self._buckets) + + def _is_full(self) -> bool: + return self._len >= int(len(self._buckets) * self._max_load) + + def _resize(self, new_capacity: int) -> None: + old_items = list(self.items()) + self._buckets = [None] * new_capacity + self._len = 0 + for k, v in old_items: + self[k] = v + + def __setitem__(self, key: KEY, val: VAL) -> None: + if self._is_full(): + self._resize(len(self._buckets) * 2) + + idx = self._home(key) + entry = _Entry(key, val, 0) + + for _ in range(len(self._buckets)): + cur = self._buckets[idx] + if cur is None: + self._buckets[idx] = entry + self._len += 1 + return + + if cur.key == key: + cur.val = val + return + + # Robin Hood rule: if incoming is "poorer" (larger psl), swap. + if entry.psl > cur.psl: + self._buckets[idx], entry = entry, cur + + idx = (idx + 1) % len(self._buckets) + entry = _Entry(entry.key, entry.val, entry.psl + 1) + + # Safety: should not happen with resizing policy. + self._resize(len(self._buckets) * 2) + self[key] = val + + def __getitem__(self, key: KEY) -> VAL: + idx = self._home(key) + psl = 0 + + for _ in range(len(self._buckets)): + cur = self._buckets[idx] + if cur is None: + break + # Early exit: if current resident is "richer" than our search distance, + # the key cannot appear further in the cluster. + if cur.psl < psl: + break + if cur.key == key: + return cur.val + idx = (idx + 1) % len(self._buckets) + psl += 1 # noqa: SIM113 + + raise KeyError(key) + + def _delete_at(self, idx: int) -> None: + # Backward-shift deletion. + self._buckets[idx] = None + self._len -= 1 + + nxt = (idx + 1) % len(self._buckets) + while True: + cur = self._buckets[nxt] + if cur is None or cur.psl == 0: + break + # Shift back by one. + self._buckets[idx] = _Entry(cur.key, cur.val, cur.psl - 1) + self._buckets[nxt] = None + idx = nxt + nxt = (nxt + 1) % len(self._buckets) + + def __delitem__(self, key: KEY) -> None: + idx = self._home(key) + psl = 0 + + for _ in range(len(self._buckets)): + cur = self._buckets[idx] + if cur is None: + break + if cur.psl < psl: + break + if cur.key == key: + self._delete_at(idx) + return + idx = (idx + 1) % len(self._buckets) + psl += 1 # noqa: SIM113 + + raise KeyError(key) + + def __iter__(self) -> Iterator[KEY]: + for item in self._buckets: + if item is not None: + yield item.key + + def __len__(self) -> int: + return self._len + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/data_structures/hashing/tests/test_advanced_hashing.py b/data_structures/hashing/tests/test_advanced_hashing.py new file mode 100644 index 000000000000..d5dd5fdb990e --- /dev/null +++ b/data_structures/hashing/tests/test_advanced_hashing.py @@ -0,0 +1,143 @@ +from operator import delitem, getitem, setitem +from typing import Any + +import pytest + +from data_structures.hashing.coalesced_hashing import CoalescedHashMap +from data_structures.hashing.fnv_hashtable import FNVHashMap +from data_structures.hashing.hopscotch import HopscotchHashMap +from data_structures.hashing.linear_probing import LinearProbing +from data_structures.hashing.power_of_two import PowerOfTwoHashMap + +# Import all your new classes +from data_structures.hashing.robin_hood import RobinHoodHashMap + + +# --- WRAPPER FOR REPO-STYLE CLASSES --- +# Makes LinearProbing behave like a dict for testing +class RepoStyleWrapper: + def __init__(self, size=10): + self._backend = LinearProbing(size) + + def __setitem__(self, key: int, val: Any): + # LinearProbing.insert_data(data) hashes 'data' to find index. + # It doesn't support separate key/value pairs in the same way dict does. + # It stores 'val' at hash(val). + # So we only test it with integer keys where key == val for simplicity here. + self._backend.insert_data(val) + + def __getitem__(self, key: int): + # The repo implementation doesn't support O(1) retrieval by value easily + # without looking at internal structure, or using search logic. + # But wait! The repo's HashTable stores KEY=Hash, VALUE=Data. + # It DOES NOT support random keys. + # So we SKIP LinearProbing in this generic dict-compliance test + # because its API is fundamentally different (Set vs Map). + pass + + def __delitem__(self, key): + pass + + def keys(self): + return self._backend.keys().values() # Return values as keys for comparison + + +# Helper functions +def _get(k): + return getitem, k + + +def _set(k, v): + return setitem, k, v + + +def _del(k): + return delitem, k + + +def _run_operation(obj, fun, *args): + try: + return fun(obj, *args), None + except Exception as e: # noqa: BLE001 + return None, str(type(e).__name__) + + +# Test Scenarios +_add_items = (_set("key_a", "val_a"), _set("key_b", "val_b")) +_overwrite_items = [_set("key_a", "val_a"), _set("key_a", "val_b")] +_delete_items = [ + _set("key_a", "val_a"), + _set("key_b", "val_b"), + _del("key_a"), + _del("key_b"), +] +_access_absent = [_get("key_a"), _del("key_a")] + + +# --- TEST 1: Modern MutableMapping Classes --- +@pytest.mark.parametrize( + "hash_map_cls", + [ + RobinHoodHashMap, + HopscotchHashMap, + CoalescedHashMap, + FNVHashMap, + PowerOfTwoHashMap, + ], +) +@pytest.mark.parametrize( + "operations", + [ + pytest.param(_add_items, id="add"), + pytest.param(_overwrite_items, id="overwrite"), + pytest.param(_delete_items, id="delete"), + pytest.param(_access_absent, id="absent_access"), + ], +) +def test_compatibility_with_dict(hash_map_cls, operations): + """ + Verify that new MutableMapping classes behave EXACTLY like a Python dict. + """ + try: + my_map = hash_map_cls() + except TypeError: + my_map = hash_map_cls(8) + + py_dict = {} + + for i, (fun, *args) in enumerate(operations): + my_res, my_exc = _run_operation(my_map, fun, *args) + py_res, py_exc = _run_operation(py_dict, fun, *args) + + assert my_res == py_res, f"{hash_map_cls.__name__}: Result mismatch at op {i}" + assert my_exc == py_exc, ( + f"{hash_map_cls.__name__}: Exception mismatch at op {i}" + ) + assert len(my_map) == len(py_dict) + assert sorted(my_map.keys()) == sorted(py_dict.keys()) + + +def test_linear_probing_specifics(): + """ + LinearProbing uses a different API (insert_data), so we test it separately. + """ + lp = LinearProbing(5) + + # 1. Insertion + lp.insert_data(10) # hash(10)%5 = 0 + lp.insert_data(15) # hash(15)%5 = 0 -> Collision -> Index 1 + + # LinearProbing.keys() returns a dict of {index: value} + # We must check membership in that dict, not the object itself. + assert 0 in lp.keys() # noqa: SIM118 + assert lp.keys()[0] == 10 + assert 1 in lp.keys() # noqa: SIM118 + assert lp.keys()[1] == 15 + + # 2. Resizing (Implicitly tested via inserts) + lp = LinearProbing(2) + lp.insert_data(1) + lp.insert_data(2) + lp.insert_data(3) # Trigger resize + + assert len(lp.keys()) == 3