forked from gardener/cc-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tarutil.py
106 lines (90 loc) · 3.88 KB
/
tarutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import logging
import tarfile
import typing
logger = logging.getLogger(__name__)
class _FilelikeProxy:
def __init__(self, generator):
'''
a fake filelike-object that will mimic the required behaviour (read) "good enough" for
usage w/ tarfile.open (in stream-mode)
'''
self.generator = generator
def read(self, size: int=-1):
try:
return next(self.generator)
except StopIteration:
return b''
def filtered_tarfile_generator(
src_tf: tarfile.TarFile,
filter_func: typing.Callable[[tarfile.TarInfo], bool]=lambda tarinfo: True,
chunk_size=tarfile.BLOCKSIZE,
chunk_callback: typing.Callable[[bytes], None]=None,
) -> typing.Generator[bytes, None, None]:
'''
returns a generator yielding a tarfile that will by default contain the same members as
the passed tarfile (src_tf). If a filter-function is given, any entries (TarInfo objects)
for which this function will return a "falsy" value will be removed from the resulting
tarfile stream (which is the actual value-add from this function).
This function is particularly useful for streaming. Note that `_FilelikeProxy` can be used
to wrap a generator yielding an (input-) tarfile-stream.
In combination, this can be used to - in a streaming manner - retrieve a tarfile-stream, e.g.
using a http-request (e.g. with requests), and upload the filtered tarfile-stream (e.g. again
with a http-request send e.g. with requests).
'''
offset = 0
for member in src_tf:
if not filter_func(member):
logger.debug(f'filtered out {member=}')
continue
# need to create a cp (to patch offsets w/o modifying original members, which would
# break accessing file-contents)
member_raw = member.tobuf()
if len(member_raw) > tarfile.BLOCKSIZE:
member_info = member.get_info()
member_info['offset'] = offset
member_info['offset_data'] = offset + len(member_raw)
member_buf = member.create_pax_header(
info=member.get_info(),
encoding=tarfile.ENCODING
)
else:
member_cp = tarfile.TarInfo.frombuf(
member_raw,
encoding=tarfile.ENCODING,
errors='surrogateescape',
)
member_cp.offset = offset
member_cp.offset_data = offset + len(member_raw)
member_buf = member_cp.tobuf()
if chunk_callback:
chunk_callback(member_buf)
yield member_buf
offset += tarfile.BLOCKSIZE
if member.size > 0:
if member.isfile():
fobj = src_tf.extractfile(member=member)
octets_sent = 0
octets_left = member.size
while octets_left and (chunk := fobj.read(min(octets_left, chunk_size))):
offset += (leng := len(chunk))
octets_sent += leng
octets_left -= leng
if chunk_callback:
chunk_callback(chunk)
yield chunk
# pad to full 512-blocks if member is not "aligned"
if member.size % tarfile.BLOCKSIZE == 0:
continue
if (missing := tarfile.BLOCKSIZE - (octets_sent % tarfile.BLOCKSIZE)):
padding = tarfile.NUL * missing
if chunk_callback:
chunk_callback(padding)
yield padding
offset += missing
else:
# TODO: handle symlinks (will not work for streaming-mode)
raise NotImplementedError(member.type)
final_padding = 2 * tarfile.BLOCKSIZE * tarfile.NUL # tarfiles should end w/ two empty blocks
yield final_padding
if chunk_callback:
chunk_callback(final_padding)