Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Improves performance of orm_cached with an in-memory cache #1029

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ repos:
- id: sass-lint
files: '^src/.*\.scss'
- repo: https://github.com/pre-commit/mirrors-eslint
rev: v8.48.0
rev: v8.50.0
hooks:
- id: eslint
files: '^src/.*\.jsx?$'
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ up in the release history, it needs to be written as follows:

<Optional Description>

TYPE: <Feature|Bugfix>
TYPE: <Feature|Bugfix|Performance>
LINK: <Ticket-Number>
HINT: <Optional upgrade hint>

Expand Down
6 changes: 2 additions & 4 deletions src/onegov/core/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,9 @@ def convert_xlsx_to_csv(
cell = sheet.cell(row, column)

if cell.value is None:
# TODO: Verify whether this actually can never be None,
# then we can skip this check
value = '' # type:ignore[unreachable]
value = ''
elif cell.data_type == 's':
value = cell.value
value = cell.value # type:ignore[assignment]
elif cell.data_type == 'n':
if (int_value := int(cell.value)) == cell.value: # type:ignore
value = str(int_value)
Expand Down
126 changes: 89 additions & 37 deletions src/onegov/core/orm/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ def users(self):
import inspect
import sqlalchemy

from onegov.core.orm.utils import make_detached
from sqlalchemy.orm.query import Query
from time import time


from typing import overload, Any, Generic, TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from sqlalchemy.orm import Session
from typing import Protocol
from typing_extensions import Self
from typing_extensions import Self, TypeGuard

from . import Base
from .session_manager import SessionManager
Expand All @@ -59,6 +61,8 @@ def __call__(
) -> 'OrmCacheDescriptor[_T]': ...

_T = TypeVar('_T')
_T1 = TypeVar('_T1')
unset = object()


class OrmCacheApp:
Expand All @@ -71,6 +75,8 @@ class OrmCacheApp:

"""

memory_cache: dict[str, tuple[float, Any]]

if TYPE_CHECKING:
# forward declare the attributes we need from Framework
session_manager: SessionManager
Expand All @@ -80,6 +86,16 @@ def cache(self) -> RedisCacheRegion: ...
request_cache: dict[str, Any]

def configure_orm_cache(self, **cfg: Any) -> None:
# this cache can live beyond the lifetime of a single request
# for complex collections of models orm_cached may invoke
# significant deserialization overhead, so it's better if we
# don't just cache this locally for the duration of a single
# request and instead span multiple requests. In order to
# avoid multiple processes from desyncing, we store a timestamp
# in the redis cache and memory cache which gets set every time
# the redis cache gets repopulated, so instances can pull the
# latest version
self.memory_cache = {}
self.is_orm_cache_setup = getattr(self, 'is_orm_cache_setup', False)

def setup_orm_cache(self) -> None:
Expand Down Expand Up @@ -137,6 +153,10 @@ def handle_orm_change(schema: str, obj: 'Base') -> None:
# Still, trust but verify:
assert self.schema == schema
self.cache.delete(descriptor.cache_key)
self.cache.delete(descriptor.ts_key)

if descriptor.cache_key in self.memory_cache:
del self.memory_cache[descriptor.cache_key]

if descriptor.cache_key in self.request_cache:
del self.request_cache[descriptor.cache_key]
Expand Down Expand Up @@ -179,6 +199,7 @@ def __init__(
):
self.cache_policy = cache_policy
self.cache_key = creator.__qualname__
self.ts_key = self.cache_key + '_ts'
self.creator = creator

def create(self, app: OrmCacheApp) -> _T:
Expand All @@ -197,19 +218,52 @@ def create(self, app: OrmCacheApp) -> _T:

return result

def merge(self, session: 'Session', obj: '_M') -> '_M':
def merge(self, session: 'Session', obj: _T1) -> _T1:
""" Merges the given obj into the given session, *if* this is possible.

That is it acts like more forgiving session.merge().

"""
if self.requires_merge(obj):
obj = session.merge(obj, load=False)
# the object might be already attached to a different session
# through the in-memory cache in which case we need to make the
# object detached
make_detached(obj)
obj = session.merge(obj, load=False) # type:ignore[assignment]
obj.is_cached = True

return obj

def requires_merge(self, obj: 'Base') -> bool:
def deep_merge(self, session: 'Session', obj: _T1) -> _T1:
""" Merges the given obj into the given session recursively, *if* this
is possible.

"""

# named tuples
if isinstance(obj, tuple) and hasattr(obj.__class__, '_make'):
obj = obj._make( # type:ignore[attr-defined]
self.deep_merge(session, o) for o in obj
)

# lists (we can save some memory here)
elif isinstance(obj, list):
for ix, o in enumerate(obj):
obj[ix] = self.deep_merge(session, o)

# generic iterables
elif isinstance(obj, (tuple, set)):
obj = obj.__class__( # type:ignore[assignment]
self.deep_merge(session, o) for o in obj
)

# generic objects
else:
obj = self.merge(session, obj)

return obj

def requires_merge(self, obj: object) -> 'TypeGuard[Base]':
""" Returns true if the given object requires a merge, which is the
case if the object is detached.

Expand Down Expand Up @@ -238,42 +292,40 @@ def load(self, app: OrmCacheApp) -> _T:
if session.dirty:
session.flush()

# we use a secondary request cache for even more lookup speed and to
# make sure that inside a request we always get the exact same instance
# (otherwise we don't see changes reflected)
if self.cache_key in app.request_cache:

# it is possible for objects in the request cache to become
# detached - in this case we need to merge them again
# (the merge function only does this if necessary)
return self.merge(session, app.request_cache[self.cache_key])
# the tertiary request cache is there for historical reasons, since the
# secondary cache used to be the request cache. in order to preserve
# the original behavior the request cache is still there
obj = app.request_cache.get(self.cache_key, unset)
if obj is unset:

else:
obj = app.cache.get_or_create(
key=self.cache_key,
creator=lambda: self.create(app)
# we use a secondary in-memory cache for even more lookup speed
ts, obj = app.memory_cache.get(
self.cache_key,
(float('-Inf'), unset)
)

# named tuples
if isinstance(obj, tuple) and hasattr(obj.__class__, '_make'):
obj = obj._make(self.merge(session, o) for o in obj) # type:ignore

# lists (we can save some memory here)
elif isinstance(obj, list):
for ix, o in enumerate(obj):
obj[ix] = self.merge(session, o)

# generic iterables
elif isinstance(obj, (tuple, set)):
obj = obj.__class__(self.merge(session, o) for o in obj)

# generic objects
else:
obj = self.merge(session, obj)

app.request_cache[self.cache_key] = obj

return obj
if obj is unset or ts != app.cache.get(key=self.ts_key):
def creator() -> _T:
obj = self.create(app)
# make sure the object gets flushed, before
# we potentially detach it from the session
session.flush()
return obj

# in-memory cache is no longer valid
obj = app.cache.get_or_create(
key=self.cache_key,
creator=lambda: self.create(app)
)
ts = app.cache.get_or_create(
key=self.ts_key,
creator=time
)
app.memory_cache[self.cache_key] = (ts, obj)

app.request_cache[self.cache_key] = obj

return self.deep_merge(session, obj)

# NOTE: Technically this descriptor should only work on
# applications that derive from OrmCacheApp, however
Expand Down
26 changes: 26 additions & 0 deletions src/onegov/core/orm/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from sqlalchemy.orm import attributes
from sqlalchemy.orm.session import _state_session # type:ignore[attr-defined]
from sqlalchemy_utils import QueryChain as QueryChainBase


from typing import TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
from typing_extensions import Self

from onegov.core.orm import Base

_T = TypeVar('_T')

# we have to forward declare the implementation, since QueryChainBase
Expand All @@ -26,3 +30,25 @@ def first(self) -> '_T | None':

def all(self) -> tuple['_T', ...]:
return tuple(self)


# TODO: This uses some sqlalchemy implementation details, so it's a little
# fragile, so it would be good if we could accomplish the same with
# a higher level API like sqlalchemy.inspect
def make_detached(instance: 'Base') -> None:
""" This is kind of like make_transient and make_transient_to_detached
without removing the instance's identity from the state, so mapped
attributes can still be loaded again after a subsequent merge.

"""
state = attributes.instance_state(instance) # type:ignore[attr-defined]
session = _state_session(state)
if session:
session._expunge_states([state])

state.expired_attributes.clear()
if state._deleted:
del state._deleted

state._commit_all(state.dict)
state._expire_attributes(state.dict, state.unloaded_expirable)
Loading