Skip to content

Commit

Permalink
DatastoreStorage: add new ndb_context_kwargs constructor kwarg
Browse files Browse the repository at this point in the history
  • Loading branch information
snarfed committed Dec 18, 2024
1 parent 507956c commit 061d385
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ Optional, only used in [com.atproto.repo](https://arroba.readthedocs.io/en/stabl

_Breaking changes:_

* `datastore_storage`:
* `DatastoreStorage`: add new `ndb_context_kwargs` constructor kwarg.
* `repo`:
* `apply_commit`, `apply_writes`: raise an exception if the repo is inactive.
* `storage`:
Expand Down
14 changes: 8 additions & 6 deletions arroba/datastore_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,24 +402,26 @@ class DatastoreStorage(Storage):
See :class:`Storage` for method details.
"""
ndb_client = None
ndb_context_kwargs = None

def __init__(self, *, ndb_client=None):
def __init__(self, *, ndb_client=None, ndb_context_kwargs=None):
"""Constructor.
Args:
ndb_client (google.cloud.ndb.Client): used in :meth:`read_blocks_by_seq`;
it's used in the `subscribeRepos` event subscription, so lexrpc calls
it on a different thread, so it needs its own ndb client context.
ndb_client (google.cloud.ndb.Client): used when there isn't already
an ndb context active
ndb_context_kwargs (dict): optional, used when creating a new ndb context
"""
super().__init__()
self.ndb_client = ndb_client
self.ndb_context_kwargs = ndb_context_kwargs or {}

def ndb_context(fn):
@wraps(fn)
def decorated(self, *args, **kwargs):
ctx = context.get_context(raise_context_error=False)

with ctx.use() if ctx else self.ndb_client.context():
with ctx.use() if ctx else self.ndb_client.context(**self.ndb_context_kwargs):
ret = fn(self, *args, **kwargs)

return ret
Expand Down Expand Up @@ -541,7 +543,7 @@ def read_blocks_by_seq(self, start=0, repo=None):

while True:
ctx = context.get_context(raise_context_error=False)
with ctx.use() if ctx else self.ndb_client.context():
with ctx.use() if ctx else self.ndb_client.context(**self.ndb_context_kwargs):
# lexrpc event subscription handlers like subscribeRepos call this
# on a different thread, so if we're there, we need to create a new
# ndb context
Expand Down

0 comments on commit 061d385

Please sign in to comment.