|
3 | 3 | import json
|
4 | 4 | import logging
|
5 | 5 | import platform as _platform_module
|
| 6 | +import re |
6 | 7 | import sys
|
7 | 8 | from collections import defaultdict
|
8 | 9 | from os.path import abspath, dirname, join
|
9 | 10 | from pathlib import Path
|
| 11 | +from typing import Any, Generator |
| 12 | + |
| 13 | +from auditwheel.elfutils import filter_undefined_symbols, is_subdir |
10 | 14 |
|
11 | 15 | from ..libc import Libc, get_libc
|
12 | 16 | from ..musllinux import find_musl_libc, get_musl_version
|
13 |
| -from .external_references import lddtree_external_references |
14 |
| -from .versioned_symbols import versioned_symbols_policy |
15 | 17 |
|
16 | 18 | _HERE = Path(__file__).parent
|
| 19 | +LIBPYTHON_RE = re.compile(r"^libpython\d+\.\d+m?.so(.\d)*$") |
17 | 20 |
|
18 | 21 | logger = logging.getLogger(__name__)
|
19 | 22 |
|
@@ -207,6 +210,116 @@ def _load_policy_schema():
|
207 | 210 | return schema
|
208 | 211 |
|
209 | 212 |
|
| 213 | +def versioned_symbols_policy( |
| 214 | + wheel_policy: WheelPolicies, versioned_symbols: dict[str, set[str]] |
| 215 | +) -> int: |
| 216 | + def policy_is_satisfied( |
| 217 | + policy_name: str, policy_sym_vers: dict[str, set[str]] |
| 218 | + ) -> bool: |
| 219 | + policy_satisfied = True |
| 220 | + for name in set(required_vers) & set(policy_sym_vers): |
| 221 | + if not required_vers[name].issubset(policy_sym_vers[name]): |
| 222 | + for symbol in required_vers[name] - policy_sym_vers[name]: |
| 223 | + logger.debug( |
| 224 | + "Package requires %s, incompatible with " |
| 225 | + "policy %s which requires %s", |
| 226 | + symbol, |
| 227 | + policy_name, |
| 228 | + policy_sym_vers[name], |
| 229 | + ) |
| 230 | + policy_satisfied = False |
| 231 | + return policy_satisfied |
| 232 | + |
| 233 | + required_vers: dict[str, set[str]] = {} |
| 234 | + for symbols in versioned_symbols.values(): |
| 235 | + for symbol in symbols: |
| 236 | + sym_name, _, _ = symbol.partition("_") |
| 237 | + required_vers.setdefault(sym_name, set()).add(symbol) |
| 238 | + matching_policies: list[int] = [] |
| 239 | + for p in wheel_policy.policies: |
| 240 | + policy_sym_vers = { |
| 241 | + sym_name: {sym_name + "_" + version for version in versions} |
| 242 | + for sym_name, versions in p["symbol_versions"].items() |
| 243 | + } |
| 244 | + if policy_is_satisfied(p["name"], policy_sym_vers): |
| 245 | + matching_policies.append(p["priority"]) |
| 246 | + |
| 247 | + if len(matching_policies) == 0: |
| 248 | + # the base policy (generic linux) should always match |
| 249 | + raise RuntimeError("Internal error") |
| 250 | + |
| 251 | + return max(matching_policies) |
| 252 | + |
| 253 | + |
| 254 | +def lddtree_external_references( |
| 255 | + wheel_policies: list, lddtree: dict, wheel_path: str |
| 256 | +) -> dict: |
| 257 | + # XXX: Document the lddtree structure, or put it in something |
| 258 | + # more stable than a big nested dict |
| 259 | + def filter_libs(libs: set[str], whitelist: set[str]) -> Generator[str, None, None]: |
| 260 | + for lib in libs: |
| 261 | + if "ld-linux" in lib or lib in ["ld64.so.2", "ld64.so.1"]: |
| 262 | + # always exclude ELF dynamic linker/loader |
| 263 | + # 'ld64.so.2' on s390x |
| 264 | + # 'ld64.so.1' on ppc64le |
| 265 | + # 'ld-linux*' on other platforms |
| 266 | + continue |
| 267 | + if LIBPYTHON_RE.match(lib): |
| 268 | + # always exclude libpythonXY |
| 269 | + continue |
| 270 | + if lib in whitelist: |
| 271 | + # exclude any libs in the whitelist |
| 272 | + continue |
| 273 | + yield lib |
| 274 | + |
| 275 | + def get_req_external(libs: set[str], whitelist: set[str]) -> set[str]: |
| 276 | + # get all the required external libraries |
| 277 | + libs = libs.copy() |
| 278 | + reqs = set() |
| 279 | + while libs: |
| 280 | + lib = libs.pop() |
| 281 | + reqs.add(lib) |
| 282 | + for dep in filter_libs(lddtree["libs"][lib]["needed"], whitelist): |
| 283 | + if dep not in reqs: |
| 284 | + libs.add(dep) |
| 285 | + return reqs |
| 286 | + |
| 287 | + ret: dict[str, dict[str, Any]] = {} |
| 288 | + for p in wheel_policies: |
| 289 | + needed_external_libs: set[str] = set() |
| 290 | + blacklist = {} |
| 291 | + |
| 292 | + if not (p["name"] == "linux" and p["priority"] == 0): |
| 293 | + # special-case the generic linux platform here, because it |
| 294 | + # doesn't have a whitelist. or, you could say its |
| 295 | + # whitelist is the complete set of all libraries. so nothing |
| 296 | + # is considered "external" that needs to be copied in. |
| 297 | + whitelist = set(p["lib_whitelist"]) |
| 298 | + blacklist_libs = set(p["blacklist"].keys()) & set(lddtree["needed"]) |
| 299 | + blacklist = {k: p["blacklist"][k] for k in blacklist_libs} |
| 300 | + blacklist = filter_undefined_symbols(lddtree["realpath"], blacklist) |
| 301 | + needed_external_libs = get_req_external( |
| 302 | + set(filter_libs(lddtree["needed"], whitelist)), whitelist |
| 303 | + ) |
| 304 | + |
| 305 | + pol_ext_deps = {} |
| 306 | + for lib in needed_external_libs: |
| 307 | + if is_subdir(lddtree["libs"][lib]["realpath"], wheel_path): |
| 308 | + # we didn't filter libs that resolved via RPATH out |
| 309 | + # earlier because we wanted to make sure to pick up |
| 310 | + # our elf's indirect dependencies. But now we want to |
| 311 | + # filter these ones out, since they're not "external". |
| 312 | + logger.debug("RPATH FTW: %s", lib) |
| 313 | + continue |
| 314 | + pol_ext_deps[lib] = lddtree["libs"][lib]["realpath"] |
| 315 | + ret[p["name"]] = { |
| 316 | + "libs": pol_ext_deps, |
| 317 | + "priority": p["priority"], |
| 318 | + "blacklist": blacklist, |
| 319 | + } |
| 320 | + return ret |
| 321 | + |
| 322 | + |
210 | 323 | __all__ = [
|
211 | 324 | "lddtree_external_references",
|
212 | 325 | "versioned_symbols_policy",
|
|
0 commit comments