Skip to content

Commit

Permalink
Buffer(keep_slow=True)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 24, 2023
1 parent 5958c2c commit 35b631c
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 13 deletions.
3 changes: 3 additions & 0 deletions doc/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ Changelog
- New method ``File.link()``, which acquires a file-based key from another source
(e.g. a different memory-mapped File object)
(:pr:`80`) `Guido Imperiale`_
- ``Buffer`` has gained the option to preserve keys in ``slow`` when they are
moved back to ``fast``
(:pr:`80`) `Guido Imperiale`_


2.2.0 - 2022-04-28
Expand Down
62 changes: 49 additions & 13 deletions zict/buffer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

from collections.abc import Callable, Iterator, MutableMapping
from itertools import chain

from zict.common import KT, VT, ZictBase, close, flush
from zict.lru import LRU
from zict.utils import Accumulator


class Buffer(ZictBase[KT, VT]):
Expand All @@ -29,7 +29,13 @@ class Buffer(ZictBase[KT, VT]):
If an exception occurs during a fast_to_slow_callbacks (e.g a callback tried
storing to disk and raised a disk full error) the key will remain in the LRU.
slow_to_fast_callbacks: list of callables
These functions run every time data moves form the slow to the fast mapping.
These functions run every time data moves form the slow to the fast
mapping.
keep_slow: bool, optional
If False (default), delete key/value pairs in slow when they are moved back to
fast.
If True, keep them in slow until deleted; this will avoid repeating the fast to
slow transition when they are evicted again, but at the cost of duplication.
Notes
-----
Expand All @@ -54,6 +60,8 @@ class Buffer(ZictBase[KT, VT]):
weight: Callable[[KT, VT], float]
fast_to_slow_callbacks: list[Callable[[KT, VT], None]]
slow_to_fast_callbacks: list[Callable[[KT, VT], None]]
keep_slow: bool
_len: Accumulator

