Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions data_structures/hashing/coalesced_hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
Coalesced hashing (hybrid of open addressing + chaining inside the table).

Reference: [https://en.wikipedia.org/wiki/Hash_table#Coalesced_hashing](https://en.wikipedia.org/wiki/Hash_table#Coalesced_hashing)
"""

from __future__ import annotations

from collections.abc import Iterator, MutableMapping
from dataclasses import dataclass
from typing import Generic, TypeVar

KEY = TypeVar("KEY")
VAL = TypeVar("VAL")


@dataclass(slots=True)
class _Node(Generic[KEY, VAL]): # noqa: UP046
key: KEY

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An error occurred while parsing the file: data_structures/hashing/coalesced_hashing.py

Traceback (most recent call last):
  File "/opt/render/project/src/algorithms_keeper/parser/python_parser.py", line 146, in parse
    reports = lint_file(
              ^^^^^^^^^^
libcst._exceptions.ParserSyntaxError: Syntax Error @ 20:13.
parser error: error at 19:12: expected one of (, :

    key: KEY
            ^

val: VAL
next: int # -1 means end of chain


class CoalescedHashMap(MutableMapping[KEY, VAL]):
"""
Coalesced hashing stores the chain pointers inside the array.

This implementation uses:
- Primary area: all indices, chaining occurs via `next` pointers.
- Free slot choice: highest-index free slot (easy to explain + deterministic).

>>> ch = CoalescedHashMap(5)
>>> ch["a"] = 1
>>> ch["b"] = 2
>>> ch["a"]
1
>>> len(ch)
2
"""

def __init__(self, capacity: int = 8, capacity_factor: float = 0.8) -> None:
if capacity < 1:
raise ValueError("capacity must be >= 1")
if not (0.0 < capacity_factor < 1.0):
raise ValueError("capacity_factor must be between 0 and 1")

self._capacity_factor = capacity_factor
self._table: list[_Node[KEY, VAL] | None] = [None] * capacity
self._len = 0

def _home(self, key: KEY) -> int:
return hash(key) % len(self._table)

def _is_full(self) -> bool:
return self._len >= int(len(self._table) * self._capacity_factor)

def _find_free_from_end(self) -> int:
for i in range(len(self._table) - 1, -1, -1):
if self._table[i] is None:
return i
return -1

def _resize(self, new_capacity: int) -> None:
old_items = list(self.items())
self._table = [None] * new_capacity
self._len = 0
for k, v in old_items:
self[k] = v

def __setitem__(self, key: KEY, val: VAL) -> None:
if self._is_full():
self._resize(len(self._table) * 2)

home = self._home(key)
node = self._table[home]

if node is None:
self._table[home] = _Node(key, val, -1)
self._len += 1
return

# Search chain for update.
cur = home
while True:
# Explicitly type the current node to satisfy mypy
current_node = self._table[cur]
if current_node is None:
# Should not happen if logic is correct, but handles None safety
break

if current_node.key == key:
current_node.val = val
return
if current_node.next == -1:
break
cur = current_node.next

# Insert new node at a free slot and link it.
free = self._find_free_from_end()
if free == -1:
self._resize(len(self._table) * 2)
self[key] = val
return

self._table[free] = _Node(key, val, -1)

# Link the previous end of chain to the new free slot
# We re-fetch the node at 'cur' to be safe
if (tail_node := self._table[cur]) is not None:
tail_node.next = free
self._len += 1

def __getitem__(self, key: KEY) -> VAL:
home = self._home(key)
cur = home
while cur != -1:
node = self._table[cur]
if node is None:
break
if node.key == key:
return node.val
cur = node.next
raise KeyError(key)

def __delitem__(self, key: KEY) -> None:
home = self._home(key)
prev = -1
cur = home

while cur != -1:
node = self._table[cur]
if node is None:
break
if node.key == key:
# If deleting head: copy next node into home if exists
# (keeps chains valid).
if prev == -1:
if node.next == -1:
self._table[cur] = None
else:
nxt = node.next
next_node = self._table[nxt]
# Must assert next_node is not None for mypy
if next_node is not None:
self._table[cur] = _Node(
next_node.key,
next_node.val,
next_node.next,
)
self._table[nxt] = None
else:
# Update previous node's next pointer
prev_node = self._table[prev]
if prev_node is not None:
prev_node.next = node.next
self._table[cur] = None
self._len -= 1
return
prev, cur = cur, node.next

raise KeyError(key)

def __iter__(self) -> Iterator[KEY]:
for node in self._table:
if node is not None:
yield node.key

def __len__(self) -> int:
return self._len


if __name__ == "__main__":
import doctest

doctest.testmod()
177 changes: 177 additions & 0 deletions data_structures/hashing/fnv_hashtable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
FNV-1a hashing + a small educational hash map.

FNV-1a is a fast, non-cryptographic hash often used for hashing bytes/strings.
Reference: https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
"""

from __future__ import annotations

from collections.abc import Iterator, MutableMapping
from dataclasses import dataclass
from typing import Generic, TypeVar

KEY = TypeVar("KEY")
VAL = TypeVar("VAL")


def fnv1a_32(data: bytes) -> int:
"""
Compute 32-bit FNV-1a over bytes.

>>> fnv1a_32(b"")
2166136261
>>> fnv1a_32(b"a") # deterministic
3826002220
"""
h = 0x811C9DC5 # offset basis
for b in data:
h ^= b
h = (h * 0x01000193) & 0xFFFFFFFF
return h


def fnv1a_64(data: bytes) -> int:
"""
Compute 64-bit FNV-1a over bytes.

>>> fnv1a_64(b"")
14695981039346656037
"""
h = 0xCBF29CE484222325 # offset basis
for b in data:
h ^= b
h = (h * 0x100000001B3) & 0xFFFFFFFFFFFFFFFF
return h


@dataclass(slots=True)
class _Item(Generic[KEY, VAL]): # noqa: UP046
key: KEY

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An error occurred while parsing the file: data_structures/hashing/fnv_hashtable.py

Traceback (most recent call last):
  File "/opt/render/project/src/algorithms_keeper/parser/python_parser.py", line 146, in parse
    reports = lint_file(
              ^^^^^^^^^^
libcst._exceptions.ParserSyntaxError: Syntax Error @ 51:13.
parser error: error at 50:12: expected one of (, :

    key: KEY
            ^

val: VAL


class _DeletedItem(_Item):
def __init__(self) -> None:
super().__init__(None, None)

def __bool__(self) -> bool:
return False


_deleted = _DeletedItem()


class FNVHashMap(MutableMapping[KEY, VAL]):
"""
Hash map using FNV-1a for string/bytes keys and Python's hash otherwise.

>>> hm = FNVHashMap()
>>> hm["hello"] = 1
>>> hm[b"hello"] = 2
>>> hm["hello"]
1
>>> hm[b"hello"]
2
>>> "missing" in hm
False
"""

def __init__(
self, initial_block_size: int = 8, capacity_factor: float = 0.75
) -> None:
if initial_block_size < 1:
raise ValueError("initial_block_size must be >= 1")
if not (0.0 < capacity_factor < 1.0):
raise ValueError("capacity_factor must be between 0 and 1")

self._initial_block_size = initial_block_size
self._buckets: list[_Item | None] = [None] * initial_block_size
self._capacity_factor = capacity_factor
self._len = 0

def _hash_key(self, key: KEY) -> int:
if isinstance(key, bytes):
return fnv1a_32(key)
if isinstance(key, str):
return fnv1a_32(key.encode("utf-8"))
return hash(key)

def _get_bucket_index(self, key: KEY) -> int:
return self._hash_key(key) % len(self._buckets)

def _iterate_buckets(self, key: KEY) -> Iterator[int]:
ind = self._get_bucket_index(key)
for _ in range(len(self._buckets)):
yield ind
ind = (ind + 1) % len(self._buckets)

def _is_full(self) -> bool:
return self._len >= int(len(self._buckets) * self._capacity_factor)

def _resize(self, new_size: int) -> None:
old = self._buckets
self._buckets = [None] * new_size
self._len = 0
for item in old:
if item:
self[item.key] = item.val

def __setitem__(self, key: KEY, val: VAL) -> None:
if self._is_full():
self._resize(len(self._buckets) * 2)

for ind in self._iterate_buckets(key):
stored = self._buckets[ind]
if not stored:
self._buckets[ind] = _Item(key, val)
self._len += 1
return
if stored.key == key:
stored.val = val
return

# Extremely unlikely due to resize policy, but safe.
self._resize(len(self._buckets) * 2)
self[key] = val

def __getitem__(self, key: KEY) -> VAL:
for ind in self._iterate_buckets(key):
item = self._buckets[ind]
if item is None:
break
if item is _deleted:
continue
if item.key == key:
return item.val
raise KeyError(key)

def __delitem__(self, key: KEY) -> None:
for ind in self._iterate_buckets(key):
item = self._buckets[ind]
if item is None:
break
if item is _deleted:
continue
if item.key == key:
self._buckets[ind] = _deleted
self._len -= 1
return
raise KeyError(key)

def __iter__(self) -> Iterator[KEY]:
yield from (item.key for item in self._buckets if item)

def __len__(self) -> int:
return self._len

def __repr__(self) -> str:
parts = ", ".join(f"{k!r}: {v!r}" for k, v in self.items())
return f"FNVHashMap({parts})"


if __name__ == "__main__":
import doctest

doctest.testmod()
Loading