Skip to content

Commit

Permalink
Merge pull request #19 from VukManojlovic/CTX-4070
Browse files Browse the repository at this point in the history
CTX-4070: Streaming download
  • Loading branch information
igorperic17 authored Jul 17, 2023
2 parents 6269a4c + fcefbdc commit 43c9eff
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 8 deletions.
8 changes: 3 additions & 5 deletions coretex/coretex/sample/network_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,10 @@ def download(self, ignoreCache: bool = False) -> bool:
bool -> False if response is failed, True otherwise
"""

if os.path.exists(self.zipPath) and not ignoreCache:
return True

response = networkManager.genericDownload(
response = networkManager.sampleDownload(
endpoint = f"{self.__class__._endpoint()}/export?id={self.id}",
destination = self.zipPath
destination = self.zipPath,
ignoreCache = ignoreCache
)

return not response.hasFailed()
Expand Down
62 changes: 61 additions & 1 deletion coretex/networking/network_manager_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from typing import Optional, Any, Dict, List
from typing import Optional, Any, Dict, List, Union
from pathlib import Path
from abc import ABC, abstractmethod
from importlib.metadata import version as getLibraryVersion
Expand Down Expand Up @@ -246,6 +246,66 @@ def genericDownload(

return response

def sampleDownload(
self,
endpoint: str,
destination: Union[str, Path],
ignoreCache: bool,
parameters: Optional[Dict[str, Any]] = None,
retryCount: int = 0
) -> NetworkResponse:
"""
Downloads file to the given destination by chunks
Parameters
----------
endpoint : str
API endpoint
destination : Union[str, Path]
path to save file
ignoreCache : bool
whether to overwrite local cache
parameters : Optional[dict[str, Any]]
request parameters (not required)
retryCount : int
number of function calls if request has failed, used
for internal retry mechanism
Returns
-------
NetworkResponse as response content to request
Example
-------
>>> from coretex import networkManager
\b
>>> response = networkManager.sampleDownload(
endpoint = "dummyObject/download",
destination = "path/to/destination/folder"
)
>>> if response.hasFailed():
print("Failed to download the file")
"""

headers = self._requestHeader()

if parameters is None:
parameters = {}

networkResponse = self._requestManager.streamingDownload(
endpoint,
destination,
ignoreCache,
headers,
parameters
)

if self.shouldRetry(retryCount, networkResponse):
print(">> [Coretex] Retry count: {0}".format(retryCount))
return self.sampleDownload(endpoint, destination, ignoreCache, parameters, retryCount + 1)

return networkResponse

def genericUpload(
self,
endpoint: str,
Expand Down
3 changes: 2 additions & 1 deletion coretex/networking/network_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def __init__(self, response: Response, endpoint: str):

try:
self.json = response.json()
except ValueError:
except (ValueError, RuntimeError):
# RuntimeError is present here to avoid the content_consumed error
self.json = {}

if not response.ok:
Expand Down
59 changes: 58 additions & 1 deletion coretex/networking/requests_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from typing import Final, Any, Optional, Dict, List
from typing import Final, Any, Optional, Dict, List, Union
from contextlib import ExitStack
from pathlib import Path

import logging

Expand Down Expand Up @@ -205,6 +206,62 @@ def post(

raise RequestFailedError

def streamingDownload(
self,
endpoint: str,
destinationPath: Union[str, Path],
ignoreCache: bool,
headers: Dict[str, str],
parameters: Dict[str, Any]
) -> NetworkResponse:
"""
Downloads a file from endpoint to destinationPath in chunks of 8192 bytes
Parameters
----------
endpoint : str
API endpoint
destination : Union[str, Path]
path to save file
ignoreCache : bool
whether to overwrite local cache
headers : Any
headers for get
parameters : int
json for get
Returns
-------
NetworkResponse -> NetworkResponse object as response content to request
"""

with self.__session.get(
self.__url(endpoint),
stream = True,
headers = headers,
json = parameters
) as response:

response.raise_for_status()

if isinstance(destinationPath, str):
destinationPath = Path(destinationPath)

if destinationPath.is_dir():
raise RuntimeError(">> [Coretex] Destination is a directory not a file")

if destinationPath.exists():
if int(response.headers["Content-Length"]) == destinationPath.stat().st_size and not ignoreCache:
return NetworkResponse(response, endpoint)

destinationPath.unlink()

with destinationPath.open("wb") as downloadedFile:
for chunk in response.iter_content(chunk_size = 8192):
downloadedFile.write(chunk)

return NetworkResponse(response, endpoint)

def upload(
self,
endpoint: str,
Expand Down

0 comments on commit 43c9eff

Please sign in to comment.