Skip to content

Commit e3cfde3

Browse files
committed
Add a type of DownloadFile that can be split
This allows Kolibri to create content nodes from arbitrarily large source files for certain types of content.
1 parent a74e193 commit e3cfde3

File tree

3 files changed

+189
-24
lines changed

3 files changed

+189
-24
lines changed

ricecooker/classes/files.py

Lines changed: 106 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from ..exceptions import UnknownFileTypeError
3030
from ricecooker.utils.encodings import get_base64_encoding
3131
from ricecooker.utils.encodings import write_base64_to_file
32+
from ricecooker.utils.file_slice import FileSlice
3233
from ricecooker.utils.images import create_image_from_epub
3334
from ricecooker.utils.images import create_image_from_pdf_page
3435
from ricecooker.utils.images import create_image_from_zip
@@ -114,13 +115,19 @@ def generate_key(action, path_or_id, settings=None, default=" (default)"):
114115
def get_cache_filename(key):
115116
cache_file = FILECACHE.get(key)
116117
if cache_file:
117-
cache_file = cache_file.decode("utf-8")
118+
cache_file = cache_file.decode("utf-8").split(",")
118119
# if the file was somehow deleted, make sure we don't return it.
119-
if not os.path.exists(config.get_storage_path(cache_file)):
120+
if not all(map(cache_file_exists, cache_file)):
120121
cache_file = None
122+
if cache_file and len(cache_file) == 1:
123+
cache_file = cache_file[0]
121124
return cache_file
122125

123126

127+
def cache_file_exists(cache_file):
128+
return os.path.exists(config.get_storage_path(cache_file))
129+
130+
124131
def cache_is_outdated(path, cache_file):
125132
outdated = True
126133
if not cache_file:
@@ -137,15 +144,18 @@ def cache_is_outdated(path, cache_file):
137144
return outdated
138145

139146

140-
def download(path, default_ext=None):
147+
def download(path, default_ext=None, slice_size=None):
141148
"""
142149
Download `path` and save to storage based on file extension derived from `path`.
143150
:param path: An URL or a local path
144151
:param default_ext: fallback ext for file when path does not end with .ext
145152
:return: filename derived from hash of file contents {md5hash(file)}.ext
146153
:rtype: sting (path of the form `{md5hash(file at path)}.ext`
147154
"""
148-
key = "DOWNLOAD:{}".format(path)
155+
if slice_size is not None:
156+
key = "DOWNLOAD:{}:{}".format(path, slice_size)
157+
else:
158+
key = "DOWNLOAD:{}".format(path)
149159

150160
cache_file = get_cache_filename(key)
151161
if not config.UPDATE and not cache_is_outdated(path, cache_file):
@@ -161,8 +171,12 @@ def download(path, default_ext=None):
161171
# Get extension of file or use `default_ext` if none found
162172
if not ext:
163173
ext = extract_path_ext(path, default_ext=default_ext)
164-
filename = copy_file_to_storage(tempf.name, ext=ext)
165-
FILECACHE.set(key, bytes(filename, "utf-8"))
174+
filename = copy_file_to_storage(tempf.name, ext=ext, slice_size=slice_size)
175+
if isinstance(filename, list):
176+
cache_value = ",".join(filename)
177+
else:
178+
cache_value = filename
179+
FILECACHE.set(key, bytes(cache_value, "utf-8"))
166180
config.LOGGER.info("\t--- Downloaded {}".format(filename))
167181
os.unlink(tempf.name)
168182

@@ -242,29 +256,54 @@ def write_path_to_filename(path, write_to_file):
242256

243257

244258
def get_hash(filepath):
245-
file_hash = hashlib.md5()
246259
with open(filepath, "rb") as fobj:
247-
for chunk in iter(lambda: fobj.read(2097152), b""):
248-
file_hash.update(chunk)
260+
return get_hash_from_fd(fobj)
261+
262+
263+
def get_hash_from_fd(fobj):
264+
file_hash = hashlib.md5()
265+
for chunk in iter(lambda: fobj.read(2097152), b""):
266+
file_hash.update(chunk)
249267
return file_hash.hexdigest()
250268

251269

252-
def copy_file_to_storage(srcfilename, ext=None):
270+
def copy_file_to_storage(src_file_name, ext=None, slice_size=None):
253271
"""
254-
Copy `srcfilename` (filepath) to destination.
272+
Copy `src_file_name` (filepath) to destination.
273+
If `slice_size` is set, the file will be broken into slices if it exceeds
274+
that size in bytes.
255275
:rtype: None
256276
"""
257277
if ext is None:
258-
ext = extract_path_ext(srcfilename)
278+
ext = extract_path_ext(src_file_name)
259279

260-
hash = get_hash(srcfilename)
261-
filename = "{}.{}".format(hash, ext)
262-
try:
263-
shutil.copy(srcfilename, config.get_storage_path(filename))
264-
except shutil.SameFileError:
265-
pass
280+
filenames = []
266281

267-
return filename
282+
with open(src_file_name, "rb") as src_fd:
283+
slices = list(FileSlice.from_file(src_fd, slice_size))
284+
285+
for slice in slices:
286+
slice_hash = get_hash_from_fd(slice)
287+
slice.seek(0)
288+
289+
out_file_name = "{}.{}".format(slice_hash, ext)
290+
storage_path = config.get_storage_path(out_file_name)
291+
292+
try:
293+
is_same_file = os.path.samefile(storage_path, src_fd.name)
294+
except FileNotFoundError:
295+
is_same_file = False
296+
297+
if not is_same_file:
298+
with open(storage_path, "wb") as out_fd:
299+
shutil.copyfileobj(slice, out_fd)
300+
301+
filenames.append(out_file_name)
302+
303+
if slice_size is None:
304+
return filenames[0]
305+
else:
306+
return filenames
268307

