Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CTX-4070: Streaming download #19

Merged
merged 8 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions coretex/coretex/sample/network_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,10 @@ def download(self, ignoreCache: bool = False) -> bool:
bool -> False if response is failed, True otherwise
"""

if os.path.exists(self.zipPath) and not ignoreCache:
return True

response = networkManager.genericDownload(
response = networkManager.sampleDownload(
endpoint = f"{self.__class__._endpoint()}/export?id={self.id}",
destination = self.zipPath
destination = self.zipPath,
ignoreCache = ignoreCache
)

return not response.hasFailed()
Expand Down
62 changes: 61 additions & 1 deletion coretex/networking/network_manager_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from typing import Optional, Any, Dict, List
from typing import Optional, Any, Dict, List, Union
from pathlib import Path
from abc import ABC, abstractmethod
from importlib.metadata import version as getLibraryVersion
Expand Down Expand Up @@ -246,6 +246,66 @@ def genericDownload(

return response

def sampleDownload(
self,
endpoint: str,
destination: Union[str, Path],
ignoreCache: bool,
parameters: Optional[Dict[str, Any]] = None,
retryCount: int = 0
) -> NetworkResponse:
"""
Downloads file to the given destination by chunks
Parameters
----------
endpoint : str
API endpoint
destination : Union[str, Path]
path to save file
ignoreCache : bool
whether to overwrite local cache
parameters : Optional[dict[str, Any]]
request parameters (not required)
retryCount : int
number of function calls if request has failed, used
for internal retry mechanism
Returns
-------
NetworkResponse as response content to request
Example
-------
>>> from coretex import networkManager
\b
>>> response = networkManager.sampleDownload(
endpoint = "dummyObject/download",
destination = "path/to/destination/folder"
)
>>> if response.hasFailed():
print("Failed to download the file")
"""

headers = self._requestHeader()

if parameters is None:
parameters = {}

networkResponse = self._requestManager.streamingDownload(
endpoint,
destination,
ignoreCache,
headers,
parameters
)

if self.shouldRetry(retryCount, networkResponse):
print(">> [Coretex] Retry count: {0}".format(retryCount))
return self.sampleDownload(endpoint, destination, ignoreCache, parameters, retryCount + 1)

return networkResponse

def genericUpload(
self,
endpoint: str,
Expand Down
3 changes: 2 additions & 1 deletion coretex/networking/network_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def __init__(self, response: Response, endpoint: str):

try:
self.json = response.json()
except ValueError:
except (ValueError, RuntimeError):
# RuntimeError is present here to avoid the content_consumed error
self.json = {}

if not response.ok:
Expand Down
59 changes: 58 additions & 1 deletion coretex/networking/requests_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from typing import Final, Any, Optional, Dict, List
from typing import Final, Any, Optional, Dict, List, Union
from contextlib import ExitStack
from pathlib import Path

import logging

Expand Down Expand Up @@ -205,6 +206,62 @@ def post(

raise RequestFailedError

def streamingDownload(
dule1322 marked this conversation as resolved.
Show resolved Hide resolved
self,
endpoint: str,
destinationPath: Union[str, Path],
ignoreCache: bool,
headers: Dict[str, str],
parameters: Dict[str, Any]
) -> NetworkResponse:
dule1322 marked this conversation as resolved.
Show resolved Hide resolved
"""
Downloads a file from endpoint to destinationPath in chunks of 8192 bytes
Parameters
----------
endpoint : str
API endpoint
destination : Union[str, Path]
path to save file
ignoreCache : bool
whether to overwrite local cache
headers : Any
headers for get
parameters : int
json for get
Returns
-------
NetworkResponse -> NetworkResponse object as response content to request
"""

with self.__session.get(
self.__url(endpoint),
stream = True,
headers = headers,
json = parameters
) as response:

response.raise_for_status()

if isinstance(destinationPath, str):
destinationPath = Path(destinationPath)

if destinationPath.is_dir():
raise RuntimeError(">> [Coretex] Destination is a directory not a file")

if destinationPath.exists():
if int(response.headers["Content-Length"]) == destinationPath.stat().st_size and not ignoreCache:
return NetworkResponse(response, endpoint)

destinationPath.unlink()

with destinationPath.open("wb") as downloadedFile:
for chunk in response.iter_content(chunk_size = 8192):
downloadedFile.write(chunk)

return NetworkResponse(response, endpoint)

def upload(
self,
endpoint: str,
Expand Down