From e0d9b0d94bd281c4d50e8a63dab40531d314b52f Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Tue, 27 Jun 2023 17:34:19 +0200 Subject: [PATCH 1/7] CTX-4070: Changed dataset download to streaming --- coretex/networking/network_manager_base.py | 24 ++++++----------- coretex/networking/network_response.py | 2 +- coretex/networking/requests_manager.py | 30 ++++++++++++++++++++++ 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/coretex/networking/network_manager_base.py b/coretex/networking/network_manager_base.py index 9c340db7..56f4dfa4 100644 --- a/coretex/networking/network_manager_base.py +++ b/coretex/networking/network_manager_base.py @@ -225,26 +225,18 @@ def genericDownload( if parameters is None: parameters = {} - response = self._requestManager.get(endpoint, headers, jsonObject = parameters) + networkResponse = self._requestManager.streamingDownload( + endpoint, + Path(destination), + headers, + parameters + ) - if self.shouldRetry(retryCount, response): + if self.shouldRetry(retryCount, networkResponse): print(">> [Coretex] Retry count: {0}".format(retryCount)) return self.genericDownload(endpoint, destination, parameters, retryCount + 1) - if response.raw.ok: - destinationPath = Path(destination) - if destinationPath.is_dir(): - raise RuntimeError(">> [Coretex] Destination is a directory not a file") - - if destinationPath.exists(): - destinationPath.unlink(missing_ok = True) - - destinationPath.parent.mkdir(parents = True, exist_ok = True) - - with open(destination, "wb") as downloadedFile: - downloadedFile.write(response.raw.content) - - return response + return networkResponse def genericUpload( self, diff --git a/coretex/networking/network_response.py b/coretex/networking/network_response.py index c01ad540..f52ca6d2 100644 --- a/coretex/networking/network_response.py +++ b/coretex/networking/network_response.py @@ -57,7 +57,7 @@ def __init__(self, response: Response, endpoint: str): try: self.json = response.json() - except ValueError: + except (ValueError, RuntimeError) as e: self.json = {} if not response.ok: diff --git a/coretex/networking/requests_manager.py b/coretex/networking/requests_manager.py index a24bd1f2..dd92edc0 100644 --- a/coretex/networking/requests_manager.py +++ b/coretex/networking/requests_manager.py @@ -17,6 +17,7 @@ from typing import Final, Any, Optional, Dict, List from contextlib import ExitStack +from pathlib import Path import logging @@ -205,6 +206,35 @@ def post( raise RequestFailedError + def streamingDownload( + self, + endpoint: str, + destinationPath: Path, + headers: Dict[str, str], + parameters: Dict[str, Any] = None + ) -> NetworkResponse: + + with self.__session.get( + self.__url(endpoint), + stream = True, + headers = headers, + json = parameters + ) as response: + + if destinationPath.is_dir(): + raise RuntimeError(">> [Coretex] Destination is a directory not a file") + + if destinationPath.exists(): + destinationPath.unlink(missing_ok = True) + + destinationPath.parent.mkdir(parents = True, exist_ok = True) + + with destinationPath.open("wb") as downloadedFile: + for chunk in response.iter_content(chunk_size = 8192): + downloadedFile.write(chunk) + + return NetworkResponse(response, endpoint) + def upload( self, endpoint: str, From 0c4d070e99312456bfe70aa7e34e160779c5dc6c Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Tue, 27 Jun 2023 17:37:31 +0200 Subject: [PATCH 2/7] CTX-4070: Updated streamingDownload function --- coretex/networking/requests_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coretex/networking/requests_manager.py b/coretex/networking/requests_manager.py index dd92edc0..5c86944b 100644 --- a/coretex/networking/requests_manager.py +++ b/coretex/networking/requests_manager.py @@ -211,7 +211,7 @@ def streamingDownload( endpoint: str, destinationPath: Path, headers: Dict[str, str], - parameters: Dict[str, Any] = None + parameters: Dict[str, Any] ) -> NetworkResponse: with self.__session.get( From 620fac02973fc059c10232903658d3aff09ffd2a Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Tue, 11 Jul 2023 17:25:57 +0200 Subject: [PATCH 3/7] CTX-4070: Made is so streaming download is only used for samples and implemented checking for downloaded file corruption (if file size does not match expected size) --- coretex/coretex/sample/network_sample.py | 8 +-- coretex/networking/network_manager_base.py | 68 +++++++++++++++++++++- coretex/networking/requests_manager.py | 4 ++ 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/coretex/coretex/sample/network_sample.py b/coretex/coretex/sample/network_sample.py index 9f4ae9a3..c6f632d2 100644 --- a/coretex/coretex/sample/network_sample.py +++ b/coretex/coretex/sample/network_sample.py @@ -122,12 +122,10 @@ def download(self, ignoreCache: bool = False) -> bool: bool -> False if response is failed, True otherwise """ - if os.path.exists(self.zipPath) and not ignoreCache: - return True - - response = networkManager.genericDownload( + response = networkManager.sampleDownload( endpoint = f"{self.__class__._endpoint()}/export?id={self.id}", - destination = self.zipPath + destination = self.zipPath, + ignoreCache = ignoreCache ) return not response.hasFailed() diff --git a/coretex/networking/network_manager_base.py b/coretex/networking/network_manager_base.py index 56f4dfa4..91de12df 100644 --- a/coretex/networking/network_manager_base.py +++ b/coretex/networking/network_manager_base.py @@ -222,19 +222,85 @@ def genericDownload( headers = self._requestHeader() + if parameters is None: + parameters = {} + + response = self._requestManager.get(endpoint, headers, jsonObject = parameters) + + if self.shouldRetry(retryCount, response): + print(">> [Coretex] Retry count: {0}".format(retryCount)) + return self.genericDownload(endpoint, destination, parameters, retryCount + 1) + + if response.raw.ok: + destinationPath = Path(destination) + if destinationPath.is_dir(): + raise RuntimeError(">> [Coretex] Destination is a directory not a file") + + if destinationPath.exists(): + destinationPath.unlink(missing_ok = True) + + destinationPath.parent.mkdir(parents = True, exist_ok = True) + + with open(destination, "wb") as downloadedFile: + downloadedFile.write(response.raw.content) + + return response + + def sampleDownload( + self, + endpoint: str, + destination: str, + ignoreCache: bool, + parameters: Optional[Dict[str, Any]] = None, + retryCount: int = 0 + ) -> NetworkResponse: + """ + Downloads file to the given destination + + Parameters + ---------- + endpoint : str + API endpoint + destination : str + path to save file + parameters : Optional[dict[str, Any]] + request parameters (not required) + retryCount : int + number of function calls if request has failed, used + for internal retry mechanism + + Returns + ------- + NetworkResponse as response content to request + + Example + ------- + >>> from coretex import networkManager + \b + >>> response = networkManager.genericDownload( + endpoint = "dummyObject/download", + destination = "path/to/destination/folder" + ) + >>> if response.hasFailed(): + print("Failed to download the file") + """ + + headers = self._requestHeader() + if parameters is None: parameters = {} networkResponse = self._requestManager.streamingDownload( endpoint, Path(destination), + ignoreCache, headers, parameters ) if self.shouldRetry(retryCount, networkResponse): print(">> [Coretex] Retry count: {0}".format(retryCount)) - return self.genericDownload(endpoint, destination, parameters, retryCount + 1) + return self.sampleDownload(endpoint, destination, parameters, retryCount + 1) return networkResponse diff --git a/coretex/networking/requests_manager.py b/coretex/networking/requests_manager.py index 5c86944b..92618f5a 100644 --- a/coretex/networking/requests_manager.py +++ b/coretex/networking/requests_manager.py @@ -210,6 +210,7 @@ def streamingDownload( self, endpoint: str, destinationPath: Path, + ignoreCache: bool, headers: Dict[str, str], parameters: Dict[str, Any] ) -> NetworkResponse: @@ -225,6 +226,9 @@ def streamingDownload( raise RuntimeError(">> [Coretex] Destination is a directory not a file") if destinationPath.exists(): + if int(response.headers["Content-Length"]) == destinationPath.stat().st_size and not ignoreCache: + return NetworkResponse(response, endpoint) + destinationPath.unlink(missing_ok = True) destinationPath.parent.mkdir(parents = True, exist_ok = True) From 028e8951c579a34aa524a645212d1a59875a0e39 Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Tue, 11 Jul 2023 17:27:58 +0200 Subject: [PATCH 4/7] CTX-4070: Minor bug fix --- coretex/networking/network_manager_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coretex/networking/network_manager_base.py b/coretex/networking/network_manager_base.py index 91de12df..b6a9898c 100644 --- a/coretex/networking/network_manager_base.py +++ b/coretex/networking/network_manager_base.py @@ -300,7 +300,7 @@ def sampleDownload( if self.shouldRetry(retryCount, networkResponse): print(">> [Coretex] Retry count: {0}".format(retryCount)) - return self.sampleDownload(endpoint, destination, parameters, retryCount + 1) + return self.sampleDownload(endpoint, destination, ignoreCache, parameters, retryCount + 1) return networkResponse From 90e7a2eea254f6788c0c77ad0e4efdb9ba5fac21 Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Fri, 14 Jul 2023 13:11:03 +0200 Subject: [PATCH 5/7] CTX-4070: Updated documentation and code in networking related to streaming download --- coretex/networking/network_manager_base.py | 6 +++-- coretex/networking/network_response.py | 2 +- coretex/networking/requests_manager.py | 26 +++++++++++++++++++--- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/coretex/networking/network_manager_base.py b/coretex/networking/network_manager_base.py index b6a9898c..94d6fc5c 100644 --- a/coretex/networking/network_manager_base.py +++ b/coretex/networking/network_manager_base.py @@ -255,7 +255,7 @@ def sampleDownload( retryCount: int = 0 ) -> NetworkResponse: """ - Downloads file to the given destination + Downloads file to the given destination by chunks Parameters ---------- @@ -263,6 +263,8 @@ def sampleDownload( API endpoint destination : str path to save file + ignoreCache : bool + whether to overwrite local cache parameters : Optional[dict[str, Any]] request parameters (not required) retryCount : int @@ -277,7 +279,7 @@ def sampleDownload( ------- >>> from coretex import networkManager \b - >>> response = networkManager.genericDownload( + >>> response = networkManager.sampleDownload( endpoint = "dummyObject/download", destination = "path/to/destination/folder" ) diff --git a/coretex/networking/network_response.py b/coretex/networking/network_response.py index f52ca6d2..67ea9230 100644 --- a/coretex/networking/network_response.py +++ b/coretex/networking/network_response.py @@ -57,7 +57,7 @@ def __init__(self, response: Response, endpoint: str): try: self.json = response.json() - except (ValueError, RuntimeError) as e: + except (ValueError, RuntimeError): self.json = {} if not response.ok: diff --git a/coretex/networking/requests_manager.py b/coretex/networking/requests_manager.py index 92618f5a..e0c333fd 100644 --- a/coretex/networking/requests_manager.py +++ b/coretex/networking/requests_manager.py @@ -214,6 +214,26 @@ def streamingDownload( headers: Dict[str, str], parameters: Dict[str, Any] ) -> NetworkResponse: + """ + Downloads a file from endpoint to destinationPath in chunks of 8192 bytes + + Parameters + ---------- + endpoint : str + API endpoint + destination : str + path to save file + ignoreCache : bool + whether to overwrite local cache + headers : Any + headers for get + parameters : int + json for get + + Returns + ------- + NetworkResponse -> NetworkResponse object as response content to request + """ with self.__session.get( self.__url(endpoint), @@ -222,6 +242,8 @@ def streamingDownload( json = parameters ) as response: + response.raise_for_status() + if destinationPath.is_dir(): raise RuntimeError(">> [Coretex] Destination is a directory not a file") @@ -229,9 +251,7 @@ def streamingDownload( if int(response.headers["Content-Length"]) == destinationPath.stat().st_size and not ignoreCache: return NetworkResponse(response, endpoint) - destinationPath.unlink(missing_ok = True) - - destinationPath.parent.mkdir(parents = True, exist_ok = True) + destinationPath.unlink() with destinationPath.open("wb") as downloadedFile: for chunk in response.iter_content(chunk_size = 8192): From 8cd9a5085210efebbf4e40c401b14123f6a8272f Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Fri, 14 Jul 2023 13:42:55 +0200 Subject: [PATCH 6/7] CTX-4070: Added comment and typing --- coretex/networking/network_manager_base.py | 2 +- coretex/networking/network_response.py | 1 + coretex/networking/requests_manager.py | 7 +++++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/coretex/networking/network_manager_base.py b/coretex/networking/network_manager_base.py index 94d6fc5c..4601a06c 100644 --- a/coretex/networking/network_manager_base.py +++ b/coretex/networking/network_manager_base.py @@ -294,7 +294,7 @@ def sampleDownload( networkResponse = self._requestManager.streamingDownload( endpoint, - Path(destination), + destination, ignoreCache, headers, parameters diff --git a/coretex/networking/network_response.py b/coretex/networking/network_response.py index 67ea9230..a9462a01 100644 --- a/coretex/networking/network_response.py +++ b/coretex/networking/network_response.py @@ -58,6 +58,7 @@ def __init__(self, response: Response, endpoint: str): try: self.json = response.json() except (ValueError, RuntimeError): + # RuntimeError is present here to avoid the content_consumed error self.json = {} if not response.ok: diff --git a/coretex/networking/requests_manager.py b/coretex/networking/requests_manager.py index e0c333fd..e194f031 100644 --- a/coretex/networking/requests_manager.py +++ b/coretex/networking/requests_manager.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Final, Any, Optional, Dict, List +from typing import Final, Any, Optional, Dict, List, Union from contextlib import ExitStack from pathlib import Path @@ -209,7 +209,7 @@ def post( def streamingDownload( self, endpoint: str, - destinationPath: Path, + destinationPath: Union[str, Path], ignoreCache: bool, headers: Dict[str, str], parameters: Dict[str, Any] @@ -244,6 +244,9 @@ def streamingDownload( response.raise_for_status() + if isinstance(destinationPath, str): + destinationPath = Path(destinationPath) + if destinationPath.is_dir(): raise RuntimeError(">> [Coretex] Destination is a directory not a file") From fcefbdc05168de4b80e934b69c55240f1f91d6e9 Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Fri, 14 Jul 2023 15:30:29 +0200 Subject: [PATCH 7/7] CTX-4070: Updated typing and documentation --- coretex/networking/network_manager_base.py | 6 +++--- coretex/networking/requests_manager.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/coretex/networking/network_manager_base.py b/coretex/networking/network_manager_base.py index 4601a06c..46047a64 100644 --- a/coretex/networking/network_manager_base.py +++ b/coretex/networking/network_manager_base.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Optional, Any, Dict, List +from typing import Optional, Any, Dict, List, Union from pathlib import Path from abc import ABC, abstractmethod from importlib.metadata import version as getLibraryVersion @@ -249,7 +249,7 @@ def genericDownload( def sampleDownload( self, endpoint: str, - destination: str, + destination: Union[str, Path], ignoreCache: bool, parameters: Optional[Dict[str, Any]] = None, retryCount: int = 0 @@ -261,7 +261,7 @@ def sampleDownload( ---------- endpoint : str API endpoint - destination : str + destination : Union[str, Path] path to save file ignoreCache : bool whether to overwrite local cache diff --git a/coretex/networking/requests_manager.py b/coretex/networking/requests_manager.py index e194f031..898ce89e 100644 --- a/coretex/networking/requests_manager.py +++ b/coretex/networking/requests_manager.py @@ -221,7 +221,7 @@ def streamingDownload( ---------- endpoint : str API endpoint - destination : str + destination : Union[str, Path] path to save file ignoreCache : bool whether to overwrite local cache