diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml new file mode 100644 index 000000000..7e9410b92 --- /dev/null +++ b/.github/workflows/unit-test.yml @@ -0,0 +1,78 @@ +name: Unit Testing + +on: + pull_request: + types: [opened, edited, reopened] + push: + branches: + - v03_wip + +jobs: + build: + + strategy: + fail-fast: false + matrix: + osver: [ "ubuntu-20.04", "ubuntu-22.04" ] + + runs-on: ${{ matrix.osver }} + + name: Unit test on ${{ matrix.osver }} + timeout-minutes: 30 + + steps: + - uses: actions/checkout@v3 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + + pip install -r requirements.txt + pip install -e . + pip install -r tests/requirements.txt + + - name: Test with pytest + run: | + pytest tests --junitxml=tests/junit/test-results.xml \ + --cov-config=tests/.coveragerc --cov=sarracenia --cov-report=html --cov-report=lcov --cov-report=xml \ + --html=tests/report.html --self-contained-html + + - name: Upload pytest junit results + uses: actions/upload-artifact@v3 + with: + name: results-junit-${{ matrix.osver }} + path: tests/junit/test-results.xml + # Use always() to always run this step to publish test results when there are test failures + if: ${{ always() }} + + - name: Upload pytest HTML report + uses: actions/upload-artifact@v3 + with: + name: results-report-${{ matrix.osver }} + path: tests/report.html + # Use always() to always run this step to publish test results when there are test failures + if: ${{ always() }} + + - name: Upload code coverage report (HTML) + uses: actions/upload-artifact@v3 + with: + name: coverage-report-${{ matrix.osver }} + path: tests/coverage/html_report + # Use always() to always run this step to publish test results when there are test failures + if: ${{ always() }} + + - name: Upload code coverage report (LCOV) + uses: actions/upload-artifact@v3 + with: + name: coverage-lcov-${{ matrix.osver }} + path: tests/coverage/coverage.lcov + # Use always() to always run this step to publish test results when there are test failures + if: ${{ always() }} + + - name: Upload code coverage report (XML) + uses: actions/upload-artifact@v3 + with: + name: coverage-xml-${{ matrix.osver }} + path: tests/coverage/coverage.xml + # Use always() to always run this step to publish test results when there are test failures + if: ${{ always() }} diff --git a/obsolete/.gitignore b/obsolete/.gitignore new file mode 100644 index 000000000..ed8ebf583 --- /dev/null +++ b/obsolete/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/tests/.coveragerc b/tests/.coveragerc new file mode 100644 index 000000000..bc1017013 --- /dev/null +++ b/tests/.coveragerc @@ -0,0 +1,38 @@ +# .coveragerc to control coverage.py +[run] +branch = True +data_file = tests/coverage/.coverage +relative_files = True + +[report] +# Regexes for lines to exclude from consideration +exclude_also = + # Don't complain about missing debug-only code: + def __repr__ + if self\.debug + + # Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + + # Don't complain if non-runnable code isn't run: + if 0: + if __name__ == .__main__.: + + # Don't complain about abstract methods, they aren't run: + @(abc\.)?abstractmethod + +ignore_errors = True + +[html] +directory = tests/coverage/html_report + +[xml] +output = tests/coverage/coverage.xml + +[json] +output = tests/coverage/coverage.json +pretty_print = True + +[lcov] +output = tests/coverage/coverage.lcov \ No newline at end of file diff --git a/tests/.gitignore b/tests/.gitignore new file mode 100644 index 000000000..19a844a28 --- /dev/null +++ b/tests/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +junit +coverage +report.html \ No newline at end of file diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 000000000..8e4bef76b --- /dev/null +++ b/tests/README.md @@ -0,0 +1,60 @@ +# Unit Testing + +## Details +Unit tests are currently setup to use [Pytest](https://docs.pytest.org/en/7.3.x/contents.html). + +## Setup + +Setting up an environment for testing is quite simple. + +From a clean Linux image you run the following commands, and you'll be all set. + +1. Download the Sarracenia source + `git clone https://github.com/MetPX/sarracenia` +2. Checkout the branch you're working on (optional) + `git checkout ` +3. Update PIP + `pip3 install -U pip` +4. Install base requirements for Sarracenia + `pip install -r requirements.txt` +5. Install sarracenia Python modules + `pip3 install -e .` +6. Install requirements for PyTest + `pip install -r tests/requirements.txt` + +That's it, you're now ready to run the tests. + +## Running + +From within the repository root directory, simply run `pytest tests` and you'll see the results. There's a full set of arguments/options that modify the output, outlined [here](https://docs.pytest.org/en/7.3.x/reference/reference.html#ini-options-ref). + +As configured, it will output a bit of system information, total tests, followed by a line per file, with dots/letters to indicate the status of each test in that file. +Letter meanings: +(`f`)ailed, (`E`)rror, (`s`)kipped, (`x`)failed, (`X`)passed, (`p`)assed, (`P`)assed with output, (`a`)ll except passed (`p`/`P`), (`w`)arnings, (`A`)ll + +Specifying the `-v` option will make it a bit more verbose, listing each tests, and it's pass/skip/fail status. + +Application logs captured during tests can be output with the `-o log_cli=true` argument. + +### Reporting +Basic HTML report of tests can be generated by adding `--html=tests/report.html --self-contained-html` to the command-line. + +Code coverage can be generated by adding `--cov-config=tests/.coveragerc --cov=sarracenia --cov-report=xml --cov-report=html` to the command-line. + +Junit-compatible unit test report can be had with by adding `--junitxml=tests/junit/test-results.xml` to the command-line. + + +## Docker +You can also run the exact same tests from within a Docker container if you want to avoid having to (re)-provision clean installs. + +If the code is already present on the host system, you can use the `python` image, and map the code into the container for the tests: +`docker run --rm -it --name sr3-pytest -v $(pwd):/app -w /app python:3 bash` + +Then you run the Setup section above, starting at Step 3. + +If the code isn't already on your system, then the following should get you setup: +- Run container + `docker run --rm -it --name sr3-pytest ubuntu bash` +- Install dependencies (Ubuntu/Debian) + `apt-get update && apt-get install -y git python3 python3-pip` +- Then follow the Setup directions above \ No newline at end of file diff --git a/tests/pytest.ini b/tests/pytest.ini new file mode 100644 index 000000000..7780f1bf4 --- /dev/null +++ b/tests/pytest.ini @@ -0,0 +1,11 @@ +[pytest] +minversion = 7.0 +# With Code Coverage +#addopts = --cov-config=tests/.coveragerc --cov=sarracenia --cov-report=html --cov-report=lcov +# With JUnit test report +#addopts = --junitxml=tests/junit/test-results.xml +norecursedirs = obsolete docs debian docker tools +python_files = *_test.py +python_functions = test_* +log_cli = False +testpaths = tests \ No newline at end of file diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 000000000..d671f0295 --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,9 @@ +pytest>=7.3 +pytest-cov>=4.0 +pytest-bug>=1.2 +pytest-depends>=1.0 +pytest-html>=3.2 + +python-redis-lock>=4 +fakeredis>=2.11 +fakeredis[lua]>=2.11 \ No newline at end of file diff --git a/tests/sarracenia/diskqueue_test.py b/tests/sarracenia/diskqueue_test.py new file mode 100644 index 000000000..8a8850e61 --- /dev/null +++ b/tests/sarracenia/diskqueue_test.py @@ -0,0 +1,239 @@ +import pytest, jsonpickle +import os + +from sarracenia.diskqueue import DiskQueue + +class Options: + def add_option(self, option, type, default = None): + if default != None: + self.option = default + pass + +BaseOptions = Options() +BaseOptions.retry_ttl = 0 +BaseOptions.logLevel = "DEBUG" +BaseOptions.queueName = "TEST_QUEUE_NAME" +BaseOptions.component = "sarra" +BaseOptions.config = "foobar.conf" +BaseOptions.pid_filename = "/tmp/sarracenia/diskqueue_test/pid_filename" +BaseOptions.housekeeping = float(39) + +message = { + "pubTime": "20180118151049.356378078", + "topic": "v02.post.sent_by_tsource2send", + "headers": { + "atime": "20180118151049.356378078", + "from_cluster": "localhost", + "mode": "644", + "mtime": "20180118151048", + "parts": "1,69,1,0,0", + "source": "tsource", + "sum": "d,c35f14e247931c3185d5dc69c5cd543e", + "to_clusters": "localhost" + }, + "baseUrl": "https://NotARealURL", + "relPath": "ThisIsAPath/To/A/File.txt", + "notice": "20180118151050.45 ftp://anonymous@localhost:2121 /sent_by_tsource2send/SXAK50_KWAL_181510___58785" +} + +def test_msgFromJSON(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'work_retry') + + assert message == download_retry.msgFromJSON(jsonpickle.encode(message)) + +def test_msgToJSON(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'work_retry') + + assert jsonpickle.encode(message) + '\n' == download_retry.msgToJSON(message) + +def test__is_exired__TooSoon(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + BaseOptions.retry_ttl = 100000 + download_retry = DiskQueue(BaseOptions, 'work_retry') + + assert download_retry.is_expired(message) == True + +def test__is_exired__TooLate(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + BaseOptions.retry_ttl = 1 + download_retry = DiskQueue(BaseOptions, 'work_retry') + + import sarracenia + message["pubTime"] = sarracenia.nowstr() + + assert download_retry.is_expired(message) == False + +def test___len__(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'work_retry') + + # fp = open(download_retry.queue_file, 'a') + # fp_new = open(download_retry.new_path, 'a') + # fp_hk = open(download_retry.housekeeping_path, 'a') + + # fp_new.write(download_retry.msgToJSON(message)) + download_retry.msg_count += 1 + assert len(download_retry) == 1 + + download_retry.msg_count_new += 1 + assert len(download_retry) == 2 + +def test_in_cache(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'work_retry') + + download_retry.retry_cache = {} + + assert download_retry.in_cache(message) == False + + # Checking if it's there actually adds it, so checking it again right after should return True + assert download_retry.in_cache(message) == True + +def test_needs_requeuing(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'work_retry') + + download_retry.retry_cache = {} + + assert download_retry.needs_requeuing(message) == True + assert download_retry.needs_requeuing(message) == False + + download_retry.o.retry_ttl = 1000000 + + assert download_retry.needs_requeuing(message) == False + +def test_put__Single(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_put__Single') + + download_retry.put([message]) + assert download_retry.msg_count_new == 1 + + line = jsonpickle.encode(message) + '\n' + + assert open(download_retry.new_path, 'r').read() == line + +def test_put__Multi(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_put__Multi') + + download_retry.put([message, message, message]) + assert download_retry.msg_count_new == 3 + + line = jsonpickle.encode(message) + '\n' + + contents = open(download_retry.new_path, 'r').read() + + assert contents == line + line + line + +def test_cleanup(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_cleanup') + + fp = open(download_retry.queue_file, 'a') + fp.write(jsonpickle.encode(message) + '\n') + download_retry.msg_count = 1 + + assert os.path.exists(download_retry.queue_file) == True + assert download_retry.msg_count == 1 + + download_retry.cleanup() + + assert os.path.exists(download_retry.queue_file) == False + assert download_retry.msg_count == 0 + +def test_msg_get_from_file__NoLine(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_msg_get_from_file__NoLine') + + fp_new, msg = download_retry.msg_get_from_file(None, download_retry.queue_file) + + assert fp_new == None + assert msg == None + +def test_msg_get_from_file(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_msg_get_from_file') + + fp = open(download_retry.queue_file, 'a') + fp.write(jsonpickle.encode(message) + '\n') + fp.flush() + fp.close() + + fp_new, msg = download_retry.msg_get_from_file(None, download_retry.queue_file) + + import io + assert isinstance(fp_new, io.TextIOWrapper) == True + assert msg == message + +def test_get__Single(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_get__Single') + + fp = open(download_retry.queue_file, 'a') + line = jsonpickle.encode(message) + '\n' + fp.write(line) + fp.flush() + fp.close() + + gotten = download_retry.get() + + assert len(gotten) == 1 + assert gotten == [message] + +def test_get__Multi(tmp_path): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_get__Multi') + + fp = open(download_retry.queue_file, 'a') + line = jsonpickle.encode(message) + '\n' + fp.write(line + line) + fp.flush() + fp.close() + + gotten = download_retry.get(2) + + assert len(gotten) == 2 + assert gotten == [message, message] + +def test_on_housekeeping__FinishRetry(tmp_path, caplog): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_on_housekeeping__FinishRetry') + + download_retry.queue_fp = open(download_retry.queue_file, 'a') + line = jsonpickle.encode(message) + '\n' + download_retry.queue_fp.write(line + line) + download_retry.queue_fp.flush() + + hk_out = download_retry.on_housekeeping() + + assert hk_out == None + + for record in caplog.records: + if "have not finished retry list" in record.message: + assert "have not finished retry list" in record.message + +def test_on_housekeeping(tmp_path, caplog): + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + download_retry = DiskQueue(BaseOptions, 'test_on_housekeeping') + + download_retry.new_fp = open(download_retry.new_path, 'a') + line = jsonpickle.encode(message) + '\n' + download_retry.new_fp.write(line + line) + download_retry.new_fp.flush() + + hk_out = download_retry.on_housekeeping() + + assert hk_out == None + assert os.path.exists(download_retry.queue_file) == True + assert os.path.exists(download_retry.new_path) == False + + for record in caplog.records: + if "has queue" in record.message: + assert "has queue" in record.message + if "Number of messages in retry list" in record.message: + assert "Number of messages in retry list" in record.message + if "on_housekeeping elapse" in record.message: + assert "on_housekeeping elapse" in record.message \ No newline at end of file diff --git a/tests/sarracenia/flowcb/nodupe/__init___test.py b/tests/sarracenia/flowcb/nodupe/__init___test.py new file mode 100644 index 000000000..a58106d9b --- /dev/null +++ b/tests/sarracenia/flowcb/nodupe/__init___test.py @@ -0,0 +1,145 @@ +import pytest +import os + +from sarracenia.flowcb.nodupe import NoDupe + +class Options: + def __init__(self): + self.retry_ttl = 0 + self.logLevel = "DEBUG" + self.logFormat = "" + self.queueName = "TEST_QUEUE_NAME" + self.component = "sarra" + self.config = "foobar.conf" + self.pid_filename = "/tmp/sarracenia/diskqueue_test/pid_filename" + self.housekeeping = float(39) + def add_option(self, option, type, default = None): + setattr(self, option, default) + pass + +BaseOptions = Options() + + +message = { + "pubTime": "20180118151049.356378078", + "topic": "v02.post.sent_by_tsource2send", + "headers": { + "atime": "20180118151049.356378078", + "from_cluster": "localhost", + "mode": "644", + "mtime": "20180118151048", + "parts": "1,69,1,0,0", + "source": "tsource", + "sum": "d,c35f14e247931c3185d5dc69c5cd543e", + "to_clusters": "localhost" + }, + "baseUrl": "https://NotARealURL", + "relPath": "ThisIsAPath/To/A/File.txt", + "notice": "20180118151050.45 ftp://anonymous@localhost:2121 /sent_by_tsource2send/SXAK50_KWAL_181510___58785" +} + +def test_deriveKey__nodupe_override(tmp_path): + BaseOptions = Options() + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + nodupe = NoDupe(BaseOptions) + + thismsg = message.copy() + + thismsg['nodupe_override'] = {'key': "SomeKeyValue"} + + assert nodupe.deriveKey(thismsg) == "SomeKeyValue" + +def test_deriveKey__fileOp(tmp_path): + BaseOptions = Options() + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + nodupe = NoDupe(BaseOptions) + + thismsg = message.copy() + thismsg['fileOp'] = {'link': "SomeKeyValue"} + assert nodupe.deriveKey(thismsg) == "SomeKeyValue" + + thismsg = message.copy() + thismsg['fileOp'] = {'directory': "SomeKeyValue"} + assert nodupe.deriveKey(thismsg) == thismsg["relPath"] + +def test_deriveKey__integrity(tmp_path): + BaseOptions = Options() + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + nodupe = NoDupe(BaseOptions) + + thismsg = message.copy() + + thismsg['integrity'] = {'method': "cod"} + assert nodupe.deriveKey(thismsg) == thismsg["relPath"] + + thismsg['integrity'] = {'method': "method", 'value': "value\n"} + assert nodupe.deriveKey(thismsg) == "method,value" + +def test_deriveKey__NotKey(tmp_path): + BaseOptions = Options() + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + nodupe = NoDupe(BaseOptions) + + thismsg = message.copy() + + assert nodupe.deriveKey(thismsg) == thismsg["relPath"] + "," + thismsg["pubTime"] + thismsg['size'] = 28234 + assert nodupe.deriveKey(thismsg) == thismsg["relPath"] + "," + thismsg["pubTime"] + ",28234" + thismsg['mtime'] = "20230118151049.356378078" + assert nodupe.deriveKey(thismsg) == thismsg["relPath"] + "," + thismsg['mtime'] + ",28234" + + del thismsg['size'] + assert nodupe.deriveKey(thismsg) == thismsg["relPath"] + "," + thismsg['mtime'] + +def test_open__withoutfile(tmp_path): + BaseOptions = Options() + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + BaseOptions.cfg_run_dir = str(tmp_path) + BaseOptions.no = 5 + nodupe = NoDupe(BaseOptions) + + nodupe.open() + assert nodupe.cache_file == str(tmp_path) + os.sep + 'recent_files_005.cache' + assert os.path.isfile(nodupe.cache_file) == True + assert len(nodupe.cache_dict) == 0 + +def test_open__withfile(tmp_path): + BaseOptions = Options() + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + BaseOptions.cfg_run_dir = str(tmp_path) + BaseOptions.no = 5 + nodupe = NoDupe(BaseOptions) + + filepath = str(tmp_path) + os.sep + 'recent_files_005.cache' + fp = open(filepath, 'a') + fp.flush() + + nodupe.open(cache_file=filepath) + assert nodupe.cache_file == str(tmp_path) + os.sep + 'recent_files_005.cache' + assert os.path.isfile(nodupe.cache_file) == True + assert len(nodupe.cache_dict) == 0 + +def test_open__withdata(tmp_path): + import urllib, time + BaseOptions = Options() + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + BaseOptions.cfg_run_dir = str(tmp_path) + BaseOptions.no = 5 + nodupe = NoDupe(BaseOptions) + nodupe.o.nodupe_ttl = 100000 + + fp = open(str(tmp_path) + os.sep + 'recent_files_005.cache', 'a') + fp.write("%s %f %s\n" % ("key1", float(time.time() - 1000), urllib.parse.quote("/some/path/to/file1.txt"))) + fp.write("%s %f %s\n" % ("key2", float(time.time() - 1000), urllib.parse.quote("/some/path/to/file2.txt"))) + fp.write("%s %f %s\n" % ("key3", float(time.time() - 1000), urllib.parse.quote("/some/path/to/file3.txt"))) + fp.write("%s %f %s\n" % ("key4", float(time.time() - 1000), urllib.parse.quote("/some/path/to/file4.txt"))) + fp.write("%s %f %s\n" % ("key5", float(time.time() - 1000), urllib.parse.quote("/some/path/to/file5.txt"))) + fp.write("%s %f %s\n" % ("key5", float(time.time() - 1000), urllib.parse.quote("/some/path/to/file6.txt"))) + fp.write("%s %f %s\n" % ("key6", float(time.time() - 1000000), urllib.parse.quote("/some/path/to/file7.txt"))) + fp.write("thisisgarbage\n") + fp.flush() + + nodupe.open() + assert nodupe.cache_file == str(tmp_path) + os.sep + 'recent_files_005.cache' + assert os.path.isfile(nodupe.cache_file) == True + assert len(nodupe.cache_dict) == 5 \ No newline at end of file diff --git a/tests/sarracenia/flowcb/retry_test.py b/tests/sarracenia/flowcb/retry_test.py new file mode 100644 index 000000000..6782d255a --- /dev/null +++ b/tests/sarracenia/flowcb/retry_test.py @@ -0,0 +1,306 @@ +import pytest +from unittest.mock import patch + +import os, types + +#from sarracenia.flowcb import FlowCB +from sarracenia.flowcb.retry import Retry + +import fakeredis + +class Options: + retry_driver = 'disk' + redisqueue_serverurl = '' + no = 1 + retry_ttl = 0 + batch = 8 + logLevel = "DEBUG" + queueName = "TEST_QUEUE_NAME" + component = "sarra" + config = "foobar.conf" + pid_filename = "NotARealPath" + housekeeping = float(0) + def add_option(self, option, type, default = None): + pass + pass + +WorkList = types.SimpleNamespace() +WorkList.ok = [] +WorkList.incoming = [] +WorkList.rejected = [] +WorkList.failed = [] +WorkList.directories_ok = [] + +message = { + "pubTime": "20180118151049.356378078", + "topic": "v02.post.sent_by_tsource2send", + "headers": { + "atime": "20180118151049.356378078", + "from_cluster": "localhost", + "mode": "644", + "mtime": "20180118151048", + "parts": "1,69,1,0,0", + "source": "tsource", + "sum": "d,c35f14e247931c3185d5dc69c5cd543e", + "to_clusters": "localhost" + }, + "baseUrl": "https://NotARealURL", + "relPath": "ThisIsAPath/To/A/File.txt", + "notice": "20180118151050.45 ftp://anonymous@localhost:2121 /sent_by_tsource2send/SXAK50_KWAL_181510___58785" +} + +@pytest.mark.bug("DiskQueue.py doesn't cleanup properly") +@pytest.mark.depends(on=['sarracenia/diskqueue_test.py::test_put__Multi', 'sarracenia/redisqueue_test.py::test_put__Multi']) +def test_cleanup(tmp_path): + + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + + BaseOptions_redis = Options() + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_cleanup" + retry_redis = Retry(BaseOptions_redis) + + retry_disk.download_retry.put([message, message, message]) + retry_disk.post_retry.put([message, message, message]) + + retry_redis.download_retry.put([message, message, message]) + retry_redis.post_retry.put([message, message, message]) + + assert len(retry_disk.download_retry) == len(retry_redis.download_retry) == 3 + assert len(retry_disk.post_retry) == len(retry_redis.post_retry) == 3 + + retry_disk.cleanup() + retry_redis.cleanup() + + #These should both return 0, but with the current DiskQueue, cleanup doesn't work properly. + assert len(retry_disk.download_retry) == len(retry_redis.download_retry) == 0 + assert len(retry_disk.post_retry) == len(retry_redis.post_retry) == 0 + + +@pytest.mark.depends(on=['sarracenia/diskqueue_test.py::test_put__Multi', 'sarracenia/redisqueue_test.py::test_put__Multi']) +def test_metricsReport(tmp_path): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.retry_driver = 'disk' + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + BaseOptions_redis = Options() + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_metricsReport" + retry_redis = Retry(BaseOptions_redis) + + retry_disk.download_retry.put([message, message, message]) + retry_disk.post_retry.put([message, message, message]) + metrics_disk = retry_disk.metricsReport() + + retry_redis.download_retry.put([message, message, message]) + retry_redis.post_retry.put([message, message, message]) + metrics_redis = retry_redis.metricsReport() + + assert metrics_disk['msgs_in_download_retry'] == metrics_redis['msgs_in_download_retry'] == 3 + assert metrics_disk['msgs_in_post_retry'] == metrics_redis['msgs_in_post_retry'] == 3 + +def test_after_post(tmp_path): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.retry_driver = 'disk' + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + BaseOptions_redis = Options() + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_after_post" + retry_redis = Retry(BaseOptions_redis) + + after_post_worklist_disk = WorkList + after_post_worklist_disk.failed = [message, message, message] + retry_disk.after_post(after_post_worklist_disk) + + after_post_worklist_redis = WorkList + after_post_worklist_redis.failed = [message, message, message] + retry_redis.after_post(after_post_worklist_redis) + + assert len(retry_disk.post_retry) == len(retry_redis.post_retry) == 3 + +def test_after_work__WLFailed(tmp_path): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.retry_driver = 'disk' + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + BaseOptions_redis = Options() + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_after_work__WLFailed" + retry_redis = Retry(BaseOptions_redis) + + after_work_worklist_disk = WorkList + after_work_worklist_disk.failed = [message, message, message] + retry_disk.after_work(after_work_worklist_disk) + + after_work_worklist_redis = WorkList + after_work_worklist_redis.failed = [message, message, message] + retry_redis.after_work(after_work_worklist_redis) + + assert len(retry_disk.download_retry) == len(retry_redis.download_retry) == 3 + assert len(after_work_worklist_disk.failed) == len(after_work_worklist_redis.failed) == 0 + +def test_after_work__SmallQty(tmp_path): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.retry_driver = 'disk' + BaseOptions_disk.batch = 2 + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + BaseOptions_redis = Options() + BaseOptions_redis.batch = 2 + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_after_work__SmallQty" + retry_redis = Retry(BaseOptions_redis) + + after_work_worklist_disk = WorkList + after_work_worklist_disk.ok = [message, message, message] + retry_disk.after_work(after_work_worklist_disk) + + after_work_worklist_redis = WorkList + after_work_worklist_redis.ok = [message, message, message] + retry_redis.after_work(after_work_worklist_redis) + + assert len(retry_disk.download_retry) == len(retry_redis.download_retry) == 0 + assert len(after_work_worklist_disk.ok) == len(after_work_worklist_redis.ok) == 3 + + +@pytest.mark.depends(on=['test_on_housekeeping', 'sarracenia/diskqueue_test.py::test_put__Multi', 'sarracenia/redisqueue_test.py::test_put__Multi']) +def test_after_work(tmp_path): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.retry_driver = 'disk' + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + BaseOptions_redis = Options() + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_after_work" + retry_redis = Retry(BaseOptions_redis) + + after_work_worklist_disk = WorkList + after_work_worklist_disk.ok = [message, message, message] + retry_disk.post_retry.put([message, message, message]) + retry_disk.on_housekeeping() + retry_disk.after_work(after_work_worklist_disk) + + after_work_worklist_redis = WorkList + after_work_worklist_redis.ok = [message, message, message] + retry_redis.post_retry.put([message, message, message]) + retry_redis.on_housekeeping() + retry_redis.after_work(after_work_worklist_redis) + + assert len(retry_disk.download_retry) == len(retry_redis.download_retry) == 0 + assert len(after_work_worklist_disk.ok) == len(after_work_worklist_redis.ok) == 4 + + +def test_after_accept__SmallQty(tmp_path): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.retry_driver = 'disk' + BaseOptions_disk.batch = 2 + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + BaseOptions_redis = Options() + BaseOptions_redis.batch = 2 + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_after_accept__SmallQty" + retry_redis = Retry(BaseOptions_redis) + + after_work_worklist_disk = WorkList + after_work_worklist_disk.incoming = [message, message, message] + retry_disk.after_accept(after_work_worklist_disk) + + after_work_worklist_redis = WorkList + after_work_worklist_redis.incoming = [message, message, message] + retry_redis.after_accept(after_work_worklist_redis) + + assert len(retry_disk.download_retry) == len(retry_redis.download_retry) == 0 + assert len(after_work_worklist_disk.incoming) == len(after_work_worklist_redis.incoming) == 3 + +@pytest.mark.depends(on=['test_on_housekeeping', 'sarracenia/diskqueue_test.py::test_put__Multi', 'sarracenia/redisqueue_test.py::test_put__Multi']) +def test_after_accept(tmp_path): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.retry_driver = 'disk' + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + BaseOptions_redis = Options() + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_after_accept" + retry_redis = Retry(BaseOptions_redis) + + after_work_worklist_disk = WorkList + after_work_worklist_disk.incoming = [message, message, message] + retry_disk.download_retry.put([message, message, message]) + retry_disk.on_housekeeping() + retry_disk.after_accept(after_work_worklist_disk) + + after_work_worklist_redis = WorkList + after_work_worklist_redis.incoming = [message, message, message] + retry_redis.download_retry.put([message, message, message]) + retry_redis.on_housekeeping() + retry_redis.after_accept(after_work_worklist_redis) + + assert len(retry_disk.download_retry) == len(retry_redis.download_retry) == 0 + assert len(after_work_worklist_disk.incoming) == len(after_work_worklist_redis.incoming) == 4 + +@pytest.mark.depends(on=['sarracenia/diskqueue_test.py::test_put__Multi', 'sarracenia/redisqueue_test.py::test_put__Multi']) +def test_on_housekeeping(tmp_path, caplog): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions_disk = Options() + BaseOptions_disk.retry_driver = 'disk' + BaseOptions_disk.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry_disk = Retry(BaseOptions_disk) + + BaseOptions_redis = Options() + BaseOptions_redis.retry_driver = 'redis' + BaseOptions_redis.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions_redis.queueName = "test_on_housekeeping" + retry_redis = Retry(BaseOptions_redis) + + retry_disk.download_retry.put([message, message, message]) + retry_disk.post_retry.put([message, message, message]) + + retry_redis.download_retry.put([message, message, message]) + retry_redis.post_retry.put([message, message, message]) + + assert len(retry_disk.download_retry) == len(retry_redis.download_retry) == 3 + assert len(retry_disk.post_retry) == len(retry_redis.post_retry) == 3 + + log_found_hk_elapse_disk = log_found_hk_elapse_redis = False + + retry_disk.on_housekeeping() + for record in caplog.records: + if "on_housekeeping elapse" in record.message: + log_found_hk_elapse_disk = True + + caplog.clear() + + retry_redis.on_housekeeping() + for record in caplog.records: + if "on_housekeeping elapse" in record.message: + log_found_hk_elapse_redis = True + + assert log_found_hk_elapse_disk == log_found_hk_elapse_redis == True \ No newline at end of file diff --git a/tests/sarracenia/flowcb/retry_teststeps.py b/tests/sarracenia/flowcb/retry_teststeps.py new file mode 100644 index 000000000..a61890657 --- /dev/null +++ b/tests/sarracenia/flowcb/retry_teststeps.py @@ -0,0 +1,459 @@ +import pytest +from pytest_steps import test_steps +from unittest.mock import patch + +import os, types + +#from sarracenia.flowcb import FlowCB +from sarracenia.flowcb.retry import Retry + +import fakeredis + +class Options: + retry_driver = 'disk' + redisqueue_serverurl = '' + no = 1 + retry_ttl = 0 + batch = 8 + logLevel = "DEBUG" + queueName = "TEST_QUEUE_NAME" + component = "sarra" + config = "foobar.conf" + pid_filename = "NotARealPath" + housekeeping = float(0) + def add_option(self, option, type, default = None): + pass + pass + + + +WorkList = types.SimpleNamespace() +WorkList.ok = [] +WorkList.incoming = [] +WorkList.rejected = [] +WorkList.failed = [] +WorkList.directories_ok = [] + +message = { + "pubTime": "20180118151049.356378078", + "topic": "v02.post.sent_by_tsource2send", + "headers": { + "atime": "20180118151049.356378078", + "from_cluster": "localhost", + "mode": "644", + "mtime": "20180118151048", + "parts": "1,69,1,0,0", + "source": "tsource", + "sum": "d,c35f14e247931c3185d5dc69c5cd543e", + "to_clusters": "localhost" + }, + "baseUrl": "https://NotARealURL", + "relPath": "ThisIsAPath/To/A/File.txt", + "notice": "20180118151050.45 ftp://anonymous@localhost:2121 /sent_by_tsource2send/SXAK50_KWAL_181510___58785" +} + +@pytest.mark.bug("DiskQueue.py doesn't cleanup properly") +@test_steps('disk', 'redis') +def test_cleanup(test_step, tmp_path): + # Execute the step according to name + if test_step == 'disk': + cleanup__disk(tmp_path) + elif test_step == 'redis': + cleanup__redis() + +def cleanup__disk(tmp_path): + BaseOptions = Options() + # -- DiskQueue + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + retry.download_retry.put([message, message, message]) + retry.post_retry.put([message, message, message]) + + assert len(retry.download_retry) == 3 + assert len(retry.post_retry) == 3 + + retry.cleanup() + + #These should both return 0, but with the current DiskQueue, cleanup doesn't work properly. + assert len(retry.download_retry) == 0 + assert len(retry.post_retry) == 0 + +def cleanup__redis(): + # -- RedisQueue + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_cleanup" + retry = Retry(BaseOptions) + + retry.download_retry.put([message, message, message]) + retry.post_retry.put([message, message, message]) + + #assert os.path.exists(retry.download_retry.queue_file) == True + assert len(retry.download_retry) == 3 + assert len(retry.post_retry) == 3 + + retry.cleanup() + + assert len(retry.download_retry) == 0 + assert len(retry.post_retry) == 0 + + +@test_steps('disk', 'redis') +def test_metricsReport(test_step, tmp_path): + # Execute the step according to name + if test_step == 'disk': + metricsReport__disk(tmp_path) + elif test_step == 'redis': + metricsReport__redis() + +def metricsReport__disk(tmp_path): + # -- DiskQueue + BaseOptions = Options() + BaseOptions.retry_driver = 'disk' + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + retry.download_retry.put([message, message, message]) + retry.post_retry.put([message, message, message]) + + metrics = retry.metricsReport() + + assert metrics['msgs_in_download_retry'] == 3 + assert metrics['msgs_in_post_retry'] == 3 + +def metricsReport__redis(): + # -- RedisQueue + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_metricsReport" + retry = Retry(BaseOptions) + + retry.download_retry.put([message, message, message]) + retry.post_retry.put([message, message, message]) + + metrics = retry.metricsReport() + + assert metrics['msgs_in_download_retry'] == 3 + assert metrics['msgs_in_post_retry'] == 3 + + +@test_steps('disk', 'redis') +def test_after_post(test_step, tmp_path): + # Execute the step according to name + if test_step == 'disk': + after_post__disk(tmp_path) + elif test_step == 'redis': + after_post__redis() + +def after_post__disk(tmp_path): + # -- DiskQueue + BaseOptions = Options() + BaseOptions.retry_driver = 'disk' + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + after_post_worklist = WorkList + after_post_worklist.failed = [message, message, message] + + retry.after_post(after_post_worklist) + + assert len(retry.post_retry) == 3 + +def after_post__redis(): + # -- RedisQueue + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_after_post" + retry = Retry(BaseOptions) + + after_post_worklist = WorkList + after_post_worklist.failed = [message, message, message] + + retry.after_post(after_post_worklist) + + assert len(retry.post_retry) == 3 + + +@test_steps('disk', 'redis') +def test_after_work__WLFailed(test_step, tmp_path): + # Execute the step according to name + if test_step == 'disk': + after_work__WLFailed__disk(tmp_path) + elif test_step == 'redis': + after_work__WLFailed__redis() + +def after_work__WLFailed__disk(tmp_path): + # -- DiskQueue + BaseOptions = Options() + BaseOptions.retry_driver = 'disk' + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + after_work_worklist = WorkList + after_work_worklist.failed = [message, message, message] + + retry.after_work(after_work_worklist) + + assert len(retry.download_retry) == 3 + assert len(after_work_worklist.failed) == 0 + +def after_work__WLFailed__redis(): + # -- RedisQueue + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_after_work__WLFailed" + retry = Retry(BaseOptions) + + after_work_worklist = WorkList + after_work_worklist.failed = [message, message, message] + + retry.after_work(after_work_worklist) + + assert len(retry.download_retry) == 3 + assert len(after_work_worklist.failed) == 0 + + + +@test_steps('disk', 'redis') +def test_after_work__SmallQty(test_step, tmp_path): + # Execute the step according to name + if test_step == 'disk': + after_work__SmallQty__disk(tmp_path) + elif test_step == 'redis': + after_work__SmallQty__redis() + +def after_work__SmallQty__disk(tmp_path): + # -- DiskQueue + BaseOptions = Options() + BaseOptions.retry_driver = 'disk' + BaseOptions.batch = 2 + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + after_work_worklist = WorkList + after_work_worklist.ok = [message, message, message] + + retry.after_work(after_work_worklist) + + assert len(retry.download_retry) == 0 + assert len(after_work_worklist.ok) == 3 + +def after_work__SmallQty__redis(): + # -- RedisQueue + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.batch = 2 + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_after_work__SmallQty" + retry = Retry(BaseOptions) + + after_work_worklist = WorkList + after_work_worklist.ok = [message, message, message] + + retry.after_work(after_work_worklist) + + assert len(retry.download_retry) == 0 + assert len(after_work_worklist.ok) == 3 + + +@test_steps('disk', 'redis') +def test_after_work(test_step, tmp_path): + # Execute the step according to name + if test_step == 'disk': + after_work__disk(tmp_path) + elif test_step == 'redis': + after_work__redis() + +def after_work__disk(tmp_path): + # -- DiskQueue + BaseOptions = Options() + BaseOptions.retry_driver = 'disk' + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + after_work_worklist = WorkList + after_work_worklist.ok = [message, message, message] + retry.post_retry.put([message, message, message]) + retry.on_housekeeping() + + retry.after_work(after_work_worklist) + + assert len(retry.download_retry) == 0 + assert len(after_work_worklist.ok) == 4 + +def after_work__redis(): + # -- RedisQueue + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_after_work" + retry = Retry(BaseOptions) + + after_work_worklist = WorkList + after_work_worklist.ok = [message, message, message] + retry.post_retry.put([message, message, message]) + retry.on_housekeeping() + + retry.after_work(after_work_worklist) + + assert len(retry.download_retry) == 0 + assert len(after_work_worklist.ok) == 4 + + +@test_steps('disk', 'redis') +def test_after_accept__SmallQty(test_step, tmp_path): + # Execute the step according to name + if test_step == 'disk': + after_accept__SmallQty__disk(tmp_path) + elif test_step == 'redis': + after_accept__SmallQty__redis() + +def after_accept__SmallQty__disk(tmp_path): + # -- DiskQueue + BaseOptions = Options() + BaseOptions.retry_driver = 'disk' + BaseOptions.batch = 2 + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + after_accept_worklist = WorkList + after_accept_worklist.incoming = [message, message, message] + + retry.after_accept(after_accept_worklist) + + assert len(retry.download_retry) == 0 + assert len(after_accept_worklist.incoming) == 3 + +def after_accept__SmallQty__redis(): + # -- RedisQueue + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.batch = 2 + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_after_accept__SmallQty" + retry = Retry(BaseOptions) + + after_accept_worklist = WorkList + after_accept_worklist.incoming = [message, message, message] + + retry.after_accept(after_accept_worklist) + + assert len(retry.download_retry) == 0 + assert len(after_accept_worklist.incoming) == 3 + yield + + +@test_steps('disk', 'redis') +def test_after_accept(test_step, tmp_path): + # Execute the step according to name + if test_step == 'disk': + after_accept__disk(tmp_path) + elif test_step == 'redis': + after_accept__redis() + +def after_accept__disk(tmp_path): + # -- DiskQueue + BaseOptions = Options() + BaseOptions.retry_driver = 'disk' + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + retry.download_retry.put([message, message, message]) + retry.on_housekeeping() + + after_accept_worklist = WorkList + after_accept_worklist.incoming = [message, message, message] + + retry.after_accept(after_accept_worklist) + + assert len(retry.download_retry) == 0 + assert len(after_accept_worklist.incoming) == 4 + +def after_accept__redis(): + # -- RedisQueue + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_after_accept" + retry = Retry(BaseOptions) + + after_accept_worklist = WorkList + after_accept_worklist.incoming = [message, message, message] + retry.download_retry.put([message, message, message]) + retry.on_housekeeping() + + retry.after_accept(after_accept_worklist) + + assert len(retry.download_retry) == 0 + assert len(after_accept_worklist.incoming) == 4 + + +@test_steps('disk', 'redis') +def test_on_housekeeping(test_step, tmp_path, caplog): + # Execute the step according to name + if test_step == 'disk': + on_housekeeping__disk(tmp_path, caplog) + elif test_step == 'redis': + on_housekeeping__redis(caplog) + +def on_housekeeping__disk(tmp_path, caplog): + # -- DiskQueue + BaseOptions = Options() + BaseOptions.retry_driver = 'disk' + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + retry = Retry(BaseOptions) + + retry.download_retry.put([message, message, message]) + retry.post_retry.put([message, message, message]) + + assert len(retry.download_retry) == 3 + assert len(retry.post_retry) == 3 + + retry.on_housekeeping() + + log_found_hk_elapse = False + for record in caplog.records: + if "on_housekeeping elapse" in record.message: + log_found_hk_elapse = True + + assert log_found_hk_elapse == True + +def on_housekeeping__redis(caplog): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions = Options() + BaseOptions.retry_driver = 'redis' + BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" + BaseOptions.queueName = "test_on_housekeeping" + retry = Retry(BaseOptions) + + #server_test_on_housekeeping = fakeredis.FakeServer() + #retry.download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_on_housekeeping) + #retry.post_retry.redis = fakeredis.FakeStrictRedis(server=server_test_on_housekeeping) + + retry.download_retry.put([message, message, message]) + retry.post_retry.put([message, message, message]) + + assert len(retry.download_retry) == 3 + assert len(retry.post_retry) == 3 + + retry.on_housekeeping() + + log_found_hk_elapse = False + for record in caplog.records: + if "on_housekeeping elapse" in record.message: + log_found_hk_elapse = True + + assert log_found_hk_elapse == True diff --git a/tests/sarracenia/redisqueue_test.py b/tests/sarracenia/redisqueue_test.py new file mode 100644 index 000000000..d3018adf7 --- /dev/null +++ b/tests/sarracenia/redisqueue_test.py @@ -0,0 +1,298 @@ +import pytest +from unittest.mock import patch + +from sarracenia.redisqueue import RedisQueue + +import fakeredis + +import jsonpickle + +class Options: + def add_option(self, option, type, default = None): + if default != None: + self.option = default + pass + +BaseOptions = Options() +BaseOptions.retry_ttl = 0 +BaseOptions.logLevel = "INFO" +BaseOptions.queueName = "TEST_QUEUE_NAME" +BaseOptions.component = "sarra" +BaseOptions.config = "foobar.conf" +BaseOptions.redisqueue_serverurl = "redis://Never.Going.To.Resolve:6379/0" +BaseOptions.housekeeping = float(39) + +message = { + "pubTime": "20180118151049.356378078", + "topic": "v02.post.sent_by_tsource2send", + "headers": { + "atime": "20180118151049.356378078", + "from_cluster": "localhost", + "mode": "644", + "mtime": "20180118151048", + "parts": "1,69,1,0,0", + "source": "tsource", + "sum": "d,c35f14e247931c3185d5dc69c5cd543e", + "to_clusters": "localhost" + }, + "baseUrl": "https://NotARealURL", + "relPath": "ThisIsAPath/To/A/File.txt", + "notice": "20180118151050.45 ftp://anonymous@localhost:2121 /sent_by_tsource2send/SXAK50_KWAL_181510___58785" +} + +def test___len__(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test___len__') + download_retry.redis.lpush(download_retry.key_name, "first") + assert len(download_retry) == 1 + download_retry.redis.lpush(download_retry.key_name_new, "second") + assert len(download_retry) == 2 + download_retry.redis.lpush(download_retry.key_name_hk, "third") + assert len(download_retry) == 2 + +def test__in_cache(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test__in_cache') + + download_retry.retry_cache = {} + + assert download_retry._in_cache(message) == False + # Checking if it's there actually adds it, so checking it again right after should return True + assert download_retry._in_cache(message) == True + +def test__is_exired__TooSoon(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions.retry_ttl = 100000 + download_retry = RedisQueue(BaseOptions, 'test__is_exired__TooSoon') + + assert download_retry._is_expired(message) == True + +def test__is_exired__TooLate(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions.retry_ttl = 1 + download_retry = RedisQueue(BaseOptions, 'test__is_exired__TooLate') + + import sarracenia + message["pubTime"] = sarracenia.nowstr() + + assert download_retry._is_expired(message) == False + +def test__needs_requeuing(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test__needs_requeuing') + + download_retry.retry_cache = {} + + assert download_retry._needs_requeuing(message) == True + assert download_retry._needs_requeuing(message) == False + download_retry.o.retry_ttl = 1000000 + assert download_retry._needs_requeuing(message) == False + +def test__msgFromJSON(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test__msgFromJSON') + + assert message == download_retry._msgFromJSON(jsonpickle.encode(message)) + +def test__msgToJSON(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test__msgToJSON') + + assert jsonpickle.encode(message) == download_retry._msgToJSON(message) + +def test__lpop(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test__lpop') + #server_test__lpop = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test__lpop) + + download_retry.put([message]) + assert download_retry.redis.llen(download_retry.key_name_new) == 1 + assert message == download_retry._lpop(download_retry.key_name_new) + +def test_put__Single(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test_put__Single') + + #server_test_put_single = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_put_single) + + download_retry.put([message]) + assert download_retry.redis.llen(download_retry.key_name_new) == 1 + +def test_put__Multi(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test_put__Multi') + + #server_test_put_multi = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_put_multi) + + download_retry.put([message, message, message, message]) + assert download_retry.redis.llen(download_retry.key_name_new) == 4 + +def test_cleanup(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + + download_retry = RedisQueue(BaseOptions, 'test_cleanup') + + #This test fails unless you explicity tell it to use a different server than the rest of the tests + # I don't know why that is, as setting the name above should ensure keyspace uniqueness among all tests + server_test_cleanup = fakeredis.FakeServer() + download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_cleanup) + + download_retry.redis.lpush(download_retry.key_name_lasthk, "data") + download_retry.redis.lpush(download_retry.key_name_new, "data") + download_retry.redis.lpush(download_retry.key_name_hk, "data") + download_retry.redis.lpush(download_retry.key_name, "data") + + #download_retry.redis_lock.acquire() + #download_retry.redis.lpush("lock:" + download_retry.key_name, "data") + + assert len(download_retry.redis.keys()) == 4 + + download_retry.cleanup() + + assert len(download_retry.redis.keys()) == 0 + +def test_get__NotLocked_Single(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + + download_retry = RedisQueue(BaseOptions, 'test_get__NotLocked_Single') + + #server_test_get__NotLocked = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_get__NotLocked) + + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + + gotten = download_retry.get() + + assert len(gotten) == 1 + assert gotten == [message] + +def test_get__NotLocked_Multi(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + + download_retry = RedisQueue(BaseOptions, 'test_get__NotLocked_Multi') + + #server_test_get__NotLocked = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_get__NotLocked) + + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + + #with patch(target="redis_lock.Lock.locked", new=lambda foo: False): + + gotten = download_retry.get(2) + + assert len(gotten) == 2 + assert gotten == [message, message] + +def test_get__Locked(): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + + download_retry = RedisQueue(BaseOptions, 'test_get__Locked') + + #server_test_get__NotLocked = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_get__NotLocked) + + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + + download_retry.redis_lock.acquire() + + gotten = download_retry.get() + + assert len(gotten) == 0 + assert gotten == [] + +def test_on_housekeeping__TooSoon(caplog): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test_on_housekeeping__TooSoon') + + #server_test_on_housekeeping__TooSoon = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_on_housekeeping__TooSoon) + + download_retry.redis.set(download_retry.key_name_lasthk, download_retry.now) + hk_out = download_retry.on_housekeeping() + + assert hk_out == None + + for record in caplog.records: + if "Housekeeping ran less than " in record.message: + assert "Housekeeping ran less than " in record.message + +@pytest.mark.skip("No need to check if we're locked, per Peter") +def test_on_housekeeping__Locked(caplog): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + download_retry = RedisQueue(BaseOptions, 'test_on_housekeeping__Locked') + + #server_test_on_housekeeping__Locked = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_on_housekeeping__Locked) + + #import jsonpickle + + #download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + + #with patch(target="redis_lock.Lock.locked", new=lambda foo: True): + + download_retry.redis.set(download_retry.key_name_lasthk, download_retry.now - download_retry.o.housekeeping - 100) + download_retry.redis_lock.acquire() + + hk_out = download_retry.on_housekeeping() + + assert hk_out == None + + import re + for record in caplog.records: + if "Another instance has lock on" in record.message: + assert "Another instance has lock on" in record.message + +def test_on_housekeeping__FinishRetry(caplog): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions.queueName = "test_on_housekeeping__FinishRetry" + download_retry = RedisQueue(BaseOptions, 'test_on_housekeeping__FinishRetry') + + #server_test_on_housekeeping__FinishRetry = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_on_housekeeping__FinishRetry) + + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + download_retry.redis.lpush(download_retry.key_name, jsonpickle.encode(message)) + download_retry.redis.set(download_retry.key_name_lasthk, download_retry.now - download_retry.o.housekeeping - 100) + + hk_out = download_retry.on_housekeeping() + + assert hk_out == None + + for record in caplog.records: + if "have not finished retry list" in record.message: + assert "have not finished retry list" in record.message + +def test_on_housekeeping(caplog): + with patch(target="redis.from_url", new=fakeredis.FakeStrictRedis.from_url, ): + BaseOptions.queueName = "test_on_housekeeping" + download_retry = RedisQueue(BaseOptions, 'test_on_housekeeping') + + #server_test_on_housekeeping = fakeredis.FakeServer() + #download_retry.redis = fakeredis.FakeStrictRedis(server=server_test_on_housekeeping) + + #with patch(target="redis_lock.Lock.locked", new=lambda foo: True): + + download_retry.redis.lpush(download_retry.key_name_new, jsonpickle.encode(message)) + download_retry.redis.lpush(download_retry.key_name_new, jsonpickle.encode(message)) + download_retry.redis.lpush(download_retry.key_name_new, jsonpickle.encode(message)) + + download_retry.redis.set(download_retry.key_name_lasthk, download_retry.now - download_retry.o.housekeeping - 100) + + hk_out = download_retry.on_housekeeping() + + assert hk_out == None + assert download_retry.redis.exists(download_retry.key_name_hk) == False + + import re + for record in caplog.records: + if "released redis_lock" in record.message: + assert "released redis_lock" in record.message + if "on_housekeeping elapse" in record.message: + assert "on_housekeeping elapse" in record.message \ No newline at end of file