Skip to content

Commit 34fc9ee

Browse files
committed
Support recursive normalizer in write in v2 API
1 parent 23bf49a commit 34fc9ee

File tree

8 files changed

+137
-24
lines changed

8 files changed

+137
-24
lines changed

build_tooling/format.py

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,38 @@ def install_tools():
3131
return black or clang
3232

3333

34-
def lint_python(in_place: bool):
34+
def lint_python(in_place: bool, specific_file: str):
3535
try:
3636
import black
3737
assert black.__version__ == black_version
3838
except ImportError:
3939
raise RuntimeError("black not installed. Run this script with --install-tools then try again")
4040

41+
if specific_file:
42+
path = specific_file
43+
else:
44+
path = "python/"
45+
4146
if in_place:
42-
return subprocess.run(["black", "-l", "120", "python/"]).returncode
47+
return subprocess.run(["black", "-l", "120", path]).returncode
4348
else:
44-
return subprocess.run(["black", "-l", "120", "--check", "python/"]).returncode
49+
return subprocess.run(["black", "-l", "120", "--check", path]).returncode
4550

4651

47-
def lint_cpp(in_place: bool):
52+
def lint_cpp(in_place: bool, specific_file: str):
4853
try:
4954
import clang_format
5055
except ImportError:
5156
raise RuntimeError("clang-format not installed. Run this script with --install-tools then try again")
5257

5358
files = []
54-
root = pathlib.Path("cpp", "arcticdb")
55-
for e in ("*.cpp", "*.hpp"):
56-
for f in root.rglob(e):
57-
files.append(str(f))
59+
if specific_file:
60+
files.append(specific_file)
61+
else:
62+
root = pathlib.Path("cpp", "arcticdb")
63+
for e in ("*.cpp", "*.hpp"):
64+
for f in root.rglob(e):
65+
files.append(str(f))
5866

5967
args = ["clang-format"]
6068
if in_place:
@@ -69,11 +77,11 @@ def lint_cpp(in_place: bool):
6977
return subprocess.run(args).returncode
7078

7179

72-
def main(type: str, in_place: bool):
80+
def main(type: str, in_place: bool, specific_file: str):
7381
if type == "python":
74-
return lint_python(in_place)
82+
return lint_python(in_place, specific_file)
7583
elif type == "cpp":
76-
return lint_cpp(in_place)
84+
return lint_cpp(in_place, specific_file)
7785
else:
7886
return lint_python(in_place) or lint_cpp(in_place)
7987

@@ -105,6 +113,11 @@ def main(type: str, in_place: bool):
105113
action='store_true',
106114
help="Apply linting rules to your working copy. Changes files."
107115
)
116+
parser.add_argument(
117+
"-f",
118+
"--file",
119+
help="Apply linting rules to a specific file."
120+
)
108121
args = parser.parse_args()
109122

110123
if args.install_tools:
@@ -118,10 +131,13 @@ def main(type: str, in_place: bool):
118131
raise RuntimeError("Must specify --type")
119132
if args.type not in ("python", "cpp", "all"):
120133
raise RuntimeError("Invalid --type")
134+
if args.type == "all" and args.file:
135+
raise RuntimeError("Cannot specify file type when specifying a single file for formatting")
121136

122137
return_code = main(
123138
type=args.type,
124139
in_place=args.in_place,
140+
specific_file=args.file,
125141
)
126142

127143
sys.exit(return_code)

