-
Notifications
You must be signed in to change notification settings - Fork 1
feat(cache): create supervisor cache #160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+286
−300
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
# This file is part of the QuestionPy Server. (https://questionpy.org) | ||
# The QuestionPy Server is free software released under terms of the MIT license. See LICENSE.md. | ||
# (c) Technische Universität Berlin, innoCampus <[email protected]> | ||
|
||
import logging | ||
from asyncio import Lock, to_thread | ||
from collections import OrderedDict | ||
|
@@ -14,11 +13,17 @@ | |
from collections.abc import Awaitable, Callable | ||
|
||
|
||
_log = logging.getLogger(__name__) | ||
|
||
|
||
class File(NamedTuple): | ||
path: Path | ||
size: int | ||
|
||
|
||
type OnRemoveCallback = Callable[[str], Awaitable[None]] | ||
|
||
|
||
class CacheItemTooLargeError(Exception): | ||
def __init__(self, key: str, actual_size: int, max_size: int): | ||
readable_actual = ByteSize(actual_size).human_readable() | ||
|
@@ -32,38 +37,40 @@ def __init__(self, key: str, actual_size: int, max_size: int): | |
self.actual_size = actual_size | ||
|
||
|
||
class FileLimitLRU: | ||
"""Limit file cache size, evicting the least recently accessed file when the specified maximum is exceeded. | ||
class LRUCacheSupervisor: | ||
"""Supervises multiple file caches living in subdirectories of the given directory. | ||
Only `bytes` type values are accepted. Their size is calculated by passing them into the builtin `len()` function. | ||
""" | ||
|
||
def __init__(self, directory: Path, max_size: int, extension: str | None = None, name: str | None = None) -> None: | ||
"""A cache should be initialised while starting a server therefore it is not necessary for it to be async.""" | ||
self._extension: str = "" if extension is None else "." + extension.lstrip(".") | ||
self._tmp_extension: str = ".tmp" | ||
if self._extension == self._tmp_extension: | ||
msg = f'Extension cannot be "{self._tmp_extension}" as it is used internally.' | ||
raise ValueError(msg) | ||
|
||
async def on_remove(_key: str) -> None: | ||
pass | ||
It evicts the least recently accessed file when the specified maximum is exceeded. | ||
self.on_remove: Callable[[str], Awaitable[None]] = on_remove | ||
"""Callback which fires on every removal of a file.""" | ||
|
||
self.directory: Path = directory | ||
Only `bytes` type values are accepted. Their size is calculated by passing them into the builtin `len()` | ||
function. | ||
""" | ||
|
||
def __init__(self, directory: Path, max_size: int) -> None: | ||
self.directory = directory | ||
self.max_size = max_size | ||
|
||
self._lock = Lock() | ||
self._tmp_extension = ".tmp" | ||
self._total_size: int = 0 | ||
self._files: OrderedDict[Path, File] = OrderedDict() | ||
self._on_remove_callbacks: dict[Path, OnRemoveCallback] = {} | ||
|
||
self._name = name or "Cache" | ||
_log.info("Initialized at '%s' with a maximum size of %s.", directory, ByteSize(max_size).human_readable()) | ||
|
||
self._files: OrderedDict[str, File] = OrderedDict() | ||
def set_on_remove_callback(self, subdirectory: Path, callback: OnRemoveCallback) -> None: | ||
self._on_remove_callbacks[subdirectory] = callback | ||
|
||
self._lock = Lock() | ||
def register(self, subdirectory: Path) -> None: | ||
"""Registers a subdirectory. | ||
Creates the subdirectory if it does not exist and removes containing files if the cache is too full and if | ||
they have the temporary file extension. | ||
""" | ||
directory = self.directory / subdirectory | ||
directory.mkdir(exist_ok=True) | ||
|
||
for path in self.directory.iterdir(): | ||
for path in directory.iterdir(): | ||
if not path.is_file(): | ||
continue | ||
|
||
|
@@ -80,19 +87,9 @@ async def on_remove(_key: str) -> None: | |
continue | ||
|
||
self._total_size = total | ||
self._files[path.stem] = File(path, size) | ||
|
||
log = logging.getLogger("questionpy-server") | ||
log.info( | ||
"%s initialised at %s with %d file(s) and %s/%s.", | ||
self._name, | ||
self.directory, | ||
len(self._files), | ||
ByteSize(self._total_size).human_readable(), | ||
ByteSize(self.max_size).human_readable(), | ||
) | ||
self._files[subdirectory / path.stem] = File(path, size) | ||
|
||
def contains(self, key: str) -> bool: | ||
def contains(self, key: Path) -> bool: | ||
"""Checks if the file exists in cache. | ||
Additionally, the file is placed to the end ensuring that it is the most recent accessed file. | ||
|
@@ -103,34 +100,36 @@ def contains(self, key: str) -> bool: | |
self._files.move_to_end(key) | ||
return True | ||
|
||
def _get_file(self, key: str) -> File: | ||
def _get_file(self, key: Path) -> File: | ||
if not self.contains(key): | ||
raise FileNotFoundError | ||
|
||
return self._files[key] | ||
|
||
def get(self, key: str) -> Path: | ||
def get(self, key: Path) -> Path: | ||
"""Returns path of the file in the cache. | ||
Raises: | ||
FileNotFoundError: If the file does not exist in the cache. | ||
""" | ||
return self._get_file(key).path | ||
|
||
async def _remove(self, key: str) -> None: | ||
async def _remove(self, key: Path) -> None: | ||
file = self._get_file(key) | ||
await to_thread(file.path.unlink, missing_ok=True) | ||
self._total_size -= file.size | ||
del self._files[key] | ||
|
||
await self.on_remove(key) | ||
cache, real_key = key.parent, key.stem | ||
if cache in self._on_remove_callbacks: | ||
await self._on_remove_callbacks[cache](real_key) | ||
|
||
async def remove(self, key: str) -> None: | ||
async def remove(self, key: Path) -> None: | ||
"""Removes file from the cache and the filesystem.""" | ||
async with self._lock: | ||
await self._remove(key) | ||
|
||
async def put(self, key: str, value: bytes) -> Path: | ||
async def put(self, key: Path, value: bytes, extension: str) -> Path: | ||
"""Puts a file in the cache and the filesystem. | ||
The internal `._total_bytes` attribute is updated. | ||
|
@@ -151,14 +150,22 @@ async def put(self, key: str, value: bytes) -> Path: | |
if size > self.max_size: | ||
# If we allowed this, the loop at the end would remove all items from the dictionary, | ||
# so we raise an error to allow exceptions for this case. | ||
raise CacheItemTooLargeError(key, size, self.max_size) | ||
raise CacheItemTooLargeError(key.stem, size, self.max_size) | ||
|
||
async with self._lock: | ||
# Save the bytes on filesystem. | ||
path = self.directory / (key + self._extension) | ||
path = self.directory / (key.with_suffix(extension)) | ||
tmp_path = path.parent / (path.name + self._tmp_extension) | ||
|
||
if size != await to_thread(tmp_path.write_bytes, value): | ||
written_bytes = await to_thread(tmp_path.write_bytes, value) | ||
if size != written_bytes: | ||
tmp_path.unlink(missing_ok=True) | ||
_log.error( | ||
"Failed to write all bytes (%s/%s) to file '%s'.", | ||
ByteSize(written_bytes).human_readable(), | ||
ByteSize(size).human_readable(), | ||
tmp_path, | ||
) | ||
msg = "Failed to write bytes to file" | ||
raise OSError(msg) | ||
|
||
|
@@ -185,14 +192,46 @@ def total_size(self) -> int: | |
return self._total_size | ||
|
||
@property | ||
def space_left(self) -> int: | ||
return self.max_size - self._total_size | ||
def files(self) -> OrderedDict[Path, File]: | ||
return self._files | ||
|
||
|
||
class LRUCache: | ||
"""A file-based LRU cache which is supervised by a `LRUCacheSupervisor` instance. | ||
The cached files are stored in a subdirectory of the `LRUCacheSupervisor`. | ||
Look at the `LRUCacheSupervisor` class for more information about the methods. | ||
""" | ||
|
||
def __init__(self, supervisor: LRUCacheSupervisor, subdirectory: Path, extension: str = "") -> None: | ||
self._supervisor = supervisor | ||
self._subdirectory = subdirectory | ||
self._extension = extension | ||
|
||
self._supervisor.register(self._subdirectory) | ||
|
||
def set_on_remove_callback(self, callback: OnRemoveCallback) -> None: | ||
self._supervisor.set_on_remove_callback(self._subdirectory, callback) | ||
|
||
async def put(self, key: str, value: bytes) -> Path: | ||
return await self._supervisor.put(self._subdirectory / key, value, self._extension) | ||
|
||
def get(self, key: str) -> Path: | ||
return self._supervisor.get(self._subdirectory / key) | ||
|
||
async def remove(self, key: str) -> None: | ||
await self._supervisor.remove(self._subdirectory / key) | ||
|
||
def contains(self, key: str) -> bool: | ||
return self._supervisor.contains(self._subdirectory / key) | ||
|
||
@property | ||
def files(self) -> OrderedDict[str, File]: | ||
"""Dictionary of all files in the cache where the key is the hash of the file. | ||
def files(self) -> dict[str, File]: | ||
return { | ||
key.stem: file for key, file in self._supervisor.files.items() if key.is_relative_to(self._subdirectory) | ||
} | ||
|
||
Returns: | ||
A copy of the internal dictionary. | ||
""" | ||
return self._files.copy() | ||
@property | ||
def directory(self) -> Path: | ||
return self._supervisor.directory / self._subdirectory |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Das war zwar schon vorher so, aber mir fällt gerade auf, dass beim Fehlschlagen von
tmp_path.write_bytes
die Datei noch gelöscht werden sollte. Sie könnte ja unvollständig angelegt worden sein, weil auf der Partition der Speicher volllief.Außerdem sollte der Vorfall als error geloggt werden.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stimmt, es wird jetzt sowohl beim Initialisieren des Cache-Supervisors und falls nicht alles Bytes geschrieben werden konnten, geloggt (199437f). Außerdem wird dann auch die Datei gelöscht (6c4a0c2).