Skip to content

feat(cache): create supervisor cache #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions config.example.ini
Original file line number Diff line number Diff line change
@@ -35,19 +35,12 @@
# Maximum combined memory usage of all workers
#max_memory = 500 MiB

[cache_package]
[cache]
# Maximum cache size
#size = 100 MiB
#size = 1 GiB

# The path of the cache (relative or absolute)
#path = cache/packages

[cache_repo_index]
# Maximum cache size
#size = 200 MiB

# The path of the cache (relative or absolute)
#path = cache/repo_index
#path = cache

[collector]
# Local directory where to look for packages
1 change: 1 addition & 0 deletions questionpy_common/constants.py
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
# General.
KiB: Final[int] = 1024
MiB: Final[int] = 1024 * KiB
GiB: Final[int] = 1024 * MiB

# Request.
MAX_PACKAGE_SIZE: Final[ByteSize] = ByteSize(20 * MiB)
145 changes: 92 additions & 53 deletions questionpy_server/cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# This file is part of the QuestionPy Server. (https://questionpy.org)
# The QuestionPy Server is free software released under terms of the MIT license. See LICENSE.md.
# (c) Technische Universität Berlin, innoCampus <[email protected]>

import logging
from asyncio import Lock, to_thread
from collections import OrderedDict
@@ -14,11 +13,17 @@
from collections.abc import Awaitable, Callable


_log = logging.getLogger(__name__)


class File(NamedTuple):
path: Path
size: int


type OnRemoveCallback = Callable[[str], Awaitable[None]]


class CacheItemTooLargeError(Exception):
def __init__(self, key: str, actual_size: int, max_size: int):
readable_actual = ByteSize(actual_size).human_readable()
@@ -32,38 +37,40 @@ def __init__(self, key: str, actual_size: int, max_size: int):
self.actual_size = actual_size


class FileLimitLRU:
"""Limit file cache size, evicting the least recently accessed file when the specified maximum is exceeded.
class LRUCacheSupervisor:
"""Supervises multiple file caches living in subdirectories of the given directory.
Only `bytes` type values are accepted. Their size is calculated by passing them into the builtin `len()` function.
"""

def __init__(self, directory: Path, max_size: int, extension: str | None = None, name: str | None = None) -> None:
"""A cache should be initialised while starting a server therefore it is not necessary for it to be async."""
self._extension: str = "" if extension is None else "." + extension.lstrip(".")
self._tmp_extension: str = ".tmp"
if self._extension == self._tmp_extension:
msg = f'Extension cannot be "{self._tmp_extension}" as it is used internally.'
raise ValueError(msg)

async def on_remove(_key: str) -> None:
pass
It evicts the least recently accessed file when the specified maximum is exceeded.
self.on_remove: Callable[[str], Awaitable[None]] = on_remove
"""Callback which fires on every removal of a file."""

self.directory: Path = directory
Only `bytes` type values are accepted. Their size is calculated by passing them into the builtin `len()`
function.
"""

def __init__(self, directory: Path, max_size: int) -> None:
self.directory = directory
self.max_size = max_size

self._lock = Lock()
self._tmp_extension = ".tmp"
self._total_size: int = 0
self._files: OrderedDict[Path, File] = OrderedDict()
self._on_remove_callbacks: dict[Path, OnRemoveCallback] = {}

self._name = name or "Cache"
_log.info("Initialized at '%s' with a maximum size of %s.", directory, ByteSize(max_size).human_readable())

self._files: OrderedDict[str, File] = OrderedDict()
def set_on_remove_callback(self, subdirectory: Path, callback: OnRemoveCallback) -> None:
self._on_remove_callbacks[subdirectory] = callback

self._lock = Lock()
def register(self, subdirectory: Path) -> None:
"""Registers a subdirectory.
Creates the subdirectory if it does not exist and removes containing files if the cache is too full and if
they have the temporary file extension.
"""
directory = self.directory / subdirectory
directory.mkdir(exist_ok=True)

for path in self.directory.iterdir():
for path in directory.iterdir():
if not path.is_file():
continue

@@ -80,19 +87,9 @@ async def on_remove(_key: str) -> None:
continue

self._total_size = total
self._files[path.stem] = File(path, size)

log = logging.getLogger("questionpy-server")
log.info(
"%s initialised at %s with %d file(s) and %s/%s.",
self._name,
self.directory,
len(self._files),
ByteSize(self._total_size).human_readable(),
ByteSize(self.max_size).human_readable(),
)
self._files[subdirectory / path.stem] = File(path, size)

