diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e3a78b6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,134 @@ +.idea/ +*.iml +*~ + +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e973ca0 --- /dev/null +++ b/Makefile @@ -0,0 +1,47 @@ +VENV_BIN = python3 -m venv +VENV_DIR ?= .venv +VENV_ACTIVATE = $(VENV_DIR)/bin/activate +VENV_RUN = . $(VENV_ACTIVATE) +ROOT_MODULE = localstack_snapshot + +venv: $(VENV_ACTIVATE) + +$(VENV_ACTIVATE): pyproject.toml + test -d .venv || $(VENV_BIN) .venv + $(VENV_RUN); pip install --upgrade pip setuptools wheel + $(VENV_RUN); pip install -e .[dev] + touch $(VENV_DIR)/bin/activate + +install: venv + +clean: + rm -rf .venv + rm -rf build/ + rm -rf .eggs/ + rm -rf *.egg-info/ + +format: + $(VENV_RUN); python -m ruff check --show-source --fix .; python -m black . + +lint: + $(VENV_RUN); python -m ruff check --show-source . && python -m black --check . + +test: venv + $(VENV_RUN); python -m pytest --cov $(ROOT_MODULE) + +test-coverage: venv + $(VENV_RUN); coverage run --source=$(ROOT_MODULE) -m pytest tests && coverage lcov -o .coverage.lcov + +coveralls: venv + $(VENV_RUN); coveralls + +dist: venv + $(VENV_RUN); python setup.py sdist bdist_wheel + +deploy: clean-dist venv test dist + $(VENV_RUN); pip install --upgrade twine; twine upload dist/* + +clean-dist: clean + rm -rf dist/ + +.PHONY: clean clean-dist diff --git a/README.md b/README.md new file mode 100644 index 0000000..2f041c8 --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +Snapshot testing for pytest +=============================== + +Extracted snapshot testing lib for LocalStack. + +This project is in a very early stage and will be both restructured and renamed. + + +## Quickstart + +to install the python and other developer requirements into a venv run: + + make install + +## Format code + +We use black and isort as code style tools. +To execute them, run: + + make format + +## Build distribution + +To build a wheel and source distribution, simply run + + make dist \ No newline at end of file diff --git a/localstack_snapshot/__init__.py b/localstack_snapshot/__init__.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/localstack_snapshot/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/localstack_snapshot/pytest/__init__.py b/localstack_snapshot/pytest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/localstack_snapshot/pytest/snapshot.py b/localstack_snapshot/pytest/snapshot.py new file mode 100644 index 0000000..5d1cd62 --- /dev/null +++ b/localstack_snapshot/pytest/snapshot.py @@ -0,0 +1,119 @@ +import json +import os +from typing import Optional + +import pytest +from _pytest.config import Config, PytestPluginManager +from _pytest.config.argparsing import Parser +from _pytest.fixtures import SubRequest +from _pytest.nodes import Item +from _pytest.reports import TestReport +from _pytest.runner import CallInfo +from pluggy import Result + +from localstack_snapshot.snapshots import SnapshotAssertionError, SnapshotSession +from localstack_snapshot.snapshots.report import render_report + + +def is_aws(): + return os.environ.get("TEST_TARGET", "") == "AWS_CLOUD" + + +@pytest.hookimpl +def pytest_configure(config: Config): + config.addinivalue_line("markers", "skip_snapshot_verify") + + +@pytest.hookimpl +def pytest_addoption(parser: Parser, pluginmanager: PytestPluginManager): + parser.addoption("--snapshot-update", action="store_true") + parser.addoption("--snapshot-raw", action="store_true") + parser.addoption("--snapshot-skip-all", action="store_true") + parser.addoption("--snapshot-verify", action="store_true") + + +@pytest.hookimpl(hookwrapper=True) +def pytest_runtest_makereport(item: Item, call: CallInfo[None]) -> Optional[TestReport]: + use_legacy_report = os.environ.get("SNAPSHOT_LEGACY_REPORT", "0") == "1" + + result: Result = yield + report: TestReport = result.get_result() + + if call.excinfo is not None and isinstance(call.excinfo.value, SnapshotAssertionError): + err: SnapshotAssertionError = call.excinfo.value + + if use_legacy_report: + error_report = "" + for res in err.result: + if not res: + error_report = f"{error_report}Match failed for '{res.key}':\n{json.dumps(json.loads(res.result.to_json()), indent=2)}\n\n" + report.longrepr = error_report + else: + report.longrepr = "\n".join([str(render_report(r)) for r in err.result if not r]) + return report + + +@pytest.hookimpl(hookwrapper=True) +def pytest_runtest_call(item: Item) -> None: + call: CallInfo = yield # noqa + + if call.excinfo: + return + + # TODO: extremely dirty... maybe it would be better to find a way to fail the test itself instead? + sm = item.funcargs.get("snapshot") + + if sm: + verify = True + paths = [] + + if not is_aws(): # only skip for local tests + for m in item.iter_markers(name="skip_snapshot_verify"): + skip_paths = m.kwargs.get("paths", []) + + skip_condition = m.kwargs.get("condition") + # can optionally include a condition, when this will be skipped + # a condition must be a Callable returning something truthy/falsey + if skip_condition: + if not callable(skip_condition): + raise ValueError("condition must be a callable") + + # special case where one of the marks has a skip condition but no paths + # since we interpret a missing paths key as "all paths", + # this should skip all paths, no matter what the other marks say + if skip_condition() and not skip_paths: + verify = False + paths.clear() # in case some other marker already added paths + break + + if not skip_condition(): + continue # don't skip + + # we skip verification if no condition has been specified + verify = False + paths.extend(skip_paths) + + sm._assert_all(verify, paths) + + +@pytest.fixture(scope="function") +def _snapshot_session(request: SubRequest): + update_overwrite = os.environ.get("SNAPSHOT_UPDATE") == "1" + raw_overwrite = os.environ.get("SNAPSHOT_RAW") == "1" + + sm = SnapshotSession( + base_file_path=os.path.join(request.fspath.dirname, request.fspath.purebasename), + scope_key=request.node.nodeid, + update=update_overwrite or request.config.option.snapshot_update, + raw=raw_overwrite or request.config.option.snapshot_raw, + verify=False if request.config.option.snapshot_skip_all else True, + ) + + yield sm + + sm._persist_state() + + +@pytest.fixture(scope="function") +def snapshot(_snapshot_session): + return _snapshot_session diff --git a/localstack_snapshot/snapshots/__init__.py b/localstack_snapshot/snapshots/__init__.py new file mode 100644 index 0000000..b01c348 --- /dev/null +++ b/localstack_snapshot/snapshots/__init__.py @@ -0,0 +1,3 @@ +from .prototype import SnapshotAssertionError, SnapshotMatchResult, SnapshotSession + +__all__ = [SnapshotSession, SnapshotMatchResult, SnapshotAssertionError] diff --git a/localstack_snapshot/snapshots/prototype.py b/localstack_snapshot/snapshots/prototype.py new file mode 100644 index 0000000..3b8b9e8 --- /dev/null +++ b/localstack_snapshot/snapshots/prototype.py @@ -0,0 +1,366 @@ +import json +import logging +import os +from datetime import datetime, timezone +from json import JSONDecodeError +from pathlib import Path +from re import Pattern +from typing import Dict, List, Optional + +from botocore.response import StreamingBody +from deepdiff import DeepDiff +from jsonpath_ng import DatumInContext +from jsonpath_ng.ext import parse + +from localstack_snapshot.snapshots.transformer import ( + KeyValueBasedTransformer, + RegexTransformer, + TransformContext, + Transformer, +) +from localstack_snapshot.util.encoding import CustomJsonEncoder + +from .transformer_utility import TransformerUtility + +SNAPSHOT_LOGGER = logging.getLogger(__name__) +SNAPSHOT_LOGGER.setLevel(logging.DEBUG if os.environ.get("DEBUG_SNAPSHOT") else logging.WARNING) + + +class SnapshotMatchResult: + def __init__(self, a: dict, b: dict, key: str = ""): + self.a = a + self.b = b + self.result = DeepDiff(a, b, verbose_level=2, view="tree") + self.key = key + + def __bool__(self) -> bool: + return not self.result + + def __repr__(self): + return self.result.pretty() + + +class SnapshotAssertionError(AssertionError): + def __init__(self, msg: str, result: List[SnapshotMatchResult]): + self.msg = msg + self.result = result + super(SnapshotAssertionError, self).__init__(msg) + + +class SnapshotSession: + """ + snapshot handler for a single test function with potentially multiple assertions\ + Since it technically only modifies a subset of the underlying snapshot file, + it assumes that a single snapshot file is only being written to sequentially + """ + + results: list[SnapshotMatchResult] + recorded_state: dict[str, dict] # previously persisted state + observed_state: dict[str, dict] # current state from match calls + + called_keys: set[str] + transformers: list[(Transformer, int)] # (transformer, priority) + + transform: TransformerUtility + + skip_verification_paths: list[str] + + def __init__( + self, + *, + base_file_path: str, + scope_key: str, + update: Optional[bool] = False, # TODO: find a way to remove this + verify: Optional[bool] = False, # TODO: find a way to remove this + raw: Optional[bool] = False, + ): + self.verify = verify + self.update = update + self.file_path = f"{base_file_path}.snapshot.json" + self.raw_file_path = f"{base_file_path}.raw.snapshot.json" + self.raw = raw + self.scope_key = scope_key + + self.called_keys = set() + self.results = [] + self.transformers = [] + + self.observed_state = {} + self.recorded_state = self._load_state() + + self.transform = TransformerUtility + + def add_transformers_list( + self, transformer_list: list[Transformer], priority: Optional[int] = 0 + ): + for transformer in transformer_list: + self.transformers.append((transformer, priority)) # TODO + + def add_transformer( + self, transformer: Transformer | list[Transformer], *, priority: Optional[int] = 0 + ): + if isinstance(transformer, list): + self.add_transformers_list(transformer, priority) + else: + self.transformers.append((transformer, priority or 0)) + + def _persist_state(self) -> None: + if self.update: + Path(self.file_path).touch() + with open(self.file_path, "r+") as fd: + try: + content = fd.read() + full_state = json.loads(content or "{}") + recorded = { + "recorded-date": datetime.now(tz=timezone.utc).strftime( + "%d-%m-%Y, %H:%M:%S" + ), + "recorded-content": self.observed_state, + } + full_state[self.scope_key] = recorded + state_to_dump = json.dumps(full_state, indent=2) + fd.seek(0) + fd.truncate() + # add line ending to be compatible with pre-commit-hooks (end-of-file-fixer) + fd.write(f"{state_to_dump}\n") + except Exception as e: + SNAPSHOT_LOGGER.exception(e) + + def _persist_raw(self, raw_state: dict) -> None: + if self.raw: + Path(self.raw_file_path).touch() + with open(self.raw_file_path, "r+") as fd: + try: + content = fd.read() + full_state = json.loads(content or "{}") + recorded = { + "recorded-date": datetime.now(tz=timezone.utc).strftime( + "%d-%m-%Y, %H:%M:%S" + ), + "recorded-content": raw_state, + } + full_state[self.scope_key] = recorded + # need to use CustomEncoder to handle datetime objects + state_to_dump = json.dumps(full_state, indent=2, cls=CustomJsonEncoder) + fd.seek(0) + fd.truncate() + # add line ending to be compatible with pre-commit-hooks (end-of-file-fixer) + fd.write(f"{state_to_dump}\n") + except Exception as e: + SNAPSHOT_LOGGER.exception(e) + + def _load_state(self) -> dict: + try: + with open(self.file_path, "r") as fd: + content = fd.read() + if content: + recorded = json.loads(content).get(self.scope_key, {}) + return recorded.get("recorded-content", None) + else: + return {} + except FileNotFoundError: + return {} + + def _update(self, key: str, obj_state: dict) -> None: + self.observed_state[key] = obj_state + + def match(self, key: str, obj: dict) -> None: + if key in self.called_keys: + raise Exception( + f"Key {key} used multiple times in the same test scope" + ) # TODO: custom exc. + + self.called_keys.add(key) + + # order the obj to guarantee reference replacement works as expected + self.observed_state[key] = self._order_dict(obj) + # TODO: track them separately since the transformation is now done *just* before asserting + + if not self.update and (not self.recorded_state or self.recorded_state.get(key) is None): + raise Exception( + f"No state for {self.scope_key} recorded. Please (re-)generate the snapshot for this test." + ) + + # TODO: we should return something meaningful here + return True + + def _assert_all( + self, verify_test_case: bool = True, skip_verification_paths: Optional[list[str]] = None + ) -> List[SnapshotMatchResult]: + """use after all match calls to get a combined diff""" + results = [] + + if not self.verify: + SNAPSHOT_LOGGER.warning("Snapshot verification disabled.") + return results + + if self.verify and not verify_test_case and not skip_verification_paths: + self.verify = False + SNAPSHOT_LOGGER.warning("Snapshot verification disabled for this test case.") + + self.skip_verification_paths = skip_verification_paths or [] + if skip_verification_paths: + SNAPSHOT_LOGGER.warning( + f"Snapshot verification disabled for paths: {skip_verification_paths}" + ) + + if self.update: + self.observed_state = self._transform(self.observed_state) + return [] + + # TODO: separate these states + a_all = self.recorded_state + if not self.observed_state: + # match was never called, so we must assume this isn't a "real" snapshot test + # e.g. test_sqs uses the snapshot fixture to configure it via another fixture on module scope + # but might not use them in some individual tests + return [] + + if not a_all and not self.update: + raise Exception( + f"No state for {self.scope_key} recorded. Please (re-)generate the snapshot for this test." + ) + + self._remove_skip_verification_paths(a_all) + self.observed_state = b_all = self._transform(self.observed_state) + + for key in self.called_keys: + a = a_all.get( + key + ) # if this is None, a new key was added since last updating => usage error + if a is None: + raise Exception( + f"State for {key=} missing in {self.scope_key}. Please (re-)generate the snapshot for this test." + ) + b = b_all[key] + result = SnapshotMatchResult(a, b, key=key) + results.append(result) + + if any(not result for result in results) and self.verify: + raise SnapshotAssertionError("Parity snapshot failed", result=results) + return results + + def _transform_dict_to_parseable_values(self, original): + """recursively goes through dict and tries to resolve values to strings (& parse them as json if possible)""" + for k, v in original.items(): + if isinstance(v, StreamingBody): + # update v for json parsing below + original[k] = v = v.read().decode( + "utf-8" + ) # TODO: patch boto client so this doesn't break any further read() calls + if isinstance(v, list) and v: + for item in v: + if isinstance(item, dict): + self._transform_dict_to_parseable_values(item) + if isinstance(v, Dict): + self._transform_dict_to_parseable_values(v) + + if isinstance(v, str) and v.startswith("{"): + try: + json_value = json.loads(v) + original[k] = json_value + except JSONDecodeError: + pass # parsing error can be ignored + + def _transform(self, tmp: dict) -> dict: + """build a persistable state definition that can later be compared against""" + self._transform_dict_to_parseable_values(tmp) + + # persist tmp + if self.raw: + self._persist_raw(tmp) + + ctx = TransformContext() + for transformer, _ in sorted(self.transformers, key=lambda p: p[1]): + tmp = transformer.transform(tmp, ctx=ctx) + + if not self.update: + self._remove_skip_verification_paths(tmp) + + replaced_tmp = {} + # avoid replacements in snapshot keys + for key, value in tmp.items(): + dumped_value = json.dumps(value, default=str) + for sr in ctx.serialized_replacements: + dumped_value = sr(dumped_value) + + assert dumped_value + try: + replaced_tmp[key] = json.loads(dumped_value) + except JSONDecodeError: + SNAPSHOT_LOGGER.error(f"could not decode json-string:\n{tmp}") + return {} + + return replaced_tmp + + def _order_dict(self, response) -> dict: + if isinstance(response, dict): + ordered_dict = {} + for key, val in sorted(response.items()): + if isinstance(val, dict): + ordered_dict[key] = self._order_dict(val) + elif isinstance(val, list): + ordered_dict[key] = [self._order_dict(entry) for entry in val] + else: + ordered_dict[key] = val + + # put the ResponseMetadata back at the end of the response + if "ResponseMetadata" in ordered_dict: + ordered_dict["ResponseMetadata"] = ordered_dict.pop("ResponseMetadata") + + return ordered_dict + else: + return response + + # LEGACY API + def register_replacement(self, pattern: Pattern[str], value: str): + self.add_transformer(RegexTransformer(pattern, value)) + + def skip_key(self, pattern: Pattern[str], value: str): + self.add_transformer( + KeyValueBasedTransformer( + lambda k, v: v if bool(pattern.match(k)) else None, + replacement=value, + replace_reference=False, + ) + ) + + def replace_value(self, pattern: Pattern[str], value: str): + self.add_transformer( + KeyValueBasedTransformer( + lambda _, v: v if bool(pattern.match(v)) else None, + replacement=value, + replace_reference=False, + ) + ) + + def _remove_skip_verification_paths(self, tmp: Dict): + """Removes all keys from the dict, that match the given json-paths in self.skip_verification_path""" + + def build_full_path_nodes(field_match: DatumInContext): + """Traverse the matched Datum to build the path field by field""" + full_path_nodes = [str(field_match.path)] + next_node = field_match + while next_node.context is not None: + full_path_nodes.append(str(next_node.context.path)) + next_node = next_node.context + + return full_path_nodes[::-1][1:] # reverse the list and remove Root()/$ + + for path in self.skip_verification_paths: + matches = parse(path).find(tmp) or [] + for m in matches: + full_path = build_full_path_nodes(m) + helper = tmp + if len(full_path) > 1: + for p in full_path[:-1]: + if isinstance(helper, list) and p.lstrip("[").rstrip("]").isnumeric(): + helper = helper[int(p.lstrip("[").rstrip("]"))] + elif isinstance(helper, dict): + helper = helper.get(p, None) + if not helper: + continue + if ( + isinstance(helper, dict) and full_path[-1] in helper.keys() + ): # might have been deleted already + del helper[full_path[-1]] diff --git a/localstack_snapshot/snapshots/report.py b/localstack_snapshot/snapshots/report.py new file mode 100644 index 0000000..2383778 --- /dev/null +++ b/localstack_snapshot/snapshots/report.py @@ -0,0 +1,129 @@ +import logging + +from localstack_snapshot.snapshots import SnapshotMatchResult + +LOG = logging.getLogger(__file__) + +_esctable = { + 'black': 30, 'red': 31, 'green': 32, 'yellow': 33, 'blue': 34, 'purple': 35, 'cyan': 36, 'white': 37, + 'Black': 40, 'Red': 41, 'Green': 42, 'Yellow': 43, 'Blue': 44, 'Purple': 45, 'Cyan': 46, 'White': 47, + 'bold': 1, 'light': 2, 'blink': 5, 'invert': 7, 'strikethrough': 9, 'underlined': 4 +} + + +class PatchPath(str): + """ + used to wrap a path string to compare hierarchically & lexically by going through + each path level + """ + + def __lt__(self, other): + if not isinstance(other, PatchPath): + raise ValueError("Incompatible types") + + parts = zip(self.split("/"), other.split("/"), strict=False) + for sa, sb in parts: + if sa < sb: + return True + + return False + + +def _format_json_path(path: list): + json_str = "$.." + for idx, elem in enumerate(path): + if not isinstance(elem, int): + json_str += str(elem) + if idx < len(path) - 1 and not json_str.endswith(".."): + json_str += "." + + if path and isinstance(path[-1], int): + json_str = json_str.rstrip(".") + return f'"{json_str}"' + + +def render_report(result: SnapshotMatchResult): + def _line(c) -> [(str, str)]: + def _render_path_part(part): + if isinstance(part, int): + return f"[{part}]" # wrap iterable index in [] to more clearly denote it being such + return str(part) + + path_parts = [_render_path_part(p) for p in c.path(output_format="list")] + change_path = "/" + "/".join(path_parts) + + expected = c.t1 + actual = c.t2 + + if c.report_type in [ + "dictionary_item_removed", + ]: + return [(change_path, f"[remove](-)[/remove] {change_path} ( {expected!r} )")] + elif c.report_type in ["iterable_item_removed"]: + if actual: + # seems to be a bug with deepdiff, if there's the same number of items in the iterable and one differs + # it will report the missing one but won't report the "additional" on the corresponding position + return [ + (change_path, f"[remove](-)[/remove] {change_path} ( {expected!r} )"), + (change_path, f"[add](+)[/add] {change_path} ( {actual!r} )"), + ] + return [(change_path, f"[remove](-)[/remove] {change_path} ( {expected!r} )")] + elif c.report_type in ["dictionary_item_added", "iterable_item_added"]: + return [(change_path, f"[add](+)[/add] {change_path} ( {actual!r} )")] + elif c.report_type in ["values_changed"]: + # TODO: more fancy change detection and visualization (e.g. parts of a string) + return [ + ( + change_path, + f"[replace](~)[/replace] {change_path} {expected!r} → {actual!r} ... (expected → actual)", + ) + ] + elif c.report_type == "type_changes": + return [ + ( + change_path, + f"[replace](~)[/replace] {change_path} {expected!r} (type: {type(expected)}) → {actual!r} (type: {type(actual)})... (expected → actual)", + ) + ] + else: + LOG.warning( + f"Unsupported diff mismatch reason: {c.report_type}. Please report this to the team so we can add support. {expected=} | {actual=}" + ) + return [ + ( + change_path, + f"[unknown]?[/unknown] {change_path} Unsupported diff mismatch for {expected!r} vs {actual!r}", + ) + ] + + lines = [] + json_paths = [] + for _cat, changes in result.result.tree.items(): + for change in changes: + lines.extend(_line(change)) + json_paths.append(change.path(output_format="list")) + + printstr = f">> match key: {result.key}\n" + + for _a, b in sorted(lines, key=lambda x: PatchPath(x[0])): + printstr += f"\t{b}\n" + + # you can add more entries to the lists to combine effects (e.g. red & underlined) + replacement_map = { + "remove": [_esctable["red"]], + "add": [_esctable["green"]], + "replace": [_esctable["yellow"]], + "unknown": [_esctable["cyan"]], + "s": [_esctable["strikethrough"]], + } + + # replace [x] tokens with the corresponding codes + for token, replacements in replacement_map.items(): + printstr = printstr.replace(f"[{token}]", "".join(f"\x1b[{code}m" for code in replacements)) + printstr = printstr.replace(f"[/{token}]", "\x1b[0m") + + printstr += "\n\tIgnore list (please keep in mind list indices might not work and should be replaced):\n\t" + + printstr += f"[{', '.join(sorted({_format_json_path(path) for path in json_paths}))}]" + + return printstr diff --git a/localstack_snapshot/snapshots/transformer.py b/localstack_snapshot/snapshots/transformer.py new file mode 100644 index 0000000..2796cea --- /dev/null +++ b/localstack_snapshot/snapshots/transformer.py @@ -0,0 +1,329 @@ +import copy +import logging +import os +import re +from datetime import datetime +from re import Pattern +from typing import Any, Callable, Optional, Protocol + +from jsonpath_ng.ext import parse + +SNAPSHOT_LOGGER = logging.getLogger(__name__) +SNAPSHOT_LOGGER.setLevel(logging.DEBUG if os.environ.get("DEBUG_SNAPSHOT") else logging.WARNING) + +# Types + +GlobalReplacementFn = Callable[[str], str] + + +class TransformContext: + _cache: dict + replacements: list[GlobalReplacementFn] + scoped_tokens: dict[str, int] + + def __init__(self): + self.replacements = [] + self.scoped_tokens = {} + self._cache = {} + + @property + def serialized_replacements(self) -> list[GlobalReplacementFn]: # TODO: naming + return self.replacements + + def register_serialized_replacement(self, fn: GlobalReplacementFn): # TODO: naming + self.replacements.append(fn) + + def new_scope(self, scope: str) -> int: + """retrieve new enumeration value for a given scope key (e.g. for tokens such as """ + current_counter = self.scoped_tokens.setdefault(scope, 1) + self.scoped_tokens[scope] += 1 + return current_counter + + +def _register_serialized_reference_replacement( + transform_context: TransformContext, *, reference_value: str, replacement: str +): + if '"' in reference_value: + reference_value = reference_value.replace('"', '\\"') + + cache = transform_context._cache.setdefault("regexcache", set()) + cache_key = reference_value + if cache_key not in cache: + actual_replacement = f"<{replacement}:{transform_context.new_scope(replacement)}>" + cache.add(cache_key) + + def _helper(bound_result, bound_replacement): + def replace_val(s): + SNAPSHOT_LOGGER.debug( + f"Replacing '{bound_result}' in snapshot with '{bound_replacement}'" + ) + return s.replace(bound_result, bound_replacement, -1) + + return replace_val + + SNAPSHOT_LOGGER.debug( + f"Registering reference replacement for value: '{reference_value:.200s}' -> '{actual_replacement}'" + ) + transform_context.register_serialized_replacement( + _helper(reference_value, actual_replacement) + ) + + +class Transformer(Protocol): + def transform(self, input_data: dict, *, ctx: TransformContext) -> dict: + ... + + +# Transformers + + +class ResponseMetaDataTransformer: + def transform(self, input_data: dict, *, ctx: TransformContext) -> dict: + for k, v in input_data.items(): + if k == "ResponseMetadata": + metadata = v + http_headers = metadata.get("HTTPHeaders") + # TODO "x-amz-bucket-region" + # TestS3.test_region_header_exists -> verifies bucket-region + headers_to_collect = ["content_type"] + simplified_headers = {} + for h in headers_to_collect: + if http_headers.get(h): + simplified_headers[h] = http_headers[h] + simplified_metadata = { + "HTTPHeaders": simplified_headers, + } + # HTTPStatusCode might be removed for marker skip_snapshot_verify + if status_code := metadata.get("HTTPStatusCode"): + simplified_metadata["HTTPStatusCode"] = status_code + input_data[k] = simplified_metadata + elif isinstance(v, dict): + input_data[k] = self.transform(v, ctx=ctx) + return input_data + + +class JsonpathTransformer: + def __init__(self, jsonpath: str, replacement: str, replace_reference: bool = True) -> None: + self.jsonpath = jsonpath + self.replacement = replacement + self.replace_references = replace_reference + + def transform(self, input_data: dict, *, ctx: TransformContext) -> dict: + pattern = parse(self.jsonpath) + + if self.replace_references: + res = pattern.find(input_data) + if not res: + SNAPSHOT_LOGGER.debug(f"No match found for JsonPath '{self.jsonpath}'") + return input_data + for r in res: + value_to_replace = r.value + _register_serialized_reference_replacement( + ctx, reference_value=value_to_replace, replacement=self.replacement + ) + else: + original = copy.deepcopy(input_data) + pattern.update(input_data, self.replacement) + if original != input_data: + SNAPSHOT_LOGGER.debug( + f"Replacing JsonPath '{self.jsonpath}' in snapshot with '{self.replacement}'" + ) + else: + SNAPSHOT_LOGGER.debug(f"No match found for JsonPath '{self.jsonpath}'") + + return input_data + + def _add_jsonpath_replacement(self, jsonpath, replacement): + self.json_path_replacement_list.append((jsonpath, replacement)) + + +class RegexTransformer: + def __init__(self, regex: str | Pattern[str], replacement: str): + self.regex = regex + self.replacement = replacement + + def transform(self, input_data: dict, *, ctx: TransformContext) -> dict: + compiled_regex = re.compile(self.regex) if isinstance(self.regex, str) else self.regex + + def _regex_replacer_helper(pattern: Pattern[str], repl: str): + def replace_val(s): + result = re.sub(pattern, repl, s) + if result != s: + SNAPSHOT_LOGGER.debug( + f"Replacing regex '{pattern.pattern:.200s}' with '{repl}'" + ) + else: + SNAPSHOT_LOGGER.debug(f"No match found for regex '{pattern.pattern:.200s}'") + return result + + return replace_val + + ctx.register_serialized_replacement( + _regex_replacer_helper(compiled_regex, self.replacement) + ) + SNAPSHOT_LOGGER.debug( + f"Registering regex pattern '{compiled_regex.pattern:.200s}' in snapshot with '{self.replacement}'" + ) + return input_data + + +class KeyValueBasedTransformer: + def __init__( + self, + match_fn: Callable[[str, Any], Optional[str]], + replacement: str, + replace_reference: bool = True, + ): + self.match_fn = match_fn + self.replacement = replacement + self.replace_reference = replace_reference + + def transform(self, input_data: dict, *, ctx: TransformContext) -> dict: + for k, v in input_data.items(): + if (match_result := self.match_fn(k, v)) is not None: + if self.replace_reference: + _register_serialized_reference_replacement( + ctx, reference_value=match_result, replacement=self.replacement + ) + else: + if isinstance(v, str): + SNAPSHOT_LOGGER.debug( + f"Replacing value for key '{k}': Match result '{match_result:.200s}' with '{self.replacement}'. (Original value: {str(v)})" + ) + input_data[k] = v.replace(match_result, self.replacement) + else: + SNAPSHOT_LOGGER.debug( + f"Replacing value for key '{k}' with '{self.replacement}'. (Original value: {str(v)})" + ) + input_data[k] = self.replacement + elif isinstance(v, list) and len(v) > 0: + for i in range(0, len(v)): + if isinstance(v[i], dict): + v[i] = self.transform(v[i], ctx=ctx) + elif isinstance(v, dict): + input_data[k] = self.transform(v, ctx=ctx) + + return input_data + + +class GenericTransformer: + def __init__(self, fn: Callable[[dict, TransformContext], dict]): + self.fn = fn + + def transform(self, input_data: dict, *, ctx: TransformContext) -> dict: + return self.fn(input_data, ctx) + + +class SortingTransformer: + key: str + sorting_fn: Optional[Callable[[...], Any]] + + # TODO: add support for jsonpath + def __init__(self, key: str, sorting_fn: Optional[Callable[[...], Any]] = None): + """Sorts a list at `key` with the given `sorting_fn` (argument for `sorted(list, key=sorting_fn)`)""" + self.key = key + self.sorting_fn = sorting_fn + + def _transform_dict(self, input_data: dict, ctx: TransformContext = None) -> dict: + for k, v in input_data.items(): + if k == self.key: + if not isinstance(v, list): + raise ValueError("SortingTransformer should only be applied to lists.") + input_data[k] = sorted(self._transform(v, ctx=ctx), key=self.sorting_fn) + else: + input_data[k] = self._transform(v, ctx=ctx) + return input_data + + def _transform_list(self, input_data: list, ctx: TransformContext = None) -> list: + return [self._transform(e, ctx=ctx) for e in input_data] + + def _transform(self, input_data: Any, ctx: TransformContext = None) -> Any: + if isinstance(input_data, dict): + return self._transform_dict(input_data, ctx=ctx) + elif isinstance(input_data, list): + return self._transform_list(input_data, ctx=ctx) + else: + return input_data + + def transform(self, input_data: dict, *, ctx: TransformContext = None) -> dict: + return self._transform_dict(input_data, ctx=ctx) + + +class RegexMatcher: + def __init__(self, regex: str | re.Pattern, representation: str): + if isinstance(regex, str): + self.regex = re.compile(regex) + elif isinstance(regex, re.Pattern): + self.regex = regex + else: + raise Exception("Invalid") + + self.representation = representation + + +REFERENCE_DATE = ( + "2022-07-13T13:48:01Z" # v1.0.0 commit timestamp cf26bd9199354a9a55e0b65e312ceee4c407f6c0 +) +PATTERN_ISO8601 = re.compile( + r"(?:[1-9]\d{3}-(?:(?:0[1-9]|1[0-2])-(?:0[1-9]|1\d|2[0-8])|(?:0[13-9]|1[0-2])-(?:29|30)|(?:0[13578]|1[02])-31)|(?:[1-9]\d(?:0[48]|[2468][048]|[13579][26])|(?:[2468][048]|[13579][26])00)-02-29)T(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d(?:\.\d{1,9})?(?:Z|[+-][01]\d:?([0-5]\d)?)" +) + + +class TimestampTransformer: + matchers: list[RegexMatcher] + + def __init__(self): + """ + Create a timestamp transformer which will replace normal datetimes with and string timestamps with their representative format. + + The reference date which is used for replacements is "2022-07-13T13:48:01Z", the commit date for the v1.0.0 tag of localstack. + """ + + # Add your matcher here + self.matchers = [ + RegexMatcher( + r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z", "2022-07-13T13:48:01.000Z" + ), # stepfunctions internal + RegexMatcher( + r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}\+\d{4}", "2022-07-13T13:48:01.000+0000" + ), # lambda + RegexMatcher( + r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6}\+\d{2}:\d{2}", + "2022-07-13T13:48:01.000000+00:00", + ), # stepfunctions external, also cloudformation + RegexMatcher( + r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", + "2022-07-13T13:48:01Z", + ), # s3 + # RegexMatcher( + # PATTERN_ISO8601, "generic-iso8601" + # ), # very generic iso8601, this should technically always be fixed so we could also think about removing it here + ] + + def transform(self, input_data: dict, *, ctx: TransformContext = None) -> dict: + return self._transform_dict(input_data, ctx=ctx) + + def _transform(self, input_data: Any, ctx: TransformContext = None) -> Any: + if isinstance(input_data, dict): + return self._transform_dict(input_data, ctx=ctx) + elif isinstance(input_data, list): + return self._transform_list(input_data, ctx=ctx) + elif isinstance(input_data, datetime): + return "" + elif isinstance(input_data, str): + return self._transform_timestamp(input_data) + return input_data + + def _transform_timestamp(self, timestamp: str) -> str: + for matcher in self.matchers: + if matcher.regex.match(timestamp): + return f"" + return timestamp + + def _transform_dict(self, input_data: dict, ctx: TransformContext = None) -> dict: + for k, v in input_data.items(): + input_data[k] = self._transform(v, ctx=ctx) + return input_data + + def _transform_list(self, input_data: list, ctx: TransformContext = None) -> list: + return [self._transform(e, ctx=ctx) for e in input_data] diff --git a/localstack_snapshot/snapshots/transformer_utility.py b/localstack_snapshot/snapshots/transformer_utility.py new file mode 100644 index 0000000..6dc9d61 --- /dev/null +++ b/localstack_snapshot/snapshots/transformer_utility.py @@ -0,0 +1,33 @@ +from typing import Optional + +from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer + + +def _replace_camel_string_with_hyphen(input_string: str): + return "".join(["-" + char.lower() if char.isupper() else char for char in input_string]).strip( + "-" + ) + + +class TransformerUtility: + @staticmethod + def key_value( + key: str, value_replacement: Optional[str] = None, reference_replacement: bool = True + ): + """Creates a new KeyValueBasedTransformer. If the key matches, the value will be replaced. + + :param key: the name of the key which should be replaced + :param value_replacement: the value which will replace the original value. + By default it is the key-name in lowercase, separated with hyphen + :param reference_replacement: if False, only the original value for this key will be replaced. + If True all references of this value will be replaced (using a regex pattern), for the entire test case. + In this case, the replaced value will be nummerated as well. + Default: True + + :return: KeyValueBasedTransformer + """ + return KeyValueBasedTransformer( + lambda k, v: v if k == key and (v is not None and v != "") else None, + replacement=value_replacement or _replace_camel_string_with_hyphen(key), + replace_reference=reference_replacement, + ) diff --git a/localstack_snapshot/util/__init__.py b/localstack_snapshot/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/localstack_snapshot/util/encoding.py b/localstack_snapshot/util/encoding.py new file mode 100644 index 0000000..dae0706 --- /dev/null +++ b/localstack_snapshot/util/encoding.py @@ -0,0 +1,49 @@ +import decimal +import json +from datetime import date, datetime + +TIMESTAMP_FORMAT_MICROS = "%Y-%m-%dT%H:%M:%S.%fZ" +TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S" + + +def timestamp(time=None, format: str = TIMESTAMP_FORMAT) -> str: + if not time: + time = datetime.utcnow() + if isinstance(time, (int, float)): + time = datetime.fromtimestamp(time) + return time.strftime(format) + + +def timestamp_millis(time=None) -> str: + microsecond_time = timestamp(time=time, format=TIMESTAMP_FORMAT_MICROS) + # truncating microseconds to milliseconds, while leaving the "Z" indicator + return microsecond_time[:-4] + microsecond_time[-1] + + +class CustomJsonEncoder(json.JSONEncoder): + """Helper class to convert JSON documents with datetime, decimals, or bytes.""" + + def default(self, o): + import yaml # leave import here, to avoid breaking our Lambda tests! + + if isinstance(o, decimal.Decimal): + if o % 1 > 0: + return float(o) + else: + return int(o) + if isinstance(o, (datetime, date)): + return timestamp_millis(o) + if isinstance(o, yaml.ScalarNode): + if o.tag == "tag:yaml.org,2002:int": + return int(o.value) + if o.tag == "tag:yaml.org,2002:float": + return float(o.value) + if o.tag == "tag:yaml.org,2002:bool": + return bool(o.value) + return str(o.value) + try: + if isinstance(o, bytes): + return o.decode(encoding="UTF-8") + return super(CustomJsonEncoder, self).default(o) + except Exception: + return None diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ef79279 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,72 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "localstack-snapshot" +authors = [ + { name = "LocalStack Contributors", email = "info@localstack.cloud" } +] +version = "0.1.0" +description = "Extracted snapshot testing lib for LocalStack" +dependencies = [ + "jsonpath-ng==1.5.3", # TODO: resolve this pin + "deepdiff", + "botocore", +] +requires-python = ">=3.10" +license = {file = "LICENSE"} +classifiers = [ + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dynamic = ["readme"] + + +[project.optional-dependencies] +dev = [ + "black==23.10.0", + "pytest>=7.0", + "coverage[toml]>=5.0.0", + "ruff==0.1.0" +] + +[tool.setuptools] +include-package-data = false + +[tool.setuptools.dynamic] +readme = {file = ["README.md"], content-type = "text/markdown"} + +[tool.setuptools.packages.find] +include = ["localstack_snapshot*"] +exclude = ["tests*"] + +[tool.setuptools.package-data] +"*" = ["*.md"] + +[tool.black] +line_length = 100 +include = '((localstack_snapshot)/.*\.py$|tests/.*\.py$)' +#extend_exclude = '()' + +[tool.ruff] +# Always generate Python 3.10-compatible code. +target-version = "py310" +line-length = 110 +select = ["B", "C", "E", "F", "I", "W", "T", "B9"] +ignore = [ + "E501", # E501 Line too long - handled by black, see https://docs.astral.sh/ruff/faq/#is-ruff-compatible-with-black +] +exclude = [ + ".venv*", + "venv*", + "dist", + "build", + "target", + "*.egg-info", + ".git", +] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5f01bf5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1 @@ +"""add pytest configuration here""" diff --git a/tests/test_snapshots.py b/tests/test_snapshots.py new file mode 100644 index 0000000..2801741 --- /dev/null +++ b/tests/test_snapshots.py @@ -0,0 +1,192 @@ +import pytest + +from localstack_snapshot.snapshots import SnapshotSession +from localstack_snapshot.snapshots.report import _format_json_path +from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer, SortingTransformer + + +class TestSnapshotManager: + def test_simple_diff_nochange(self): + sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + sm.recorded_state = {"key_a": {"a": 3}} + sm.match("key_a", {"a": 3}) + sm._assert_all() + + def test_simple_diff_change(self): + sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + sm.recorded_state = {"key_a": {"a": 3}} + sm.match("key_a", {"a": 5}) + with pytest.raises(Exception) as ctx: + sm._assert_all() + ctx.match("Parity snapshot failed") + + def test_multiple_assertmatch_with_same_key_fail(self): + sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + sm.recorded_state = {"key_a": {"a": 3}} + sm.match("key_a", {"a": 3}) + with pytest.raises(Exception) as ctx: + sm.match("key_a", {"a": 3}) + ctx.match("used multiple times in the same test scope") + + def test_context_replacement(self): + sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + sm.add_transformer( + KeyValueBasedTransformer(lambda k, v: v if k == "aaa" else None, replacement="A") + ) + sm.recorded_state = {"key_a": {"aaa": "", "bbb": " hello"}} + sm.match("key_a", {"aaa": "something", "bbb": "something hello"}) + sm._assert_all() + + # def test_context_replacement_no_change(self): + # sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + # sm.add_transformer(TransformerUtility.key_value("name")) + # sm.recorded_state = {"key_a": {"name": ""}} + # sm.match("key_a", {"name": ""}) + # sm._assert_all() + + # def test_match_order_reference_replacement(self): + # """tests if the reference-replacement works as expected, e.g., using alphabetical order of keys""" + # sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + # + # sm.add_transformer(KeyValueBasedTransformer(_resource_name_transformer, "resource")) + # + # sm.recorded_state = { + # "subscription-attributes": { + # "Attributes": { + # "ConfirmationWasAuthenticated": "true", + # "Endpoint": "arn:aws:lambda:region:111111111111:function:", + # "Owner": "111111111111", + # "PendingConfirmation": "false", + # "Protocol": "lambda", + # "RawMessageDelivery": "false", + # "RedrivePolicy": { + # "deadLetterTargetArn": "arn:aws:sqs:region:111111111111:" + # }, + # "SubscriptionArn": "arn:aws:sns:region:111111111111::", + # "TopicArn": "arn:aws:sns:region:111111111111:", + # }, + # "ResponseMetadata": {"HTTPHeaders": {}, "HTTPStatusCode": 200}, + # } + # } + # sm.match( + # "subscription-attributes", + # { + # "Attributes": { + # "ConfirmationWasAuthenticated": "true", + # "Owner": "111111111111", + # "PendingConfirmation": "false", + # "Protocol": "lambda", + # "RawMessageDelivery": "false", + # "RedrivePolicy": { + # "deadLetterTargetArn": "arn:aws:sqs:region:111111111111:111112222233333" + # }, + # "TopicArn": "arn:aws:sns:region:111111111111:rrrrrrrrrrrrrrrrr", + # "SubscriptionArn": "arn:aws:sns:region:111111111111:rrrrrrrrrrrrrrrrr:azazazazazazazaza", + # "Endpoint": "arn:aws:lambda:region:111111111111:function:aaaaabbbbb", + # }, + # "ResponseMetadata": {"HTTPHeaders": {}, "HTTPStatusCode": 200}, + # }, + # ) + # sm._assert_all() + + # def test_reference_replacement_skip_outer_keys(self): + # """Test if the reference replacement properly skips the snapshot keys on the outermost level""" + # sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + # sm.add_transformer(TransformerUtility.key_value("name")) + # sm.recorded_state = {"key_a": {"name": ""}} + # sm.match("key_a", {"name": "key"}) + # sm._assert_all() + + def test_replacement_key_value(self): + sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + sm.add_transformer( + KeyValueBasedTransformer( + # returns last two characters of value -> only this should be replaced + lambda k, v: v[-2:] if k == "aaa" else None, + replacement="A", + replace_reference=False, + ) + ) + sm.recorded_state = { + "key_a": {"aaa": "hellA", "aab": "this is a test", "b": {"aaa": "another teA"}} + } + sm.match("key_a", {"aaa": "helloo", "aab": "this is a test", "b": {"aaa": "another test"}}) + sm._assert_all() + + def test_dot_in_skip_verification_path(self): + sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + sm.recorded_state = { + "key_a": {"aaa": "hello", "aab": "this is a test", "b": {"a.aa": "another test"}} + } + sm.match( + "key_a", + {"aaa": "hello", "aab": "this is a test-fail", "b": {"a.aa": "another test-fail"}}, + ) + + with pytest.raises(Exception) as ctx: # asserts it fail without skipping + sm._assert_all() + ctx.match("Parity snapshot failed") + + skip_path = ["$..aab", "$..b.a.aa"] + with pytest.raises(Exception) as ctx: # asserts it fails if fields are not escaped + sm._assert_all(skip_verification_paths=skip_path) + ctx.match("Parity snapshot failed") + + skip_path_escaped = ["$..aab", "$..b.'a.aa'"] + sm._assert_all(skip_verification_paths=skip_path_escaped) + + def test_non_homogeneous_list(self): + sm = SnapshotSession(scope_key="A", verify=True, base_file_path="", update=False) + sm.recorded_state = {"key1": [{"key2": "value1"}, "value2", 3]} + sm.match("key1", [{"key2": "value1"}, "value2", 3]) + sm._assert_all() + + +def test_json_diff_format(): + path = ["Records", 1] + assert _format_json_path(path) == '"$..Records"' + path = ["Records", 1, 1, 1] + assert _format_json_path(path) == '"$..Records"' + path = ["Records", 1, "SomeKey"] + assert _format_json_path(path) == '"$..Records..SomeKey"' + path = ["Records", 1, 1, "SomeKey"] + assert _format_json_path(path) == '"$..Records..SomeKey"' + path = ["Records", 1, 1, 0, "SomeKey"] + assert _format_json_path(path) == '"$..Records..SomeKey"' + path = ["Records", "SomeKey"] + assert _format_json_path(path) == '"$..Records.SomeKey"' + path = [] + assert _format_json_path(path) == '"$.."' + path = [1, 1, 0, "SomeKey"] + assert _format_json_path(path) == '"$..SomeKey"' + + +def test_sorting_transformer(): + original_dict = { + "a": { + "b": [ + {"name": "c-123"}, + {"name": "a-123"}, + {"name": "b-123"}, + ] + }, + "a2": { + "b": [ + {"name": "b-123"}, + {"name": "a-123"}, + {"name": "c-123"}, + ] + }, + } + + sorted_items = [ + {"name": "a-123"}, + {"name": "b-123"}, + {"name": "c-123"}, + ] + + transformer = SortingTransformer("b", lambda x: x["name"]) + transformed_dict = transformer.transform(original_dict) + + assert transformed_dict["a"]["b"] == sorted_items + assert transformed_dict["a2"]["b"] == sorted_items diff --git a/tests/test_transformer.py b/tests/test_transformer.py new file mode 100644 index 0000000..3f98616 --- /dev/null +++ b/tests/test_transformer.py @@ -0,0 +1,255 @@ +import copy +import json + +import pytest + +from localstack_snapshot.snapshots.transformer import ( + SortingTransformer, + TimestampTransformer, + TransformContext, +) +from localstack_snapshot.snapshots.transformer_utility import TransformerUtility + + +class TestTransformer: + def test_key_value_replacement(self): + input = { + "hello": "world", + "hello2": "again", + "path": {"to": {"anotherkey": "hi", "inside": {"hello": "inside"}}}, + } + + key_value = TransformerUtility.key_value( + "hello", "placeholder", reference_replacement=False + ) + + expected_key_value = { + "hello": "placeholder", + "hello2": "again", + "path": {"to": {"anotherkey": "hi", "inside": {"hello": "placeholder"}}}, + } + + copied = copy.deepcopy(input) + ctx = TransformContext() + assert key_value.transform(copied, ctx=ctx) == expected_key_value + assert ctx.serialized_replacements == [] + + copied = copy.deepcopy(input) + key_value = TransformerUtility.key_value("hello", "placeholder", reference_replacement=True) + expected_key_value_reference = { + "hello": "", + "hello2": "again", + "path": {"to": {"anotherkey": "hi", "": {"hello": ""}}}, + } + assert key_value.transform(copied, ctx=ctx) == copied + assert len(ctx.serialized_replacements) == 2 + + tmp = json.dumps(copied, default=str) + for sr in ctx.serialized_replacements: + tmp = sr(tmp) + + assert json.loads(tmp) == expected_key_value_reference + + def test_key_value_replacement_with_falsy_value(self): + input = { + "hello": "world", + "somenumber": 0, + } + + key_value = TransformerUtility.key_value( + "somenumber", "placeholder", reference_replacement=False + ) + + expected_key_value = { + "hello": "world", + "somenumber": "placeholder", + } + + copied = copy.deepcopy(input) + ctx = TransformContext() + assert key_value.transform(copied, ctx=ctx) == expected_key_value + assert ctx.serialized_replacements == [] + + @pytest.mark.parametrize("type", ["key_value", "jsonpath"]) + def test_replacement_with_reference(self, type): + input = { + "also-me": "b", + "path": { + "to": {"anotherkey": "hi", "test": {"hello": "replaceme"}}, + "another": {"key": "this/replaceme/hello"}, + }, + "b": {"a/b/replaceme.again": "bb"}, + "test": {"inside": {"path": {"to": {"test": {"hello": "also-me"}}}}}, + } + + expected = { + "": "b", + "path": { + "to": {"anotherkey": "hi", "test": {"hello": ""}}, + "another": {"key": "this//hello"}, + }, + "b": {"a/b/.again": "bb"}, + "test": {"inside": {"path": {"to": {"test": {"hello": ""}}}}}, + } + replacement = "MYVALUE" + if type == "key_value": + transformer = TransformerUtility.key_value( + "hello", replacement, reference_replacement=True + ) + else: + transformer = TransformerUtility.jsonpath( + "$..path.to.test.hello", replacement, reference_replacement=True + ) + + copied = copy.deepcopy(input) + ctx = TransformContext() + + assert transformer.transform(copied, ctx=ctx) == copied + assert len(ctx.serialized_replacements) == 2 + + tmp = json.dumps(copied, default=str) + for sr in ctx.serialized_replacements: + tmp = sr(tmp) + + assert json.loads(tmp) == expected + + def test_regex(self): + input = { + "hello": "world", + "hello2": "again", + "path": {"to": {"anotherkey": "hi", "inside": {"hello": "inside"}}}, + } + + expected = { + "new-value": "world", + "new-value2": "again", + "path": {"to": {"anotherkey": "hi", "inside": {"new-value": "inside"}}}, + } + + transformer = TransformerUtility.regex("hello", "new-value") + + ctx = TransformContext() + output = transformer.transform(json.dumps(input), ctx=ctx) + for sr in ctx.serialized_replacements: + output = sr(output) + assert json.loads(output) == expected + + def test_log_stream_name(self): + input = { + "Payload": { + "context": { + "functionVersion": "$LATEST", + "functionName": "my-function", + "memoryLimitInMB": "128", + "logGroupName": "/aws/lambda/my-function", + "logStreamName": "2022/05/31/[$LATEST]ced3cafaaf284d8199e02909ac87e2f5", + "clientContext": { + "custom": {"foo": "bar"}, + "client": {"snap": ["crackle", "pop"]}, + "env": {"fizz": "buzz"}, + }, + "invokedFunctionArn": "arn:aws:lambda:us-east-1:111111111111:function:my-function", + } + } + } + transformers = TransformerUtility.lambda_api() + ctx = TransformContext() + for t in transformers: + t.transform(input, ctx=ctx) + + output = json.dumps(input) + for sr in ctx.serialized_replacements: + output = sr(output) + + expected = { + "Payload": { + "context": { + "functionVersion": "$LATEST", + "functionName": "", + "memoryLimitInMB": "128", + "logGroupName": "/aws/lambda/", + "logStreamName": "", + "clientContext": { + "custom": {"foo": "bar"}, + "client": {"snap": ["crackle", "pop"]}, + "env": {"fizz": "buzz"}, + }, + "invokedFunctionArn": "arn:aws:lambda:us-east-1:111111111111:function:", + } + } + } + assert expected == json.loads(output) + + def test_nested_sorting_transformer(self): + input = { + "subsegments": [ + { + "name": "mysubsegment", + "subsegments": [ + {"name": "b"}, + {"name": "a"}, + ], + } + ], + } + + expected = { + "subsegments": [ + { + "name": "mysubsegment", + "subsegments": [ + {"name": "a"}, + {"name": "b"}, + ], + } + ], + } + + transformer = SortingTransformer("subsegments", lambda s: s["name"]) + + ctx = TransformContext() + output = transformer.transform(input, ctx=ctx) + assert output == expected + + +class TestTimestampTransformer: + def test_generic_timestamp_transformer(self): + # TODO: add more samples + + input = { + "lambda_": { + "FunctionName": "lambdafn", + "LastModified": "2023-10-09T12:49:50.000+0000", + }, + "cfn": { + "StackName": "cfnstack", + "CreationTime": "2023-11-20T18:39:36.014000+00:00", + }, + "sfn": { + "name": "statemachine", + "creationDate": "2023-11-21T07:14:12.243000+01:00", + "sfninternal": "2023-11-21T07:14:12.243Z", + }, + } + + expected = { + "lambda_": { + "FunctionName": "lambdafn", + "LastModified": "", + }, + "cfn": { + "StackName": "cfnstack", + "CreationTime": "", + }, + "sfn": { + "name": "statemachine", + "creationDate": "", + "sfninternal": "", + }, + } + + transformer = TimestampTransformer() + + ctx = TransformContext() + output = transformer.transform(input, ctx=ctx) + assert output == expected