diff --git a/macrobond_data_api/com/_metadata_directory.py b/macrobond_data_api/com/_metadata_directory.py index 2c515cf5..550f8557 100644 --- a/macrobond_data_api/com/_metadata_directory.py +++ b/macrobond_data_api/com/_metadata_directory.py @@ -4,7 +4,7 @@ try: from pywintypes import TimeType except ImportError as ex_: - ... + pass if TYPE_CHECKING: # pragma: no cover from .com_types.connection import Connection diff --git a/macrobond_data_api/com/com_client.py b/macrobond_data_api/com/com_client.py index 8ebf48af..93ad70de 100644 --- a/macrobond_data_api/com/com_client.py +++ b/macrobond_data_api/com/com_client.py @@ -25,7 +25,7 @@ # winreg is not available on linux so mypy will fail on build server as it is runiong on linux from winreg import OpenKey, QueryValueEx, HKEY_CLASSES_ROOT, HKEY_CURRENT_USER # type: ignore except ImportError: - ... + pass def _test_regedit_assembly() -> Optional[str]: diff --git a/macrobond_data_api/common/types/_repr_html_sequence.py b/macrobond_data_api/common/types/_repr_html_sequence.py index cadb2bd6..5c23c2c0 100644 --- a/macrobond_data_api/common/types/_repr_html_sequence.py +++ b/macrobond_data_api/common/types/_repr_html_sequence.py @@ -20,11 +20,11 @@ def __init__(self, items: Sequence[_TypeVar]) -> None: @overload def __getitem__(self, i: int) -> _TypeVar: - ... + pass @overload def __getitem__(self, s: slice) -> Sequence[_TypeVar]: - ... + pass def __getitem__(self, key): # type: ignore return _ReprHtmlSequence(self.items[key]) if isinstance(key, slice) else self.items[key] diff --git a/macrobond_data_api/common/types/get_all_vintage_series_result.py b/macrobond_data_api/common/types/get_all_vintage_series_result.py index 806df6a3..264fea64 100644 --- a/macrobond_data_api/common/types/get_all_vintage_series_result.py +++ b/macrobond_data_api/common/types/get_all_vintage_series_result.py @@ -64,11 +64,11 @@ def _repr_html_(self) -> str: @overload def __getitem__(self, i: int) -> VintageSeries: - ... + pass @overload def __getitem__(self, s: slice) -> Sequence[VintageSeries]: - ... + pass def __getitem__(self, key): # type: ignore return self.series[key] diff --git a/macrobond_data_api/common/types/metadata_value_information.py b/macrobond_data_api/common/types/metadata_value_information.py index e759ea61..f17e75ec 100644 --- a/macrobond_data_api/common/types/metadata_value_information.py +++ b/macrobond_data_api/common/types/metadata_value_information.py @@ -111,11 +111,11 @@ def to_dict(self) -> List[TypedDictMetadataValueInformationItem]: @overload def __getitem__(self, i: int) -> MetadataValueInformationItem: - ... + pass @overload def __getitem__(self, s: slice) -> List[MetadataValueInformationItem]: - ... + pass def __getitem__(self, key): # type: ignore return self.entities[key] diff --git a/macrobond_data_api/common/types/search_result.py b/macrobond_data_api/common/types/search_result.py index 40d9ff62..4bf3f1fb 100644 --- a/macrobond_data_api/common/types/search_result.py +++ b/macrobond_data_api/common/types/search_result.py @@ -51,11 +51,11 @@ def _repr_html_(self) -> str: @overload def __getitem__(self, i: int) -> "Metadata": - ... + pass @overload def __getitem__(self, s: slice) -> Sequence["Metadata"]: - ... + pass def __getitem__(self, key): # type: ignore return self.entities[key] diff --git a/macrobond_data_api/common/types/search_result_long.py b/macrobond_data_api/common/types/search_result_long.py index 6622aad3..628e4e44 100644 --- a/macrobond_data_api/common/types/search_result_long.py +++ b/macrobond_data_api/common/types/search_result_long.py @@ -48,11 +48,11 @@ def _repr_html_(self) -> str: @overload def __getitem__(self, i: int) -> str: - ... + pass @overload def __getitem__(self, s: slice) -> List[str]: - ... + pass def __getitem__(self, key): # type: ignore return self.entities[key] diff --git a/macrobond_data_api/common/types/start_or_end_point.py b/macrobond_data_api/common/types/start_or_end_point.py index 6f4f89ad..a9dbc151 100644 --- a/macrobond_data_api/common/types/start_or_end_point.py +++ b/macrobond_data_api/common/types/start_or_end_point.py @@ -86,12 +86,12 @@ def point_in_time( mm: int = None, # pylint: disable = invalid-name dd: int = None, # pylint: disable = invalid-name ) -> "StartOrEndPoint": - ... + pass @overload @staticmethod def point_in_time(yyyy_or_datetime: datetime) -> "StartOrEndPoint": - ... + pass @staticmethod def point_in_time( diff --git a/macrobond_data_api/common/types/unified_series.py b/macrobond_data_api/common/types/unified_series.py index 42e66384..db1a9200 100644 --- a/macrobond_data_api/common/types/unified_series.py +++ b/macrobond_data_api/common/types/unified_series.py @@ -146,11 +146,11 @@ def _repr_html_(self) -> str: @overload def __getitem__(self, i: int) -> UnifiedSeries: - ... + pass @overload def __getitem__(self, s: slice) -> List[UnifiedSeries]: - ... + pass def __getitem__(self, key): # type: ignore return self.series[key] diff --git a/macrobond_data_api/util/transfer_performance_test.py b/macrobond_data_api/util/transfer_performance_test.py index 197f308a..ce51b04b 100644 --- a/macrobond_data_api/util/transfer_performance_test.py +++ b/macrobond_data_api/util/transfer_performance_test.py @@ -118,7 +118,7 @@ def run_integrity_tests(self, indicator: bool, times: int) -> None: for i in range(0, times): result = _Result.run_integrity_test(self.size_kB, i) if result.error is not None: - ... + pass # print(f" Error: {str(result.error)} ", end="", flush=True) elif indicator: print(".", end="", flush=True) diff --git a/macrobond_data_api/web/_web_api_revision.py b/macrobond_data_api/web/_web_api_revision.py index 71db3635..da5cba13 100644 --- a/macrobond_data_api/web/_web_api_revision.py +++ b/macrobond_data_api/web/_web_api_revision.py @@ -20,8 +20,7 @@ from macrobond_data_api.common.types._repr_html_sequence import _ReprHtmlSequence from ._split_in_to_chunks import split_in_to_chunks -from .session import ProblemDetailsException, Session, _raise_on_error - +from .session import ProblemDetailsException, Session if TYPE_CHECKING: # pragma: no cover from .web_api import WebApi @@ -275,8 +274,8 @@ def get_many_series_with_revisions( with self.session.series.post_fetch_all_vintage_series( _create_web_revision_h_request(requests_chunkd), stream=True ) as response: - _raise_on_error(response) - ijson_items = ijson.items(response.raw, "item") + self.session.raise_on_error(response) + ijson_items = ijson.items(self.session.response_to_file_object(response), "item") item: "SeriesWithVintagesResponse" for item in ijson_items: error_code = item.get("errorCode") diff --git a/macrobond_data_api/web/_web_only_api.py b/macrobond_data_api/web/_web_only_api.py index 5e7c970d..1e35fd55 100644 --- a/macrobond_data_api/web/_web_only_api.py +++ b/macrobond_data_api/web/_web_only_api.py @@ -1,17 +1,18 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, List, Optional, Callable, Tuple +import warnings import ijson from macrobond_data_api.common.types import SearchResultLong from macrobond_data_api.common.types._parse_iso8601 import _parse_iso8601 +from .web_types.data_package_list_context import DataPackageListContextManager from .web_types.data_package_list_state import DataPackageListState from .web_types.data_pacakge_list_item import DataPackageListItem from .web_types.data_package_list import DataPackageList from .web_types.data_package_body import DataPackageBody -from .session import _raise_on_error from .subscription_list import SubscriptionList if TYPE_CHECKING: # pragma: no cover @@ -91,7 +92,7 @@ def _get_data_package_list_iterative_pars_items( return True -def get_data_package_list(self: "WebApi", if_modified_since: datetime = None) -> DataPackageList: +def get_data_package_list(self: "WebApi", if_modified_since: Optional[datetime] = None) -> DataPackageList: # pylint: disable=line-too-long """ Get the items in the data package. @@ -123,11 +124,12 @@ def get_data_package_list_iterative( self: "WebApi", body_callback: Callable[[DataPackageBody], Optional[bool]], items_callback: Callable[[DataPackageBody, List[DataPackageListItem]], Optional[bool]], - if_modified_since: datetime = None, + if_modified_since: Optional[datetime] = None, buffer_size: int = 200, ) -> Optional[DataPackageBody]: # pylint: disable=line-too-long """ + .. Important:: This method is deprecated. Use `macrobond_data_api.web.web_api.WebApi.get_data_package_list_chunked` instead. Process the data package list in batches. This is more efficient since the complete list does not have to be in memory. @@ -142,26 +144,31 @@ def get_data_package_list_iterative( items_callback : Callable[[macrobond_data_api.web.web_types.data_package_body.DataPackageBody, List[macrobond_data_api.web.web_types.data_pacakge_list_item.DataPackageListItem]], Optional[bool]] The callback for each batch of items. Return True to continue processing. - if_modified_since : datetime + if_modified_since : datetime, optional The timestamp of the property time_stamp_for_if_modified_since from the response of the previous call. If not specified, all items will be returned. - buffer_size : int + buffer_size : int, optional The maximum number of items to include in each callback Returns ------- `macrobond_data_api.web.web_types.data_package_body.DataPackageBody` """ # pylint: enable=line-too-long + warnings.warn( + "get_data_package_list_iterative is deprecated. Use get_data_package_list_chunked instead.", + DeprecationWarning, + 2, + ) + params = {} body: Optional[DataPackageBody] = None if if_modified_since: params["ifModifiedSince"] = if_modified_since.isoformat() - with self._session.get("v1/series/getdatapackagelist", params=params, stream=True) as response: - _raise_on_error(response) - ijson_parse = ijson.parse(response.raw) + with self._session.get_or_raise("v1/series/getdatapackagelist", params=params, stream=True) as response: + ijson_parse = ijson.parse(self.session.response_to_file_object(response)) ( time_stamp_for_if_modified_since, @@ -186,12 +193,41 @@ def get_data_package_list_iterative( return body +def get_data_package_list_chunked( + self: "WebApi", if_modified_since: Optional[datetime] = None, chunk_size: int = 200 +) -> DataPackageListContextManager: + # pylint: disable=line-too-long + """ + Process the data package list in chunks. + This is more efficient since the complete list does not have to be in memory and it can be processed while + downloading. + + Typically you want to pass the date of time_stamp_for_if_modified_since from response of the previous call + to get incremental updates. + + Parameters + ---------- + if_modified_since : datetime, optional + The timestamp of the property time_stamp_for_if_modified_since from the response of the previous call. + If not specified, all items will be returned. + + chunk_size : int, optional + The maximum number of items to include in each List in DataPackageListContext.items + Returns + ------- + `macrobond_data_api.web.web_types.data_package_list_context.DataPackageListContextManager` + """ + # pylint: enable=line-too-long + return DataPackageListContextManager(if_modified_since, chunk_size, self) + + # Search def entity_search_multi_filter_long( self: "WebApi", *filters: "SearchFilter", include_discontinued: bool = False ) -> SearchResultLong: + # pylint: disable=line-too-long """ Search for time series and other entitites. This call can return more results than `macrobond_data_api.common.api.Api.entity_search_multi_filter`, @@ -204,12 +240,12 @@ def entity_search_multi_filter_long( ---------- *filters : `macrobond_data_api.common.types.search_filter.SearchFilter` One or more search filters. - include_discontinued : bool + include_discontinued : bool, optional Set this value to True in order to include discontinued entities in the search. Returns ------- - `macrobond_data_api.common.types.search_result_long.SearchResultLong` + A `macrobond_data_api.common.types.search_result_long.SearchResultLong` object containing the names of the entities that match the search filters. """ def convert_filter_to_web_filter(_filter: "SearchFilter") -> "WebSearchFilter": diff --git a/macrobond_data_api/web/data_package_list_poller.py b/macrobond_data_api/web/data_package_list_poller.py index 65de51aa..8f648b6f 100644 --- a/macrobond_data_api/web/data_package_list_poller.py +++ b/macrobond_data_api/web/data_package_list_poller.py @@ -1,35 +1,87 @@ from abc import ABC, abstractmethod +from enum import Enum from datetime import datetime, timezone import time -from typing import List, Optional, cast, TYPE_CHECKING, Callable +from typing import List, Optional, Tuple, Union, Callable, TYPE_CHECKING + from .web_api import WebApi from .web_types.data_package_list_state import DataPackageListState -if TYPE_CHECKING: # pragma: no cover - from .web_types import DataPackageBody, DataPackageListItem +from .web_types.data_pacakge_list_item import DataPackageListItem +from .web_types.data_package_body import DataPackageBody +if TYPE_CHECKING: + from macrobond_data_api.web.web_types.data_package_list_context import ( + DataPackageListContext, + DataPackageListContextManager, + ) -class _AbortException(Exception): - ... + +class ExceptionSource(Enum): + FAILED_TO_BEGIN_FULL_LISTING = 1 + """ + Failed_begin_full_listing can happen after: + * start() + * on_exception() + * on_full_listing_end() + * on_incremental_end() + """ + + FAILED_TO_BEGIN_LISTING = 2 + """ + Failed_begin_listing + * start() + * on_exception() + * on_full_listing_end() + * on_incremental_end() + """ + + FAILED_TO_BEGIN_LISTING_INCOMPLETE = 3 + """ + Failed_to_begin_listing_incomplete + * on_incremental_batch() + """ + + FAILED_TO_GET_BATCH_IN_FULL_LISTING = 4 + """ + Failed_to_get_batch_in_full_listing can happen after: + * on_full_listing_begin() + * on_full_listing_batch() + """ + + FAILED_TO_GET_BATCH_IN_LISTING = 5 + """ + Failed_to_get_batch_in_listing can happen after: + * on_incremental_begin() + * on_incremental_batch() + """ + + FAILED_TO_GET_BATCH_IN_LISTING_INCOMPLETE = 6 + """ + Failed_to_get_batch_in_listing_incomplete can happen after: + * on_incremental_batch() + """ class DataPackageListPoller(ABC): """ This is work in progress and might change soon. Run a loop polling for changed series in the data package list. - Derive from this class and override `on_full_listing_start`, `on_full_listing_items`, `on_full_listing_stop`, - `on_incremental_start`, `on_incremental_items` and `on_incremental_stop`. + Derive from this class and override `on_full_listing_begin`, `on_full_listing_batch`, `on_full_listing_end`, + `on_incremental_begin`, `on_incremental_batch` and `on_incremental_end`. Parameters ---------- api : WebApi The API instance to use. - download_full_list_on_or_after : datetime + download_full_list_on_or_after : datetime, optional The saved value of `download_full_list_on_or_after` from the previous run. `None` on first run. - time_stamp_for_if_modified_since: datetime + time_stamp_for_if_modified_since: datetime, optional The saved value of `time_stamp_for_if_modified_since` from the previous run. `None`on first run. + chunk_size : int, optional + The maximum number of items to include in each on_*_batch() """ def __init__( @@ -37,19 +89,24 @@ def __init__( api: WebApi, download_full_list_on_or_after: Optional[datetime] = None, time_stamp_for_if_modified_since: Optional[datetime] = None, - _sleep: Callable[[int], None] = time.sleep, + chunk_size: int = 200, ) -> None: + self._api = api + self._download_full_list_on_or_after = download_full_list_on_or_after + self._time_stamp_for_if_modified_since = time_stamp_for_if_modified_since + self._chunk_size = chunk_size + self.up_to_date_delay = 15 * 60 """ The time to wait, in seconds, between polls. """ self.incomplete_delay = 15 """ The time to wait, in seconds, between continuing partial updates. """ self.on_error_delay = 30 """ The time to wait, in seconds, before retrying after an error. """ - self._api = api - self._sleep = _sleep + self.on_retry_delay = 30 + + self._sleep = time.sleep + self._now = lambda: datetime.now(timezone.utc) self._abort = False - self._download_full_list_on_or_after = download_full_list_on_or_after - self._time_stamp_for_if_modified_since = time_stamp_for_if_modified_since @property def api(self) -> WebApi: @@ -66,7 +123,7 @@ def download_full_list_on_or_after(self) -> Optional[datetime]: @property def time_stamp_for_if_modified_since(self) -> Optional[datetime]: """ - This value is used internall to keep track of the the time of the last detected modification. + This value is used internally to keep track of the the time of the last detected modification. Save this value after processing and pass in constructor for the next run. """ return self._time_stamp_for_if_modified_since @@ -77,8 +134,7 @@ def start(self) -> None: self._abort = False while not self._abort: if not self._time_stamp_for_if_modified_since or ( - self._download_full_list_on_or_after - and datetime.now(timezone.utc) > self._download_full_list_on_or_after + self._download_full_list_on_or_after and self._now() > self._download_full_list_on_or_after ): sub = self._run_full_listing() if sub: @@ -86,6 +142,11 @@ def start(self) -> None: self._time_stamp_for_if_modified_since = sub.time_stamp_for_if_modified_since else: sub = self._run_listing(self._time_stamp_for_if_modified_since) + + if sub and sub.state != DataPackageListState.UP_TO_DATE: + self._sleep(self.incomplete_delay) + sub = self._run_listing_incomplete(sub.time_stamp_for_if_modified_since) + if sub: self._time_stamp_for_if_modified_since = sub.time_stamp_for_if_modified_since @@ -95,167 +156,205 @@ def start(self) -> None: self._sleep(self.up_to_date_delay) def _test_access(self) -> None: - params = {"ifModifiedSince": datetime(3000, 1, 1, tzinfo=timezone.utc)} + params = {"ifModifiedSince": datetime(3000, 1, 1, tzinfo=timezone.utc).isoformat()} response = self._api.session.get("v1/series/getdatapackagelist", params=params) if response.status_code == 403: raise Exception("Needs access - The account is not set up to use DataPackageList") - def _run_full_listing(self, max_attempts: int = 3) -> Optional["DataPackageBody"]: - is_stated = False - - def _body_callback(body: "DataPackageBody") -> None: - is_stated = True # pylint: disable=unused-variable - self.on_full_listing_start(body) - + def _run_full_listing(self, max_attempts: int = 3) -> Optional[DataPackageBody]: + context_manager: Optional["DataPackageListContextManager"] = None try: - for attempt in range(1, max_attempts): - try: - sub = self._api.get_data_package_list_iterative( - _body_callback, - self.on_full_listing_items, - None, - ) - if not sub: - raise ValueError("subscription is None") - - self.on_full_listing_stop(False, None) - return sub - except Exception as ex: # pylint: disable=broad-except - if self._abort: - raise _AbortException() from ex - if attempt > max_attempts: - raise ex - self._sleep(self.on_error_delay) - except _AbortException as ex: - if is_stated: - self.on_full_listing_stop(True, cast(Exception, ex.__cause__)) - except Exception as ex: # pylint: disable=broad-except - if is_stated: - self.on_full_listing_stop(False, ex) - return None - - def _run_listing(self, if_modified_since: datetime, max_attempts: int = 3) -> Optional["DataPackageBody"]: - is_stated = False - - def _body_callback(body: "DataPackageBody") -> None: - is_stated = True # pylint: disable=unused-variable - self.on_incremental_start(body) + context_manager, context, body = self._retry_get_data_package_list_chunked( + max_attempts, None, ExceptionSource.FAILED_TO_BEGIN_FULL_LISTING + ) + if context is None or body is None: + return None + self.on_full_listing_begin(body) + if self._abort: + self.on_full_listing_end(True) + return None + + if self._try_iterator( + context, + body, + self.on_full_listing_batch, + self.on_full_listing_end, + ExceptionSource.FAILED_TO_GET_BATCH_IN_FULL_LISTING, + ): + self.on_full_listing_end(False) + return body + return None + finally: + if context_manager: + context_manager.__exit__(None, None, None) + + def _run_listing(self, if_modified_since: datetime, max_attempts: int = 3) -> Optional[DataPackageBody]: + context_manager: Optional["DataPackageListContextManager"] = None try: - for attempt in range(1, max_attempts): - try: - sub = self._api.get_data_package_list_iterative( - _body_callback, - self.on_incremental_items, - if_modified_since, + context_manager, context, body = self._retry_get_data_package_list_chunked( + max_attempts, if_modified_since, ExceptionSource.FAILED_TO_BEGIN_LISTING + ) + if context is None or body is None: + return None + + self.on_incremental_begin(body) + if self._abort: + self.on_incremental_end(True) + return None + + if ( + self._try_iterator( + context, + body, + self.on_incremental_batch, + self.on_incremental_end, + ExceptionSource.FAILED_TO_GET_BATCH_IN_LISTING, + ) + is False + ): + return None + + if body.state == DataPackageListState.UP_TO_DATE: + self.on_incremental_end(False) + + return body + finally: + if context_manager: + context_manager.__exit__(None, None, None) + + def _run_listing_incomplete(self, if_modified_since: datetime, max_attempts: int = 3) -> Optional[DataPackageBody]: + while True: + context_manager: Optional["DataPackageListContextManager"] = None + try: + context_manager, context, body = self._retry_get_data_package_list_chunked( + max_attempts, if_modified_since, ExceptionSource.FAILED_TO_BEGIN_LISTING_INCOMPLETE + ) + if context is None or body is None: + return None + + if ( + self._try_iterator( + context, + body, + self.on_incremental_batch, + self.on_incremental_end, + ExceptionSource.FAILED_TO_GET_BATCH_IN_LISTING_INCOMPLETE, ) - break - except Exception as ex: # pylint: disable=broad-except - if self._abort: - raise _AbortException() from ex - if attempt > max_attempts: - raise - self._sleep(self.on_error_delay) - - if not sub: - raise ValueError("subscription is None") - - if sub.state == DataPackageListState.UP_TO_DATE: - self.on_incremental_stop(False, None) - return sub - - self._sleep(self.incomplete_delay) - - return self._run_listing_incomplete(sub.time_stamp_for_if_modified_since, is_stated, max_attempts) - except _AbortException as ex: - if is_stated: - self.on_incremental_stop(True, cast(Exception, ex.__cause__)) - except Exception as ex: # pylint: disable=broad-except - if is_stated: - self.on_incremental_stop(False, ex) - return None - - def _run_listing_incomplete( - self, if_modified_since: datetime, is_stated: bool, max_attempts: int = 3 - ) -> Optional["DataPackageBody"]: - try: - while True: - for attempt in range(1, max_attempts): - try: - sub = self._api.get_data_package_list_iterative( - lambda _: None, - self.on_incremental_items, - if_modified_since, - ) - - if not sub: - raise ValueError("subscription is None") - - if sub.state == DataPackageListState.UP_TO_DATE: - self.on_incremental_stop(False, None) - return sub - - self._sleep(self.incomplete_delay) - - if_modified_since = sub.time_stamp_for_if_modified_since - except Exception as ex2: # pylint: disable=broad-except - if self._abort: - raise _AbortException() from ex2 - if attempt > max_attempts: - raise - self._sleep(self.on_error_delay) - except _AbortException as ex: - if is_stated: - self.on_incremental_stop(True, cast(Exception, ex.__cause__)) - except Exception as ex: # pylint: disable=broad-except - if is_stated: - self.on_incremental_stop(False, ex) - return None + is False + ): + return None + + if body.state == DataPackageListState.UP_TO_DATE: + self.on_incremental_end(False) + return body + + self._sleep(self.incomplete_delay) + + if_modified_since = body.time_stamp_for_if_modified_since + finally: + if context_manager: + context_manager.__exit__(None, None, None) + + def _retry_get_data_package_list_chunked( + self, max_attempts: int, if_modified_since: Optional[datetime], exception_source: ExceptionSource + ) -> Union[ + Tuple["DataPackageListContextManager", "DataPackageListContext", DataPackageBody], + Tuple["DataPackageListContextManager", None, None], + ]: + attempt = 1 + while True: + try: + context_manager = self._api.get_data_package_list_chunked(if_modified_since, self._chunk_size) + context = context_manager.__enter__() # pylint: disable=unnecessary-dunder-call + body = DataPackageBody( + context.time_stamp_for_if_modified_since, + context.download_full_list_on_or_after, + context.state, + ) + return context_manager, context, body + except Exception as ex: # pylint: disable=broad-except + if attempt > max_attempts: + self.on_exception(exception_source, ex) + return context_manager, None, None + self._sleep(self.on_retry_delay * attempt) + attempt += 1 + + def _try_iterator( + self, + context: "DataPackageListContext", + body: DataPackageBody, + on_batch: Callable[[DataPackageBody, List[DataPackageListItem]], None], + on_end: Callable[[bool], None], + exception_source: ExceptionSource, + ) -> bool: + iterator = iter(context.items) + while True: + try: + items = [DataPackageListItem(x[0], x[1]) for x in next(iterator)] + except StopIteration: + return True + except Exception as ex: # pylint: disable=broad-except + self.on_exception(exception_source, ex) + return False + + on_batch(body, items) + if self._abort: + on_end(True) + return False # full_listing @abstractmethod - def on_full_listing_start(self, subscription: "DataPackageBody") -> None: + def on_full_listing_begin(self, subscription: DataPackageBody) -> None: """This override is called when a full listing starts.""" @abstractmethod - def on_full_listing_items(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: """This override is called repeatedly with one or more items until all items are listed.""" @abstractmethod - def on_full_listing_stop(self, is_aborted: bool, exception: Optional[Exception]) -> None: + def on_full_listing_end(self, is_aborted: bool) -> None: """ This override is called when the full listing is stopped. Parameters ---------- is_aborted : bool The processing was aborted. - exception : Optional[Exception] - If not None, there was an exception. """ # listing @abstractmethod - def on_incremental_start(self, subscription: "DataPackageBody") -> None: + def on_incremental_begin(self, subscription: DataPackageBody) -> None: """This override is called when an incremental listing starts.""" @abstractmethod - def on_incremental_items(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: """This override is called repeatedly with one or more items until all updated items are listed.""" @abstractmethod - def on_incremental_stop(self, is_aborted: bool, exception: Optional[Exception]) -> None: + def on_incremental_end(self, is_aborted: bool) -> None: """ This override is called when the incremental listing is stopped. Parameters ---------- is_aborted : bool The processing was aborted. - exception : Optional[Exception] - If not None, there was an exception. + """ + + @abstractmethod + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + """ + This override is called when the incremental listing is stopped. + Parameters + ---------- + exception : Exception + The exception. """ def abort(self) -> None: - """Call this method to stop processing.""" + """ + Call this method to stop processing. + """ self._abort = True diff --git a/macrobond_data_api/web/session.py b/macrobond_data_api/web/session.py index 5defaef9..2f40906f 100644 --- a/macrobond_data_api/web/session.py +++ b/macrobond_data_api/web/session.py @@ -35,23 +35,14 @@ } -def _raise_on_error(response: "Response", non_error_status: Sequence[int] = None) -> "Response": - if non_error_status is None: - non_error_status = [200] +class _ResponseAsFileObject: + def __init__(self, response: "Response", chunk_size: int = 65536) -> None: + self.data = response.iter_content(chunk_size=chunk_size) - if response.status_code in non_error_status: - return response - - content_type = response.headers.get("Content-Type") - - if content_type in ["application/json; charset=utf-8", "application/json"]: - raise ProblemDetailsException.create_from_response(response) - - macrobond_status = response.headers.get("X-Macrobond-Status") - if macrobond_status: - raise ProblemDetailsException(response, detail=macrobond_status) - - raise HttpException(response) + def read(self, n: int) -> bytes: + if n == 0: + return b"" + return next(self.data, b"") class Session: @@ -171,7 +162,7 @@ def get_or_raise( non_error_status: Sequence[int] = None, stream: bool = False, ) -> "Response": - return _raise_on_error(self.get(url, params, stream=stream), non_error_status) + return self.raise_on_error(self.get(url, params, stream=stream), non_error_status) def post(self, url: str, params: dict = None, json: object = None, stream: bool = False) -> "Response": return self._request("POST", url, params, json, stream) @@ -184,7 +175,7 @@ def post_or_raise( non_error_status: Sequence[int] = None, stream: bool = False, ) -> "Response": - return _raise_on_error(self.post(url, params, json, stream=stream), non_error_status) + return self.raise_on_error(self.post(url, params, json, stream=stream), non_error_status) def delete(self, url: str, params: dict = None, stream: bool = False) -> "Response": return self._request("DELETE", url, params, None, stream) @@ -196,7 +187,28 @@ def delete_or_raise( non_error_status: Sequence[int] = None, stream: bool = False, ) -> "Response": - return _raise_on_error(self.delete(url, params, stream=stream), non_error_status) + return self.raise_on_error(self.delete(url, params, stream=stream), non_error_status) + + def raise_on_error(self, response: "Response", non_error_status: Sequence[int] = None) -> "Response": + if non_error_status is None: + non_error_status = [200] + + if response.status_code in non_error_status: + return response + + content_type = response.headers.get("Content-Type") + + if content_type in ["application/json; charset=utf-8", "application/json"]: + raise ProblemDetailsException.create_from_response(response) + + macrobond_status = response.headers.get("X-Macrobond-Status") + if macrobond_status: + raise ProblemDetailsException(response, detail=macrobond_status) + + raise HttpException(response) + + def response_to_file_object(self, response: "Response") -> _ResponseAsFileObject: + return _ResponseAsFileObject(response) def _request(self, method: str, url: str, params: Optional[dict], json: object, stream: bool) -> "Response": if not self._is_open: diff --git a/macrobond_data_api/web/web_api.py b/macrobond_data_api/web/web_api.py index 2c404907..77af3f66 100644 --- a/macrobond_data_api/web/web_api.py +++ b/macrobond_data_api/web/web_api.py @@ -4,6 +4,7 @@ entity_search_multi_filter_long, get_data_package_list, get_data_package_list_iterative, + get_data_package_list_chunked, subscription_list, ) from ._web_api_metadata import metadata_list_values, metadata_get_attribute_information, metadata_get_value_information @@ -71,6 +72,7 @@ def session(self) -> Session: get_data_package_list = get_data_package_list get_data_package_list_iterative = get_data_package_list_iterative + get_data_package_list_chunked = get_data_package_list_chunked entity_search_multi_filter_long = entity_search_multi_filter_long subscription_list = subscription_list diff --git a/macrobond_data_api/web/web_types/__init__.py b/macrobond_data_api/web/web_types/__init__.py index 211bea0f..9326d9a5 100644 --- a/macrobond_data_api/web/web_types/__init__.py +++ b/macrobond_data_api/web/web_types/__init__.py @@ -58,3 +58,5 @@ from .series_request import SeriesRequest from .in_house_series_methods import InHouseSeriesMethods + +from .data_package_list_context import DataPackageListContext, DataPackageListContextManager diff --git a/macrobond_data_api/web/web_types/data_package_list.py b/macrobond_data_api/web/web_types/data_package_list.py index f0ac7d63..9fa4bcd0 100644 --- a/macrobond_data_api/web/web_types/data_package_list.py +++ b/macrobond_data_api/web/web_types/data_package_list.py @@ -30,11 +30,11 @@ def __init__(self, response: "FeedEntitiesResponse") -> None: @overload def __getitem__(self, i: int) -> DataPackageListItem: - ... + pass @overload def __getitem__(self, s: slice) -> List[DataPackageListItem]: - ... + pass def __getitem__(self, key): # type: ignore return self.items[key] diff --git a/macrobond_data_api/web/web_types/data_package_list_context.py b/macrobond_data_api/web/web_types/data_package_list_context.py new file mode 100644 index 00000000..5015ba31 --- /dev/null +++ b/macrobond_data_api/web/web_types/data_package_list_context.py @@ -0,0 +1,186 @@ +from datetime import datetime +from typing import TYPE_CHECKING, Any, Optional, Tuple, Iterable, Iterator, List + +import ijson + +from macrobond_data_api.common.types._parse_iso8601 import _parse_iso8601 + +from .data_package_list_state import DataPackageListState + +if TYPE_CHECKING: # pragma: no cover + from ..web_api import WebApi + from requests import Response + +__pdoc__ = { + "DataPackageListContext.__init__": False, + "DataPackageListContextManager.__init__": False, +} + + +class _DataPackageListContextIterator(Iterator[List[Tuple[str, datetime]]], Iterable[List[Tuple[str, datetime]]]): + _is_uesd = False + _reached_the_end_of_array = False + + def __init__(self, ijson_parse: Any, chunk_size: int) -> None: + self._ijson_parse = ijson_parse + self.chunk_size = chunk_size + + def __iter__(self) -> Iterator[List[Tuple[str, datetime]]]: + if self._is_uesd: + raise Exception("iterator is already used") + self._is_uesd = True + return self + + def __next__(self) -> List[Tuple[str, datetime]]: + if self._reached_the_end_of_array: + raise StopIteration() + name = "" + modified: Optional[datetime] = None + items: List[Tuple[str, datetime]] = [] + while True: + prefix, event, value = next(self._ijson_parse) + if event == "end_map": + if name == "": + raise Exception("bad format: name was not found") + if modified is None: + raise Exception("bad format: modified was not found") + items.append((name, modified)) + name = "" + modified = None + if len(items) == self.chunk_size: + return items + elif event == "end_array": + self._reached_the_end_of_array = True + if len(items) != 0: + return items + raise StopIteration() + elif prefix == "entities.item.name": + if event != "string": + raise Exception("bad format: entities.item.name is not a string") + name = value + elif prefix == "entities.item.modified": + if event != "string": + raise Exception("bad format: entities.item.modified is not a string") + modified = _parse_iso8601(value) + + +class DataPackageListContext: + @property + def time_stamp_for_if_modified_since(self) -> datetime: + """ + A timestamp to pass as the ifModifiedSince parameter + in the next request to get incremental updates. + """ + return self._time_stamp_for_if_modified_since + + @property + def download_full_list_on_or_after(self) -> Optional[datetime]: + """ + Recommended earliest next time to request a full list + by omitting timeStampForIfModifiedSince. + """ + return self._download_full_list_on_or_after + + @property + def state(self) -> DataPackageListState: + """ + The state of this list. + """ + return self._state + + @property + def items(self) -> Iterable[List[Tuple[str, datetime]]]: + """An iterable contining Lists of tuples with the name and Timestamp when this entity was last modified""" + return self._items + + def __init__( + self, + time_stamp_for_if_modified_since: datetime, + download_full_list_on_or_after: Optional[datetime], + state: DataPackageListState, + items: _DataPackageListContextIterator, + ) -> None: + self._time_stamp_for_if_modified_since = time_stamp_for_if_modified_since + self._download_full_list_on_or_after = download_full_list_on_or_after + self._state = state + self._items = items + + +def _parse_body( + ijson_parse: Any, +) -> Tuple[Optional[datetime], Optional[datetime], Optional[DataPackageListState]]: + time_stamp_for_if_modified_since: Optional[datetime] = None + download_full_list_on_or_after: Optional[datetime] = None + state: Optional[DataPackageListState] = None + for prefix, event, value in ijson_parse: + if prefix == "timeStampForIfModifiedSince": + if event != "string": + raise Exception("bad format: timeStampForIfModifiedSince is not a string") + time_stamp_for_if_modified_since = _parse_iso8601(value) + elif prefix == "downloadFullListOnOrAfter": + if event != "string": + raise Exception("bad format: downloadFullListOnOrAfter is not a string") + download_full_list_on_or_after = _parse_iso8601(value) + elif prefix == "state": + if event != "number": + raise Exception("bad format: state is not a number") + state = DataPackageListState(value) + elif event == "start_array": + if prefix != "entities": + raise Exception("bad format: event start_array does not have a prefix of 'entities'") + break + return time_stamp_for_if_modified_since, download_full_list_on_or_after, state + + +class DataPackageListContextManager: + def __init__(self, if_modified_since: Optional[datetime], chunk_size: int, webApi: "WebApi") -> None: + self._if_modified_since = if_modified_since + self.chunk_size = chunk_size + self._webApi: Optional["WebApi"] = webApi + self._iterator_started = False + self._response: Optional["Response"] = None + + def __enter__(self) -> DataPackageListContext: + params = {} + if self._if_modified_since: + params["ifModifiedSince"] = self._if_modified_since.isoformat() + + if self._webApi is None: + raise Exception("obj is closed") + + try: + session = self._webApi._session + self._webApi = None + self._response = session.get_or_raise("v1/series/getdatapackagelist", params=params, stream=True) + + ijson_parse = ijson.parse(session.response_to_file_object(self._response)) + + ( + time_stamp_for_if_modified_since, + download_full_list_on_or_after, + state, + ) = _parse_body(ijson_parse) + + if state is None: + raise Exception("bad format: state was not found") + if time_stamp_for_if_modified_since is None: + raise Exception("bad format: timeStampForIfModifiedSince was not found") + if not self._if_modified_since and download_full_list_on_or_after is None: + raise Exception("bad format: downloadFullListOnOrAfter was not found") + + return DataPackageListContext( + time_stamp_for_if_modified_since, + download_full_list_on_or_after, + state, + _DataPackageListContextIterator(ijson_parse, self.chunk_size), + ) + + except Exception as e: + self.__exit__(None, None, None) + raise e + + def __exit__(self, exception_type: Any, exception_value: Any, traceback: Any) -> None: + self._webApi = None + if self._response: + self._response.close() + self._response = None diff --git a/scripts/lint_tools.py b/scripts/lint_tools.py index 74251dc9..011dc064 100644 --- a/scripts/lint_tools.py +++ b/scripts/lint_tools.py @@ -9,7 +9,7 @@ class Mypy(WorkItem): # TODO: @mb-jp use --strict for mypy async def run(self) -> None: - await self.python_run("mypy", ". --show-error-codes --exclude .env --python-version 3.8") + await self.python_run("mypy", ". --show-error-codes --exclude .env --exclude test.py --python-version 3.8") class Pylint(WorkItem): @@ -19,7 +19,7 @@ async def run(self) -> None: class PyCodeStyle(WorkItem): async def run(self) -> None: - await self.python_run("pycodestyle", "--count . --exclude=.env") + await self.python_run("pycodestyle", "--count . --exclude=.env,test.py") class Black(WorkItem): diff --git a/setup.py b/setup.py index 790e6e0c..8cb4a4d1 100644 --- a/setup.py +++ b/setup.py @@ -85,22 +85,22 @@ extras_require={ "extra": ["matplotlib", "statsmodels", "scikit-learn", "pandas"], "dev": [ - "mypy==1.4.1", - "pylint==2.17.5", - "pycodestyle==2.10.0", + "mypy==1.6.0", + "pylint==3.0.1", + "pycodestyle==2.11.1", "pdoc3==0.10.0", - "build>=0.10.0", - "pytest==7.4.0", + "build>=1.0.3", + "pytest==7.4.2", "pytest-xdist==3.3.1", - "coverage>=7.2.7", - "black[jupyter]==23.7.0", + "coverage>=7.3.2", + "black[jupyter]==23.9.1", "requests[socks]>=2.31.0", "nbconvert==7.3.0", "ipython>=7.34.0", - "types-pywin32==305.0.0.10", - "types-requests==2.31.0.2", - "types-setuptools==68.0.0.3", - "filelock==3.12.2", + "types-pywin32==306.0.0.5", + "types-requests==2.31.0.9", + "types-setuptools==68.2.0.0", + "filelock==3.12.4", ], "socks": ["requests[socks]>=2.31.0"], }, diff --git a/tests/Web/data_package_list_poller/abort.py b/tests/Web/data_package_list_poller/abort.py new file mode 100644 index 00000000..1361b599 --- /dev/null +++ b/tests/Web/data_package_list_poller/abort.py @@ -0,0 +1,411 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, Dict, List, Optional + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", + entities: Optional[List[Dict[str, str]]] = None, +) -> Response: + if entities is None: + entities = [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ] + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": entities, + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +def test_abort_full_listing_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + self.abort() + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(3) + assert is_aborted is True + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api, chunk_size=1).start() + + assert hit == 3 + + +def test_abort_full_listing_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + if hit_test(3, 4) == 4: + self.abort() + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(5) + assert is_aborted is True + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api, chunk_size=1).start() + + assert hit == 5 + + +def test_abort_full_listing_3() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(3) + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(4) + assert is_aborted is False + self.abort() + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api).start() + + assert hit == 4 + + +# test_abort_listing + + +def test_abort_listing_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + self.abort() + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(4) + assert is_aborted is True + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 4 + + +def test_abort_listing_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + if hit_test(4): + self.abort() + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(5) + assert is_aborted is True + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + chunk_size=1, + ).start() + + assert hit == 5 + + +def test_abort_listing_3() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(4) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(5) + assert is_aborted is False + self.abort() + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 5 + + +# test_abort_listing_and_listing_incomplete + + +def test_abort_listing_and_listing_incomplete_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + if hit_test(4, 6) == 6: + self.abort() + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(7) + assert is_aborted is True + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 7 + + +def test_abort_listing_and_listing_incomplete_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4, 6) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(7) + assert is_aborted is False + self.abort() + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 7 diff --git a/tests/Web/data_package_list_poller/error.py b/tests/Web/data_package_list_poller/error.py new file mode 100644 index 00000000..cf7e682a --- /dev/null +++ b/tests/Web/data_package_list_poller/error.py @@ -0,0 +1,396 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, Dict, List, Optional +import pytest + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", + entities: Optional[List[Dict[str, str]]] = None, +) -> Response: + if entities is None: + entities = [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ] + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": entities, + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +def test_full_listing_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller(api).start() + + assert hit == 2 + + +def test_full_listing_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(3) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller(api).start() + + assert hit == 3 + + +def test_full_listing_3() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_full_listing_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(3) + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(4) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller(api).start() + + assert hit == 4 + + +# test_abort_listing + + +def test_listing_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 3 + + +def test_listing_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(4) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 4 + + +def test_listing_3() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: DataPackageBody, items: List[DataPackageListItem]) -> None: + hit_test(4) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(5) + raise Exception("Test exception") + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 5 + + +# test_abort_listing_and_listing_incomplete + + +def test_listing_and_listing_incomplete_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + if hit_test(4, 6) == 6: + raise Exception("Test exception") + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 6 + + +def test_listing_and_listing_incomplete_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4, 6) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(7) + raise Exception("Test exception") + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + with pytest.raises(Exception, match="Test exception"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 7 diff --git a/tests/Web/data_package_list_poller/normal.py b/tests/Web/data_package_list_poller/normal.py new file mode 100644 index 00000000..24ea70bd --- /dev/null +++ b/tests/Web/data_package_list_poller/normal.py @@ -0,0 +1,363 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, Dict, List, Optional + +import pytest + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", + entities: Optional[List[Dict[str, str]]] = None, +) -> Response: + if entities is None: + entities = [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ] + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": entities, + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +# _run_full_listing +def test_full_listing() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(7) + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert subscription.state == DataPackageListState.FULL_LISTING + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + nonlocal hit + hit += 1 + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert subscription.state == DataPackageListState.FULL_LISTING + if hit == 3: + assert items == [DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6))] + if hit == 4: + assert items == [DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6))] + if hit == 5: + assert items == [DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6))] + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(6) + assert is_aborted is False + + api = get_api(get_json_response(DataPackageListState.FULL_LISTING)) + + with pytest.raises(Exception, match="End of test"): + _TestDataPackageListPoller(api, chunk_size=1).start() + + assert hit == 7 + + +# _run_listing +def test_listing() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(8) + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert subscription.state == DataPackageListState.UP_TO_DATE + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + nonlocal hit + hit += 1 + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert subscription.state == DataPackageListState.UP_TO_DATE + if hit == 4: + assert items == [DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6))] + elif hit == 5: + assert items == [DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6))] + elif hit == 6: + assert items == [DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6))] + else: + raise Exception("should not be here") + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(7) + assert is_aborted is False + + api = get_api(get_json_response(DataPackageListState.UP_TO_DATE)) + + with pytest.raises(Exception, match="End of test"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + chunk_size=1, + ).start() + + assert hit == 8 + + +# _run_listing and _run_listing_incomplete +def test_listing_and_listing_incomplete_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(7, 12) + if hit == 7: + assert secs == self.incomplete_delay + elif hit == 12: + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + if hit_test(3) == 3: + assert subscription.state == DataPackageListState.INCOMPLETE + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + nonlocal hit + hit += 1 + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + if hit == 4: + assert subscription.state == DataPackageListState.INCOMPLETE + assert items == [DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6))] + elif hit == 5: + assert subscription.state == DataPackageListState.INCOMPLETE + assert items == [DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6))] + elif hit == 6: + assert subscription.state == DataPackageListState.INCOMPLETE + assert items == [DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6))] + elif hit == 8: + assert subscription.state == DataPackageListState.UP_TO_DATE + assert items == [DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6))] + elif hit == 9: + assert subscription.state == DataPackageListState.UP_TO_DATE + assert items == [DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6))] + elif hit == 10: + assert subscription.state == DataPackageListState.UP_TO_DATE + assert items == [DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6))] + else: + raise Exception("should not be here") + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(11) + assert is_aborted is False + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), get_json_response(DataPackageListState.UP_TO_DATE) + ) + + with pytest.raises(Exception, match="End of test"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + chunk_size=1, + ).start() + + assert hit == 12 + + +def test_listing_and_listing_incomplete_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(5, 7, 10) + if hit in (5, 7): + assert secs == self.incomplete_delay + elif hit == 10: + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + if hit_test(3) == 3: + assert subscription.state == DataPackageListState.INCOMPLETE + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit = hit_test(4, 6, 8) + assert subscription.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert subscription.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + + assert items == [ + DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6)), + DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6)), + DataPackageListItem("usgdp", datetime(2000, 2, 5, 4, 5, 6)), + ] + + if hit in (4, 6): + assert subscription.state == DataPackageListState.INCOMPLETE + elif hit == 8: + assert subscription.state == DataPackageListState.UP_TO_DATE + else: + raise Exception("should not be here") + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(9) + assert is_aborted is False + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), + get_json_response(DataPackageListState.INCOMPLETE), + get_json_response(DataPackageListState.UP_TO_DATE), + ) + + with pytest.raises(Exception, match="End of test"): + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 10 diff --git a/tests/Web/data_package_list_poller/on_exception.py b/tests/Web/data_package_list_poller/on_exception.py new file mode 100644 index 00000000..9d81b378 --- /dev/null +++ b/tests/Web/data_package_list_poller/on_exception.py @@ -0,0 +1,378 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, List, Optional + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", +) -> Response: + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ], + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +def get_broken_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", +) -> Response: + response = get_json_response(state, downloadFullListOnOrAfter, timeStampForIfModifiedSince) + response.raw = BytesIO(bytes(response.raw.getvalue().decode("utf-8")[:-10], "utf-8")) + return response + + +def http_500_response() -> Response: + response = Response() + response.status_code = 500 + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +# _run_full_listing + + +def test_full_listing_error_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(2, 3, 4) - 1 + assert secs == self.on_retry_delay * hit + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(5) + assert source is ExceptionSource.FAILED_TO_BEGIN_FULL_LISTING + assert exception is not None + self.abort() + + api = get_api( + http_500_response(), + http_500_response(), + http_500_response(), + http_500_response(), + ) + + _TestDataPackageListPoller(api, chunk_size=200).start() + + assert hit == 5 + + +def test_full_listing_error_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(2) + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(3) + assert source is ExceptionSource.FAILED_TO_GET_BATCH_IN_FULL_LISTING + assert exception is not None + self.abort() + + api = get_api(get_broken_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api, chunk_size=200).start() + + assert hit == 3 + + +# _run_listing + + +def test_listing_error_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def sleep(self, secs: float) -> None: + hit = hit_test(3, 4, 5) - 2 + assert secs == self.on_retry_delay * hit + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(6) + assert source is ExceptionSource.FAILED_TO_BEGIN_LISTING + assert exception is not None + self.abort() + + api = get_api( + http_500_response(), + http_500_response(), + http_500_response(), + http_500_response(), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 6 + + +def test_listing_error_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: DataPackageBody) -> None: + hit_test(3) + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(4) + assert source is ExceptionSource.FAILED_TO_GET_BATCH_IN_LISTING + assert exception is not None + self.abort() + + api = get_api( + get_broken_json_response(DataPackageListState.UP_TO_DATE), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 4 + + +# _run_listing and _run_listing_incomplete + + +def test_listing_and_listing_incomplete_error_1() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(5, 6, 7, 8) + if hit == 5: + assert secs == self.incomplete_delay + else: + assert secs == self.on_retry_delay * (hit - 5) + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4) + assert subscription.state == DataPackageListState.INCOMPLETE + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(9) + assert source is ExceptionSource.FAILED_TO_BEGIN_LISTING_INCOMPLETE + assert exception is not None + self.abort() + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), + http_500_response(), + http_500_response(), + http_500_response(), + http_500_response(), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 9 + + +def test_listing_and_listing_incomplete_error_2() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(5) + assert secs == self.incomplete_delay + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4) + assert subscription.state == DataPackageListState.INCOMPLETE + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + hit_test(6) + assert source is ExceptionSource.FAILED_TO_GET_BATCH_IN_LISTING_INCOMPLETE + assert exception is not None + self.abort() + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), + get_broken_json_response(DataPackageListState.UP_TO_DATE), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 6 diff --git a/tests/Web/data_package_list_poller/retry.py b/tests/Web/data_package_list_poller/retry.py new file mode 100644 index 00000000..0acedc3c --- /dev/null +++ b/tests/Web/data_package_list_poller/retry.py @@ -0,0 +1,237 @@ +from datetime import datetime, timezone +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, List, Optional + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +def get_api(*responses: Response) -> WebApi: + return WebApi(Session("", "", test_auth2_session=TestAuth2Session(*responses))) + + +def get_json_response( + state: DataPackageListState, + downloadFullListOnOrAfter: str = "2000-02-01T04:05:06", + timeStampForIfModifiedSince: str = "2000-02-02T04:05:06", +) -> Response: + json = json_dumps( + { + "downloadFullListOnOrAfter": downloadFullListOnOrAfter, + "timeStampForIfModifiedSince": timeStampForIfModifiedSince, + "state": state, + "entities": [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + {"name": "usgdp", "modified": "2000-02-05T04:05:06"}, + ], + } + ) + response = Response() + response.status_code = 200 + response.raw = BytesIO(bytes(json, "utf-8")) + return response + + +def http_500_response() -> Response: + response = Response() + response.status_code = 500 + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +def test_full_listing() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(2) + assert secs == self.on_error_delay + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(4) + + def on_full_listing_end(self, is_aborted: bool) -> None: + hit_test(5) + self.abort() + + api = get_api(http_500_response(), get_json_response(DataPackageListState.FULL_LISTING)) + + _TestDataPackageListPoller(api, chunk_size=200).start() + + assert hit == 5 + + +def test_listing() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit_test(3) + assert secs == self.on_error_delay + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(4) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + hit_test(5) + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(6) + self.abort() + + api = get_api(http_500_response(), get_json_response(DataPackageListState.UP_TO_DATE)) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 6 + + +# _run_listing and _run_listing_incomplete +def test_listing_and_listing_incomplete() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + def _test_access(self) -> None: + hit_test(1) + + def sleep(self, secs: float) -> None: + hit = hit_test(5, 6, 12) + if hit == 5: + assert secs == self.incomplete_delay + elif hit == 6: + assert secs == self.on_error_delay + elif hit == 12: + assert secs == self.up_to_date_delay + raise Exception("End of test") + + def now(self) -> datetime: + hit_test(2) + return datetime(2000, 1, 1, tzinfo=timezone.utc) + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + hit_test(3) + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + if hit_test(4, 7) == 4: + assert subscription.state == DataPackageListState.INCOMPLETE + elif hit == 7: + assert subscription.state == DataPackageListState.UP_TO_DATE + + def on_incremental_end(self, is_aborted: bool) -> None: + hit_test(8) + self.abort() + + api = get_api( + get_json_response(DataPackageListState.INCOMPLETE), + http_500_response(), + get_json_response(DataPackageListState.UP_TO_DATE), + ) + + _TestDataPackageListPoller( + api, + download_full_list_on_or_after=datetime(3000, 1, 1, tzinfo=timezone.utc), + time_stamp_for_if_modified_since=datetime(1000, 1, 1, tzinfo=timezone.utc), + ).start() + + assert hit == 8 diff --git a/tests/Web/data_package_list_poller/test_access.py b/tests/Web/data_package_list_poller/test_access.py new file mode 100644 index 00000000..36cc061a --- /dev/null +++ b/tests/Web/data_package_list_poller/test_access.py @@ -0,0 +1,94 @@ +from datetime import datetime +from typing import Any, List, Optional + +import pytest + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.data_package_list_poller import DataPackageListPoller, ExceptionSource +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem + + +class TestAuth2Session: + __test__ = False + + def __init__(self, *responses: Response): + self.index = 0 + self.responses = responses + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = self.responses[self.index] + self.index += 1 + return response + + +class TestDataPackageListPoller(DataPackageListPoller): + __test__ = False + + def __init__( + self, + api: WebApi, + download_full_list_on_or_after: Optional[datetime] = None, + time_stamp_for_if_modified_since: Optional[datetime] = None, + chunk_size: int = 200, + ): + super().__init__(api, download_full_list_on_or_after, time_stamp_for_if_modified_since, chunk_size) + self._sleep = self.sleep + self._now = self.now + + def sleep(self, secs: float) -> None: + raise Exception("should not be called") + + def now(self) -> datetime: + raise Exception("should not be called") + + def on_full_listing_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_full_listing_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_full_listing_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_incremental_begin(self, subscription: "DataPackageBody") -> None: + raise Exception("should not be called") + + def on_incremental_batch(self, subscription: "DataPackageBody", items: List["DataPackageListItem"]) -> None: + raise Exception("should not be called") + + def on_incremental_end(self, is_aborted: bool) -> None: + raise Exception("should not be called") + + def on_exception(self, source: ExceptionSource, exception: Exception) -> None: + raise Exception("should not be called") + + +def test_access() -> None: + hit = 0 + + def hit_test(*now: int) -> int: + nonlocal hit + hit += 1 + assert hit in now + return hit + + class _TestAuth2Session: + __test__ = False + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + hit_test(1) + response = Response() + response.status_code = 403 + return response + + class _TestDataPackageListPoller(TestDataPackageListPoller): + __test__ = False + + api = WebApi(Session("", "", test_auth2_session=_TestAuth2Session())) + with pytest.raises(Exception, match="Needs access - The account is not set up to use DataPackageList"): + _TestDataPackageListPoller(api).start() + + assert hit == 1 diff --git a/tests/Web/web_get_data_package_list.py b/tests/Web/web_get_data_package_list.py new file mode 100644 index 00000000..90f5647b --- /dev/null +++ b/tests/Web/web_get_data_package_list.py @@ -0,0 +1,54 @@ +from datetime import datetime +from json import dumps as json_dumps +from typing import Any + +from requests import Response + +import pytest + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, content: bytes): + self.content = content + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = Response() + response.status_code = 200 + response._content = self.content + return response + + +@pytest.mark.parametrize( + "state", + [DataPackageListState.FULL_LISTING, DataPackageListState.INCOMPLETE, DataPackageListState.UP_TO_DATE], +) +def test(state: DataPackageListState) -> None: + json = json_dumps( + { + "downloadFullListOnOrAfter": "2000-02-01T04:05:06", + "timeStampForIfModifiedSince": "2000-02-02T04:05:06", + "state": state, + "entities": [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + ], + } + ) + + api = WebApi(Session("", "", test_auth2_session=TestAuth2Session(bytes(json, "utf-8")))) + + r = api.get_data_package_list() + + assert r.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert r.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert r.state == state + assert r.items == [ + DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6)), + DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6)), + ] diff --git a/tests/Web/web_get_data_package_list_chunked.py b/tests/Web/web_get_data_package_list_chunked.py new file mode 100644 index 00000000..9cf8d781 --- /dev/null +++ b/tests/Web/web_get_data_package_list_chunked.py @@ -0,0 +1,88 @@ +from datetime import datetime +from io import BytesIO +from json import dumps as json_dumps +from typing import Any + +import pytest + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, content: bytes): + self.content = content + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = Response() + response.status_code = 200 + response.raw = BytesIO(self.content) + return response + + +def get_json(state: DataPackageListState) -> str: + return json_dumps( + { + "downloadFullListOnOrAfter": "2000-02-01T04:05:06", + "timeStampForIfModifiedSince": "2000-02-02T04:05:06", + "state": state, + "entities": [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + ], + } + ) + + +@pytest.mark.parametrize( + "state", + [DataPackageListState.FULL_LISTING, DataPackageListState.INCOMPLETE, DataPackageListState.UP_TO_DATE], +) +def test_1(state: DataPackageListState) -> None: + hitponts = 1 + json = get_json(state) + + api = WebApi(Session("", "", test_auth2_session=TestAuth2Session(bytes(json, "utf-8")))) + + with api.get_data_package_list_chunked() as context: + assert context.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert context.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert context.state == state + + assert list(context.items) == [ + [("sek", datetime(2000, 2, 3, 4, 5, 6)), ("dkk", datetime(2000, 2, 4, 4, 5, 6))], + ] + + hitponts -= 1 + + assert hitponts == 0 + + +@pytest.mark.parametrize( + "state", + [DataPackageListState.FULL_LISTING, DataPackageListState.INCOMPLETE, DataPackageListState.UP_TO_DATE], +) +def test_2(state: DataPackageListState) -> None: + hitponts = 1 + json = get_json(state) + + api = WebApi(Session("", "", test_auth2_session=TestAuth2Session(bytes(json, "utf-8")))) + + with api.get_data_package_list_chunked(chunk_size=1) as context: + assert context.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert context.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert context.state == state + + assert list(context.items) == [ + [("sek", datetime(2000, 2, 3, 4, 5, 6))], + [("dkk", datetime(2000, 2, 4, 4, 5, 6))], + ] + + hitponts -= 1 + + assert hitponts == 0 diff --git a/tests/Web/web_get_data_package_list_iterative.py b/tests/Web/web_get_data_package_list_iterative.py new file mode 100644 index 00000000..4adbcd4b --- /dev/null +++ b/tests/Web/web_get_data_package_list_iterative.py @@ -0,0 +1,72 @@ +from datetime import datetime +from io import BytesIO +from json import dumps as json_dumps +from typing import Any, List +import warnings + +import pytest + +from requests import Response + +from macrobond_data_api.web import WebApi +from macrobond_data_api.web.session import Session +from macrobond_data_api.web.web_types import DataPackageBody, DataPackageListItem, DataPackageListState + + +class TestAuth2Session: + __test__ = False + + def __init__(self, content: bytes): + self.content = content + + def request(self, *args: Any, **kwargs: Any) -> Response: # pylint: disable=unused-argument + response = Response() + response.status_code = 200 + response.raw = BytesIO(self.content) + return response + + +@pytest.mark.parametrize( + "state", + [DataPackageListState.FULL_LISTING, DataPackageListState.INCOMPLETE, DataPackageListState.UP_TO_DATE], +) +def test(state: DataPackageListState) -> None: + json = json_dumps( + { + "downloadFullListOnOrAfter": "2000-02-01T04:05:06", + "timeStampForIfModifiedSince": "2000-02-02T04:05:06", + "state": state, + "entities": [ + {"name": "sek", "modified": "2000-02-03T04:05:06"}, + {"name": "dkk", "modified": "2000-02-04T04:05:06"}, + ], + } + ) + + hitponts = 2 + + def body_callback(body: DataPackageBody) -> None: + assert body.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert body.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert body.state == state + nonlocal hitponts + hitponts -= 1 + + def items_callback(body: DataPackageBody, data: List[DataPackageListItem]) -> None: + assert body.download_full_list_on_or_after == datetime(2000, 2, 1, 4, 5, 6) + assert body.time_stamp_for_if_modified_since == datetime(2000, 2, 2, 4, 5, 6) + assert body.state == state + assert data == [ + DataPackageListItem("sek", datetime(2000, 2, 3, 4, 5, 6)), + DataPackageListItem("dkk", datetime(2000, 2, 4, 4, 5, 6)), + ] + nonlocal hitponts + hitponts -= 1 + + api = WebApi(Session("", "", test_auth2_session=TestAuth2Session(bytes(json, "utf-8")))) + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + api.get_data_package_list_iterative(body_callback, items_callback) + + assert hitponts == 0