def contains(self, key: str) -> bool:
def contains(self, key: Path) -> bool:
"""Checks if the file exists in cache.
Additionally, the file is placed to the end ensuring that it is the most recent accessed file.
@@ -103,34 +100,36 @@ def contains(self, key: str) -> bool:
self._files.move_to_end(key)
return True

def _get_file(self, key: str) -> File:
def _get_file(self, key: Path) -> File:
if not self.contains(key):
raise FileNotFoundError

return self._files[key]

def get(self, key: str) -> Path:
def get(self, key: Path) -> Path:
"""Returns path of the file in the cache.
Raises:
FileNotFoundError: If the file does not exist in the cache.
"""
return self._get_file(key).path

async def _remove(self, key: str) -> None:
async def _remove(self, key: Path) -> None:
file = self._get_file(key)
await to_thread(file.path.unlink, missing_ok=True)
self._total_size -= file.size
del self._files[key]

await self.on_remove(key)
cache, real_key = key.parent, key.stem
if cache in self._on_remove_callbacks:
await self._on_remove_callbacks[cache](real_key)

async def remove(self, key: str) -> None:
async def remove(self, key: Path) -> None:
"""Removes file from the cache and the filesystem."""
async with self._lock:
await self._remove(key)

async def put(self, key: str, value: bytes) -> Path:
async def put(self, key: Path, value: bytes, extension: str) -> Path:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Das war zwar schon vorher so, aber mir fällt gerade auf, dass beim Fehlschlagen von tmp_path.write_bytes die Datei noch gelöscht werden sollte. Sie könnte ja unvollständig angelegt worden sein, weil auf der Partition der Speicher volllief.
Außerdem sollte der Vorfall als error geloggt werden.

Copy link
Contributor Author

@janbritz janbritz Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stimmt, es wird jetzt sowohl beim Initialisieren des Cache-Supervisors und falls nicht alles Bytes geschrieben werden konnten, geloggt (199437f). Außerdem wird dann auch die Datei gelöscht (6c4a0c2).

"""Puts a file in the cache and the filesystem.
The internal `._total_bytes` attribute is updated.
@@ -151,14 +150,22 @@ async def put(self, key: str, value: bytes) -> Path:
if size > self.max_size:
# If we allowed this, the loop at the end would remove all items from the dictionary,
# so we raise an error to allow exceptions for this case.
raise CacheItemTooLargeError(key, size, self.max_size)
raise CacheItemTooLargeError(key.stem, size, self.max_size)

async with self._lock:
# Save the bytes on filesystem.
path = self.directory / (key + self._extension)
path = self.directory / (key.with_suffix(extension))
tmp_path = path.parent / (path.name + self._tmp_extension)

if size != await to_thread(tmp_path.write_bytes, value):
written_bytes = await to_thread(tmp_path.write_bytes, value)
if size != written_bytes:
tmp_path.unlink(missing_ok=True)
_log.error(
"Failed to write all bytes (%s/%s) to file '%s'.",
ByteSize(written_bytes).human_readable(),
ByteSize(size).human_readable(),
tmp_path,
)
msg = "Failed to write bytes to file"
raise OSError(msg)

@@ -185,14 +192,46 @@ def total_size(self) -> int:
return self._total_size

@property
def space_left(self) -> int:
return self.max_size - self._total_size
def files(self) -> OrderedDict[Path, File]:
return self._files


class LRUCache:
"""A file-based LRU cache which is supervised by a `LRUCacheSupervisor` instance.
The cached files are stored in a subdirectory of the `LRUCacheSupervisor`.
Look at the `LRUCacheSupervisor` class for more information about the methods.
"""

def __init__(self, supervisor: LRUCacheSupervisor, subdirectory: Path, extension: str = "") -> None:
self._supervisor = supervisor
self._subdirectory = subdirectory
self._extension = extension

self._supervisor.register(self._subdirectory)

def set_on_remove_callback(self, callback: OnRemoveCallback) -> None:
self._supervisor.set_on_remove_callback(self._subdirectory, callback)

async def put(self, key: str, value: bytes) -> Path:
return await self._supervisor.put(self._subdirectory / key, value, self._extension)

def get(self, key: str) -> Path:
return self._supervisor.get(self._subdirectory / key)

async def remove(self, key: str) -> None:
await self._supervisor.remove(self._subdirectory / key)

def contains(self, key: str) -> bool:
return self._supervisor.contains(self._subdirectory / key)

@property
def files(self) -> OrderedDict[str, File]:
"""Dictionary of all files in the cache where the key is the hash of the file.
def files(self) -> dict[str, File]:
return {
key.stem: file for key, file in self._supervisor.files.items() if key.is_relative_to(self._subdirectory)
}

Returns:
A copy of the internal dictionary.
"""
return self._files.copy()
@property
def directory(self) -> Path:
return self._supervisor.directory / self._subdirectory
8 changes: 4 additions & 4 deletions questionpy_server/collector/_package_collection.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
from pydantic import HttpUrl