def __init__(
self,
Expand All @@ -67,6 +75,7 @@ def __init__(
slow_to_fast_callbacks: Callable[[KT, VT], None]
| list[Callable[[KT, VT], None]]
| None = None,
keep_slow: bool = False,
):
self.fast = LRU(n, fast, weight=weight, on_evict=[self.fast_to_slow])
self.slow = slow
Expand All @@ -77,12 +86,17 @@ def __init__(
slow_to_fast_callbacks = [slow_to_fast_callbacks]
self.fast_to_slow_callbacks = fast_to_slow_callbacks or []
self.slow_to_fast_callbacks = slow_to_fast_callbacks or []
self.keep_slow = keep_slow
self._len = Accumulator()

@property
def n(self) -> float:
return self.fast.n

def fast_to_slow(self, key: KT, value: VT) -> None:
if self.keep_slow and key in self.slow:
return

self.slow[key] = value
try:
for cb in self.fast_to_slow_callbacks:
Expand All @@ -98,8 +112,9 @@ def slow_to_fast(self, key: KT) -> VT:
# Avoid useless movement for heavy values
w = self.weight(key, value)
if w <= self.n:
del self.slow[key]
self.fast[key] = value
if not self.keep_slow:
del self.slow[key]
for cb in self.slow_to_fast_callbacks:
cb(key, value)
return value
Expand All @@ -112,13 +127,14 @@ def __getitem__(self, key: KT) -> VT:

def __setitem__(self, key: KT, value: VT) -> None:
try:
del self.slow[key]
del self[key]
except KeyError:
pass
# This may trigger an eviction from fast to slow of older keys.
# If the weight is individually greater than n, then key/value will be stored
# into self.slow instead (see LRU.__setitem__).
self.fast[key] = value
self._len += 1

def set_noevict(self, key: KT, value: VT) -> None:
"""Variant of ``__setitem__`` that does not move keys from fast to slow if the
Expand All @@ -131,32 +147,52 @@ def set_noevict(self, key: KT, value: VT) -> None:
self.fast.set_noevict(key, value)

def __delitem__(self, key: KT) -> None:
try:
del self.fast[key]
except KeyError:
del self.slow[key]
has_key = False
for d in (self.fast, self.slow):
try:
del d[key]
has_key = True
except KeyError:
pass

if has_key:
self._len -= 1
else:
raise KeyError(key)

# FIXME dictionary views https://github.com/dask/zict/issues/61
def keys(self) -> Iterator[KT]: # type: ignore
return iter(self)

def values(self) -> Iterator[VT]: # type: ignore
return chain(self.fast.values(), self.slow.values())
return (v for _, v in self.items())

def items(self) -> Iterator[tuple[KT, VT]]: # type: ignore
return chain(self.fast.items(), self.slow.items())
yield from self.fast.items()
if self.keep_slow:
yield from ((k, self.slow[k]) for k in self.slow if k not in self.fast)
else:
yield from self.slow.items()

def __len__(self) -> int:
return len(self.fast) + len(self.slow)
return int(self._len)

def __iter__(self) -> Iterator[KT]:
return chain(self.fast, self.slow)
yield from self.fast
if self.keep_slow:
yield from (k for k in self.slow if k not in self.fast)
else:
yield from self.slow

def __contains__(self, key: object) -> bool:
return key in self.fast or key in self.slow

def __str__(self) -> str:
return f"Buffer<{self.fast}, {self.slow}>"
s = f"Buffer<fast: {len(self.fast)}, slow: {len(self.slow)}"
if self.keep_slow:
ndup = len(self.fast) + len(self.slow) - int(self._len)
s += f", unique: {self._len}, duplicates: {ndup}"
return s + ">"

__repr__ = __str__

Expand Down
103 changes: 103 additions & 0 deletions zict/tests/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,106 @@ def test_set_noevict():
assert a == {"y": 3, "x": 1}
assert b == {"z": 6}
assert f2s == s2f == []


def test_flush_close():
flushes = []
closes = []

class D(dict):
def __init__(self, name):
self.name = name

def flush(self):
flushes.append(self.name)

def close(self):
closes.append(self.name)

buff = Buffer(D("fast"), D("slow"), n=2)
buff.flush()
buff.close()
assert flushes == ["fast", "slow"]
assert closes == ["fast", "slow"]


def test_keep_slow():
a = {}
b = {}
f2s = []
s2f = []
buff = Buffer(
a,
b,
n=10,
weight=lambda k, v: v,
keep_slow=True,
fast_to_slow_callbacks=lambda k, v: f2s.append(k),
slow_to_fast_callbacks=lambda k, v: s2f.append(k),
)

buff["x"] = 1
buff["y"] = 2
buff["z"] = 11
buff.fast.evict()
assert a == {"y": 2}
assert b == {"x": 1, "z": 11}
assert f2s == ["z", "x"]
assert s2f == []
assert buff.fast.total_weight == 2
f2s.clear()

assert buff["x"] == 1 # Get from slow
assert buff["x"] == 1 # It's in both
assert buff["z"] == 11 # Too large to stay in fast
assert a == {"x": 1, "y": 2}
assert b == {"x": 1, "z": 11}
assert f2s == []
assert s2f == ["x", "z"] # x has been moved only once
assert buff.fast.total_weight == 3
# Test no duplicates
assert len(buff) == 3
assert list(buff) == list(buff.keys()) == ["y", "x", "z"]
assert list(buff.items()) == [("y", 2), ("x", 1), ("z", 11)]
assert list(buff.values()) == [2, 1, 11]
f2s.clear()
s2f.clear()

assert (
str(buff)
== repr(buff)
== ("Buffer<fast: 2, slow: 2, unique: 3, duplicates: 1>")
)

# Evict a key that is already in slow
_ = buff["y"]
buff.fast.evict()
assert a == {"y": 2}
assert b == {"x": 1, "z": 11}
assert f2s == [] # fast_to_slow_callback was not called
assert s2f == []
assert buff.fast.total_weight == 2
assert len(buff) == 3
_ = buff["x"]
s2f.clear()

# Overwrite
buff["x"] = 3
buff["y"] = 4
buff["z"] = 12
assert a == {"x": 3, "y": 4}
assert b == {"z": 12}
assert f2s == ["z"] # One more spill for z
assert s2f == []
assert buff.fast.total_weight == 7
assert len(buff) == 3
f2s.clear()

# Delete
del buff["x"]
del buff["y"]
del buff["z"]
assert a == b == {}
assert f2s == s2f == []
assert buff.fast.total_weight == 0
assert len(buff) == 0

0 comments on commit 35b631c

Please sign in to comment.