-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathcheck_substitutes.py
executable file
·162 lines (146 loc) · 5.33 KB
/
check_substitutes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
#
# SPDX-FileCopyrightText: 2024 fosslinux <[email protected]>
"""Check that substituted files are the same."""
import bz2
import filecmp
import gzip
import itertools
import lzma
import shutil
import tarfile
import tempfile
import sys
import os
from lib.generator import Generator
# Get a temporary directory to work in
working = tempfile.mkdtemp()
# Colour constants
# pylint: disable=too-few-public-methods
class Colors():
"""ANSI Color Codes"""
GREY = "\033[90m"
RED = "\033[91m"
GREEN = "\033[92m"
ORANGE = "\033[91m\033[93m"
YELLOW = "\033[93m"
END = "\033[0m"
def traverse_path(base_root):
"""Takes a path and returns a set of all directories and files in that path."""
all_dirs = set()
all_files = set()
for root, directories, files in os.walk(base_root, topdown=True):
for d in directories:
all_dirs.add(os.path.join(root, d).lstrip(base_root))
for f in files:
all_files.add(os.path.join(root, f).lstrip(base_root))
return (all_dirs, all_files)
class Distfile():
"""Represents one distfile and operations performed on it."""
def __init__(self, i, url):
self.i = i
self.url = url
self.out_file = f"{i}-{os.path.basename(url)}"
self.filepath = ""
def download(self):
"""Downloads the distfile."""
Generator.download_file(self.url, working, self.out_file, silent=True)
self.filepath = os.path.join(working, self.out_file)
def decompress(self):
"""Decompresses the distfile."""
compression = self.out_file.rsplit('.', maxsplit=1)[-1]
decompress_func = {
"gz": gzip.open,
"tgz": gzip.open,
"bz2": bz2.open,
"xz": lzma.open,
"lzma": lzma.open
}
if compression not in decompress_func:
# No decompression needed
return
# Remove the compression extension
new_path = '.'.join(self.filepath.split('.')[:-1])
# tgz -> .tar
if compression == "tgz":
new_path = f"{new_path}.tar"
# Move the decompressed binary stream to a new file
with decompress_func[compression](self.filepath, 'rb') as fin:
with open(new_path, 'wb') as fout:
shutil.copyfileobj(fin, fout)
self.filepath = new_path
def extract(self):
"""Extracts the distfile."""
# Sanity check
if not tarfile.is_tarfile(self.filepath):
return
out_dir = os.path.join(working, f"{self.i}")
os.mkdir(out_dir)
with tarfile.open(self.filepath, 'r') as f:
f.extractall(path=out_dir)
self.filepath = out_dir
# It makes more sense here to label them d1 and d2 rather than have one be self.
# pylint: disable=no-self-argument
def compare(d1, d2):
"""Compares the distfile to another distfile."""
if not os.path.isdir(d1.filepath):
# Compare files
return filecmp.cmp(d1.filepath, d2.filepath, shallow=False)
if not os.path.isdir(d2.filepath):
# Then, d2 is a file and d1 is a directory
return False
# Otherwise it's two directories
dirnames1, filenames1 = traverse_path(d1.filepath)
dirnames2, filenames2 = traverse_path(d2.filepath)
if dirnames1 != dirnames2:
return False
if filenames1 != filenames2:
return False
return filecmp.cmpfiles(d1.filepath, d2.filepath, filenames1, shallow=False)
def check(*args):
"""Check if a list of distfiles are equivalent."""
notequiv = []
# Find all pairs that are not equivalent
for pair in itertools.combinations(args, 2):
if pair[0].compare(pair[1]):
print(f"{Colors.GREY}DEBUG: {pair[0].url} is equivalent to {pair[1].url}{Colors.END}")
else:
notequiv.append(pair)
# Decompress all, and check again
for d in {y for x in notequiv for y in x}:
d.decompress()
for pair in notequiv.copy():
if pair[0].compare(pair[1]):
# pylint: disable=line-too-long
print(f"{Colors.YELLOW}NOTE: {pair[0].url} is equivalent to {pair[1].url} when decompressed{Colors.END}")
notequiv.remove(pair)
# Extract all, and check again
for d in {y for x in notequiv for y in x}:
d.extract()
has_error = False
for pair in notequiv:
if pair[0].compare(pair[1]):
# pylint: disable=line-too-long
print(f"{Colors.ORANGE}WARN: {pair[0].url} is equivalent to {pair[1].url} when extracted{Colors.END}")
else:
has_error = True
# pylint: disable=line-too-long
print(f"{Colors.RED}ERROR: {pair[0].url} is not equivalent to {pair[1].url}!{Colors.END}")
return has_error
def main():
"""Main function."""
has_error = False
with open("substitutes", 'r', encoding="utf-8") as f:
for line in f.readlines():
urls = line.strip().split(' ')
distfiles = []
for i, url in enumerate(urls):
distfiles.append(Distfile(i, url))
for distfile in distfiles:
distfile.download()
if check(*distfiles):
has_error = True
sys.exit(has_error)
if __name__ == "__main__":
main()