Skip to content

Commit cfbbd68

Browse files
🐛 Fix PSQL OpErr on archive creation by batching (#6993)
This fixes SQLAlchemy `OperationalError`s when creating large archives with the PSQL backend by batching query filters using `DEFAULT_FILTER_SIZE` (999). The default value was re-used from PR #6889 in which it was originally set to avoid exceeding the `SQLITE_MAX_VARIABLE_NUMBER` limit for archive imports with the SQLite backend. Key changes: - Add `filter_size` parameter to `create_archive()`, `_collect_required_entities()` and all helper functions called therein to apply batching to query filters, using the default value of 999 (not exposed to the user via CLI) - Move `batch_iter()` to `aiida.common.utils` for reuse - Drop `QueryParams` in favor of the explicit `batch_size` and `filter_size` arguments - Update graph traversal to batch node ID queries, as well Some import functions still need batching (see #6907), which will be done in a follow-up PR. --------- Co-authored-by: Daniel Hollas <[email protected]>
1 parent cf690e4 commit cfbbd68

File tree

17 files changed

+680
-307
lines changed

17 files changed

+680
-307
lines changed

docs/source/nitpick-exceptions

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ py:class aiida.cmdline.params.types.choice.T
127127
py:class aiida.common.lang.T
128128
py:class aiida.engine.processes.functions.N
129129
py:class aiida.engine.processes.functions.R_co
130+
py:class aiida.common.utils.T
131+
py:class aiida.common.utils.R
130132

131133
### third-party packages
132134
# Note: These exceptions are needed if

src/aiida/cmdline/commands/cmd_archive.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from aiida.common.exceptions import CorruptStorage, IncompatibleStorageSchema, UnreachableStorage
2727
from aiida.common.links import GraphTraversalRules
2828
from aiida.common.log import AIIDA_LOGGER
29+
from aiida.common.utils import DEFAULT_BATCH_SIZE, DEFAULT_FILTER_SIZE
2930

3031
EXTRAS_MODE_EXISTING = ['keep_existing', 'update_existing', 'mirror', 'none']
3132
EXTRAS_MODE_NEW = ['import', 'none']
@@ -130,7 +131,11 @@ def inspect(ctx, archive, version, meta_data, database):
130131
)
131132
@click.option('--compress', default=6, show_default=True, type=int, help='Level of compression to use (0-9).')
132133
@click.option(
133-
'-b', '--batch-size', default=1000, type=int, help='Stream database rows in batches, to reduce memory usage.'
134+
'-b',
135+
'--batch-size',
136+
default=DEFAULT_BATCH_SIZE,
137+
type=int,
138+
help='Stream database rows in batches, to reduce memory usage.',
134139
)
135140
@click.option(
136141
'--test-run',
@@ -210,6 +215,7 @@ def create(
210215
'overwrite': force,
211216
'compression': compress,
212217
'batch_size': batch_size,
218+
'filter_size': DEFAULT_FILTER_SIZE, # Implementation detail, not exposed to user via CLI
213219
'test_run': dry_run,
214220
}
215221

src/aiida/common/utils.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import sys
1919
from collections.abc import Iterable, Iterator
2020
from datetime import datetime, timedelta
21-
from typing import TYPE_CHECKING, Any, Callable
21+
from typing import TYPE_CHECKING, Any, Callable, TypeVar, overload
2222
from uuid import UUID
2323

2424
from aiida.common.typing import Self
@@ -32,6 +32,9 @@
3232
except ImportError:
3333
from typing_extensions import TypeAlias
3434

35+
T = TypeVar('T')
36+
R = TypeVar('R')
37+
3538

3639
def get_new_uuid() -> str:
3740
"""Return a new UUID (typically to be used for new nodes)."""
@@ -608,3 +611,55 @@ def format_directory_size(size_in_bytes: int) -> str:
608611

609612
# Format the size to two decimal places
610613
return f'{converted_size:.2f} {prefixes[index]}'
614+
615+
616+
@overload
617+
def batch_iter(iterable: Iterable[T], size: int, transform: None = None) -> Iterable[tuple[int, list[T]]]: ...
618+
619+
620+
@overload
621+
def batch_iter(iterable: Iterable[T], size: int, transform: Callable[[T], R]) -> Iterable[tuple[int, list[R]]]: ...
622+
623+
624+
def batch_iter(
625+
iterable: Iterable[T], size: int, transform: Callable[[T], Any] | None = None
626+
) -> Iterable[tuple[int, list[Any]]]:
627+
"""Yield an iterable in batches of a set number of items.
628+
629+
Note, the final yield may be less than this size.
630+
631+
:param transform: a transform to apply to each item
632+
:returns: (number of items, list of items)
633+
"""
634+
transform = transform or (lambda x: x)
635+
current = []
636+
length = 0
637+
for item in iterable:
638+
current.append(transform(item))
639+
length += 1
640+
if length >= size:
641+
yield length, current
642+
current = []
643+
length = 0
644+
if current:
645+
yield length, current
646+
647+
648+
# NOTE: `sqlite` has an `SQLITE_MAX_VARIABLE_NUMBER` compile-time flag.
649+
# On older `sqlite` versions, this was set to 999 by default,
650+
# while for newer versions it is generally higher, see:
651+
# https://www.sqlite.org/limits.html
652+
# If `DEFAULT_FILTER_SIZE` is set too high, the limit can be hit when large `IN` queries are
653+
# constructed through AiiDA, leading to SQLAlchemy `OperationalError`s.
654+
# On modern systems, the limit might be in the hundreds of thousands, however, as it is OS-
655+
# and/or Python version dependent and we don't know its size, we set the value to 999 for safety.
656+
# From manual benchmarking, this value for batching also seems to give reasonable performance.
657+
DEFAULT_FILTER_SIZE: int = 999
658+
659+
# NOTE: `DEFAULT_BATCH_SIZE` controls how many database rows are fetched and processed at once during
660+
# streaming operations (e.g., `QueryBuilder.iterall()`, `QueryBuilder.iterdict()`). This prevents
661+
# loading entire large result sets into memory at once, which could cause memory exhaustion when
662+
# working with datasets containing thousands or millions of records. The value of 1000 provides a
663+
# balance between memory efficiency and database round-trip overhead. Setting it too low increases
664+
# the number of database queries needed, while setting it too high increases memory consumption.
665+
DEFAULT_BATCH_SIZE: int = 1000

src/aiida/storage/sqlite_zip/migrations/legacy_to_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ def _json_to_sqlite(
131131
outpath: Path, data: dict, node_repos: Dict[str, List[Tuple[str, Optional[str]]]], batch_size: int = 100
132132
) -> None:
133133
"""Convert a JSON archive format to SQLite."""
134-
from aiida.tools.archive.common import batch_iter
134+
from aiida.common.utils import batch_iter
135135

136136
from . import v1_db_schema as v1_schema
137137

src/aiida/tools/archive/common.py

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import urllib.parse
1212
import urllib.request
1313
from html.parser import HTMLParser
14-
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type
14+
from typing import Dict, Type
1515

1616
from aiida.orm import AuthInfo, Comment, Computer, Entity, Group, Log, Node, User
1717
from aiida.orm.entities import EntityTypes
@@ -28,30 +28,6 @@
2828
}
2929

3030

31-
def batch_iter(
32-
iterable: Iterable[Any], size: int, transform: Optional[Callable[[Any], Any]] = None
33-
) -> Iterable[Tuple[int, List[Any]]]:
34-
"""Yield an iterable in batches of a set number of items.
35-
36-
Note, the final yield may be less than this size.
37-
38-
:param transform: a transform to apply to each item
39-
:returns: (number of items, list of items)
40-
"""
41-
transform = transform or (lambda x: x)
42-
current = []
43-
length = 0
44-
for item in iterable:
45-
current.append(transform(item))
46-
length += 1
47-
if length >= size:
48-
yield length, current
49-
current = []
50-
length = 0
51-
if current:
52-
yield length, current
53-
54-
5531
class HTMLGetLinksParser(HTMLParser):
5632
"""If a filter_extension is passed, only links with extension matching
5733
the given one will be returned.

0 commit comments

Comments
 (0)