From 55ab7f783184cffd18aa4271bbda42ebaa72999a Mon Sep 17 00:00:00 2001 From: Dmitry Dygalo Date: Sat, 10 Aug 2019 20:38:22 +0200 Subject: [PATCH] Cassettes mutations --- README.rst | 27 ++++ docs/changelog.rst | 6 + setup.py | 2 +- src/pytest_recording/_compat.py | 13 ++ src/pytest_recording/_vcr.py | 15 +- src/pytest_recording/mutations/__init__.py | 9 ++ src/pytest_recording/mutations/core.py | 52 +++++++ src/pytest_recording/mutations/mutagens.py | 48 +++++++ src/pytest_recording/plugin.py | 44 +++++- tests/test_mutations.py | 155 +++++++++++++++++++++ 10 files changed, 363 insertions(+), 8 deletions(-) create mode 100644 src/pytest_recording/_compat.py create mode 100644 src/pytest_recording/mutations/__init__.py create mode 100644 src/pytest_recording/mutations/core.py create mode 100644 src/pytest_recording/mutations/mutagens.py create mode 100644 tests/test_mutations.py diff --git a/README.rst b/README.rst index 0786314..ec7fe24 100644 --- a/README.rst +++ b/README.rst @@ -117,6 +117,33 @@ Run ``pytest``: The network blocking feature supports ``socket``-based transports and ``pycurl``. +Cassettes mutation +~~~~~~~~~~~~~~~~~~ + +With static cassettes, the responses are stored and expected, but how to test unexpected behavior of a remote service? +What if it will respond with plain text instead of JSON? unexpected status code? missing field or wrong value in JSON? + +If you already have cassettes recorded, then mutation testing could help you to test these unusual scenarios. +To enable it you need to add ``pytest.mark.vcr_mutations`` to your test and specify ``mutators`` keyword argument: + +.. code:: python + + from pytest_recording import mutations + + def make_call(): + response = requests.get("http://httpbin.org/get") + ... + + @pytest.mark.vcr_mutations( + mutations=mutations.Body() + ) + def test_malformed_body(mutation): + with pytest.raises(MalformedBodyError): + make_call() + + +It will generate N(???) mutations for the cassettes and will run that test. + Contributing ------------ diff --git a/docs/changelog.rst b/docs/changelog.rst index 1434d2e..012b549 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -6,6 +6,11 @@ Changelog `Unreleased`_ ------------- +Added +~~~~~ + +- Add cassettes mutation support. `#13`_ + `0.3.2`_ - 2019-08-01 --------------------- @@ -49,6 +54,7 @@ Added .. _0.3.0: https://github.com/kiwicom/pytest-recording/compare/v0.2.0...v0.3.0 .. _0.2.0: https://github.com/kiwicom/pytest-recording/compare/v0.1.0...v0.2.0 +.. _#13: https://github.com/kiwicom/pytest-recording/issues/13 .. _#10: https://github.com/kiwicom/pytest-recording/issues/10 .. _#8: https://github.com/kiwicom/pytest-recording/issues/8 .. _#2: https://github.com/kiwicom/pytest-recording/issues/2 \ No newline at end of file diff --git a/setup.py b/setup.py index 827cede..de4f04c 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ install_requires = ["vcrpy>=2.0.1", "attrs"] if sys.version_info[0] == 2: - install_requires.append("pytest>=3.5.0,<5.0") + install_requires.extend(["pytest>=3.5.0,<5.0", "funcsigs>=1.0.2"]) else: install_requires.append("pytest>=3.5.0") diff --git a/src/pytest_recording/_compat.py b/src/pytest_recording/_compat.py new file mode 100644 index 0000000..2e739b0 --- /dev/null +++ b/src/pytest_recording/_compat.py @@ -0,0 +1,13 @@ +try: + from inspect import Signature + + get_signature = Signature.from_callable # pylint: disable=unused-import +except ImportError: + from funcsigs import signature as get_signature # pylint: disable=unused-import + + +try: + from collections.abc import Iterable # pylint: disable=unused-import +except ImportError: + # Python 2 + from collections import Iterable diff --git a/src/pytest_recording/_vcr.py b/src/pytest_recording/_vcr.py index fcc08c2..32de040 100644 --- a/src/pytest_recording/_vcr.py +++ b/src/pytest_recording/_vcr.py @@ -9,13 +9,17 @@ from .utils import unique, unpack -def load_cassette(cassette_path, serializer): +def load_cassette(cassette_path, serializer, mutation): try: with open(cassette_path) as f: cassette_content = f.read() except IOError: return [], [] - return deserialize(cassette_content, serializer) + requests, cassettes = deserialize(cassette_content, serializer) + if mutation is not None: + for cassette in cassettes: + mutation.apply(cassette) + return requests, cassettes @attr.s(slots=True) @@ -23,11 +27,12 @@ class CombinedPersister(FilesystemPersister): """Load extra cassettes, but saves only the first one.""" extra_paths = attr.ib() + mutation = attr.ib() def load_cassette(self, cassette_path, serializer): all_paths = chain.from_iterable(((cassette_path,), self.extra_paths)) # Pairs of 2 lists per cassettes: - all_content = (load_cassette(path, serializer) for path in unique(all_paths)) + all_content = (load_cassette(path, serializer, self.mutation) for path in unique(all_paths)) # Two iterators from all pairs from above: all requests, all responses # Notes. # 1. It is possible to do it with accumulators, for loops and `extend` calls, @@ -37,14 +42,14 @@ def load_cassette(self, cassette_path, serializer): return starmap(unpack, zip(*all_content)) -def use_cassette(vcr_cassette_dir, record_mode, markers, config): +def use_cassette(vcr_cassette_dir, record_mode, markers, config, mutation): """Create a VCR instance and return an appropriate context manager for the given cassette configuration.""" merged_config = merge_kwargs(config, markers) path_transformer = get_path_transformer(merged_config) vcr = VCR(path_transformer=path_transformer, cassette_library_dir=vcr_cassette_dir, record_mode=record_mode) # flatten the paths extra_paths = chain(*(marker[0] for marker in markers)) - persister = CombinedPersister(extra_paths) + persister = CombinedPersister(extra_paths, mutation) vcr.register_persister(persister) return vcr.use_cassette(markers[0][0][0], **merged_config) diff --git a/src/pytest_recording/mutations/__init__.py b/src/pytest_recording/mutations/__init__.py new file mode 100644 index 0000000..3aee2b8 --- /dev/null +++ b/src/pytest_recording/mutations/__init__.py @@ -0,0 +1,9 @@ +from .mutagens import status_code, malform_body, empty_body, reduce_body +from .core import mutation, MutationGroup + + +DEFAULT_STATUS_CODES = (400, 401, 403, 404, 500, 501, 502, 503) + +DEFAULT = MutationGroup((status_code(code=DEFAULT_STATUS_CODES), empty_body, malform_body, reduce_body)) + +del MutationGroup # Not needed in the API diff --git a/src/pytest_recording/mutations/core.py b/src/pytest_recording/mutations/core.py new file mode 100644 index 0000000..572a613 --- /dev/null +++ b/src/pytest_recording/mutations/core.py @@ -0,0 +1,52 @@ +import attr + +from .._compat import get_signature + + +@attr.s(slots=True) +class Mutation(object): + func = attr.ib() + context = attr.ib(factory=dict) + _signature = attr.ib(init=False) + + @classmethod + def from_function(cls, func): + return cls(func) + + @property + def signature(self): + if not hasattr(self, "_signature"): + self._signature = get_signature(self.func) + return self._signature + + def __call__(self, **context): + self.validate_call(**context) + return self.__class__(self.func, context) + + def validate_call(self, **context): + self.signature.bind(cassette={}, **context) + + def generate(self): + yield self + + def apply(self, cassette): + return self.func(cassette, **self.context) + + +mutation = Mutation.from_function + + +@attr.s(slots=True) +class MutationGroup(object): + mutations = attr.ib() + + def generate(self): + for mutation in self.mutations: + # change to "yield from" after dropping Python 2 + for one in self.generate_one(mutation): + yield one + + def generate_one(self, mutation): + # change to "yield from" after dropping Python 2 + for one in mutation.generate(): + yield one diff --git a/src/pytest_recording/mutations/mutagens.py b/src/pytest_recording/mutations/mutagens.py new file mode 100644 index 0000000..bccdda2 --- /dev/null +++ b/src/pytest_recording/mutations/mutagens.py @@ -0,0 +1,48 @@ +import json +import random + +from .._compat import Iterable +from .core import mutation, Mutation + + +class ChangeStatusCode(Mutation): + def generate(self): + # "code" will always be in the context, because it is validated before + if isinstance(self.context["code"], Iterable): + for status_code in self.context["code"]: + yield Mutation(self.func, {"code": status_code}) + else: + yield self + + +@ChangeStatusCode.from_function +def status_code(cassette, code): + cassette["status"]["code"] = code + + +@mutation +def empty_body(cassette): + """Make the body empty.""" + cassette["body"]["string"] = b"" + + +@mutation +def malform_body(cassette): + """Add chars to the body to make it not decodable.""" + cassette["body"]["string"] = b"xX" + cassette["body"]["string"] + b"Xx" + + +@mutation +def reduce_body(cassette): + data = json.loads(cassette["body"]["string"]) + # it could be a list or other json type + # take out random key + key = random.choice(list(data.keys())) + del data[key] + cassette["body"]["string"] = json.dumps(data) + + +# TODO. +# XML - Add / remove XML tags. +# JSON - add keys, remove keys, replace values +# JSON - shrink lists, extend lists diff --git a/src/pytest_recording/plugin.py b/src/pytest_recording/plugin.py index 7e73dcc..3ce891a 100644 --- a/src/pytest_recording/plugin.py +++ b/src/pytest_recording/plugin.py @@ -4,13 +4,15 @@ import pytest -from . import network +from . import mutations, network +from ._compat import Iterable from ._vcr import use_cassette RECORD_MODES = ("once", "new_episodes", "none", "all") def pytest_configure(config): + # TODO. register mutation marker config.addinivalue_line("markers", "vcr: Mark the test as using VCR.py.") config.addinivalue_line("markers", "block_network: Block network access except for VCR recording.") network.install_pycurl_wrapper() @@ -84,17 +86,55 @@ def block_network(request, record_mode): yield +@pytest.fixture() +def mutation(): + """Default mutation applied to all cassettes if no `vcr_mutations` is specified. + + Should be None by default. But, could be used to apply the same mutations for all cassettes in a certain context. + """ + return + + @pytest.fixture(autouse=True) def vcr(request, vcr_markers, vcr_cassette_dir, record_mode): """Install a cassette if a test is marked with `pytest.mark.vcr`.""" if vcr_markers: config = request.getfixturevalue("vcr_config") - with use_cassette(vcr_cassette_dir, record_mode, vcr_markers, config) as cassette: + mutation = request.getfixturevalue("mutation") + with use_cassette(vcr_cassette_dir, record_mode, vcr_markers, config, mutation) as cassette: yield cassette else: yield +def pytest_generate_tests(metafunc): + """Generate tests if VCR mutations are applied.""" + if should_apply_mutations(metafunc): + marker = metafunc.definition.get_closest_marker("vcr_mutations") + if not marker: + # No mutations applied + return + if not marker.kwargs: + targets = mutations.DEFAULT + elif not (len(marker.kwargs) == 1 and "mutations" in marker.kwargs): + pytest.fail("Some error message") + else: + targets = marker.kwargs["mutations"] + if isinstance(targets, Iterable): + targets = mutations.core.MutationGroup(targets) + + metafunc.parametrize("mutation", targets.generate()) + + +def should_apply_mutations(metafunc): + """To perform mutations on a test we need: + + - `none` record mode; + - `pytest.mark.vcr` applied to the test. + """ + return metafunc.config.getoption("--record-mode") == "none" and list(metafunc.definition.iter_markers(name="vcr")) + + @pytest.fixture(scope="module") def vcr_cassette_dir(request): """Each test module has its own cassettes directory to avoid name collisions. diff --git a/tests/test_mutations.py b/tests/test_mutations.py new file mode 100644 index 0000000..d8d78b6 --- /dev/null +++ b/tests/test_mutations.py @@ -0,0 +1,155 @@ +import pytest + +from pytest_recording import mutations + + +def test_status_code(testdir, get_response_cassette): + # When status_code mutation is specified + testdir.makepyfile( + """ +import pytest +import requests +from pytest_recording import mutations + +STATUS_CODES = (400, 401, 418, 500, 501) + +@pytest.mark.vcr_mutations(mutations=mutations.status_code(code=STATUS_CODES)) +@pytest.mark.vcr("{}") +def test_status_code(mutation): + assert requests.get("http://httpbin.org/get").status_code == mutation.context["code"] + """.format( + get_response_cassette + ) + ) + # Then 5 tests should be generated with mutated cassettes + result = testdir.runpytest() + result.assert_outcomes(passed=5) + + +@pytest.mark.parametrize("mutation", ("malform_body", "empty_body")) +def test_change_body(testdir, get_response_cassette, mutation): + # When body-changing mutation is specified + testdir.makepyfile( + """ +try: + from json import JSONDecodeError +except ImportError: + JSONDecodeError = ValueError +import pytest +import requests +from pytest_recording import mutations + +@pytest.mark.vcr_mutations(mutations=mutations.{}) +@pytest.mark.vcr("{}") +def test_change_body(mutation): + try: + requests.get("http://httpbin.org/get").json() + except JSONDecodeError: + pass + """.format( + mutation, get_response_cassette + ) + ) + # Then cassettes should be mutated - body is changed + result = testdir.runpytest() + result.assert_outcomes(passed=1) + + +def test_combined(testdir, get_response_cassette): + # When mutations are combined with "|" symbol + testdir.makepyfile( + """ +# Remove try/except after dropping Python 2 +try: + from json import JSONDecodeError +except ImportError: + JSONDecodeError = ValueError +import pytest +import requests +from pytest_recording import mutations + +@pytest.mark.vcr_mutations( + mutations=[ + mutations.empty_body, + mutations.malform_body, + mutations.status_code(code=502), + mutations.status_code(code=[400, 401]) + ] +) +@pytest.mark.vcr("{}") +def test_combined(mutation): + try: + response = requests.get("http://httpbin.org/get") + response.json() + except JSONDecodeError: + pass + """.format( + get_response_cassette + ) + ) + # Then all mutations should be applied and N test cases should be generated + result = testdir.runpytest() + result.assert_outcomes(passed=5) + + +def test_mutation_context(testdir, get_response_cassette): + # When mutation is called with some arguments in the mark + testdir.makepyfile( + """ +import pytest +from pytest_recording import mutations + +@mutations.mutation +def mut(cassette, test): + print("CONTEXT:", test) + +@pytest.mark.vcr_mutations(mutations=mut(test=123)) +@pytest.mark.vcr("{}") +def test_mutation_context(mutation): + pass + + """.format( + get_response_cassette + ) + ) + # Then these arguments should be passed in mutation when mutation is applied + result = testdir.runpytest("-s") + result.assert_outcomes(passed=1) + result.stdout.fnmatch_lines(["*CONTEXT*:* 123*"]) + + +@pytest.mark.parametrize("kwargs", ({}, {"a": 1})) +def test_signature(kwargs): + # When an invalid argument is passed to a mutation + with pytest.raises(TypeError): + # Then TypeError should happen early, not during the test run + mutations.status_code(**kwargs) + + +def test_mark_signature(testdir, get_response_cassette): + # When `vcr_mutations` mark is applied with wrong arguments + testdir.makepyfile( + """ +import pytest +from pytest_recording import mutations + +@mutations.mutation +def mut(cassette, test): + print("CONTEXT:", test) + +@pytest.mark.vcr_mutations(mutationZ=mutations.empty_body) # A typo +@pytest.mark.vcr("{}") +def test_mark_signature(mutation): + pass + + """.format( + get_response_cassette + ) + ) + # Then tests should fail early + result = testdir.runpytest() + result.assert_outcomes(error=1) + + +# TODO. what about different content types in cassettes? +# What if it is gzipped?