From 560f62d23884eddfa55d46575a8a22dcff32e70a Mon Sep 17 00:00:00 2001 From: mb-jp Date: Fri, 27 Oct 2023 11:39:30 +0200 Subject: [PATCH] init --- macrobond_data_api/web/_web_only_api.py | 7 + .../web/data_package_list_poller.py | 371 ++++++++++------ tests/Web/data_package_list_poller/abort.py | 411 ++++++++++++++++++ tests/Web/data_package_list_poller/error.py | 396 +++++++++++++++++ tests/Web/data_package_list_poller/normal.py | 363 ++++++++++++++++ .../data_package_list_poller/on_exception.py | 378 ++++++++++++++++ tests/Web/data_package_list_poller/retry.py | 237 ++++++++++ .../data_package_list_poller/test_access.py | 94 ++++ 8 files changed, 2121 insertions(+), 136 deletions(-) create mode 100644 tests/Web/data_package_list_poller/abort.py create mode 100644 tests/Web/data_package_list_poller/error.py create mode 100644 tests/Web/data_package_list_poller/normal.py create mode 100644 tests/Web/data_package_list_poller/on_exception.py create mode 100644 tests/Web/data_package_list_poller/retry.py create mode 100644 tests/Web/data_package_list_poller/test_access.py diff --git a/macrobond_data_api/web/_web_only_api.py b/macrobond_data_api/web/_web_only_api.py index 7bdc3869..77825373 100644 --- a/macrobond_data_api/web/_web_only_api.py +++ b/macrobond_data_api/web/_web_only_api.py @@ -1,5 +1,6 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, List, Optional, Callable, Tuple +import warnings import ijson @@ -154,6 +155,12 @@ def get_data_package_list_iterative( `macrobond_data_api.web.web_types.data_package_body.DataPackageBody` """ # pylint: enable=line-too-long + warnings.warn( + "get_data_package_list_iterative is deprecated. Use get_data_package_list_chunked instead.", + DeprecationWarning, + 2, + ) + params = {} body: Optional[DataPackageBody] = None diff --git a/macrobond_data_api/web/data_package_list_poller.py b/macrobond_data_api/web/data_package_list_poller.py index ea4f79d7..8f648b6f 100644 --- a/macrobond_data_api/web/data_package_list_poller.py +++ b/macrobond_data_api/web/data_package_list_poller.py @@ -1,35 +1,87 @@ from abc import ABC, abstractmethod +from enum import Enum from datetime import datetime, timezone import time -from typing import List, Optional, cast, TYPE_CHECKING, Callable +from typing import List, Optional, Tuple, Union, Callable, TYPE_CHECKING + from .web_api import WebApi from .web_types.data_package_list_state import DataPackageListState -if TYPE_CHECKING: # pragma: no cover - from .web_types import DataPackageBody, DataPackageListItem +from .web_types.data_pacakge_list_item import DataPackageListItem +from .web_types.data_package_body import DataPackageBody +if TYPE_CHECKING: + from macrobond_data_api.web.web_types.data_package_list_context import ( + DataPackageListContext, + DataPackageListContextManager, + ) -class _AbortException(Exception): - pass + +class ExceptionSource(Enum): + FAILED_TO_BEGIN_FULL_LISTING = 1 + """ + Failed_begin_full_listing can happen after: + * start() + * on_exception() + * on_full_listing_end() + * on_incremental_end() + """ + + FAILED_TO_BEGIN_LISTING = 2 + """ + Failed_begin_listing + * start() + * on_exception() + * on_full_listing_end() + * on_incremental_end() + """ + + FAILED_TO_BEGIN_LISTING_INCOMPLETE = 3 + """ + Failed_to_begin_listing_incomplete + * on_incremental_batch() + """ + + FAILED_TO_GET_BATCH_IN_FULL_LISTING = 4 + """ + Failed_to_get_batch_in_full_listing can happen after: + * on_full_listing_begin() + * on_full_listing_batch() + """ + + FAILED_TO_GET_BATCH_IN_LISTING = 5 + """ + Failed_to_get_batch_in_listing can happen after: + * on_incremental_begin() + * on_incremental_batch() + """ + + FAILED_TO_GET_BATCH_IN_LISTING_INCOMPLETE = 6 + """ + Failed_to_get_batch_in_listing_incomplete can happen after: + * on_incremental_batch() + """ class DataPackageListPoller(ABC): """ This is work in progress and might change soon. Run a loop polling for changed series in the data package list. - Derive from this class and override `on_full_listing_start`, `on_full_listing_items`, `on_full_listing_stop`, - `on_incremental_start`, `on_incremental_items` and `on_incremental_stop`. + Derive from this class and override `on_full_listing_begin`, `on_full_listing_batch`, `on_full_listing_end`, + `on_incremental_begin`, `on_incremental_batch` and `on_incremental_end`. Parameters ---------- api : WebApi The API instance to use. - download_full_list_on_or_after : datetime + download_full_list_on_or_after : datetime, optional The saved value of `download_full_list_on_or_after` from the previous run. `None` on first run. - time_stamp_for_if_modified_since: datetime + time_stamp_for_if_modified_since: datetime, optional The saved value of `time_stamp_for_if_modified_since` from the previous run. `None`on first run. + chunk_size : int, optional + The maximum number of items to include in each on_*_batch() """ def __init__( @@ -37,19 +89,24 @@ def __init__( api: WebApi, download_full_list_on_or_after: Optional[datetime] = None, time_stamp_for_if_modified_since: Optional[datetime] = None, - _sleep: Callable[[int], None] = time.sleep, + chunk_size: int = 200, ) -> None: + self._api = api + self._download_full_list_on_or_after = download_full_list_on_or_after + self._time_stamp_for_if_modified_since = time_stamp_for_if_modified_since + self._chunk_size = chunk_size + self.up_to_date_delay = 15 * 60 """ The time to wait, in seconds, between polls. """ self.incomplete_delay = 15 """ The time to wait, in seconds, between continuing partial updates. """ self.on_error_delay = 30 """ The time to wait, in seconds, before retrying after an error. """ - self._api = api - self._sleep = _sleep + self.on_retry_delay = 30 + + self._sleep = time.sleep + self._now = lambda: datetime.now(timezone.utc) self._abort = False - self._download_full_list_on_or_after = download_full_list_on_or_after - self._time_stamp_for_if_modified_since = time_stamp_for_if_modified_since @property def api(self) -> WebApi: @@ -66,7 +123,7 @@ def download_full_list_on_or_after(self) -> Optional[datetime]: @property def time_stamp_for_if_modified_since(self) -> Optional[datetime]: """ - This value is used internall to keep track of the the time of the last detected modification. + This value is used internally to keep track of the the time of the last detected modification. Save this value after processing and pass in constructor for the next run. """ return self._time_stamp_for_if_modified_since @@ -77,8 +134,7 @@ def start(self) -> None: self._abort = False while not self._abort: if not self._time_stamp_for_if_modified_since or ( - self._download_full_list_on_or_after - and datetime.now(timezone.utc) > self._download_full_list_on_or_after + self._download_full_list_on_or_after and self._now() > self._download_full_list_on_or_after ): sub = self._run_full_listing() if sub: @@ -86,6 +142,11 @@ def start(self) -> None: self._time_stamp_for_if_modified_since = sub.time_stamp_for_if_modified_since else: sub = self._run_listing(self._time_stamp_for_if_modified_since) + + if sub and sub.state != DataPackageListState.UP_TO_DATE: + self._sleep(self.incomplete_delay) + sub = self._run_listing_incomplete(sub.time_stamp_for_if_modified_since) + if sub: self._time_stamp_for_if_modified_since = sub.time_stamp_for_if_modified_since @@ -95,167 +156,205 @@ def start(self) -> None: self._sleep(self.up_to_date_delay) def _test_access(self) -> None: - params = {"ifModifiedSince": datetime(3000, 1, 1, tzinfo=timezone.utc)} + params = {"ifModifiedSince": datetime(3000, 1, 1, tzinfo=timezone.utc).isoformat()} response = self._api.session.get("v1/series/getdatapackagelist", params=params) if response.status_code == 403: raise Exception("Needs access - The account is not set up to use DataPackageList") - def _run_full_listing(self, max_attempts: int = 3) -> Optional["DataPackageBody"]: - is_stated = False - - def _body_callback(body: "DataPackageBody") -> None: - is_stated = True # pylint: disable=unused-variable - self.on_full_listing_start(body) - + def _run_full_listing(self, max_attempts: int = 3) -> Optional[DataPackageBody]: + context_manager: Optional["DataPackageListContextManager"] = None try: - for attempt in range(1, max_attempts): - try: - sub = self._api.get_data_package_list_iterative( - _body_callback, - self.on_full_listing_items, - None, - ) - if not sub: - raise ValueError("subscription is None") - - self.on_full_listing_stop(False, None) - return sub - except Exception as ex: # pylint: disable=broad-except - if self._abort: - raise _AbortException() from ex - if attempt > max_attempts: - raise ex - self._sleep(self.on_error_delay) - except _AbortException as ex: - if is_stated: - self.on_full_listing_stop(True, cast(Exception, ex.__cause__)) - except Exception as ex: # pylint: disable=broad-except - if is_stated: - self.on_full_listing_stop(False, ex) - return None - - def _run_listing(self, if_modified_since: datetime, max_attempts: int = 3) -> Optional["DataPackageBody"]: - is_stated = False - - def _body_callback(body: "DataPackageBody") -> None: - is_stated = True # pylint: disable=unused-variable - self.on_incremental_start(body) + context_manager, context, body = self._retry_get_data_package_list_chunked( + max_attempts, None, ExceptionSource.FAILED_TO_BEGIN_FULL_LISTING + ) + if context is None or body is None: + return None + self.on_full_listing_begin(body) + if self._abort: + self.on_full_listing_end(True) + return None + + if self._try_iterator( + context, + body, + self.on_full_listing_batch, + self.on_full_listing_end, + ExceptionSource.FAILED_TO_GET_BATCH_IN_FULL_LISTING, + ): + self.on_full_listing_end(False) + return body + return None + finally: + if context_manager: + context_manager.__exit__(None, None, None) + + def _run_listing(self, if_modified_since: datetime, max_attempts: int = 3) -> Optional[DataPackageBody]: + context_manager: Optional["DataPackageListContextManager"] = None try: - for attempt in range(1, max_attempts): - try: - sub = self._api.get_data_package_list_iterative( - _body_callback, - self.on_incremental_items, - if_modified_since, + context_manager, context, body = self._retry_get_data_package_list_chunked( + max_attempts, if_modified_since, ExceptionSource.FAILED_TO_BEGIN_LISTING + ) + if context is None or body is None: + return None + + self.on_incremental_begin(body) + if self._abort: + self.on_incremental_end(True) + return None + + if ( + self._try_iterator( + context, + body, + self.on_incremental_batch, + self.on_incremental_end, + ExceptionSource.FAILED_TO_GET_BATCH_IN_LISTING, + ) + is False + ): + return None + + if body.state == DataPackageListState.UP_TO_DATE: + self.on_incremental_end(False) + + return body + finally: + if context_manager: + context_manager.__exit__(None, None, None) + + def _run_listing_incomplete(self, if_modified_since: datetime, max_attempts: int = 3) -> Optional[DataPackageBody]: + while True: + context_manager: Optional["DataPackageListContextManager"] = None + try: + context_manager, context, body = self._retry_get_data_package_list_chunked( + max_attempts, if_modified_since, ExceptionSource.FAILED_TO_BEGIN_LISTING_INCOMPLETE + ) + if context is None or body is None: + return None + + if ( + self._try_iterator( + context, + body, + self.on_incremental_batch, + self.on_incremental_end, + ExceptionSource.FAILED_TO_GET_BATCH_IN_LISTING_INCOMPLETE, ) - break - except Exception as ex: # pylint: disable=broad-except - if self._abort: - raise _AbortException() from ex - if attempt > max_attempts: - raise - self._sleep(self.on_error_delay) - - if not sub: - raise ValueError("subscription is None") - - if sub.state == DataPackageListState.UP_TO_DATE: - self.on_incremental_stop(False, None) - return sub - - self._sleep(self.incomplete_delay) - - return self._run_listing_incomplete(sub.time_stamp_for_if_modified_since, is_stated, max_attempts) - except _AbortException as ex: - if is_stated: - self.on_incremental_stop(True, cast(Exception, ex.__cause__)) - except Exception as ex: # pylint: disable=broad-except - if is_stated: - self.on_incremental_stop(False, ex) - return None - - def _run_listing_incomplete( - self, if_modified_since: datetime, is_stated: bool, max_attempts: int = 3 - ) -> Optional["DataPackageBody"]: - try: - while True: - for attempt in range(1, max_attempts): - try: - sub = self._api.get_data_package_list_iterative( - lambda _: None, - self.on_incremental_items, - if_modified_since, - ) - - if not sub: - raise ValueError("subscription is None") - - if sub.state == DataPackageListState.UP_TO_DATE: - self.on_incremental_stop(False, None) - return sub - - self._sleep(self.incomplete_delay) - - if_modified_since = sub.time_stamp_for_if_modified_since - except Exception as ex2: # pylint: disable=broad-except - if self._abort: - raise _AbortException() from ex2 - if attempt > max_attempts: - raise - self._sleep(self.on_error_delay) - except _AbortException as ex: - if is_stated: - self.on_incremental_stop(True, cast(Exception, ex.__cause__)) - except Exception as ex: # pylint: disable=broad-except - if is_stated: - self.on_incremental_stop(False, ex) - return None + is False + ): + return None + + if body.state == DataPackageListState.UP_TO_DATE: + self.on_incremental_end(False) + return body + + self._sleep(self.incomplete_delay) + + if_modified_since = body.time_stamp_for_if_modified_since + finally: + if context_manager: + context_manager.__exit__(None, None, None) + + def _retry_get_data_package_list_chunked( + self, max_attempts: int, if_modified_since: Optional[datetime], exception_source: ExceptionSource + ) -> Union[ + Tuple["DataPackageListContextManager", "DataPackageListContext", DataPackageBody], + Tuple["DataPackageListContextManager", None, None], + ]: + attempt = 1 + while True: + try: + context_manager = self._api.get_data_package_list_chunked(if_modified_since, self._chunk_size) + context = context_manager.__enter__() # pylint: disable=unnecessary-dunder-call + body = DataPackageBody( + context.time_stamp_for_if_modified_since, + context.download_full_list_on_or_after, + context.state, + ) + return context_manager, context, body + except Exception as ex: # pylint: disable=broad-except + if attempt > max_attempts: + self.on_exception(exception_source, ex) + return context_manager, None, None + self._sleep(self.on_retry_delay * attempt) + attempt += 1 + + def _try_iterator( + self, + context: "DataPackageListContext", + body: DataPackageBody, + on_batch: Callable[[DataPackageBody, List[DataPackageListItem]], None], + on_end: Callable[[bool], None], + exception_source: ExceptionSource, + ) -> bool: + iterator = iter(context.items) + while True: + try: + items = [DataPackageListItem(x[0], x[1]) for x in next(iterator)] + except StopIteration: + return True + except Exception as ex: # pylint: disable=broad-except + self.on_exception(exception_source, ex) + return False + + on_batch(body, items) + if self._abort: + on_end(True) + return False # full_listing @abstractmethod - def on_full_listing_start(self, subscription: "DataPackageBody") -> None: + def on_full_listing_begin(self, subscription: DataPackageBody) -> None: """This override is called when a full listing starts.""" @abstractmethod - def on_full_listing_items(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: """This override is called repeatedly with one or more items until all items are listed.""" @abstractmethod - def on_full_listing_stop(self, is_aborted: bool, exception: Optional[Exception]) -> None: + def on_full_listing_end(self, is_aborted: bool) -> None: """ This override is called when the full listing is stopped. Parameters ---------- is_aborted : bool The processing was aborted. - exception : Optional[Exception] - If not None, there was an exception. """ # listing @abstractmethod - def on_incremental_start(self, subscription: "DataPackageBody") -> None: + def on_incremental_begin(self, subscription: DataPackageBody) -> None: """This override is called when an incremental listing starts.""" @abstractmethod - def on_incremental_items(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: """This override is called repeatedly with one or more items until all updated items are listed.""" @abstractmethod - def on_incremental_stop(self, is_aborted: bool, exception: Optional[Exception]) -> None: + def on_incremental_end(self, is_aborted: bool) -> None: """ This override is called when the incremental listing is stopped. Parameters ---------- is_aborted : bool The processing was aborted. - exception : Optional[Exception] - If not None, there was an exception. + """ + + @abstractmethod + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + """ + This override is called when the incremental listing is stopped. + Parameters + ---------- + exception : Exception + The exception. """ def abort(self) -> None: - """Call this method to stop processing.""" + """ + Call this method to stop processing. + """ self._abort = True diff --git a/tests/Web/data_package_list_poller/abort.py b/tests/Web/data_package_list_poller/abort.py new file mode 100644 index 00000000..1361b599 --- /dev/null +++ b/tests/Web/data_package_list_poller/abort.py @@ -0,0 +1,411 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, Dict, List, Optional + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", + entities: Optional[List[Dict[str, str]]] = None, +) -> Response: + if entities is None: + entities = [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ] + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": entities, + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +def test_abort_full_listing_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + self.abort() + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(3) + assert is_aborted is True + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api, chunk_size=1).start() + + assert hit == 3 + + +def test_abort_full_listing_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + if hit_test(3, 4) == 4: + self.abort() + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(5) + assert is_aborted is True + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api, chunk_size=1).start() + + assert hit == 5 + + +def test_abort_full_listing_3() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(3) + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(4) + assert is_aborted is False + self.abort() + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api).start() + + assert hit == 4 + + +# test_abort_listing + + +def test_abort_listing_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + self.abort() + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(4) + assert is_aborted is True + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 4 + + +def test_abort_listing_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + if hit_test(4): + self.abort() + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(5) + assert is_aborted is True + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + chunk_size=1, + ).start() + + assert hit == 5 + + +def test_abort_listing_3() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(4) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(5) + assert is_aborted is False + self.abort() + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 5 + + +# test_abort_listing_and_listing_incomplete + + +def test_abort_listing_and_listing_incomplete_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + if hit_test(4, 6) == 6: + self.abort() + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(7) + assert is_aborted is True + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 7 + + +def test_abort_listing_and_listing_incomplete_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4, 6) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(7) + assert is_aborted is False + self.abort() + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 7 diff --git a/tests/Web/data_package_list_poller/error.py b/tests/Web/data_package_list_poller/error.py new file mode 100644 index 00000000..cf7e682a --- /dev/null +++ b/tests/Web/data_package_list_poller/error.py @@ -0,0 +1,396 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, Dict, List, Optional +import pytest + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", + entities: Optional[List[Dict[str, str]]] = None, +) -> Response: + if entities is None: + entities = [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ] + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": entities, + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +def test_full_listing_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller(api).start() + + assert hit == 2 + + +def test_full_listing_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(3) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller(api).start() + + assert hit == 3 + + +def test_full_listing_3() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(3) + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(4) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller(api).start() + + assert hit == 4 + + +# test_abort_listing + + +def test_listing_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 3 + + +def test_listing_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(4) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 4 + + +def test_listing_3() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(4) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(5) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 5 + + +# test_abort_listing_and_listing_incomplete + + +def test_listing_and_listing_incomplete_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + if hit_test(4, 6) == 6: + raise Exception("Test exception") + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 6 + + +def test_listing_and_listing_incomplete_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4, 6) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(7) + raise Exception("Test exception") + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 7 diff --git a/tests/Web/data_package_list_poller/normal.py b/tests/Web/data_package_list_poller/normal.py new file mode 100644 index 00000000..24ea70bd --- /dev/null +++ b/tests/Web/data_package_list_poller/normal.py @@ -0,0 +1,363 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, Dict, List, Optional + +import pytest + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", + entities: Optional[List[Dict[str, str]]] = None, +) -> Response: + if entities is None: + entities = [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ] + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": entities, + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +# _run_full_listing +def test_full_listing() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(7) + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert subscription.state == DataPackageListState.FULL_LISTING + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + nonlocal hit + hit += 1 + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert subscription.state == DataPackageListState.FULL_LISTING + if hit == 3: + assert items == [DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6))] + if hit == 4: + assert items == [DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6))] + if hit == 5: + assert items == [DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6))] + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(6) + assert is_aborted is False + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + with pytest.raises(Exception, match="End of test"): + _TestDataPackageListPoller(api, chunk_size=1).start() + + assert hit == 7 + + +# _run_listing +def test_listing() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(8) + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert subscription.state == DataPackageListState.UP_TO_DATE + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + nonlocal hit + hit += 1 + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert subscription.state == DataPackageListState.UP_TO_DATE + if hit == 4: + assert items == [DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6))] + elif hit == 5: + assert items == [DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6))] + elif hit == 6: + assert items == [DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6))] + else: + raise Exception("should not be here") + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(7) + assert is_aborted is False + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + with pytest.raises(Exception, match="End of test"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + chunk_size=1, + ).start() + + assert hit == 8 + + +# _run_listing and _run_listing_incomplete +def test_listing_and_listing_incomplete_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(7, 12) + if hit == 7: + assert secs == self.incomplete_delay + elif hit == 12: + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + if hit_test(3) == 3: + assert subscription.state == DataPackageListState.INCOMPLETE + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + nonlocal hit + hit += 1 + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + if hit == 4: + assert subscription.state == DataPackageListState.INCOMPLETE + assert items == [DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6))] + elif hit == 5: + assert subscription.state == DataPackageListState.INCOMPLETE + assert items == [DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6))] + elif hit == 6: + assert subscription.state == DataPackageListState.INCOMPLETE + assert items == [DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6))] + elif hit == 8: + assert subscription.state == DataPackageListState.UP_TO_DATE + assert items == [DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6))] + elif hit == 9: + assert subscription.state == DataPackageListState.UP_TO_DATE + assert items == [DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6))] + elif hit == 10: + assert subscription.state == DataPackageListState.UP_TO_DATE + assert items == [DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6))] + else: + raise Exception("should not be here") + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(11) + assert is_aborted is False + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + with pytest.raises(Exception, match="End of test"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + chunk_size=1, + ).start() + + assert hit == 12 + + +def test_listing_and_listing_incomplete_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(5, 7, 10) + if hit in (5, 7): + assert secs == self.incomplete_delay + elif hit == 10: + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + if hit_test(3) == 3: + assert subscription.state == DataPackageListState.INCOMPLETE + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit = hit_test(4, 6, 8) + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + + assert items == [ + DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6)), + DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6)), + DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6)), + ] + + if hit in (4, 6): + assert subscription.state == DataPackageListState.INCOMPLETE + elif hit == 8: + assert subscription.state == DataPackageListState.UP_TO_DATE + else: + raise Exception("should not be here") + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(9) + assert is_aborted is False + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), + get_json_response(DataPackageListState.INCOMPLETE), + get_json_response(DataPackageListState.UP_TO_DATE), + ) + + with pytest.raises(Exception, match="End of test"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 10 diff --git a/tests/Web/data_package_list_poller/on_exception.py b/tests/Web/data_package_list_poller/on_exception.py new file mode 100644 index 00000000..9d81b378 --- /dev/null +++ b/tests/Web/data_package_list_poller/on_exception.py @@ -0,0 +1,378 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, List, Optional + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", +) -> Response: + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ], + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +def get_broken_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", +) -> Response: + response = get_json_response(state, downloadFullListOnOrAfter, timeStampForIfModifiedSince) + response.raw = BytesIO(bytes(response.raw.getvalue().decode("utf-8")[:-10], "utf-8")) + return response + + +def http_500_response() -> Response: + response = Response() + response.status_code = 500 + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +# _run_full_listing + + +def test_full_listing_error_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(2, 3, 4) - 1 + assert secs == self.on_retry_delay * hit + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(5) + assert source is ExceptionSource.FAILED_TO_BEGIN_FULL_LISTING + assert exception is not None + self.abort() + + api = get_api( + http_500_response(), + http_500_response(), + http_500_response(), + http_500_response(), + ) + + _TestDataPackageListPoller(api, chunk_size=200).start() + + assert hit == 5 + + +def test_full_listing_error_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(3) + assert source is ExceptionSource.FAILED_TO_GET_BATCH_IN_FULL_LISTING + assert exception is not None + self.abort() + + api = get_api(get_broken_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api, chunk_size=200).start() + + assert hit == 3 + + +# _run_listing + + +def test_listing_error_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def sleep(self, secs: float) -> None: + hit = hit_test(3, 4, 5) - 2 + assert secs == self.on_retry_delay * hit + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(6) + assert source is ExceptionSource.FAILED_TO_BEGIN_LISTING + assert exception is not None + self.abort() + + api = get_api( + http_500_response(), + http_500_response(), + http_500_response(), + http_500_response(), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 6 + + +def test_listing_error_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: DataPackageBody) -> None: + hit_test(3) + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(4) + assert source is ExceptionSource.FAILED_TO_GET_BATCH_IN_LISTING + assert exception is not None + self.abort() + + api = get_api( + get_broken_json_response(DataPackageListState.UP_TO_DATE), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 4 + + +# _run_listing and _run_listing_incomplete + + +def test_listing_and_listing_incomplete_error_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(5, 6, 7, 8) + if hit == 5: + assert secs == self.incomplete_delay + else: + assert secs == self.on_retry_delay * (hit - 5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4) + assert subscription.state == DataPackageListState.INCOMPLETE + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(9) + assert source is ExceptionSource.FAILED_TO_BEGIN_LISTING_INCOMPLETE + assert exception is not None + self.abort() + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), + http_500_response(), + http_500_response(), + http_500_response(), + http_500_response(), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 9 + + +def test_listing_and_listing_incomplete_error_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + assert secs == self.incomplete_delay + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4) + assert subscription.state == DataPackageListState.INCOMPLETE + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(6) + assert source is ExceptionSource.FAILED_TO_GET_BATCH_IN_LISTING_INCOMPLETE + assert exception is not None + self.abort() + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), + get_broken_json_response(DataPackageListState.UP_TO_DATE), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 6 diff --git a/tests/Web/data_package_list_poller/retry.py b/tests/Web/data_package_list_poller/retry.py new file mode 100644 index 00000000..0acedc3c --- /dev/null +++ b/tests/Web/data_package_list_poller/retry.py @@ -0,0 +1,237 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, List, Optional + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", +) -> Response: + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ], + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +def http_500_response() -> Response: + response = Response() + response.status_code = 500 + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +def test_full_listing() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(2) + assert secs == self.on_error_delay + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4) + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(5) + self.abort() + + api = get_api(http_500_response(), get_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api, chunk_size=200).start() + + assert hit == 5 + + +def test_listing() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(3) + assert secs == self.on_error_delay + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(4) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(5) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(6) + self.abort() + + api = get_api(http_500_response(), get_json_response(DataPackageListState.UP_TO_DATE)) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 6 + + +# _run_listing and _run_listing_incomplete +def test_listing_and_listing_incomplete() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(5, 6, 12) + if hit == 5: + assert secs == self.incomplete_delay + elif hit == 6: + assert secs == self.on_error_delay + elif hit == 12: + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + if hit_test(4, 7) == 4: + assert subscription.state == DataPackageListState.INCOMPLETE + elif hit == 7: + assert subscription.state == DataPackageListState.UP_TO_DATE + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(8) + self.abort() + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), + http_500_response(), + get_json_response(DataPackageListState.UP_TO_DATE), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 8 diff --git a/tests/Web/data_package_list_poller/test_access.py b/tests/Web/data_package_list_poller/test_access.py new file mode 100644 index 00000000..36cc061a --- /dev/null +++ b/tests/Web/data_package_list_poller/test_access.py @@ -0,0 +1,94 @@ +from datetime import datetime +from typing import Any, List, Optional + +import pytest + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +def test_access() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestAuth2Session: + __test__ = False + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + hit_test(1) + response = Response() + response.status_code = 403 + return response + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + api = WebApi(Session("", "", test_auth2_session=_TestAuth2Session())) + with pytest.raises(Exception, match="Needs access - The account is not set up to use DataPackageList"): + _TestDataPackageListPoller(api).start() + + assert hit == 1