Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-based shared memory model #80

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ Changelog
=========
.. currentmodule:: zict

3.1.0 - Unreleased
------------------
- New method ``File.link()``, which acquires a file-based key from another source
(e.g. a different memory-mapped File object)
(:pr:`80`) `Guido Imperiale`_
- ``Buffer`` has gained the option to preserve keys in ``slow`` when they are
moved back to ``fast``
(:pr:`80`) `Guido Imperiale`_


3.0.0 - 2023-04-17
------------------
- The library is now almost completely thread-safe
Expand Down
105 changes: 37 additions & 68 deletions zict/buffer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from __future__ import annotations

from collections.abc import Callable, Iterator, MutableMapping
from itertools import chain
from typing import ( # TODO import from collections.abc (needs Python >=3.9)
ItemsView,
ValuesView,
)

from zict.common import KT, VT, ZictBase, close, discard, flush, locked
from zict.lru import LRU
from zict.utils import InsertionSortedSet


class Buffer(ZictBase[KT, VT]):
Expand All @@ -34,6 +30,11 @@ class Buffer(ZictBase[KT, VT]):
storing to disk and raised a disk full error) the key will remain in the LRU.
slow_to_fast_callbacks: list of callables
These functions run every time data moves form the slow to the fast mapping.
keep_slow: bool, optional
If False (default), delete key/value pairs in slow when they are moved back to
fast.
If True, keep them in slow until deleted; this will avoid repeating the fast to
slow transition when they are evicted again, but at the cost of duplication.

Notes
-----
Expand All @@ -60,7 +61,9 @@ class Buffer(ZictBase[KT, VT]):
weight: Callable[[KT, VT], float]
fast_to_slow_callbacks: list[Callable[[KT, VT], None]]
slow_to_fast_callbacks: list[Callable[[KT, VT], None]]
keep_slow: bool
_cancel_restore: dict[KT, bool]
_keys: InsertionSortedSet[KT]