from questionpy_server import WorkerPool
from questionpy_server.cache import FileLimitLRU
from questionpy_server.cache import LRUCache
from questionpy_server.collector.indexer import Indexer
from questionpy_server.collector.lms_collector import LMSCollector
from questionpy_server.collector.local_collector import LocalCollector
@@ -31,8 +31,8 @@ def __init__(
self,
local_dir: Path | None,
repos: dict[HttpUrl, timedelta],
repo_index_cache: FileLimitLRU,
package_cache: FileLimitLRU,
repo_index_cache: LRUCache,
package_cache: LRUCache,
worker_pool: WorkerPool,
):
self._indexer = Indexer(worker_pool)
@@ -50,7 +50,7 @@ def __init__(
self._collectors.append(self._lms_collector)

# Update indexer if package in cache gets removed.
package_cache.on_remove = self._unregister_package_from_index
package_cache.set_on_remove_callback(self._unregister_package_from_index)

async def start(self) -> None:
"""Starts the package collection."""
6 changes: 3 additions & 3 deletions questionpy_server/collector/abc.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from pathlib import Path
from typing import TYPE_CHECKING

from questionpy_server.cache import FileLimitLRU
from questionpy_server.cache import LRUCache

if TYPE_CHECKING:
from questionpy_server.collector.indexer import Indexer
@@ -56,8 +56,8 @@ async def get_path(self, package: "Package") -> Path:
class CachedCollector(BaseCollector, ABC):
"""A collector that caches retrieved packages locally."""

_cache: FileLimitLRU
_cache: LRUCache

def __init__(self, cache: FileLimitLRU, indexer: "Indexer"):
def __init__(self, cache: LRUCache, indexer: "Indexer"):
super().__init__(indexer=indexer)
self._cache = cache
4 changes: 2 additions & 2 deletions questionpy_server/collector/lms_collector.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from pathlib import Path
from typing import TYPE_CHECKING

from questionpy_server.cache import FileLimitLRU
from questionpy_server.cache import LRUCache
from questionpy_server.collector.abc import CachedCollector
from questionpy_server.worker.runtime.messages import BaseWorkerError

@@ -24,7 +24,7 @@ class LMSCollector(CachedCollector):
a cache, and can be retrieved exclusively by their hash.
"""

def __init__(self, cache: FileLimitLRU, indexer: "Indexer"):
def __init__(self, cache: LRUCache, indexer: "Indexer"):
super().__init__(cache=cache, indexer=indexer)

async def start(self) -> None:
6 changes: 3 additions & 3 deletions questionpy_server/collector/repo_collector.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
from pathlib import Path
from typing import TYPE_CHECKING

from questionpy_server.cache import FileLimitLRU
from questionpy_server.cache import LRUCache
from questionpy_server.collector.abc import CachedCollector
from questionpy_server.repository import RepoMeta, RepoPackage, Repository
from questionpy_server.repository.helper import DownloadError
@@ -29,8 +29,8 @@ def __init__(
self,
url: str,
update_interval: timedelta,
package_cache: FileLimitLRU,
repo_index_cache: FileLimitLRU,
package_cache: LRUCache,
repo_index_cache: LRUCache,
indexer: "Indexer",
):
super().__init__(cache=package_cache, indexer=indexer)
4 changes: 2 additions & 2 deletions questionpy_server/repository/__init__.py
Original file line number Diff line number Diff line change
@@ -7,14 +7,14 @@
from gzip import decompress
from urllib.parse import urljoin

from questionpy_server.cache import CacheItemTooLargeError, FileLimitLRU
from questionpy_server.cache import CacheItemTooLargeError, LRUCache
from questionpy_server.repository.helper import download
from questionpy_server.repository.models import RepoMeta, RepoPackage, RepoPackageIndex
from questionpy_server.utils.logger import URLAdapter


class Repository:
def __init__(self, url: str, cache: FileLimitLRU):
def __init__(self, url: str, cache: LRUCache):
self._url_base = url
self._url_index = urljoin(self._url_base, "PACKAGES.json.gz")
self._url_meta = urljoin(self._url_base, "META.json")
21 changes: 5 additions & 16 deletions questionpy_server/settings.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
SettingsConfigDict,
)

from questionpy_common.constants import MAX_PACKAGE_SIZE, MiB
from questionpy_common.constants import MAX_PACKAGE_SIZE, GiB, MiB
from questionpy_server.worker import Worker
from questionpy_server.worker.impl.subprocess import SubprocessWorker

@@ -106,19 +106,9 @@ def _load_worker_class(cls, value: object) -> builtins.type[Worker]:
return value


class PackageCacheSettings(BaseModel):
size: ByteSize = ByteSize(100 * MiB)
directory: DirectoryPath = Path("cache/packages").resolve()

@field_validator("directory")
@classmethod
def resolve_path(cls, value: Path) -> Path:
return value.resolve()


class RepoIndexCacheSettings(BaseModel):
size: ByteSize = ByteSize(200 * MiB)
directory: DirectoryPath = Path("cache/repo_index").resolve()
class CacheSettings(BaseModel):
size: ByteSize = ByteSize(1 * GiB)
directory: DirectoryPath = Path("cache").resolve()

@field_validator("directory")
@classmethod
@@ -268,8 +258,7 @@ class Settings(BaseSettings):
general: GeneralSettings
webservice: WebserviceSettings
worker: WorkerSettings
cache_package: PackageCacheSettings
cache_repo_index: RepoIndexCacheSettings
cache: CacheSettings
collector: CollectorSettings
auth: AuthSettings

12 changes: 5 additions & 7 deletions questionpy_server/web/app.py
Original file line number Diff line number Diff line change
@@ -4,12 +4,13 @@
import asyncio
import logging
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any, ClassVar

from aiohttp import web

from questionpy_server import __version__
from questionpy_server.cache import FileLimitLRU
from questionpy_server.cache import LRUCache, LRUCacheSupervisor
from questionpy_server.collector import PackageCollection
from questionpy_server.settings import Settings
from questionpy_server.web.middlewares import middlewares
@@ -34,12 +35,9 @@ def __init__(self, settings: Settings):
settings.worker.max_workers, settings.worker.max_memory, worker_type=settings.worker.type
)

self.package_cache = FileLimitLRU(
settings.cache_package.directory, settings.cache_package.size, extension=".qpy", name="PackageCache"
)
self.repo_index_cache = FileLimitLRU(
settings.cache_repo_index.directory, settings.cache_repo_index.size, name="RepoIndexCache"
)
cache_supervisor = LRUCacheSupervisor(settings.cache.directory, settings.cache.size)
self.package_cache = LRUCache(cache_supervisor, Path("packages"), extension=".qpy")
self.repo_index_cache = LRUCache(cache_supervisor, Path("repo_index"))

self.package_collection = PackageCollection(
settings.collector.local_directory,
6 changes: 2 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -18,10 +18,9 @@
from questionpy_server.hash import calculate_hash
from questionpy_server.settings import (
AuthSettings,
CacheSettings,
CollectorSettings,
GeneralSettings,
PackageCacheSettings,
RepoIndexCacheSettings,
Settings,
WebserviceSettings,
WorkerSettings,
@@ -127,8 +126,7 @@ def qpy_server(tmp_path_factory: pytest.TempPathFactory) -> QPyServer:
general=GeneralSettings(),
webservice=WebserviceSettings(listen_address="127.0.0.1", listen_port=0),
worker=WorkerSettings(type=ThreadWorker),
cache_package=PackageCacheSettings(directory=tmp_path_factory.mktemp("qpy_package_cache")),
cache_repo_index=RepoIndexCacheSettings(directory=tmp_path_factory.mktemp("qpy_repo_index_cache")),
cache=CacheSettings(directory=tmp_path_factory.mktemp("qpy_cache")),
collector=CollectorSettings(),
auth=AuthSettings(enabled=False),
)
23 changes: 15 additions & 8 deletions tests/questionpy_server/collector/test_lms_collector.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
# This file is part of the QuestionPy Server. (https://questionpy.org)
# The QuestionPy Server is free software released under terms of the MIT license. See LICENSE.md.
# (c) Technische Universität Berlin, innoCampus <info@isis.tu-berlin.de>

from unittest.mock import patch

import pytest
from _pytest.tmpdir import TempPathFactory

from questionpy_common.constants import KiB
from questionpy_server import WorkerPool
from questionpy_server.cache import FileLimitLRU
from questionpy_server.cache import LRUCache, LRUCacheSupervisor
from questionpy_server.collector.indexer import Indexer
from questionpy_server.collector.lms_collector import LMSCollector
from questionpy_server.hash import HashContainer, calculate_hash
@@ -18,18 +17,26 @@
from tests.conftest import PACKAGE


def create_lms_collector(
tmp_path_factory: TempPathFactory, worker_pool: WorkerPool
) -> tuple[LMSCollector, FileLimitLRU]:
def create_lms_collector(tmp_path_factory: TempPathFactory, worker_pool: WorkerPool) -> tuple[LMSCollector, LRUCache]:
"""Creates and returns a local collector along with the cache it is using."""
path = tmp_path_factory.mktemp("qpy")
cache = FileLimitLRU(path, 100 * KiB, extension=".qpy")
supervisor_path = tmp_path_factory.mktemp("qpy")
supervisor_cache = LRUCacheSupervisor(supervisor_path, 100 * KiB)

cache_path = supervisor_path / "packages"
cache_path.mkdir()
cache = LRUCache(supervisor_cache, cache_path, extension=".qpy")

indexer = Indexer(worker_pool)
return LMSCollector(cache, indexer), cache


async def test_package_in_cache_before_init(tmp_path_factory: TempPathFactory) -> None:
cache = FileLimitLRU(tmp_path_factory.mktemp("qpy"), 100 * KiB, extension=".qpy")
supervisor_path = tmp_path_factory.mktemp("qpy")
supervisor_cache = LRUCacheSupervisor(supervisor_path, 100 * KiB)

cache_path = supervisor_path / "packages"
cache_path.mkdir()
cache = LRUCache(supervisor_cache, cache_path, extension=".qpy")

# Put package into cache.
await cache.put(PACKAGE.hash, PACKAGE.path.read_bytes())
11 changes: 8 additions & 3 deletions tests/questionpy_server/collector/test_package_collection.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
from _pytest.tmpdir import TempPathFactory
from semver import VersionInfo

from questionpy_server.cache import FileLimitLRU
from questionpy_server.cache import LRUCache, LRUCacheSupervisor
from questionpy_server.collector import PackageCollection
from questionpy_server.collector.indexer import Indexer
from questionpy_server.collector.lms_collector import LMSCollector
@@ -90,10 +90,15 @@ def test_get_packages() -> None:


async def test_notify_indexer_on_cache_deletion(tmp_path_factory: TempPathFactory) -> None:
cache = FileLimitLRU(tmp_path_factory.mktemp("qpy"), 100)
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("qpy"), 100)
cache_path = supervisor.directory / "packages"
cache_path.mkdir()
cache = LRUCache(supervisor, cache_path, extension=".qpy")
await cache.put("hash", b"")

PackageCollection(None, {}, Mock(), cache, Mock())

# The callback should unregister the package from the indexer.
with patch.object(Indexer, "unregister_package") as unregister_package:
await cache.on_remove("hash")
await cache.remove("hash")
unregister_package.assert_called_once()
12 changes: 8 additions & 4 deletions tests/questionpy_server/repository/test_repository.py
Original file line number Diff line number Diff line change
@@ -4,14 +4,15 @@

import logging
from gzip import compress
from pathlib import Path
from unittest.mock import ANY, Mock, patch
from urllib.parse import urljoin

import pytest
from _pytest.tmpdir import TempPathFactory

from questionpy_common.constants import KiB
from questionpy_server.cache import CacheItemTooLargeError, FileLimitLRU
from questionpy_server.cache import CacheItemTooLargeError, LRUCache, LRUCacheSupervisor
from questionpy_server.repository import RepoMeta, RepoPackage, RepoPackageIndex, Repository
from questionpy_server.utils.manifest import ComparableManifest
from tests.test_data.factories import ManifestFactory, RepoMetaFactory, RepoPackageVersionsFactory
@@ -53,7 +54,8 @@ async def test_get_meta() -> None:


async def test_get_packages(tmp_path_factory: TempPathFactory) -> None:
repository = Repository(REPO_URL, FileLimitLRU(tmp_path_factory.mktemp("qpy"), 100 * KiB))
supervisor_cache = LRUCacheSupervisor(tmp_path_factory.mktemp("qpy"), 100 * KiB)
repository = Repository(REPO_URL, LRUCache(supervisor_cache, Path("repo_index")))

package_index = RepoPackageIndex(packages=[REPO_PACKAGE_VERSIONS_0, REPO_PACKAGE_VERSIONS_1])

@@ -86,7 +88,8 @@ async def test_get_packages(tmp_path_factory: TempPathFactory) -> None:


async def test_get_packages_cached(tmp_path_factory: TempPathFactory) -> None:
cache = FileLimitLRU(tmp_path_factory.mktemp("qpy"), 100 * KiB)
supervisor_cache = LRUCacheSupervisor(tmp_path_factory.mktemp("qpy"), 100 * KiB)
cache = LRUCache(supervisor_cache, Path("repo_index"))
repository = Repository(REPO_URL, cache)
package_index = RepoPackageIndex(packages=[REPO_PACKAGE_VERSIONS_0])

@@ -113,7 +116,8 @@ async def test_get_packages_cached(tmp_path_factory: TempPathFactory) -> None:
async def test_log_warning_when_package_index_is_too_big_for_cache(
tmp_path_factory: TempPathFactory, caplog: pytest.LogCaptureFixture
) -> None:
cache = FileLimitLRU(tmp_path_factory.mktemp("qpy"), 100 * KiB)
supervisor_cache = LRUCacheSupervisor(tmp_path_factory.mktemp("qpy"), 100 * KiB)
cache = LRUCache(supervisor_cache, Path("repo_index"))
repository = Repository(REPO_URL, cache)
package_index = RepoPackageIndex(packages=[REPO_PACKAGE_VERSIONS_0])

311 changes: 132 additions & 179 deletions tests/questionpy_server/test_cache.py
Original file line number Diff line number Diff line change
@@ -2,240 +2,193 @@
# The QuestionPy Server is free software released under terms of the MIT license. See LICENSE.md.
# (c) Technische Universität Berlin, innoCampus <info@isis.tu-berlin.de>

from dataclasses import dataclass
from pathlib import Path
from string import ascii_lowercase
from typing import NamedTuple
from unittest.mock import patch
from unittest.mock import AsyncMock, patch

import pytest
from _pytest.tmpdir import TempPathFactory

from questionpy_server.cache import CacheItemTooLargeError, FileLimitLRU
from questionpy_server.cache import CacheItemTooLargeError, LRUCache, LRUCacheSupervisor


@dataclass
class ItemSettings:
size_per_item: int
num_of_items: int
def get_file_count(directory: Path) -> int:
"""Counts files in a directory."""
return len([file for file in directory.iterdir() if file.is_file()])

def __post_init__(self) -> None:
self.list = [(char, self.size_per_item * char.encode()) for char in ascii_lowercase[: self.num_of_items]]
self.total_size = self.size_per_item * self.num_of_items

def get_directory_size(directory: Path) -> int:
"""Calculates directory size."""
return sum(file.stat().st_size for file in directory.iterdir() if file.is_file())

class CacheSettings(NamedTuple):
size: int
directory: Path

def write_data(to: Path, amount: int) -> None:
to.parent.mkdir(exist_ok=True)
to.write_bytes(b"." * amount)

class Settings(NamedTuple):
cache: CacheSettings
items: ItemSettings

def test_init_removes_files_if_cache_is_full(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 2)

@pytest.fixture
def settings(tmp_path_factory: TempPathFactory) -> Settings:
return Settings(
cache=CacheSettings(
size=100,
directory=tmp_path_factory.mktemp("qpy"),
),
items=ItemSettings(size_per_item=15, num_of_items=6),
)
cache_subdirectory = Path("cache")
cache_path = supervisor.directory / cache_subdirectory

write_data(cache_path / "A", 1)
write_data(cache_path / "B", 2)

def write_files_to_directory(files: list[tuple[str, bytes]], directory: Path) -> None:
"""Writes files onto a specific directory on the filesystem.
cache = LRUCache(supervisor, cache_subdirectory)

Args:
files (List[Tuple[str, bytes]]): files to be written
directory (Path): where files should be created
"""
for file, content in files:
file_path = directory / file
file_path.write_bytes(content)
assert get_file_count(cache_path) == 1
assert get_directory_size(cache.directory) == supervisor.total_size
assert 0 < supervisor.total_size <= 2
assert {True, False} == {cache.contains("A"), cache.contains("B")}


def get_file_count(directory: Path) -> int:
"""Counts files in a directory.
def test_supervisor_creates_cache_directory_in_supervisor_directory(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 1)

Args:
directory (Path): of which to get the file count
Returns:
count of files in directory
"""
return len([file for file in directory.iterdir() if file.is_file()])
cache_subdirectory = Path("cache")
cache = LRUCache(supervisor, cache_subdirectory)

assert cache.directory == supervisor.directory / cache_subdirectory
assert cache.directory.is_dir()

def get_directory_size(directory: str) -> int:
"""Calculates directory size.

Args:
directory (str): of which to get the size
Returns:
size of directory
"""
return sum(file.stat().st_size for file in Path(directory).iterdir() if file.is_file())
def test_init_removes_files_with_temporary_extension(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 1)

cache_subdirectory = Path("cache")

@pytest.fixture
def path_with_too_many_bytes(tmp_path_factory: TempPathFactory, settings: Settings) -> Path:
directory = tmp_path_factory.mktemp("qpy")
write_files_to_directory(settings.items.list, directory)
file_path = supervisor.directory / cache_subdirectory / "A.tmp"
write_data(file_path, 1)

large_item_path = directory / "large_item"
large_item_path.write_bytes(b"." * (settings.cache.size - settings.items.total_size + 1))
LRUCache(supervisor, cache_subdirectory)

return directory
assert not file_path.is_file()
assert supervisor.total_size == 0


@pytest.fixture
def cache(settings: Settings) -> FileLimitLRU:
write_files_to_directory(settings.items.list, Path(settings.cache.directory))
return FileLimitLRU(settings.cache.directory, settings.cache.size)
async def test_put_with_multiple_caches(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 2)

cache_1_subdirectory = Path("cache_1")
cache_2_subdirectory = Path("cache_2")

def test_init(cache: FileLimitLRU, settings: Settings, path_with_too_many_bytes: Path) -> None:
assert cache.total_size == settings.items.total_size
assert cache.space_left == settings.cache.size - settings.items.total_size
assert get_file_count(settings.cache.directory) == settings.items.num_of_items
cache_1 = LRUCache(supervisor, cache_1_subdirectory)
cache_2 = LRUCache(supervisor, cache_2_subdirectory)

# Existing path contains more bytes than the cache can hold.
small_cache = FileLimitLRU(path_with_too_many_bytes, settings.cache.size)
assert small_cache.total_size <= settings.cache.size
assert get_directory_size(str(path_with_too_many_bytes)) <= settings.cache.size
await cache_1.put("A", b"A")
await cache_2.put("B", b"B")

# Ignore directories.
(Path(settings.cache.directory) / "test_dir").mkdir()
FileLimitLRU(settings.cache.directory, settings.cache.size)
assert cache_1.contains("A")
assert not cache_1.contains("B")
assert cache_1.files.keys() == {"A"}
assert (supervisor.directory / cache_1_subdirectory / "A").is_file()

# Remove files with temporary extension.
tmp_file = Path(settings.cache.directory) / ("file.txt" + cache._tmp_extension)
tmp_file.write_bytes(b".")
new_cache = FileLimitLRU(settings.cache.directory, settings.cache.size)
assert not tmp_file.is_file()
assert get_file_count(settings.cache.directory) == settings.items.num_of_items
assert new_cache.total_size == settings.items.total_size
assert cache_2.contains("B")
assert not cache_2.contains("A")
assert cache_2.files.keys() == {"B"}
assert (supervisor.directory / cache_2_subdirectory / "B").is_file()

assert supervisor.total_size == 2

async def test_remove(cache: FileLimitLRU, settings: Settings) -> None:
# Remove a file.
file, _ = settings.items.list[0]
await cache.remove(file)
assert not (cache.directory / file).is_file()
expected_total_size = settings.items.total_size - settings.items.size_per_item
assert cache.total_size == expected_total_size

# Removing a file should fire the callback.
with patch.object(cache, "on_remove") as mock:
file, _ = settings.items.list[-1]
await cache.remove(file)
expected_total_size = settings.items.total_size - 2 * settings.items.size_per_item
assert not (cache.directory / file).is_file()
assert cache.total_size == expected_total_size
mock.assert_called_once()
async def test_put_with_data_bigger_than_capacity_raises(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 1)

# Remove previously removed file.
with pytest.raises(FileNotFoundError):
await cache.remove(file)
assert cache.total_size == expected_total_size
cache_subdirectory = Path("cache")
cache = LRUCache(supervisor, cache_subdirectory)

# Remove not existing file.
with pytest.raises(FileNotFoundError):
await cache.remove("doesnotexist")
assert cache.total_size == expected_total_size
with pytest.raises(CacheItemTooLargeError):
await cache.put("A", b"..")

assert not cache.contains("A")
assert supervisor.total_size == 0
assert not (supervisor.directory / cache_subdirectory / "A").is_file()

def test_get(cache: FileLimitLRU, settings: Settings) -> None:
# Get first added item.
file, content = settings.items.list[0]
path = cache.get(file)
assert path == Path(settings.cache.directory) / file
assert path.read_bytes() == content

# Get last added item.
file, content = settings.items.list[-1]
path = cache.get(file)
assert path == Path(settings.cache.directory) / file
assert path.read_bytes() == content
async def test_put_raises_if_written_bytes_does_not_match_expected_size(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 1)

cache_subdirectory = Path("cache")
cache = LRUCache(supervisor, cache_subdirectory)

with (
patch("pathlib.Path.write_bytes", return_value=-1),
pytest.raises(IOError, match="Failed to write bytes"),
):
await cache.put("B", b".")


# Get not existing item.
async def test_put_removes_lru_file_if_cache_is_full(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 1)

cache_1_subdirectory = Path("cache_1")
cache_2_subdirectory = Path("cache_2")

cache_1 = LRUCache(supervisor, cache_1_subdirectory)
cache_2 = LRUCache(supervisor, cache_2_subdirectory)

await cache_1.put("A", b"A")
await cache_2.put("B", b"B")

assert not cache_1.contains("A")
assert not (supervisor.directory / cache_1_subdirectory / "A").is_file()

assert cache_2.contains("B")
assert (supervisor.directory / cache_2_subdirectory / "B").is_file()

assert supervisor.total_size == 1


async def test_remove_raises_if_file_does_not_exist(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 1)
cache = LRUCache(supervisor, Path("cache"))
with pytest.raises(FileNotFoundError):
cache.get("doesnotexist")
await cache.remove("doesnotexist")


def test_contains(cache: FileLimitLRU, settings: Settings) -> None:
# Check first added file.
file, _ = settings.items.list[0]
assert cache.contains(file)
async def test_remove_fires_callback_if_file_is_removed(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 2)

# Check last added file.
file, _ = settings.items.list[-1]
assert cache.contains(file)
cache_1 = LRUCache(supervisor, Path("cache_1"))
cache_2 = LRUCache(supervisor, Path("cache_2"))

# Check not existing file.
assert not cache.contains("doesnotexist")
cache_1_callback = AsyncMock()
cache_2_callback = AsyncMock()

cache_1.set_on_remove_callback(cache_1_callback)
cache_2.set_on_remove_callback(cache_2_callback)

async def test_put(cache: FileLimitLRU, settings: Settings) -> None:
# Content type is not bytes.
with pytest.raises(TypeError):
await cache.put("new", "string") # type: ignore[arg-type]
assert cache.total_size == settings.items.total_size
assert get_file_count(settings.cache.directory) == settings.items.num_of_items
await cache_1.put("A", b"A")
await cache_2.put("B", b"B")

# Content size is bigger than cache size.
with pytest.raises(CacheItemTooLargeError):
await cache.put("new", b"." * (settings.cache.size + 1))

# Replace existing file.
file, _ = settings.items.list[0]
new_content = b"." * settings.items.size_per_item
await cache.put(file, new_content)
assert (Path(settings.cache.directory) / file).read_bytes() == new_content
assert cache.total_size == settings.items.total_size

# Put max sized content into cache.
file, content = "A", b"." * settings.cache.size
await cache.put(file, content)
assert (Path(settings.cache.directory) / file).is_file()
assert cache.total_size == settings.cache.size
assert get_file_count(settings.cache.directory) == 1

# Partially written file raises error.
with (
patch("questionpy_server.cache.Path.write_bytes", return_value=-1),
pytest.raises(IOError, match="Failed to write bytes"),
):
await cache.put("B", b".")
# LRU file will be removed.
await cache_1.put("C", b"C")
cache_1_callback.assert_called_once_with("A")
cache_2_callback.assert_not_called()

# Delete every file in directory.
for filepath in Path(settings.cache.directory).iterdir():
filepath.unlink()
await cache_2.remove("B")
cache_2_callback.assert_called_once_with("B")
cache_1_callback.assert_called_once()

# Remove the oldest file in cache.
filecount = 3
datasize = settings.cache.size // filecount
files, content = [str(i) for i in range(filecount)], b"." * datasize

for file in files:
await cache.put(file, content)
async def test_cache_with_file_extension(tmp_path_factory: TempPathFactory) -> None:
supervisor = LRUCacheSupervisor(tmp_path_factory.mktemp("supervisor"), 1)
cache = LRUCache(supervisor, Path("cache"), extension=".qpy")

for i in range(filecount):
# Add new file to cache and check if the oldest file is removed.
new_file = str(filecount + i + 1)
await cache.put(new_file, content)
callback = AsyncMock()
cache.set_on_remove_callback(callback)

assert cache.total_size == datasize * filecount
assert cache.total_size <= settings.cache.size
assert get_file_count(settings.cache.directory) == filecount
assert (Path(settings.cache.directory) / new_file).is_file()
assert not (Path(settings.cache.directory) / files[i]).is_file()
key = "A"
expected_path = supervisor.directory / "cache" / f"{key}.qpy"

await cache.put(key, b".")
assert cache.contains(key)
assert expected_path.is_file()
assert cache.get(key) == expected_path
assert cache.files.keys() == {key}

def test_get_files(cache: FileLimitLRU, settings: Settings) -> None:
# Check if cache.file is only a copy of the original dict.
assert cache.files == cache._files
assert cache.files is not cache._files
assert len(cache.files) == settings.items.num_of_items
await cache.remove(key)
callback.assert_called_once_with(key)
assert not cache.contains(key)
assert not expected_path.is_file()
3 changes: 1 addition & 2 deletions tests/questionpy_server/test_settings.py
Original file line number Diff line number Diff line change
@@ -24,8 +24,7 @@ def path_with_empty_config_file(tmp_path: Path) -> Path:
[general]
[webservice]
[worker]
[cache_package]
[cache_repo_index]
[cache]
[collector]
[auth]
""")