python/arcticdb/adapters/arctic_library_adapter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def set_library_options(
2424
write_options = lib_desc.version.write_options
2525

2626
write_options.dynamic_strings = True
27-
write_options.recursive_normalizers = True
2827
write_options.use_tombstones = True
2928
write_options.fast_tombstone_all = True
3029
lib_desc.version.symbol_list = True
@@ -38,6 +37,7 @@ def set_library_options(
3837
write_options.de_duplication = options.dedup
3938
write_options.segment_row_size = options.rows_per_segment
4039
write_options.column_group_size = options.columns_per_segment
40+
write_options.recursive_normalizers = options.recursive_normalizers
4141

4242
lib_desc.version.encoding_version = (
4343
options.encoding_version if options.encoding_version is not None else DEFAULT_ENCODING_VERSION

python/arcticdb/options.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def __init__(
4343
rows_per_segment: int = 100_000,
4444
columns_per_segment: int = 127,
4545
encoding_version: Optional[EncodingVersion] = None,
46+
recursive_normalizers: bool = True,
4647
):
4748
"""
4849
Parameters
@@ -124,12 +125,19 @@ def __init__(
124125
encoding_version: Optional[EncodingVersion], default None
125126
The encoding version to use when writing data to storage.
126127
v2 is faster, but still experimental, so use with caution.
128+
129+
recursive_normalizers: bool, default True
130+
Whether to recursively normalize nested data structures when writing sequence-like or dict-like data.
131+
The data structure can be nested or a mix of lists and dictionaries.
132+
Note: If the leaf nodes cannot be natively normalized and must be written using write_pickle, those leaf nodes
133+
will be pickled, resulting in the overall data being only partially normalized and partially pickled.
127134
"""
128135
self.dynamic_schema = dynamic_schema
129136
self.dedup = dedup
130137
self.rows_per_segment = rows_per_segment
131138
self.columns_per_segment = columns_per_segment
132139
self.encoding_version = encoding_version
140+
self.recursive_normalizers = recursive_normalizers
133141

134142
def __eq__(self, right):
135143
return (
@@ -138,13 +146,15 @@ def __eq__(self, right):
138146
and self.rows_per_segment == right.rows_per_segment
139147
and self.columns_per_segment == right.columns_per_segment
140148
and self.encoding_version == right.encoding_version
149+
and self.recursive_normalizers == right.recursive_normalizers
141150
)
142151

143152
def __repr__(self):
144153
return (
145154
f"LibraryOptions(dynamic_schema={self.dynamic_schema}, dedup={self.dedup},"
146155
f" rows_per_segment={self.rows_per_segment}, columns_per_segment={self.columns_per_segment},"
147-
f" encoding_version={self.encoding_version if self.encoding_version is not None else 'Default'})"
156+
f" encoding_version={self.encoding_version if self.encoding_version is not None else 'Default'},"
157+
f" recursive_normalizers={self.recursive_normalizers})"
148158
)
149159

150160

python/arcticdb/version_store/_store.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,15 @@ def stage(
612612
log.warning("The data could not be normalized to an ArcticDB format and has not been written")
613613
return None
614614

615+
def _is_recursive_normalizers_enabled(self, **kwargs):
616+
return resolve_defaults(
617+
"recursive_normalizers",
618+
self._lib_cfg.lib_desc.version.write_options,
619+
global_default=False,
620+
uppercase=False,
621+
**kwargs,
622+
)
623+
615624
def write(
616625
self,
617626
symbol: str,
@@ -698,11 +707,9 @@ def write(
698707
prune_previous_version = resolve_defaults(
699708
"prune_previous_version", proto_cfg, global_default=False, existing_value=prune_previous_version, **kwargs
700709
)
701-
recursive_normalizers = resolve_defaults(
702-
"recursive_normalizers", proto_cfg, global_default=False, uppercase=False, **kwargs
703-
)
704710
parallel = resolve_defaults("parallel", proto_cfg, global_default=False, uppercase=False, **kwargs)
705711
incomplete = resolve_defaults("incomplete", proto_cfg, global_default=False, uppercase=False, **kwargs)
712+
recursive_normalizers = self._is_recursive_normalizers_enabled(**kwargs)
706713

707714
# TODO remove me when dynamic strings is the default everywhere
708715
if parallel:
@@ -3147,12 +3154,8 @@ def will_item_be_pickled(self, item, recursive_normalizers: Optional[bool] = Non
31473154
)
31483155
if result and log_warning_message:
31493156
proto_cfg = self._lib_cfg.lib_desc.version.write_options
3150-
resolved_recursive_normalizers = resolve_defaults(
3151-
"recursive_normalizers",
3152-
proto_cfg,
3153-
global_default=False,
3154-
uppercase=False,
3155-
**{"recursive_normalizers": recursive_normalizers},
3157+
resolved_recursive_normalizers = self._is_recursive_normalizers_enabled(
3158+
**{"recursive_normalizers": recursive_normalizers}
31563159
)
31573160
warning_msg = ""
31583161
is_recursive_normalize_preferred, _, _ = self._try_flatten(item, "")

python/arcticdb/version_store/library.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,7 @@ def write(
944944
staged=False,
945945
validate_index=True,
946946
index_column: Optional[str] = None,
947+
recursive_normalizers: bool = None,
947948
) -> VersionedItem:
948949
"""
949950
Write ``data`` to the specified ``symbol``. If ``symbol`` already exists then a new version will be created to
@@ -992,6 +993,10 @@ def write(
992993
index_column: Optional[str], default=None
993994
Optional specification of timeseries index column if data is an Arrow table. Ignored if data is not an Arrow
994995
table.
996+
recursive_normalizers: bool, default None
997+
Whether to recursively normalize nested data structures when writing sequence-like or dict-like data.
998+
If None, falls back to the corresponding setting in the library configuration.
999+
The data structure can be nested or a mix of lists and dictionaries.
9951000
9961001
Returns
9971002
-------
@@ -1020,7 +1025,9 @@ def write(
10201025
>>> w = adb.WritePayload("symbol", df, metadata={'the': 'metadata'})
10211026
>>> lib.write(*w, staged=True)
10221027
"""
1023-
if not self._allowed_input_type(data):
1028+
if not self._nvs._is_recursive_normalizers_enabled(
1029+
**{"recursive_normalizers": recursive_normalizers}
1030+
) and not self._allowed_input_type(data):
10241031
raise ArcticUnsupportedDataTypeException(
10251032
"data is of a type that cannot be normalized. Consider using "
10261033
f"write_pickle instead. type(data)=[{type(data)}]"
@@ -1037,10 +1044,17 @@ def write(
10371044
index_column=index_column,
10381045
norm_failure_options_msg="Using write_pickle will allow the object to be written. However, many operations "
10391046
"(such as date_range filtering and column selection) will not work on pickled data.",
1047+
recursive_normalizers=recursive_normalizers,
10401048
)
10411049

10421050
def write_pickle(
1043-
self, symbol: str, data: Any, metadata: Any = None, prune_previous_versions: bool = False, staged=False
1051+
self,
1052+
symbol: str,
1053+
data: Any,
1054+
metadata: Any = None,
1055+
prune_previous_versions: bool = False,
1056+
staged=False,
1057+
recursive_normalizers: bool = None,
10441058
) -> VersionedItem:
10451059
"""
10461060
See `write`. This method differs from `write` only in that ``data`` can be of any type that is serialisable via
@@ -1062,6 +1076,10 @@ def write_pickle(
10621076
See documentation on `write`.
10631077
staged
10641078
See documentation on `write`.
1079+
recursive_normalizers: bool, default None
1080+
See documentation on `write`.
1081+
If the leaf nodes cannot be natively normalized, they will be pickled,
1082+
resulting in the overall data being recursively normalized and partially pickled.
10651083
10661084
Returns
10671085
-------
@@ -1086,6 +1104,7 @@ def write_pickle(
10861104
prune_previous_version=prune_previous_versions,
10871105
pickle_on_failure=True,
10881106
parallel=staged,
1107+
recursive_normalizers=recursive_normalizers,
10891108
)
10901109

10911110
@staticmethod

python/tests/conftest.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,12 +655,25 @@ def arctic_client_lmdb(request, encoding_version) -> Arctic:
655655
return ac
656656

657657

658+
@pytest.fixture
659+
def arctic_client_s3(request) -> Arctic:
660+
storage_fixture: StorageFixture = request.getfixturevalue("s3_storage")
661+
ac = storage_fixture.create_arctic()
662+
return ac
663+
664+
658665
@pytest.fixture
659666
def arctic_library(arctic_client, lib_name) -> Generator[Library, None, None]:
660667
yield arctic_client.create_library(lib_name)
661668
arctic_client.delete_library(lib_name)
662669

663670

671+
@pytest.fixture
672+
def arctic_library_s3(arctic_client_s3, lib_name) -> Generator[Library, None, None]:
673+
yield arctic_client_s3.create_library(lib_name)
674+
arctic_client_s3.delete_library(lib_name)
675+
676+
664677
@pytest.fixture
665678
def arctic_library_dynamic(arctic_client, lib_name) -> Generator[Library, None, None]:
666679
lib_opts = LibraryOptions(dynamic_schema=True)

python/tests/integration/arcticdb/test_arctic.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,22 @@ def test_s3_repr(s3_storage: S3Bucket, one_col_df, lib_name):
824824
assert written_vi.host == config
825825

826826

827+
def test_default_library_config(mem_storage, lib_name):
828+
ac = mem_storage.create_arctic()
829+
lib = ac.create_library(lib_name)
830+
assert lib._nvs.lib_cfg().lib_desc.version.symbol_list == True
831+
832+
write_options = lib._nvs.lib_cfg().lib_desc.version.write_options
833+
assert write_options.dynamic_strings == True
834+
assert write_options.recursive_normalizers == True
835+
assert write_options.use_tombstones == True
836+
assert write_options.fast_tombstone_all == True
837+
assert write_options.prune_previous_version == False
838+
assert write_options.pickle_on_failure == False
839+
assert write_options.snapshot_dedup == False
840+
assert write_options.delayed_deletes == False
841+
842+
827843
class A:
828844
"""A dummy user defined type that requires pickling to serialize."""
829845

python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
UnsupportedKeyInDictionary,
2121
ArcticException as ArcticNativeException,
2222
)
23+
from arcticdb.version_store.library import ArcticUnsupportedDataTypeException
24+
import arcticdb.toolbox.query_stats as qs
2325
from arcticdb_ext.storage import KeyType
2426
from arcticdb_ext.version_store import NoSuchVersionException
2527
import arcticdb_ext.stream as adb_stream
@@ -57,6 +59,40 @@ def assert_vit_equals_except_data(left, right):
5759
assert left.timestamp == right.timestamp
5860

5961

62+
@pytest.mark.parametrize("recursive_normalizers", (True, False, None))
63+
def test_v2_api(arctic_library_s3, sym, recursive_normalizers, clear_query_stats):
64+
lib = arctic_library_s3
65+
data = {"a": np.arange(5), "b": pd.DataFrame({"col": [1, 2, 3]})}
66+
if recursive_normalizers is None or recursive_normalizers is True:
67+
with qs.query_stats():
68+
lib.write(sym, data, recursive_normalizers=recursive_normalizers)
69+
stats = qs.get_query_stats()
70+
assert "MULTI_KEY" in stats["storage_operations"]["S3_PutObject"].keys()
71+
else:
72+
with pytest.raises(ArcticUnsupportedDataTypeException) as e:
73+
lib.write(sym, data, recursive_normalizers=recursive_normalizers)
74+
75+
76+
@pytest.mark.parametrize("recursive_normalizers", (True, False, None))
77+
def test_v2_api_pickle(arctic_library_s3, sym, recursive_normalizers, clear_query_stats):
78+
lib = arctic_library_s3
79+
data = (
80+
{
81+
"a": [1, 2, 3],
82+
"b": {"c": np.arange(24)},
83+
"d": [AlmostAListNormalizer()], # A random item that will be pickled
84+
},
85+
)
86+
with qs.query_stats():
87+
lib.write_pickle(sym, data, recursive_normalizers=recursive_normalizers)
88+
keys = qs.get_query_stats()["storage_operations"]["S3_PutObject"].keys()
89+
if recursive_normalizers is None or recursive_normalizers is True:
90+
assert "MULTI_KEY" in keys
91+
else:
92+
assert "MULTI_KEY" not in keys
93+
assert lib._nvs.is_symbol_pickled(sym) == True
94+
95+
6096
@pytest.mark.parametrize("read", (lambda lib, sym: lib.batch_read([sym])[sym], lambda lib, sym: lib.read(sym)))
6197
@pytest.mark.storage
6298
def test_recursively_written_data(basic_store, read):

0 commit comments

Comments
 (0)