diff --git a/checks.py b/checks.py index 548f254b..aa221feb 100755 --- a/checks.py +++ b/checks.py @@ -1,13 +1,10 @@ """ Run some tests and generate warnings for proton configuration issues """ -import shutil -import os -import subprocess from .logger import log -def esync_file_limits(): +def esync_file_limits() -> bool: """ Check esync file limits using /proc/sys/fs/file-max https://www.reddit.com/r/SteamPlay/comments/9kqisk/tip_for_those_using_proton_no_esync1/ @@ -19,14 +16,15 @@ def esync_file_limits(): https://github.com/zfigura/wine/blob/esync/README.esync ''' - with open('/proc/sys/fs/file-max') as fsmax: + with open('/proc/sys/fs/file-max', encoding='ascii') as fsmax: max_files = fsmax.readline() if int(max_files) < 8192: log.warn(warning) return False return True -def run_checks(): + +def run_checks() -> None: """ Run checks to notify of any potential issues """ diff --git a/debug.py b/debug.py index 9a0a81da..268b968c 100755 --- a/debug.py +++ b/debug.py @@ -10,7 +10,7 @@ os.environ['DEBUG'] = '1' -def show_debug_info(): +def show_debug_info() -> None: """ Show various debug info """ check_args = [ diff --git a/download.py b/download.py index cc8a5340..6fa1109b 100755 --- a/download.py +++ b/download.py @@ -11,7 +11,7 @@ HASH_BLOCK_SIZE = 65536 -def get_filename(headers): +def get_filename(headers: list) -> str: """ Retrieve a filename from a request headers via Content-Disposition """ content_disp = [x for x in headers if x[0] == 'Content-Disposition'][0][1] @@ -19,27 +19,29 @@ def get_filename(headers): return raw_filename.replace('filename=', '').replace('"', '') -def gdrive_download(gdrive_id, path): +def gdrive_download(gdrive_id: str, path: str) -> None: """ Download a file from gdrive given the fileid and a path to save """ url = GDRIVE_URL.format(gdrive_id) cjar = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cjar)) urllib.request.install_opener(opener) + req = urllib.request.Request(url) - resp = urllib.request.urlopen(req) - confirm_cookie = [x for x in resp.getheaders() if - (x[0] == 'Set-Cookie' - and x[1].startswith('download_warning'))][0][1] + with urllib.request.urlopen(req, timeout=10) as resp: + confirm_cookie = [x for x in resp.getheaders() if + (x[0] == 'Set-Cookie' + and x[1].startswith('download_warning'))][0][1] confirm = confirm_cookie.split(';')[0].split('=')[1] - req2 = urllib.request.Request(url + '&confirm={}'.format(confirm)) - resp2 = urllib.request.urlopen(req2) - filename = get_filename(resp2.getheaders()) - with open(os.path.join(path, filename), 'wb') as save_file: - save_file.write(resp2.read()) + + req = urllib.request.Request(f'{url}&confirm={confirm}') + with urllib.request.urlopen(req, timeout=10) as resp: + filename = get_filename(resp.getheaders()) + with open(os.path.join(path, filename), 'wb') as save_file: + save_file.write(resp.read()) -def sha1sum(filename): +def sha1sum(filename: str) -> str: """ Computes the sha1sum of the specified file """ if not os.path.isfile(filename): diff --git a/engine.py b/engine.py index a387d2e7..84728eac 100755 --- a/engine.py +++ b/engine.py @@ -1,7 +1,6 @@ """ Game engine API """ -import io import os import sys from .logger import log @@ -10,7 +9,7 @@ class Engine(): """ Game engines """ - def __init__(self): + def __init__(self) -> None: self.engine_name = None self.supported = { 'Dunia 2': 'https://pcgamingwiki.com/wiki/Engine:Dunia_2', @@ -40,14 +39,14 @@ def __init__(self): log.info('Engine: ' + self.supported[self.engine_name]) - def _add_argument(self, args=''): + def _add_argument(self, args: str = '') -> None: """ Set command line arguments """ sys.argv += args.split(' ') - def _is_unity(self): + def _is_unity(self) -> bool: """ Detect Unity engine """ @@ -62,7 +61,7 @@ def _is_unity(self): return False - def _is_dunia2(self): + def _is_dunia2(self) -> bool: """ Detect Dunia 2 engine (Far Cry >= 3) """ @@ -76,7 +75,8 @@ def _is_dunia2(self): return False - def _is_rage(self): + + def _is_rage(self) -> bool: """ Detect RAGE engine (GTA IV/V) """ @@ -91,63 +91,66 @@ def _is_rage(self): return False - def _is_ue3(self): + + def _is_ue3(self) -> bool: """ Detect Unreal Engine 3 """ return False - def _is_ue4(self): + def _is_ue4(self) -> bool: """ Detect Unreal Engine 4 """ return False - def _log(self, ctx, msg, warn=False): + def _log(self, ctx: str, msg: str, warn: bool = False) -> None: """ Log wrapper """ if self.engine_name is None: log.warn(ctx + ': Engine not defined') - return False + return - elif warn is not False: - log.warn(self.engine_name + ': ' + ctx + ': ' + msg) - else: - log.info(self.engine_name + ': ' + ctx + ': ' + msg) + log_func = log.warn if warn else log.info + log_func(f'{self.engine_name}: {ctx}: {msg}') - def name(self): + def name(self) -> str: """ Report Engine name """ return self.engine_name - def set(self, engine=None): + def set(self, _engine: str = None) -> bool: """ Force engine """ - if engine in self.supported: - self.engine_name = engine + if _engine in self.supported: + self.engine_name = _engine self._log('set', 'forced') else: - log.warn('Engine not supported (' + engine + ')') + log.warn(f'Engine not supported ({_engine})') + return False + return True - def nosplash(self): + def nosplash(self) -> bool: """ Disable splash screen """ if self.engine_name == 'UE3': self._add_argument('-nosplash') - self._log('nosplash', 'splash screen disabled' + res) + self._log('nosplash', 'splash screen disabled') else: self._log('nosplash', 'not supported', True) + return False + return True - def info(self): + def info(self) -> bool: """ Show some information about engine """ @@ -156,9 +159,11 @@ def info(self): self._log('info', 'command line arguments') else: self._log('info', 'not supported', True) + return False + return True - def nointro(self): + def nointro(self) -> bool: """ Skip intro videos """ @@ -170,9 +175,11 @@ def nointro(self): self._log('nointro', 'intro videos disabled') else: self._log('nointro', 'not supported', True) + return False + return True - def launcher(self, show=True): + def launcher(self) -> bool: """ Force launcher """ @@ -181,8 +188,11 @@ def launcher(self, show=True): self._log('launcher', 'forced') else: self._log('launcher', 'not supported', True) + return False + return True - def windowed(self): + + def windowed(self) -> bool: """ Force windowed mode """ @@ -194,17 +204,20 @@ def windowed(self): self._log('windowed', 'window') else: self._log('windowed', 'not supported', True) + return False + return True - def resolution(self, res=None): + def resolution(self, res: str = None) -> bool: """ Force screen resolution """ - if res is not None: - res_wh = res.split('x') - else: + if not isinstance(res, str): + self._log('resolution', 'not provided') return False + res_wh = res.split('x') + if self.engine_name == 'Unity': self._add_argument('-screen-width ' + res_wh[0] + ' -screen-height ' + res_wh[1]) self._log('resolution', res) @@ -213,6 +226,8 @@ def resolution(self, res=None): self._log('resolution', res) else: self._log('resolution', 'not supported', True) + return False + return True engine = Engine() #pylint: disable=C0103 diff --git a/fix.py b/fix.py index 7269f843..2c7a420a 100755 --- a/fix.py +++ b/fix.py @@ -1,20 +1,23 @@ """ Gets the game id and applies a fix if found """ -from __future__ import print_function import io import os import re import sys import urllib import json + +from functools import lru_cache from importlib import import_module -from .util import protonprefix, check_internet +from .util import check_internet from .checks import run_checks from .logger import log from . import config -def game_id(): + +@lru_cache +def get_game_id() -> str: """ Trys to return the game id from environment variables """ if 'UMU_ID' in os.environ: @@ -30,58 +33,45 @@ def game_id(): return None -def game_name(): +@lru_cache +def get_game_name() -> str: """ Trys to return the game name from environment variables """ - is_online = check_internet() if 'UMU_ID' in os.environ: - if os.path.isfile(os.environ['WINEPREFIX'] + "/game_title"): - with open(os.environ['WINEPREFIX'] + "/game_title", 'r') as file: - return file.readline() - else: - try: - if 'STORE' in os.environ and is_online: - url = "https://umu.openwinecomponents.org/umu_api.php?umu_id=" + os.environ['UMU_ID'] + "&store=" + os.environ['STORE'] - headers = {'User-Agent': 'Mozilla/5.0'} - req = urllib.request.Request(url, headers=headers) - response = urllib.request.urlopen(req, timeout=5) - data = response.read() - json_data = json.loads(data) - title = json_data[0]['title'] - file = open(os.environ['WINEPREFIX'] + "/game_title", 'w') - file.write(title) - file.close() - elif 'STORE' not in os.environ and is_online: - url = "https://umu.openwinecomponents.org/umu_api.php?umu_id=" + os.environ['UMU_ID'] + "&store=none" - headers = {'User-Agent': 'Mozilla/5.0'} - req = urllib.request.Request(url, headers=headers) - response = urllib.request.urlopen(req, timeout=5) - data = response.read() - json_data = json.loads(data) - title = json_data[0]['title'] - file = open(os.environ['WINEPREFIX'] + "/game_title", 'w') - file.write(title) - file.close() - elif not is_online: - raise OSError - except OSError as e: - #log.info('OSError occurred: {}'.format(e)) # used for debugging - return 'UNKNOWN' - except IndexError as e: - #log.info('IndexError occurred: {}'.format(e)) # used for debugging - return 'UNKNOWN' - except UnicodeDecodeError as e: - #log.info('UnicodeDecodeError occurred: {}'.format(e)) # used for debugging - return 'UNKNOWN' - except TimeoutError: - log.info('umu.openwinecomponents.org timed out') - return 'UNKNOWN' - with open(os.environ['WINEPREFIX'] + "/game_title", 'r') as file: + if os.path.isfile(os.environ['WINEPREFIX'] + '/game_title'): + with open(os.environ['WINEPREFIX'] + '/game_title', 'r', encoding='utf-8') as file: return file.readline() + + if not check_internet(): + log.warn('No internet connection, can\'t fetch name') + return 'UNKNOWN' + + try: + # Fallback to 'none', if STORE isn't set + store = os.getenv('STORE', 'none') + url = f'https://umu.openwinecomponents.org/umu_api.php?umu_id={os.environ["UMU_ID"]}&store={store}' + headers = {'User-Agent': 'Mozilla/5.0'} + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req, timeout=5) as response: + data = response.read() + json_data = json.loads(data) + title = json_data[0]['title'] + with open(os.environ['WINEPREFIX'] + '/game_title', 'w', encoding='utf-8') as file: + file.write(title) + return title + except TimeoutError as ex: + log.info('umu.openwinecomponents.org timed out') + log.debug(f'TimeoutError occurred: {ex}') + except OSError as ex: + log.debug(f'OSError occurred: {ex}') + except IndexError as ex: + log.debug(f'IndexError occurred: {ex}') + except UnicodeDecodeError as ex: + log.debug(f'UnicodeDecodeError occurred: {ex}') else: try: game_library = re.findall(r'.*/steamapps', os.environ['PWD'], re.IGNORECASE)[-1] - game_manifest = os.path.join(game_library, 'appmanifest_' + game_id() + '.acf') + game_manifest = os.path.join(game_library, f'appmanifest_{get_game_id()}.acf') with io.open(game_manifest, 'r', encoding='utf-8') as appmanifest: for xline in appmanifest.readlines(): @@ -89,109 +79,112 @@ def game_name(): name = re.findall(r'"[^"]+"', xline, re.UNICODE)[-1] return name except OSError: - return 'UNKNOWN' + pass except IndexError: - return 'UNKNOWN' + pass except UnicodeDecodeError: - return 'UNKNOWN' + pass + return 'UNKNOWN' -def run_fix(gameid): - """ Loads a gamefix module by it's gameid +def get_store_name(store: str) -> str: + """ Mapping for store identifier to store name + """ + return { + 'amazon': 'Amazon', + 'battlenet': 'Battle.net', + 'ea': 'EA', + 'egs': 'EGS', + 'gog': 'GOG', + 'humble': 'Humble', + 'itchio': 'Itch.io', + 'steam': 'Steam', + 'ubisoft': 'Ubisoft', + 'zoomplatform': 'ZOOM Platform' + }.get(store, None) + + +def get_module_name(game_id: str, default: bool = False, local: bool = False) -> str: + """ Creates the name of a gamefix module, which can be imported + """ + if os.environ.get('STORE'): + store = os.environ['STORE'].lower() + elif game_id.isnumeric(): + store = 'steam' + + if store != 'steam': + log.info(f'Non-steam game {get_game_name()} ({game_id})') + + store_name = get_store_name(store) + if store_name: + log.info(f'{store_name} store specified, using {store_name} database') + else: + log.info('No store specified, using UMU database') + store = 'umu' + + return (f'protonfixes.gamefixes-{store}.' if not local else 'localfixes.') +\ + (game_id if not default else 'default') + + +def _run_fix_local(game_id: str, default: bool = False) -> bool: + """ Check if a local gamefix is available first and run it + """ + localpath = os.path.expanduser('~/.config/protonfixes/localfixes') + module_name = game_id if not default else 'default' + + # Check if local gamefix exists + if not os.path.isfile(os.path.join(localpath, module_name + '.py')): + return False + + # Ensure local gamefixes are importable as modules via PATH + with open(os.path.join(localpath, '__init__.py'), 'a', encoding='utf-8'): + sys.path.append(os.path.expanduser('~/.config/protonfixes')) + + # Run fix + return _run_fix(game_id, default, True) + + +def _run_fix(game_id: str, default: bool = False, local: bool = False) -> bool: + """ Private function, which actually executes gamefixes """ + fix_type = 'protonfix' if not default else 'defaults' + scope = 'global' if not local else 'local' + + try: + module_name = get_module_name(game_id, default, local) + game_module = import_module(module_name) + + log.info(f'Using {scope} {fix_type} for {get_game_name()} ({game_id})') + game_module.main() + except ImportError: + log.info(f'No {scope} {fix_type} found for {get_game_name()} ({game_id})') + return False + return True - if gameid is None: + +def run_fix(game_id: str) -> None: + """ Loads a gamefix module by it's gameid + local fixes prevent global fixes from being executed + """ + if game_id is None: return if config.enable_checks: run_checks() - game = game_name() + ' ('+ gameid + ')' - localpath = os.path.expanduser('~/.config/protonfixes/localfixes') + # execute default.py (local) + if not _run_fix_local(game_id, True) and config.enable_global_fixes: + _run_fix(game_id, True) # global - # execute default.py - if os.path.isfile(os.path.join(localpath, 'default.py')): - open(os.path.join(localpath, '__init__.py'), 'a').close() - sys.path.append(os.path.expanduser('~/.config/protonfixes')) - try: - game_module = import_module('localfixes.default') - log.info('Using local defaults for ' + game) - game_module.main() - except ImportError: - log.info('No local defaults found for ' + game) - elif config.enable_global_fixes: - try: - if gameid.isnumeric(): - game_module = import_module('protonfixes.gamefixes-steam.default') - else: - log.info('Non-steam game ' + game) - game_module = import_module('protonfixes.gamefixes-umu.default') - log.info('Using global defaults for ' + game) - game_module.main() - except ImportError: - log.info('No global defaults found') - - # execute .py - if os.path.isfile(os.path.join(localpath, gameid + '.py')): - open(os.path.join(localpath, '__init__.py'), 'a').close() - sys.path.append(os.path.expanduser('~/.config/protonfixes')) - try: - game_module = import_module('localfixes.' + gameid) - log.info('Using local protonfix for ' + game) - game_module.main() - except ImportError: - log.info('No local protonfix found for ' + game) - elif config.enable_global_fixes: - try: - if gameid.isnumeric(): - game_module = import_module('protonfixes.gamefixes-steam.' + gameid) - else: - log.info('Non-steam game ' + game) - if os.environ.get("STORE"): - if os.environ['STORE'].lower() == "amazon": - log.info('Amazon store specified, using Amazon database') - game_module = import_module('protonfixes.gamefixes-amazon.' + gameid) - elif os.environ['STORE'].lower() == "battlenet": - log.info('Battle.net store specified, using Battle.net database') - game_module = import_module('protonfixes.gamefixes-battlenet.' + gameid) - elif os.environ['STORE'].lower() == "ea": - log.info('EA store specified, using EA database') - game_module = import_module('protonfixes.gamefixes-ea.' + gameid) - elif os.environ['STORE'].lower() == "egs": - log.info('EGS store specified, using EGS database') - game_module = import_module('protonfixes.gamefixes-egs.' + gameid) - elif os.environ['STORE'].lower() == "gog": - log.info('GOG store specified, using GOG database') - game_module = import_module('protonfixes.gamefixes-gog.' + gameid) - elif os.environ['STORE'].lower() == "humble": - log.info('Humble store specified, using Humble database') - game_module = import_module('protonfixes.gamefixes-humble.' + gameid) - elif os.environ['STORE'].lower() == "itchio": - log.info('Itch.io store specified, using Itch.io database') - game_module = import_module('protonfixes.gamefixes-itchio.' + gameid) - elif os.environ['STORE'].lower() == "ubisoft": - log.info('Ubisoft store specified, using Ubisoft database') - game_module = import_module('protonfixes.gamefixes-ubisoft.' + gameid) - elif os.environ['STORE'].lower() == "zoomplatform": - log.info('ZOOM Platform store specified, using ZOOM Platform database') - game_module = import_module('protonfixes.gamefixes-zoomplatform.' + gameid) - elif os.environ['STORE'].lower() == "none": - log.info('No store specified, using umu database') - game_module = import_module('protonfixes.gamefixes-umu.' + gameid) - else: - log.info('No store specified, using umu database') - game_module = import_module('protonfixes.gamefixes-umu.' + gameid) - log.info('Using protonfix for ' + game) - game_module.main() - except ImportError: - log.info('No protonfix found for ' + game) - - -def main(): + # execute .py (local) + if not _run_fix_local(game_id, False) and config.enable_global_fixes: + _run_fix(game_id, False) # global + + +def main() -> None: """ Runs the gamefix """ - check_args = [ 'iscriptevaluator.exe' in sys.argv[2], 'getcompatpath' in sys.argv[1], @@ -204,4 +197,4 @@ def main(): return log.info('Running protonfixes') - run_fix(game_id()) + run_fix(get_game_id()) diff --git a/gamefixes-steam/294700.py b/gamefixes-steam/294700.py index 42381336..72fcbc61 100755 --- a/gamefixes-steam/294700.py +++ b/gamefixes-steam/294700.py @@ -14,4 +14,4 @@ def main(): util.regedit_add('HKLM\\System\\MountedDevices','\\??\\Volume{00000000-0000-0000-0000-000000000052}','REG_BINARY','2f746d7000') #sets up ID? exported from regedit util.regedit_add('HKLM\\System\\MountedDevices','\\DosDevices\\R:','REG_BINARY','5c005c002e005c0064003a000000') #sets up dosdevice? exported from regedit - util.regedit_add('HKLM\\Software\\Wine\\Drives','r:','REG_SZ','cdrom', 1) #designate drive as CD-ROM, requires 64-bit access + util.regedit_add('HKLM\\Software\\Wine\\Drives','r:','REG_SZ','cdrom', True) #designate drive as CD-ROM, requires 64-bit access diff --git a/gamefixes-steam/497360.py b/gamefixes-steam/497360.py index 2746fcaf..60d07b17 100644 --- a/gamefixes-steam/497360.py +++ b/gamefixes-steam/497360.py @@ -16,7 +16,7 @@ def main(): dosdevice = os.path.join(util.protonprefix(), 'dosdevices/r:') if not os.path.exists(dosdevice): os.symlink('/tmp', dosdevice) #create symlink for dosdevices - util.regedit_add("HKLM\\Software\\Wine\\Drives",'r:','REG_SZ','cdrom', 1) #designate drive as CD-ROM, requires 64-bit access + util.regedit_add("HKLM\\Software\\Wine\\Drives",'r:','REG_SZ','cdrom', True) #designate drive as CD-ROM, requires 64-bit access util.protontricks('quartz') util.protontricks('amstream') #No errors but doesn't show videos on SYDNEY diff --git a/gamefixes-steam/default.py b/gamefixes-steam/default.py index 11aeb5d3..74b84f64 100755 --- a/gamefixes-steam/default.py +++ b/gamefixes-steam/default.py @@ -1,3 +1,8 @@ +""" Default file for Steam game fixes + This file is always executed for games that are identified as Steam games, + even if no game fix is present. It is run before game fixes are applied. +""" + import sys from protonfixes import util diff --git a/gamefixes-umu/default.py b/gamefixes-umu/default.py index 8a32b2b2..699f2f15 100755 --- a/gamefixes-umu/default.py +++ b/gamefixes-umu/default.py @@ -1,3 +1,8 @@ +""" Default file for UMU game fixes + This file is always executed for games that are identified as UMU games, + even if no game fix is present. It is run before game fixes are applied. +""" + def main(): """ global defaults """ diff --git a/logger.py b/logger.py index e8423a8f..41cf925c 100755 --- a/logger.py +++ b/logger.py @@ -9,7 +9,7 @@ class Log(): """Log to stderr for steam dumps """ - def __init__(self): + def __init__(self) -> None: self.pfx = 'ProtonFixes[' + str(os.getpid()) + '] ' self.colors = { 'RESET': '\u001b[0m', @@ -19,13 +19,13 @@ def __init__(self): 'DEBUG': '\u001b[35m' } - def __call__(self, msg): + def __call__(self, msg: str) -> None: """ Allows the Log instance to be called directly """ self.log(msg) - def log(self, msg='', level='INFO'): + def log(self, msg: str = '', level: str = 'INFO') -> None: """ Prints the log message to stdout the same way as Proton """ @@ -40,25 +40,25 @@ def log(self, msg='', level='INFO'): testfile.write(logtext) - def info(self, msg): + def info(self, msg: str) -> None: """ Wrapper for printing info messages """ self.log(msg, 'INFO') - def warn(self, msg): + def warn(self, msg: str) -> None: """ Wrapper for printing warning messages """ self.log(msg, 'WARN') - def crit(self, msg): + def crit(self, msg: str) -> None: """ Wrapper for printing critical messages """ self.log(msg, 'CRIT') - def debug(self, msg): + def debug(self, msg: str) -> None: """ Wrapper for printing debug messages """ diff --git a/pyproject.toml b/pyproject.toml index f35a59a5..6da8c50a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -376,7 +376,7 @@ disable = ["raw-checker-failed", "bad-inline-option", "locally-disabled", "file- [tool.pylint.method_args] # List of qualified names (i.e., library.method) which require a timeout # parameter e.g. 'requests.api.get,requests.api.post' -timeout-methods = ["requests.api.delete", "requests.api.get", "requests.api.head", "requests.api.options", "requests.api.patch", "requests.api.post", "requests.api.put", "requests.api.request"] +timeout-methods = ["urllib.request.urlopen", "requests.api.delete", "requests.api.get", "requests.api.head", "requests.api.options", "requests.api.patch", "requests.api.post", "requests.api.put", "requests.api.request"] [tool.pylint.miscellaneous] # List of note tags to take in consideration, separated by a comma. diff --git a/steamhelper.py b/steamhelper.py index 78777c6e..2089a985 100644 --- a/steamhelper.py +++ b/steamhelper.py @@ -1,21 +1,31 @@ +""" The Steamhelper allows the installation of Steam apps +""" + import os import re import shutil import subprocess import time + libpaths = [] REGEX_LIB = re.compile(r'"path"\s*"(?P(.*))"') REGEX_STATE = re.compile(r'"StateFlags"\s*"(?P(\d))"') +STEAM_DIRS = [ + "~/.steam/root", + "~/.steam/debian-installation", + "~/.local/share/Steam", + "~/.steam/steam" +] -def install_app(appid, delay=1): - """Wait for the installation of an appid +def install_app(appid: str, delay: int = 1) -> None: + """ Wait for the installation of an appid """ _install_steam_appid(appid) - while not(_is_app_installed(appid)): + while not _is_app_installed(appid): time.sleep(delay) -def _install_steam_appid(appid): +def _install_steam_appid(appid: str) -> None: """ Call steam URL """ install_url = "steam://install/"+str(appid) @@ -30,11 +40,11 @@ def _install_steam_appid(appid): elif shutil.which("exo-open"): subprocess.call(["exo-open", install_url]) -def _is_app_installed(appid): - """Check if app is installed +def _is_app_installed(appid: str) -> bool: + """ Check if app is installed """ libraries_path = _get_steam_libraries_path() - + # bypass no library path if len(libraries_path) == 0: return True @@ -49,37 +59,29 @@ def _is_app_installed(appid): break return is_installed -def _get_steam_libraries_path(): - """Get Steam Libraries Path - """ - STEAM_DIRS = [ - "~/.steam/root", - "~/.steam/debian-installation", - "~/.local/share/Steam", - "~/.steam/steam" - ] - - global libpaths +def _get_steam_libraries_path() -> list: + """ Get Steam Libraries Path + """ if len(libpaths) == 0: for steampath in STEAM_DIRS: libfile = os.path.join(os.path.expanduser(steampath),"steamapps","libraryfolders.vdf") if os.path.exists(libfile): - libpaths = _find_regex_groups(libfile, REGEX_LIB, 'path') + libpaths.append(_find_regex_groups(libfile, REGEX_LIB, 'path')) break return libpaths -def _get_manifest_path(appid, librarypath): +def _get_manifest_path(appid: str, librarypath: str) -> str: """Get appmanifest path """ - return os.path.join(librarypath, "steamapps", "appmanifest_"+str(appid)+".acf") + return os.path.join(librarypath, "steamapps", f"appmanifest_{str(appid)}.acf") -def _find_regex_groups(path, regex, groupname): +def _find_regex_groups(path: str, regex: re.Pattern, groupname: str) -> list: """ Given a file and a regex with a named group groupname, return an array of all the matches """ matches = [] - with open(path) as re_file: + with open(path, encoding='ascii') as re_file: for line in re_file: search = regex.search(line) if search: diff --git a/tools/lint.sh b/tools/lint.sh index c4ebcf40..882f8f36 100644 --- a/tools/lint.sh +++ b/tools/lint.sh @@ -6,11 +6,11 @@ # Lint the following gamefix dir: # steam, gog, amazon, egs, humble, itchio, ubisoft, umu, zoomplatform -mapfile -d '' files_array < <(find ./{gamefixes-steam,gamefixes-amazon,gamefixes-gog,gamefixes-egs,gamefixes-humble,gamefixes-itchio,gamefixes-ubisoft,gamefixes-umu,gamefixes-zoomplatform} -type f -name "*.py" ! -name "__init__.py" -print0) +mapfile -d '' files_array < <(find ./{,gamefixes-steam,gamefixes-amazon,gamefixes-gog,gamefixes-egs,gamefixes-humble,gamefixes-itchio,gamefixes-ubisoft,gamefixes-umu,gamefixes-zoomplatform} -type f -name "*.py" ! -name "__init__.py" -print0) # Disable these checks: -# - Long lines from comments -# - Import errors because umu-protonfixes will be renamed at release -# - Docstrings for functions and modules -# - Invalid identifier names for files -pylint --rcfile pyproject.toml --disable C0103,C0116,E0401,C0301,C0114 "${files_array[@]}" +# - E0401: Import errors because umu-protonfixes will be renamed at release +# - C0103: Invalid identifier names for files, as gamefixes are numeric +# - C0116: Missing docstrings for functions or method +# - C0301: Long lines +pylint --rcfile pyproject.toml --disable E0401,C0103,C0116,C0301 "${files_array[@]}" diff --git a/util.py b/util.py index 09df5f7c..9170d4f4 100755 --- a/util.py +++ b/util.py @@ -12,19 +12,21 @@ import subprocess import urllib.request import functools + +from socket import socket, AF_INET, SOCK_DGRAM +from typing import Union, Literal, Mapping + from .logger import log from .steamhelper import install_app from . import config -from socket import socket, AF_INET, SOCK_DGRAM try: import __main__ as protonmain except ImportError: log.warn('Unable to hook into Proton main script environment') -# pylint: disable=unreachable -def which(appname): +def which(appname: str) -> str: """ Returns the full path of an executable in $PATH """ @@ -36,7 +38,7 @@ def which(appname): return None -def protondir(): +def protondir() -> str: """ Returns the path to proton """ @@ -44,7 +46,7 @@ def protondir(): return proton_dir -def protonprefix(): +def protonprefix() -> str: """ Returns the wineprefix used by proton """ @@ -53,7 +55,7 @@ def protonprefix(): 'pfx/') -def protonnameversion(): +def protonnameversion() -> str: """ Returns the version of proton from sys.argv[0] """ @@ -64,13 +66,13 @@ def protonnameversion(): return None -def protontimeversion(): +def protontimeversion() -> int: """ Returns the version timestamp of proton from the `version` file """ fullpath = os.path.join(protondir(), 'version') try: - with open(fullpath, 'r') as version: + with open(fullpath, 'r', encoding='ascii') as version: for timestamp in version.readlines(): return int(timestamp.strip()) except OSError: @@ -80,15 +82,15 @@ def protontimeversion(): return 0 -def protonversion(timestamp=False): +def protonversion(timestamp: bool = False) -> Union[str, int]: """ Returns the version of proton """ - if timestamp: return protontimeversion() return protonnameversion() -def once(func=None, retry=None): + +def once(func: callable = None, retry: bool = False): """ Decorator to use on functions which should only run once in a prefix. Error handling: By default, when an exception occurs in the decorated function, the @@ -102,13 +104,12 @@ def once(func=None, retry=None): """ if func is None: return functools.partial(once, retry=retry) - retry = retry if retry else False #pylint: disable=missing-docstring def wrapper(*args, **kwargs): - func_id = func.__module__ + "." + func.__name__ + func_id = f'{func.__module__}.{func.__name__}' prefix = protonprefix() - directory = os.path.join(prefix, "drive_c/protonfixes/run/") + directory = os.path.join(prefix, 'drive_c/protonfixes/run/') file = os.path.join(directory, func_id) if not os.path.exists(directory): os.makedirs(directory) @@ -123,7 +124,8 @@ def wrapper(*args, **kwargs): raise exc exception = exc - open(file, 'a').close() + with open(file, 'a', encoding='ascii') as tmp: + tmp.close() if exception: raise exception #pylint: disable=raising-bad-type @@ -132,7 +134,7 @@ def wrapper(*args, **kwargs): return wrapper -def _killhanging(): +def _killhanging() -> None: """ Kills processes that hang when installing winetricks """ @@ -150,15 +152,16 @@ def _killhanging(): except IOError: continue -def _forceinstalled(verb): + +def _forceinstalled(verb: str) -> None: """ Records verb into the winetricks.log.forced file """ forced_log = os.path.join(protonprefix(), 'winetricks.log.forced') - with open(forced_log, 'a') as forcedlog: + with open(forced_log, 'a', encoding='ascii') as forcedlog: forcedlog.write(verb + '\n') -def _checkinstalled(verb, logfile='winetricks.log'): +def _checkinstalled(verb: str, logfile: str = 'winetricks.log') -> bool: """ Returns True if the winetricks verb is found in the winetricks log """ @@ -173,7 +176,7 @@ def _checkinstalled(verb, logfile='winetricks.log'): wt_verb_param = verb.split('=')[1] wt_is_set = False try: - with open(winetricks_log, 'r') as tricklog: + with open(winetricks_log, 'r', encoding='ascii') as tricklog: for xline in tricklog.readlines(): if re.findall(r'^' + wt_verb, xline.strip()): wt_is_set = bool(xline.strip() == wt_verb + wt_verb_param) @@ -182,7 +185,7 @@ def _checkinstalled(verb, logfile='winetricks.log'): return False # Check for regular verbs try: - with open(winetricks_log, 'r') as tricklog: + with open(winetricks_log, 'r', encoding='ascii') as tricklog: if verb in reversed([x.strip() for x in tricklog.readlines()]): return True except OSError: @@ -190,20 +193,20 @@ def _checkinstalled(verb, logfile='winetricks.log'): return False -def checkinstalled(verb): +def checkinstalled(verb: str) -> bool: """ Returns True if the winetricks verb is found in the winetricks log or in the 'winetricks.log.forced' file """ if verb == 'gui': return False - log.info('Checking if winetricks ' + verb + ' is installed') + log.info(f'Checking if winetricks {verb} is installed') if _checkinstalled(verb, 'winetricks.log.forced'): return True return _checkinstalled(verb) -def is_custom_verb(verb): +def is_custom_verb(verb: str) -> bool: """ Returns path to custom winetricks verb, if found """ if verb == 'gui': @@ -226,7 +229,8 @@ def is_custom_verb(verb): return False -def check_internet(): + +def check_internet() -> bool: """Checks for internet connection.""" try: with socket(AF_INET, SOCK_DGRAM) as sock: @@ -236,7 +240,8 @@ def check_internet(): except (TimeoutError, OSError): return False -def protontricks(verb): + +def protontricks(verb: str) -> bool: """ Runs winetricks if available """ if not checkinstalled(verb): @@ -244,7 +249,7 @@ def protontricks(verb): # Proceed with your function logic here pass else: - log.info("No internet connection. Winetricks will be skipped.") + log.info('No internet connection. Winetricks will be skipped.') return False log.info('Installing winetricks ' + verb) @@ -255,7 +260,7 @@ def protontricks(verb): env['WINESERVER'] = protonmain.g_proton.wineserver_bin env['WINETRICKS_LATEST_VERSION_CHECK'] = 'disabled' env['LD_PRELOAD'] = '' - + winetricks_bin = os.path.abspath(__file__).replace('util.py','winetricks') winetricks_cmd = [winetricks_bin, '--unattended'] + verb.split(' ') if verb == 'gui': @@ -281,8 +286,8 @@ def protontricks(verb): log.info('Using winetricks verb ' + verb) subprocess.call([env['WINESERVER'], '-w'], env=env) - process = subprocess.Popen(winetricks_cmd, env=env) - process.wait() + with subprocess.Popen(winetricks_cmd, env=env) as process: + process.wait() _killhanging() # Check if the verb failed (eg. access denied) @@ -293,7 +298,7 @@ def protontricks(verb): # Check if verb recorded to winetricks log if not checkinstalled(verb): - log.warn('Not recorded as installed: winetricks ' + verb + ', forcing!') + log.warn(f'Not recorded as installed: winetricks {verb}, forcing!') _forceinstalled(verb) log.info('Winetricks complete') @@ -301,7 +306,8 @@ def protontricks(verb): return False -def regedit_add(folder,name=None,type=None,value=None,arch=None): + +def regedit_add(folder: str, name: str = None, typ: str = None, value: str = None, arch: bool = False) -> None: """ Add regedit keys """ @@ -310,14 +316,14 @@ def regedit_add(folder,name=None,type=None,value=None,arch=None): env['WINE'] = protonmain.g_proton.wine_bin env['WINELOADER'] = protonmain.g_proton.wine_bin env['WINESERVER'] = protonmain.g_proton.wineserver_bin - - if name is not None and type is not None and value is not None: + + if name is not None and typ is not None and value is not None: # Flag for if we want to force writing to the 64-bit registry sector - if arch is not None: - regedit_cmd = ['wine', 'reg' , 'add', folder, '/f', '/v', name, '/t', type, '/d', value, '/reg:64'] + if arch: + regedit_cmd = ['wine', 'reg' , 'add', folder, '/f', '/v', name, '/t', typ, '/d', value, '/reg:64'] else: - regedit_cmd = ['wine', 'reg' , 'add', folder, '/f', '/v', name, '/t', type, '/d', value] + regedit_cmd = ['wine', 'reg' , 'add', folder, '/f', '/v', name, '/t', typ, '/d', value] log.info('Adding key: ' + folder) @@ -332,19 +338,21 @@ def regedit_add(folder,name=None,type=None,value=None,arch=None): log.info('Adding key: ' + folder) - process = subprocess.Popen(regedit_cmd, env=env) - process.wait() + with subprocess.Popen(regedit_cmd, env=env) as process: + process.wait() + -def replace_command(orig_str, repl_str): +def replace_command(orig: str, repl: str) -> None: """ Make a commandline replacement in sys.argv """ - log.info('Changing ' + orig_str + ' to ' + repl_str) + log.info(f'Changing {orig} to {repl}') for idx, arg in enumerate(sys.argv): - if orig_str in arg: - sys.argv[idx] = arg.replace(orig_str, repl_str) + if orig in arg: + sys.argv[idx] = arg.replace(orig, repl) -def append_argument(argument): + +def append_argument(argument: str) -> None: """ Append an argument to sys.argv """ @@ -352,15 +360,17 @@ def append_argument(argument): sys.argv.append(argument) log.debug('New commandline: ' + str(sys.argv)) -def set_environment(envvar, value): + +def set_environment(envvar: str, value: str) -> None: """ Add or override an environment value """ - log.info('Adding env: ' + envvar + '=' + value) + log.info(f'Adding env: {envvar}={value}') os.environ[envvar] = value protonmain.g_session.env[envvar] = value -def del_environment(envvar): + +def del_environment(envvar: str) -> None: """ Remove an environment variable """ @@ -370,7 +380,8 @@ def del_environment(envvar): if envvar in protonmain.g_session.env: del protonmain.g_session.env[envvar] -def get_game_install_path(): + +def get_game_install_path() -> str: """ Game installation path """ install_path = os.environ['PWD'] @@ -378,17 +389,19 @@ def get_game_install_path(): install_path = os.environ['STEAM_COMPAT_INSTALL_PATH'] log.debug('Detected path to game: ' + install_path) # only for `waitforexitandrun` command - return install_path + return install_path -def winedll_override(dll, dtype): + +def winedll_override(dll: str, dtype: Literal['n', 'b', 'n,b', 'b,n', '']) -> None: """ Add WINE dll override """ - log.info('Overriding ' + dll + '.dll = ' + dtype) - setting = dll + "=" + dtype + log.info(f'Overriding {dll}.dll = {dtype}') + setting = f'{dll}={dtype}' protonmain.append_to_env_str(protonmain.g_session.env, 'WINEDLLOVERRIDES', setting, ';') -def disable_nvapi(): + +def disable_nvapi() -> None: """ Disable WINE nv* dlls """ @@ -400,29 +413,33 @@ def disable_nvapi(): winedll_override('nvencodeapi', '') winedll_override('nvencodeapi64', '') -def disable_esync(): + +def disable_esync() -> None: """ Disabling Esync """ log.info('Disabling Esync') set_environment('WINEESYNC', '') -def disable_fsync(): + +def disable_fsync() -> None: """ Disabling FSync """ log.info('Disabling FSync') set_environment('WINEFSYNC', '') -def disable_protonaudioconverter(): + +def disable_protonaudioconverter() -> None: """ Disabling Proton Audio Converter """ log.info('Disabling Proton Audio Converter') set_environment('GST_PLUGIN_FEATURE_RANK', 'protonaudioconverterbin:NONE') + @once -def disable_uplay_overlay(): +def disable_uplay_overlay() -> bool: """Disables the UPlay in-game overlay. Creates or appends the UPlay settings.yml file with the correct setting to disable the overlay. @@ -440,28 +457,22 @@ def disable_uplay_overlay(): config_file = os.path.join(config_dir, 'settings.yml') if not os.path.isdir(config_dir): - log.warn( - 'Could not disable UPlay overlay: "' - + config_dir - + '" does not exist or is not a directory.' - ) - return + log.warn(f'Could not disable UPlay overlay: "{config_dir}" does not exist or is not a directory.') + return False - if not os.path.isfile(config_file): - f = open(config_file,"w+") - f.write("\noverlay:\n enabled: false\n forceunhookgame: false\n fps_enabled: false\n warning_enabled: false\n") - f.close + try: + with open(config_file, 'a+', encoding='ascii') as file: + file.write('\noverlay:\n enabled: false\n forceunhookgame: false' + '\n fps_enabled: false\n warning_enabled: false\n') log.info('Disabled UPlay overlay') - else: - try: - with open(config_file, 'a+') as file: - file.write("\noverlay:\n enabled: false\n forceunhookgame: false\n fps_enabled: false\n warning_enabled: false\n") - log.info('Disabled UPlay overlay') - return - except OSError as err: - log.warn('Could not disable UPlay overlay: ' + err.strerror) + return True + except OSError as err: + log.warn('Could not disable UPlay overlay: ' + err.strerror) + + return False + -def create_dosbox_conf(conf_file, conf_dict): +def create_dosbox_conf(conf_file: str, conf_dict: Mapping[str, Mapping[str, any]]) -> None: """Create DOSBox configuration file. DOSBox accepts multiple configuration files passed with -conf @@ -472,24 +483,25 @@ def create_dosbox_conf(conf_file, conf_dict): return conf = configparser.ConfigParser() conf.read_dict(conf_dict) - with open(conf_file, 'w') as file: + with open(conf_file, 'w', encoding='ascii') as file: conf.write(file) -def _get_case_insensitive_name(path): + +def _get_case_insensitive_name(path: str) -> str: """ Find potentially differently-cased location e.g /path/to/game/system/gothic.ini -> /path/to/game/System/GOTHIC.INI """ if os.path.exists(path): return path root = path - # Find first existing directory in the tree + # Find first existing directory in the tree while not os.path.exists(root): root = os.path.split(root)[0] - - if root[len(root) - 1] not in ["/", "\\"]: + + if root[len(root) - 1] not in ['/', '\\']: root = root + os.sep # Separate missing path from existing root - s_working_dir = path.replace(root, "").split(os.sep) + s_working_dir = path.replace(root, '').split(os.sep) paths_to_find = len(s_working_dir) # Keep track of paths we found so far paths_found = 0 @@ -515,7 +527,8 @@ def _get_case_insensitive_name(path): root = os.path.join(root, os.sep.join(s_working_dir[paths_found:])) return root -def _get_config_full_path(cfile, base_path): + +def _get_config_full_path(cfile: str, base_path: str) -> str: """ Find game's config file """ @@ -534,9 +547,10 @@ def _get_config_full_path(cfile, base_path): return cfg_path log.warn('Config file not found: ' + cfg_path) - return False + return None + -def create_backup_config(cfg_path): +def create_backup_config(cfg_path: str) -> None: """ Create backup config file """ @@ -545,7 +559,8 @@ def create_backup_config(cfg_path): log.info('Creating backup for config file') shutil.copyfile(cfg_path, cfg_path + '.protonfixes.bak') -def set_ini_options(ini_opts, cfile, encoding, base_path='user'): + +def set_ini_options(ini_opts: str, cfile: str, encoding: str, base_path: str = 'user') -> bool: """ Edit game's INI config file """ cfg_path = _get_config_full_path(cfile, base_path) @@ -560,14 +575,15 @@ def set_ini_options(ini_opts, cfile, encoding, base_path='user'): conf.read(cfg_path,encoding) - log.info('Addinging INI options into '+cfile+':\n'+ str(ini_opts)) + log.info(f'Addinging INI options into {cfile}:\n{str(ini_opts)}') conf.read_string(ini_opts) - with open(cfg_path, 'w') as configfile: + with open(cfg_path, 'w', encoding=encoding) as configfile: conf.write(configfile) return True -def set_xml_options(base_attibutte, xml_line, cfile, base_path='user'): + +def set_xml_options(base_attibutte: str, xml_line: str, cfile: str, base_path: str = 'user') -> bool: """ Edit game's XML config file """ xml_path = _get_config_full_path(cfile, base_path) @@ -581,29 +597,33 @@ def set_xml_options(base_attibutte, xml_line, cfile, base_path='user'): base_size = os.path.getsize(xml_path) backup_size = os.path.getsize(xml_path + '.protonfixes.bak') - if base_size == backup_size: - ConfigFile = open(xml_path, 'r') - contents = ConfigFile.readlines() - LINENUM=0 + if base_size != backup_size: + return False + + with open(xml_path, 'r', encoding='utf-8') as file: + contents = file.readlines() + i = 0 for line in contents: - LINENUM+=1 + i += 1 if base_attibutte in line: - log.info('Addinging XML options into '+cfile+':\n'+ str(xml_line)) - contents.insert(LINENUM, xml_line + "\n") - ConfigFile.close() - ConfigFile = open(xml_path, 'w') + log.info(f'Adding XML options into {cfile}, line {i}:\n{xml_line}') + contents.insert(i, xml_line + '\n') + + with open(xml_path, 'w', encoding='utf-8') as file: for eachitem in contents: - ConfigFile.write(eachitem) - ConfigFile.close() - log.info("Config Patch Applied! \n") + file.write(eachitem) + + log.info('XML config patch applied') + return True + -def get_resolution(): +def get_resolution() -> tuple[int, int]: """ Returns screen res width, height using xrandr """ - xrandr_bin = os.path.abspath(__file__).replace('util.py','xrandr') # Execute xrandr command and capture its output + xrandr_bin = os.path.abspath(__file__).replace('util.py','xrandr') xrandr_output = subprocess.check_output([xrandr_bin, '--current']).decode('utf-8') - + # Find the line that starts with 'Screen 0:' and extract the resolution for line in xrandr_output.splitlines(): if 'primary' in line: @@ -612,22 +632,22 @@ def get_resolution(): offset_values = width_height[1].split('+') clean_resolution = width_height[0] + 'x' + offset_values[0] screenx, screeny = clean_resolution.split('x') - return int(screenx), int(screeny) - + return (int(screenx), int(screeny)) + # If no resolution is found, return default values or raise an exception - return 0, 0 # or raise Exception("Resolution not found") + return (0, 0) # or raise Exception('Resolution not found') + def read_dxvk_conf(cfp): """ Add fake [DEFAULT] section to dxvk.conf """ - yield '['+ configparser.ConfigParser().default_section +']' + yield f'[{configparser.ConfigParser().default_section}]' yield from cfp -def set_dxvk_option(opt, val, cfile='/tmp/protonfixes_dxvk.conf'): +def set_dxvk_option(opt: str, val: str, cfile: str = '/tmp/protonfixes_dxvk.conf') -> None: """ Create custom DXVK config file - - See https://github.com/doitsujin/dxvk/wiki/Configuration for details + See https://github.com/doitsujin/dxvk/wiki/Configuration for details """ conf = configparser.ConfigParser() conf.optionxform = str @@ -645,30 +665,33 @@ def set_dxvk_option(opt, val, cfile='/tmp/protonfixes_dxvk.conf'): conf.set(section, 'session', str(os.getpid())) if os.access(dxvk_conf, os.F_OK): - conf.read_file(read_dxvk_conf(open(dxvk_conf))) + with open(dxvk_conf, encoding='ascii') as dxvk: + conf.read_file(read_dxvk_conf(dxvk)) log.debug(conf.items(section)) # set option log.info('Addinging DXVK option: '+ str(opt) + ' = ' + str(val)) conf.set(section, opt, str(val)) - with open(cfile, 'w') as configfile: + with open(cfile, 'w', encoding='ascii') as configfile: conf.write(configfile) -def install_eac_runtime(): + +def install_eac_runtime() -> None: """ Install Proton Easyanticheat Runtime """ install_app(1826330) -def install_battleye_runtime(): + +def install_battleye_runtime() -> None: """ Install Proton BattlEye Runtime """ install_app(1161040) -def install_all_from_tgz(url, path=os.getcwd()): + +def install_all_from_tgz(url: str, path: str = os.getcwd()) -> None: """ Install all files from a downloaded tar.gz """ - cache_dir = config.cache_dir tgz_file_name = os.path.basename(url) tgz_file_path = os.path.join(cache_dir, tgz_file_name) @@ -676,17 +699,18 @@ def install_all_from_tgz(url, path=os.getcwd()): if tgz_file_name not in os.listdir(cache_dir): log.info('Downloading ' + tgz_file_name) urllib.request.urlretrieve(url, tgz_file_path) - + with tarfile.open(tgz_file_path, 'r:gz') as tgz_obj: - log.info('Extracting ' + tgz_file_name + ' to ' + path) + log.info(f'Extracting {tgz_file_name} to {path}') tgz_obj.extractall(path) - -def install_from_zip(url, filename, path=os.getcwd()): + + +def install_from_zip(url: str, filename: str, path: str = os.getcwd()) -> None: """ Install a file from a downloaded zip """ if filename in os.listdir(path): - log.info('File ' + filename + ' found in ' + path) + log.info(f'File {filename} found in {path}') return cache_dir = config.cache_dir @@ -694,36 +718,44 @@ def install_from_zip(url, filename, path=os.getcwd()): zip_file_path = os.path.join(cache_dir, zip_file_name) if zip_file_name not in os.listdir(cache_dir): - log.info('Downloading ' + filename + ' to ' + zip_file_path) + log.info(f'Downloading {filename} to {zip_file_path}') urllib.request.urlretrieve(url, zip_file_path) with zipfile.ZipFile(zip_file_path, 'r') as zip_obj: - log.info('Extracting ' + filename + ' to ' + path) + log.info(f'Extracting {filename} to {path}') zip_obj.extract(filename, path=path) -def try_show_gui_error(text): + +def try_show_gui_error(text: str) -> None: + """ Trys to show a message box with an error + 1. Try importing tkinter and show messagebox + 2. Try executing process 'notify-send' + 3. Failed, output info to log + """ try: # in case in-use Python doesn't have tkinter, which is likely - from tkinter import messagebox - messagebox.showerror("Proton Fixes", text) - except Exception as e: + from tkinter import messagebox # pylint: disable=C0415 + messagebox.showerror('Proton Fixes', text) + except ImportError: try: - subprocess.run(["notify-send", "protonfixes", text]) - except: - log.info("Failed to show error message with the following text: {}".format(text)) + subprocess.run(['notify-send', 'protonfixes', text], check=True) + except (subprocess.SubprocessError, subprocess.CalledProcessError): + log.info('Failed to show error message with the following text: ' + text) + def is_smt_enabled() -> bool: """ Returns whether SMT is enabled. If the check has failed, False is returned. """ try: - with open('/sys/devices/system/cpu/smt/active') as smt_file: - return smt_file.read().strip() == "1" + with open('/sys/devices/system/cpu/smt/active', encoding='ascii') as smt_file: + return smt_file.read().strip() == '1' except PermissionError: log.warn('No permission to read SMT status') - except OSError as e: - log.warn(f'SMT status not supported by the kernel (errno: {e.errno})') + except OSError as ex: + log.warn(f'SMT status not supported by the kernel (errno: {ex.errno})') return False + def get_cpu_count() -> int: """ Returns the cpu core count, provided by the OS. If the request failed, 0 is returned. @@ -734,6 +766,7 @@ def get_cpu_count() -> int: return 0 return cpu_cores + def set_cpu_topology(core_count: int, ignore_user_setting: bool = False) -> bool: """ This sets the cpu topology to a fixed core count. By default, a user provided topology is prioritized. @@ -774,6 +807,7 @@ def set_cpu_topology_nosmt(core_limit: int = 0, ignore_user_setting: bool = Fals cpu_cores = max(cpu_cores, min(cpu_cores, core_limit)) # Apply limit return set_cpu_topology(cpu_cores, ignore_user_setting) + def set_cpu_topology_limit(core_limit: int, ignore_user_setting: bool = False) -> bool: """ This sets the cpu topology to a limited number of logical cores. A limit that exceeds the available cores, will be ignored.