Skip to content

Commit

Permalink
Merge pull request #189 from EGA-archive/EE-2650
Browse files Browse the repository at this point in the history
EE-2650 Retry downloading the slice file if there is slice temp file
  • Loading branch information
Alegria Aclan authored Feb 16, 2023
2 parents d0a9575 + f5259c4 commit 6db328b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 43 deletions.
23 changes: 10 additions & 13 deletions pyega3/libs/data_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,37 +203,34 @@ def download_file_slice(self, file_name, start_pos, length, options=None, pbar=N

self.temporary_files.add(file_name)

existing_size = os.stat(final_file_name).st_size if os.path.exists(final_file_name) else 0
if existing_size > length:
os.remove(final_file_name)
existing_size = 0

if pbar:
pbar.update(existing_size)

if existing_size == length:
if os.path.exists(final_file_name):
existing_size = os.stat(final_file_name).st_size
pbar and pbar.update(existing_size)
return final_file_name

if os.path.exists(file_name):
os.remove(file_name)

try:
range_start = start_pos + existing_size
range_start = start_pos
range_end = start_pos + length - 1
extra_headers = {
'Range': f'bytes={range_start}-{range_end}'
}

with self.data_client.get_stream(path, extra_headers) as r:
with open(file_name, 'ba') as file_out:
self.temporary_files.add(file_name)
for chunk in r.iter_content(DOWNLOAD_FILE_MEMORY_BUFFER_SIZE):
file_out.write(chunk)
if pbar:
pbar.update(len(chunk))
pbar and pbar.update(len(chunk))

total_received = os.path.getsize(file_name)

if total_received != length:
raise Exception(f"Slice error: received={total_received}, requested={length}, file='{file_name}'")

except Exception:
except Exception as e:
if os.path.exists(file_name):
os.remove(file_name)
raise
Expand Down
72 changes: 42 additions & 30 deletions tests/unit/test_download_file_slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,21 @@ def slice_file(random_binary_file):

def test_download_file_slice_downloads_correct_bytes_to_file(mock_data_server, slice_file, mock_data_client):
mock_data_server.file_content[slice_file.id] = slice_file.binary
written_bytes = 0

def mock_write(buf):
nonlocal written_bytes
buf_len = len(buf)
expected_buf = slice_file.binary[slice_file.start + written_bytes:slice_file.start + written_bytes + buf_len]
assert expected_buf == buf
written_bytes += buf_len

file_name = common.rand_str()
file_name_for_slice = file_name + '-from-' + str(slice_file.start) + '-len-' + str(
slice_temp_file_name = file_name + '-from-' + str(slice_file.start) + '-len-' + str(
slice_file.length) + '.slice.tmp'

file = DataFile(mock_data_client, slice_file.id)

m_open = mock.mock_open()
download_mock = SliceFileDownloadMock(slice_file)
with mock.patch("builtins.open", m_open, create=True):
with mock.patch("os.path.getsize", lambda path: written_bytes if path == file_name_for_slice else 0):
with mock.patch("os.path.getsize",
lambda path: download_mock.written_bytes if path == slice_temp_file_name else 0):
with mock.patch("os.rename"):
m_open().write.side_effect = mock_write
m_open().write.side_effect = download_mock.write
file.download_file_slice(file_name, slice_file.start, slice_file.length)
assert slice_file.length == written_bytes
assert slice_file.length == download_mock.written_bytes

m_open.assert_called_with(file_name_for_slice, 'ba')
m_open.assert_called_with(slice_temp_file_name, 'ba')


def test_error_when_bad_token(mock_data_server, mock_data_client):
Expand Down Expand Up @@ -124,35 +115,56 @@ def test_return_slice_file_when_existing(mock_data_server, mock_data_client, sli
mock_stat.st_size = slice_file.length

with patch.object(mock_data_client, 'get_stream', wraps=mock_data_client.get_stream) as get_stream_mock, \
mock.patch("os.path.exists", lambda path: True if path == slice_file.file_name else False), \
mock.patch("os.path.getsize", lambda path: slice_file.length), \
mock.patch("os.stat", lambda path: mock_stat):
mock.patch("os.path.exists", lambda path: True if path == slice_file.file_name else False), \
mock.patch("os.path.getsize", lambda path: slice_file.length), \
mock.patch("os.stat", lambda path: mock_stat):
# When: the slice file is downloaded
filename = file.download_file_slice(slice_file.original_file_name, slice_file.start, slice_file.length)
# Then: the existing slice file with same length is reused and data is not re-fetched
assert filename == slice_file.file_name
get_stream_mock.assert_not_called()


def test_remove_existing_slice_file_when_it_exceeds_slice_length(mock_data_server, mock_data_client, slice_file):
# Given: a slice file existing in tmp directory whose size exceeds the expected slice length
def test_remove_existing_slice_file_and_download_that_slice(mock_data_server, mock_data_client, slice_file):
# Given: a slice.tmp file existing in tmp directory
mock_data_server.file_content[slice_file.id] = slice_file.binary
file = DataFile(mock_data_client, slice_file.id)

mock_stat = Mock()
mock_stat.st_size = slice_file.length + 1

with mock.patch("os.remove") as remove_file_mock, \
mock.patch("os.path.exists", lambda path: True if path == slice_file.file_name else False), \
mock.patch("os.path.getsize", lambda path: slice_file.length), \
mock.patch("os.stat", lambda path: mock_stat):
m_open = mock.mock_open()
mock_file_exists = {slice_file.file_name: False}
slice_temp_file_name = slice_file.file_name + '.tmp'
mock_file_exists[slice_temp_file_name] = True
download_mock = SliceFileDownloadMock(slice_file)
with mock.patch("builtins.open", m_open, create=True), \
mock.patch("os.remove") as remove_file_mock, \
mock.patch("os.path.exists", lambda path: mock_file_exists.get(path)), \
mock.patch("os.path.getsize",
lambda path: download_mock.written_bytes if path == slice_temp_file_name else 0), \
mock.patch("os.rename"):
m_open().write.side_effect = download_mock.write
# When: the slice file is downloaded
output_file = file.download_file_slice(slice_file.original_file_name, slice_file.start, slice_file.length)
# Then: the existing slice file is deleted
# Then: delete the existing .slice.tmp file and download that slice
assert slice_file.length == download_mock.written_bytes
assert output_file == slice_file.file_name
remove_file_mock.assert_called_once_with(slice_file.file_name)
remove_file_mock.assert_called_once_with(slice_temp_file_name)
m_open.assert_called_with(slice_temp_file_name, 'ba')


def teardown_module():
for f in glob.glob(f'{os.getcwd()}/*.slice'):
os.remove(f)


class SliceFileDownloadMock:
def __init__(self, slice_file):
self.written_bytes = 0
self.slice_file = slice_file

def write(self, buf):
buf_len = len(buf)
start = self.slice_file.start + self.written_bytes
end = self.slice_file.start + self.written_bytes + buf_len
expected_buf = self.slice_file.binary[start:end]
assert expected_buf == buf
self.written_bytes += buf_len

0 comments on commit 6db328b

Please sign in to comment.