269308

270309
def compress_video_file(filename, ffmpeg_settings):
@@ -490,22 +529,68 @@ def validate(self):
490529

491530
def process_file(self):
492531
try:
493-
self.filename, self.ext = download(self.path, default_ext=self.default_ext)
532+
self.filename, self.ext = self._download()
494533
# don't validate for single-digit extension, or no extension
495534
if not self.ext:
496535
self.ext = extract_path_ext(self.path)
497-
return self.filename
498536
# Catch errors related to reading file path and handle silently
499537
except HTTP_CAUGHT_EXCEPTIONS as err:
500538
self.error = str(err)
501539
config.LOGGER.debug("Failed to download, error is: {}".format(err))
502540
config.FAILED_FILES.append(self)
503541
return None
504542

543+
return self.filename
544+
545+
def _download(self):
546+
return download(self.path, default_ext=self.default_ext)
547+
505548
def __str__(self):
506549
return self.path
507550

508551

552+
class SplittableDownloadFile(DownloadFile):
553+
"""
554+
A type of DownloadFile that will be split into pieces if the source file
555+
exceeds `slice_size`. This is separate from DownloadFile because not all
556+
content types support file splitting.
557+
"""
558+
559+
# 2 GB in bytes
560+
slice_size = 2000000000
561+
562+
def process_file(self):
563+
filenames = super(SplittableDownloadFile, self).process_file()
564+
565+
# TODO: When we call node.add_file, we are assuming files will be
566+
# added in sequence and that order will be maintained. Should we
567+
# add a mechanism where it adds split file order to extra_fields,
568+
# similar to SlideshowNode?
569+
570+
if isinstance(filenames, list):
571+
self.filename = filenames[0]
572+
for extra_filename in filenames[1:]:
573+
extra_file = self.create_split(extra_filename)
574+
self.node.add_file(extra_file)
575+
576+
def create_split(self, filename):
577+
download_file = SplittableDownloadFile(
578+
self.path,
579+
preset=self.get_preset(),
580+
language=self.language,
581+
default_ext=self.default_ext,
582+
source_url=self.source_url,
583+
)
584+
download_file.filename = filename
585+
download_file.ext = self.ext
586+
return download_file
587+
588+
def _download(self):
589+
return download(
590+
self.path, default_ext=self.default_ext, slice_size=self.slice_size
591+
)
592+
593+
509594
IMAGE_EXTENSIONS = {
510595
file_formats.PNG,
511596
file_formats.JPG,

ricecooker/classes/nodes.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,15 @@ def process_files(self):
181181
- (optionally) generate thumbnail file from the node's content
182182
Returns: content-hash based filenames of all the files for this node
183183
"""
184-
filenames = []
185-
for file in self.files:
186-
filenames.append(file.process_file())
184+
185+
# Items may be added to self.files during file.process_file(), so
186+
# we will work with a copy and generate our list of filenames
187+
# separately.
188+
189+
for file in list(self.files):
190+
file.process_file()
191+
192+
filenames = [file.filename for file in self.files]
187193

188194
# Auto-generation of thumbnails happens here if derive_thumbnail or config.THUMBNAILS is set
189195
if not self.has_thumbnail() and (config.THUMBNAILS or self.derive_thumbnail):

ricecooker/utils/file_slice.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
class FileSlice(object):
2+
"""
3+
File-like object that represents a slice of a file, starting from its
4+
current offset until `count`. Reads are always relative to the slice's
5+
start and end point.
6+
"""
7+
8+
def __init__(self, file, count=None):
9+
self.file = file
10+
self.start = file.tell()
11+
12+
file.seek(0, 2)
13+
self.file_size = file.tell()
14+
15+
if count is None:
16+
count = self.file_size
17+
18+
count = min(self.file_size - self.start, count)
19+
self.end = self.start + count
20+
21+
# Seek to the end of the file so the next FileSlice object will be
22+
# created from that point.
23+
file.seek(self.end)
24+
25+
self.__last_offset = self.start
26+
27+
@classmethod
28+
def from_file(cls, file, chunk_size):
29+
slice = cls(file, chunk_size)
30+
yield slice
31+
32+
while slice.end < slice.file_size:
33+
slice = cls(file, chunk_size)
34+
yield slice
35+
36+
@property
37+
def size(self):
38+
return self.end - self.start
39+
40+
def seek(self, offset, whence=0):
41+
if whence == 0:
42+
offset = self.start + offset
43+
elif whence == 1:
44+
offset = self.tell() + offset
45+
elif whence == 2:
46+
offset = self.end - offset
47+
self.file.seek(offset)
48+
self.__store_offset()
49+
50+
def __reset_offset(self):
51+
if self.file.tell() != self.__last_offset:
52+
self.file.seek(self.__last_offset)
53+
54+
def __store_offset(self):
55+
self.__last_offset = self.file.tell()
56+
57+
def tell(self):
58+
self.__reset_offset()
59+
return self.file.tell() - self.start
60+
61+
def read(self, count=None):
62+
self.__reset_offset()
63+
64+
if count is None:
65+
count = self.size
66+
67+
remaining = max(0, self.size - self.tell())
68+
69+
buffer = self.file.read(min(count, remaining))
70+
self.__store_offset()
71+
return buffer
72+
73+
def write(self, string):
74+
raise NotImplementedError()

0 commit comments

Comments
 (0)