def __init__(
self,
Expand All @@ -74,6 +77,7 @@ def __init__(
slow_to_fast_callbacks: Callable[[KT, VT], None]
| list[Callable[[KT, VT], None]]
| None = None,
keep_slow: bool = False,
):
super().__init__()
self.fast = LRU(
Expand All @@ -91,7 +95,9 @@ def __init__(
slow_to_fast_callbacks = [slow_to_fast_callbacks]
self.fast_to_slow_callbacks = fast_to_slow_callbacks or []
self.slow_to_fast_callbacks = slow_to_fast_callbacks or []
self.keep_slow = keep_slow
self._cancel_restore = {}
self._keys = InsertionSortedSet((*self.fast, *self.slow))

@property
def n(self) -> float:
Expand Down Expand Up @@ -136,6 +142,9 @@ def offset(self, value: float) -> None:
self.fast.offset = value

def fast_to_slow(self, key: KT, value: VT) -> None:
if self.keep_slow and key in self.slow:
return

self.slow[key] = value
try:
for cb in self.fast_to_slow_callbacks:
Expand Down Expand Up @@ -169,7 +178,8 @@ def slow_to_fast(self, key: KT) -> VT:
# - If the below code was just `self.fast[key] = value; del
# self.slow[key]` now the key would be in neither slow nor fast!
self.fast.set_noevict(key, value)
del self.slow[key]
if not self.keep_slow:
del self.slow[key]

with self.unlock():
self.fast.evict_until_below_target()
Expand All @@ -180,17 +190,20 @@ def slow_to_fast(self, key: KT) -> VT:

@locked
def __getitem__(self, key: KT) -> VT:
if key not in self._keys:
raise KeyError(key)
try:
return self.fast[key]
except KeyError:
return self.slow_to_fast(key)

def __setitem__(self, key: KT, value: VT) -> None:
with self.lock:
discard(self.slow, key)
if key in self._cancel_restore:
self._cancel_restore[key] = True
self.fast[key] = value
self.set_noevict(key, value)
try:
self.fast.evict_until_below_target()
except Exception:
self.fast._setitem_exception(key)
raise

@locked
def set_noevict(self, key: KT, value: VT) -> None:
Expand All @@ -201,6 +214,7 @@ def set_noevict(self, key: KT, value: VT) -> None:
if key in self._cancel_restore:
self._cancel_restore[key] = True
self.fast.set_noevict(key, value)
self._keys.add(key)

def evict_until_below_target(self, n: float | None = None) -> None:
"""Wrapper around :meth:`zict.LRU.evict_until_below_target`.
Expand All @@ -210,55 +224,32 @@ def evict_until_below_target(self, n: float | None = None) -> None:

@locked
def __delitem__(self, key: KT) -> None:
self._keys.remove(key)
if key in self._cancel_restore:
self._cancel_restore[key] = True
try:
del self.fast[key]
except KeyError:
del self.slow[key]
discard(self.fast, key)
discard(self.slow, key)

@locked
def _cancel_evict(self, key: KT, value: VT) -> None:
discard(self.slow, key)

def values(self) -> ValuesView[VT]:
return BufferValuesView(self)

def items(self) -> ItemsView[KT, VT]:
return BufferItemsView(self)

def __len__(self) -> int:
with self.lock, self.fast.lock:
return (
len(self.fast)
+ len(self.slow)
- sum(
k in self.fast and k in self.slow
for k in chain(self._cancel_restore, self.fast._cancel_evict)
)
)
return len(self._keys)

def __iter__(self) -> Iterator[KT]:
"""Make sure that the iteration is not disrupted if you evict/restore a key in
the middle of it
"""
seen = set()
while True:
try:
for d in (self.fast, self.slow):
for key in d:
if key not in seen:
seen.add(key)
yield key
return
except RuntimeError:
pass
return iter(self._keys)

def __contains__(self, key: object) -> bool:
return key in self.fast or key in self.slow
return key in self._keys

@locked
def __str__(self) -> str:
return f"Buffer<{self.fast}, {self.slow}>"
s = f"Buffer<fast: {len(self.fast)}, slow: {len(self.slow)}"
if self.keep_slow:
ndup = len(self.fast) + len(self.slow) - len(self._keys)
s += f", unique: {len(self._keys)}, duplicates: {ndup}"
return s + ">"

__repr__ = __str__

Expand All @@ -267,25 +258,3 @@ def flush(self) -> None:

def close(self) -> None:
close(self.fast, self.slow)


class BufferItemsView(ItemsView[KT, VT]):
_mapping: Buffer # FIXME CPython implementation detail
__slots__ = ()

def __iter__(self) -> Iterator[tuple[KT, VT]]:
# Avoid changing the LRU
return chain(self._mapping.fast.items(), self._mapping.slow.items())


class BufferValuesView(ValuesView[VT]):
_mapping: Buffer # FIXME CPython implementation detail
__slots__ = ()

def __contains__(self, value: object) -> bool:
# Avoid changing the LRU
return any(value == v for v in self)

def __iter__(self) -> Iterator[VT]:
# Avoid changing the LRU
return chain(self._mapping.fast.values(), self._mapping.slow.values())
75 changes: 71 additions & 4 deletions zict/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class File(ZictBase[str, bytes]):

Keys must be strings, values must be buffers

Note this shouldn't be used for interprocess persistence, as keys
are cached in memory.
Keys are cached in memory; you shouldn't share the directory with other File
objects. However, see :meth:`link` for inter-process comunication.

Parameters
----------
Expand Down Expand Up @@ -76,8 +76,9 @@ def _safe_key(self, key: str) -> str:
"""Escape key so that it is usable on all filesystems.

Append to the filenames a unique suffix that changes every time this method is
called. This prevents race conditions when another thread accesses the same
key, e.g. ``__setitem__`` on one thread and ``__getitem__`` on another.
called. This prevents race conditions when another thread/process opens the
files for read (see :meth:`link` below), as it guarantees that a file is either
complete and coherent or it does not exist.
"""
# `#` is escaped by quote and is supported by most file systems
key = quote(key, safe="") + f"#{self._inc}"
Expand Down Expand Up @@ -156,3 +157,69 @@ def __delitem__(self, key: str) -> None:

def __len__(self) -> int:
return len(self.filenames)

def get_path(self, key: str) -> str:
"""Returns the full path on disk for a spilled key"""
return os.path.join(self.directory, self.filenames[key])

@locked
def link(self, key: str, path: str) -> None:
"""Hardlink an external file into self.directory.

The file must be on the same filesystem as self.directory. This is an atomic
operation which allows for data transfer between multiple File instances (or
from an external data creator to a File instance) running on different
processes, and is particularly useful in conjunction with memory mapping.

Raises
------
FileNotFoundError
The key has been deleted from the other File mapping
PermissionError
Can't access the target path for writing
OSError
- OS or filesystem doesn't support hardlinking
- The provided path and self.directory are on different mountpoints

Examples
--------

In process 1:

>>> z1 = File("/dev/shm/z1", memmap=True) # doctest: +SKIP
>>> z1["x"] = b"Hello world!" # doctest: +SKIP
>>> send_to_proc2("x", z1.get_path("x")) # doctest: +SKIP

In process 2:

>>> z2 = File("/dev/shm/z2", memmap=True) # doctest: +SKIP
>>> key, path = receive_from_proc1() # doctest: +SKIP
>>> z2.link(key, path) # doctest: +SKIP

Now ``z1["x"]`` and ``z2["x"]`` share the same memory. Updating the memoryview
contents on one (``z1["x"][:] = ...``) will immediately be reflected onto the
other. Setting a new value on either (``z1["x"] = ...``) will decouple them.

There are now two files on disk, ``/dev/shm/z1/x#0`` and ``/dev/shm/z2/x#0``,
which share the same inode. The memory is released when both z1 and z2 delete
the key.

.. note::
File names change every time you set a new value for a key; this prevents a
race condition when z1 is in the process of replacing ``x`` with an entirely
new value while z2 acquires it.

You may also use link() to create aliases to its own data.
This reads x back into memory and then writes a deep copy of it into y::

>>> z["y"] = z["x"] # doctest: +SKIP

This creates a second, shallow reference to x and is the same as writing
``z["y"] = z["x"]`` on a regular in-memory dict::

>>> z.link("y", z.get_path("x")) # doctest: +SKIP
"""
self.discard(key)
fn = self._safe_key(key)
os.link(path, os.path.join(self.directory, fn))
self.filenames[key] = fn
20 changes: 12 additions & 8 deletions zict/lru.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,20 @@ def __setitem__(self, key: KT, value: VT) -> None:
try:
self.evict_until_below_target()
except Exception:
if self.weights.get(key, 0) > self.n and key not in self.heavy:
# weight(value) > n and evicting the key we just inserted failed.
# Evict the rest of the LRU instead.
try:
while len(self.d) > 1:
self.evict()
except Exception:
pass
self._setitem_exception(key)
raise

@locked
def _setitem_exception(self, key: KT) -> None:
if self.weights.get(key, 0) > self.n and key not in self.heavy:
# weight(value) > n and evicting the key we just inserted failed.
# Evict the rest of the LRU instead.
try:
while len(self.d) > 1:
self.evict()
except Exception:
pass

@locked
def set_noevict(self, key: KT, value: VT) -> None:
"""Variant of ``__setitem__`` that does not evict if the total weight exceeds n.
Expand Down
6 changes: 2 additions & 4 deletions zict/tests/test_async_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ async def test_close_during_evict(check_thread_leaks):

@pytest.mark.asyncio
async def test_close_during_get(check_thread_leaks):
buff = AsyncBuffer({}, utils_test.SlowDict(0.01), n=100)
buff.slow.data.update({i: i for i in range(100)})
buff = AsyncBuffer({}, utils_test.SlowDict(0.01, {i: i for i in range(100)}), n=100)
assert len(buff) == 100
assert not buff.fast

Expand Down Expand Up @@ -199,8 +198,7 @@ def __getitem__(self, key):
time.sleep(0.01)
return super().__getitem__(key)

with AsyncBuffer({}, Slow(), n=100) as buff:
buff.slow.update({i: i for i in range(100)})
with AsyncBuffer({}, Slow({i: i for i in range(100)}), n=100) as buff:
assert len(buff) == 100

future = buff.async_get(list(range(100)), missing=missing)
Expand Down
Loading