-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
util.py
582 lines (490 loc) · 17.8 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
"""
Utilities for use by the various scripts.
"""
import io
import multiprocessing
import os
import pathlib
import re
import shlex
import shutil
import subprocess
import tarfile
import traceback
import urllib.request
from config import CONFIG as cfg
def warn(message):
"""
Prints a warning message
"""
print(f'\x1b[33;1m[!]\x1b[m {message}')
def ensure_root():
"""
Terminates the program if not run as root
"""
if os.getuid() != 0:
print("\x1b[31;1mplease run as root\x1b[m")
raise SystemExit(1)
def get_consent():
"""
Prompts the user to confirm a - presumably controversial - action
No means no*!
*disclaimer: no actually has no defined meaning here
"""
while True:
res = input('type "yes", "skip", or "abort": ')
if res == "yes":
return True
if res == "skip":
return False
if res == "abort":
print("aborting")
raise SystemExit(1)
def command(*cmd, silent=False, nspawn=None, shell=False, confirm=False,
stdin=None, capture_stdout=False, env=None, get_retval=False,
cwd='.'):
"""
Prints and runs the given command.
@param silent
defaults to False. If True, the command is not printed.
@param nspawn
defaults to None. If not None, the command is run inside that nspawn
container.
@param shell
defaults to False. If True, the command is expected to be exactly
one string, and will be run through 'sh -c'
@param confirm
defaults to False. If True, the user must confirm before the command
is run.
@param stdin
defaults to None. If not None, this is fed to the command as stdin
(must be str or bytes)
@param capture_stdout
defaults to False. If True, the command stdout is captured and
returned as bytes.
@param env
environment variables to use.
@param get_retval
defaults to False. If True, the command return code is returned,
as an integer, instead of verifying that the command has succeeded.
@param cwd
defaults to None. If given, the command is run in this directory.
"""
if capture_stdout and get_retval:
raise RuntimeError("cannot capture stdout AND get retval")
if shell:
if len(cmd) != 1:
raise RuntimeError(
"expected exactly one command string for shell=True, "
f"but got {cmd!r}"
)
cmd = ['sh', '-c'] + list(cmd)
if nspawn is not None:
if stdin is not None or capture_stdout:
cmd = ['systemd-nspawn', '-D', nspawn, '--pipe'] + list(cmd)
else:
cmd = ['systemd-nspawn', '-D', nspawn] + list(cmd)
if not silent:
if confirm:
print("\x1b[33;1mwill run:\x1b[m", end=" ")
else:
print("\x1b[32;1m$\x1b[m", end=" ")
print(" ".join(shlex.quote(part) for part in cmd))
if confirm:
if not get_consent():
return
elif confirm:
raise RuntimeError("confirm=True but silent=True")
proc = subprocess.Popen(cmd,
stdin=None if stdin is None else subprocess.PIPE,
stdout=subprocess.PIPE if capture_stdout else None,
cwd=cwd
)
if isinstance(stdin, str):
stdin = stdin.encode('utf-8')
stdout, _ = proc.communicate(input=stdin)
if get_retval:
return proc.returncode
else:
if proc.returncode != 0:
raise RuntimeError(f"invocation failed: {cmd!r}")
return stdout
def initrd_write(path, *lines, content=None, append=False):
"""
Writes a file in the initrd.
@param path
must be an absolute string (starting with '/').
@param lines
each individual line, not ending in \\n, is encoded (if needed),
padded with \\n, then
@param content
can only be given if no lines are given.
if bytes, it is written as-is.
if str, it is encoded as utf-8, then written.
"""
if path[:1] != '/':
raise RuntimeError(f"path is not absolute: {path!r}")
if content is not None:
if lines:
raise RuntimeError("'content' and 'lines' cannot both be given")
if isinstance(content, str):
content = content.encode('utf-8')
else:
def prepare_line(line):
""" encodes to bytes if needed, and appends \\n if needed. """
if isinstance(line, str):
line = line.encode('utf-8')
if not line.endswith(b'\n'):
line = line + b'\n'
return line
content = b''.join(prepare_line(line) for line in lines)
if append:
mode = 'ab'
else:
mode = 'wb'
print('\x1b[33;1m' + mode + '\x1b[m ' + path)
with open(cfg.path.initrd + path, mode) as fileobj:
fileobj.write(content)
def mount_tmpfs(dirname):
"""
Creates a directory (if not exists),
and mounts a tmpfs there (if not mounted).
To ensure that you get a fresh tmpfs,
call `umount()` first.
"""
if not os.path.exists(dirname):
os.makedirs(dirname)
if not os.path.ismount(dirname):
command(
'mount',
'-t',
'tmpfs',
'tmpfs',
dirname
)
def umount(dirname):
"""
Unmounts the filesystem at the dirname (if mounted).
"""
while os.path.ismount(dirname):
command('umount', dirname)
def list_files_in_packages(packages, nspawn):
"""
lists all files that are installed by the given debian package.
"""
if not packages:
return
queue = multiprocessing.Queue()
def subprocess():
try:
os.chroot(nspawn)
os.chdir('/')
for package in packages:
listing_path = f'/var/lib/dpkg/info/{package}.list'
with open(listing_path, 'rb') as listing:
for filename in listing:
if filename[-1:] != b'\n' or filename[:1] != b'/':
raise RuntimeError(f'bad line {filename!r}')
filename = filename[:-1]
if not os.path.isdir(filename):
queue.put(os.path.realpath(filename)[1:])
except BaseException:
traceback.print_exc()
finally:
queue.put(StopIteration)
proc = multiprocessing.Process(target=subprocess, args=())
proc.start()
while True:
entry = queue.get()
if entry is StopIteration:
break
yield entry
proc.join()
class FileEditor:
"""
allows loading, modifying and writing text files.
nicely asks for confirmation before writing.
It also creates the parent folder, if required.
Constructor arguments:
@param write_to
the filename to where the data will be written
"""
def __init__(self, write_to, executable=False):
self.data = None
self.write_to = write_to
self.executable = executable
def load(self):
""" loads data from the output file """
self.load_from(self.write_to)
def load_from(self, filename):
"""
loads data from some file
"""
with open(filename, 'rb') as fileobj:
self.data = fileobj.read()
def write(self):
"""
Writes the data, after showing the diff that will be written and
asking for permission.
"""
parentdir = os.path.dirname(self.write_to)
if not os.path.exists(parentdir):
warn(f'creating directory {parentdir}')
if not get_consent():
return False
os.makedirs(parentdir)
if os.path.exists(self.write_to):
# the file will be overwritten
proc = subprocess.Popen(
['diff', '--color', '-u', self.write_to, '-'],
stdin=subprocess.PIPE
)
proc.communicate(self.data)
if proc.returncode == 0:
# nothing to do
print(f'{self.write_to!r}: unchanged')
self.ensure_x_flag(need_consent=True)
return False
backup_to = self.write_to + '-stiefelbup'
warn(f'{self.write_to!r}: overwriting; backing up old version to {backup_to!r}')
if os.path.exists(backup_to):
warn('existing backup will be overwritten')
else:
# the file will be newly created
warn(f'creating file {self.write_to!r}')
backup_to = None
if not get_consent():
return False
if backup_to is not None:
os.rename(self.write_to, backup_to)
with open(self.write_to, 'wb') as fileobj:
fileobj.write(self.data)
self.ensure_x_flag(need_consent=False)
return True
def ensure_x_flag(self, need_consent):
if self.executable != os.access(self.write_to, os.X_OK):
if self.executable:
if need_consent:
warn(f'chmod +x {self.write_to!r}')
if not get_consent():
return
command('chmod', '+x', self.write_to)
else:
if need_consent:
warn(f'chmod -x {self.write_to!r}')
if not get_consent():
return
command('chmod', '-x', self.write_to)
def edit_bash_list(self, varname, entries):
"""
edits a bash list by modifying entries as specified.
entries is a dictionary of {entry: action} where entry
is any string that could be found in a bash list, while
action is one of:
'at-end': add at the end of the list
'before-X': add before the entry 'X'
'remove': remove this entry
e.g. for
self.data='HOOKS=(a b c d)'
varname='HOOKS'
entries={'f': 'at-end', 'e': 'before-d', 'a': 'remove'}
will result in
self.data='HOOKS=(b c e d f)
"""
# load the bash list
match = re.search(fr'\n{varname}=\((.*?)\)'.encode(), self.data)
if match is None:
raise RuntimeError(
f'{self.write_to!r}: '
f'cannot find {varname!r} definition'
)
start, end = match.span(1)
current = shlex.split(self.data[start:end].decode())
for entry, action in entries.items():
if action == 'remove':
try:
current.remove(entry)
except ValueError:
pass
elif action == 'at-end':
if entry not in current:
current.append(entry)
elif action.startswith('before-'):
try:
current.insert(current.index(action[7:]), entry)
except ValueError:
raise RuntimeError(
f'{self.write_to!r}: '
f'cannot find {action[7:]!r} in {varname!r}'
) from None
else:
raise RuntimeError(
f'{self.write_to!r}: '
f'unknown action {action!r}'
)
def quote(entry):
"""
we can't use shlex.quote because it would over-quote
backtick expressions like `which ifrename`.
this is not perfect but it should be good enough.
"""
if ' ' in entry:
return f'"{entry}"'
else:
return entry
section = ' '.join(quote(entry) for entry in current).encode()
self.data = self.data[:start] + section + self.data[end:]
def add_or_edit_var(self, varname, value, add_prefix=''):
"""
edits or creates a bash variable assignment such as foo="asdf"
"""
match = re.search(fr'\n{varname}="(.*?)"'.encode(), self.data)
if match is None:
# doesn't exist yet
self.data += f'{add_prefix}{varname}="{value}"\n'.encode()
else:
# exists already; just swap out the matched group
start, end = match.span(1)
self.data = self.data[:start] + value.encode() + self.data[end:]
def set_data(self, data):
"""
sets content directly (instead of loading it from a file)
"""
self.data = data
def install_folder(source, dest="/"):
"""
install the folder at 'source' to 'dest'.
symlinks are not supported.
consent is acquired for each step.
"""
if not os.path.isdir(dest):
warn(f'creating directory {dest}')
if get_consent():
os.makedirs(dest, exist_ok=True)
else:
print(f'skipping install of {source!r} to {dest!r}')
return
for entry in os.listdir(source):
source_path = os.path.join(source, entry)
dest_path = os.path.join(dest, entry)
if os.path.isdir(source_path):
# recurse
install_folder(source_path, dest_path)
else:
# use the FileEditor to install the file, this will take care of
# printing diffs and asking for permission and so on
editor = FileEditor(dest_path, os.access(source_path, os.X_OK))
editor.load_from(source_path)
editor.write()
def ensure_unit_enabled(name):
"""
Checks that the given systemd unit is enabled.
If not, enables it (after asking for confirmation).
"""
if command('systemctl', 'is-enabled', name, get_retval=True) != 0:
command('systemctl', 'enable', name, confirm=True)
def restart_unit(name):
"""
Restarts the given systemd unit.
Also works for units that are not yet running.
"""
command('systemctl', 'restart', name, confirm=True)
def download(url, timeout=20):
"""
Downloads data from this URL, returns a bytes object
"""
print(f"downloading {url}")
return urllib.request.urlopen(url, timeout=timeout).read()
def download_tar(url, target_dir, timeout=20):
"""
Downlaods tar file from the URL, and extracts it to target_dir.
Will only accept tar files that have a subfolder that contains all entries.
The files from that subfolder are extracted directly into target_dir.
Returns the name of that subfolder.
"""
tar_blob = download(url, timeout)
tar_fileobj = io.BytesIO(tar_blob)
tar = tarfile.open(fileobj=tar_fileobj)
prefix = tar.getnames()[0]
for name in tar.getnames():
normpath = os.path.normpath(name)
if normpath.startswith('/') or normpath.startswith('..'):
raise RuntimeError("bad TAR file (has files outside '.')")
if os.path.relpath(normpath, prefix).startswith('..'):
raise RuntimeError("bad TAR file (has no common prefix)")
# perform extraction
os.makedirs(target_dir, exist_ok=True)
for entry in tar:
target = os.path.join(target_dir, os.path.relpath(entry.name, prefix))
print(f'tar: extracting {os.path.normpath(target)}')
if entry.isdir():
os.makedirs(target, exist_ok=True)
elif entry.isfile():
with tar.extractfile(entry) as fileobj:
blob = fileobj.read()
with open(target, 'wb') as fileobj:
fileobj.write(blob)
os.chmod(target, entry.mode)
else:
raise RuntimeError("unsupported entry type")
return prefix
def install_binary(rootpath, binarypath):
"""
install the given binary into the rootpath
under /bin/binaryname,
including all dependent libraries
"""
if not isinstance(rootpath, pathlib.Path):
rootpath = pathlib.Path(rootpath)
copy_symlink_chain(rootpath, binarypath)
deps = command('ldd', binarypath, capture_stdout=True).decode()
for dep in deps.split("\n"):
dep = dep.strip()
if not dep:
continue
if dep.startswith("linux-vdso.so"):
# no need to store the vdso :)
continue
if '=>' in dep:
# '\tlibreadline.so.8 => /lib64/libreadline.so.8 (0x00007f83fa5ab000)\n'
dep_path = dep.split("=>")[1].split()[0]
else:
dep_path = dep.split()[0]
copy_symlink_chain(rootpath, dep_path)
def copy_symlink_chain(rootpath, file_path):
"""
copy a file from the current root to the given new rootpath.
copy all symlinks on the way until we reach a real file.
"""
file_path = pathlib.Path(file_path)
file_path_dest = rootpath / file_path.relative_to('/')
while True:
if not (file_path.exists() or file_path.is_symlink()):
raise FileNotFoundError(str(file_path))
if file_path_dest.exists() or file_path_dest.is_symlink():
file_path_dest.unlink()
file_path_dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(file_path, file_path_dest, follow_symlinks=False)
if file_path.is_symlink():
# if is symlink, try again with its destination
file_path_ln = file_path.parent / pathlib.Path(os.readlink(file_path))
file_path_dest = rootpath / file_path_ln.relative_to('/')
file_path = file_path_ln
elif file_path.exists():
# real file, so we're done
break
else:
raise Exception(f"wtf {file_path} no symlink and doesn't exist")
# else, copy the file and that's it
def mac_to_v6ll(mac):
"""
convert a mac address to a ipv6 link local address
"""
mac_nr = int(mac.replace(':', ''), 16)
# cut out the v6ll bytes
# XOR the MSB with 0x02 to invert the universal/local mac bit
high1 = mac_nr >> 32 & 0xffff ^ 0x0200
high0 = mac_nr >> 24 & 0xff
low1 = mac_nr & 0xffff
low0 = mac_nr >> 16 & 0xff
return f'fe80::{high1:04x}:{high0:02x}ff:fe{low0:02x}:{low1:04x}'