diff --git a/pyproject.toml b/pyproject.toml index f993818..144301d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,8 @@ packages = [{include = "fate", from = "src"}] [tool.poetry.dependencies] python = "^3.8" -argcmdr = "^0.13.2" +argcmdr = "^1.0.0" +argcomplete = "^2.0" croniter = "^1.3.5" Jinja2 = "^3.1.2" loguru = "^0.6.0" diff --git a/src/fate/cli/__init__.py b/src/fate/cli/__init__.py index d63dd50..ec8e2bc 100644 --- a/src/fate/cli/__init__.py +++ b/src/fate/cli/__init__.py @@ -1,3 +1,3 @@ -from .base import runcmd # noqa: F401 +from .base import exit_on_error, runcmd # noqa: F401 from .root import main, Main, daemon, serve # noqa: F401 diff --git a/src/fate/cli/base/__init__.py b/src/fate/cli/base/__init__.py index c6e24f8..2576a12 100644 --- a/src/fate/cli/base/__init__.py +++ b/src/fate/cli/base/__init__.py @@ -1,2 +1,3 @@ -from .execution import runcmd # noqa: F401 -from .main import Main # noqa: F401 +from .common import exit_on_error # noqa: F401 +from .execution import runcmd # noqa: F401 +from .main import Main # noqa: F401 diff --git a/src/fate/cli/base/common.py b/src/fate/cli/base/common.py index ac886da..1c6be84 100644 --- a/src/fate/cli/base/common.py +++ b/src/fate/cli/base/common.py @@ -1,4 +1,5 @@ import enum +import functools import fate.conf @@ -32,6 +33,24 @@ def __exit__(self, exc_type, exc_value, _traceback): self.parser.exit(78, f'{self.parser.prog}: error: {exc_value}\n') +def exit_on_error(method): + """Decorator to apply context manager `ExitOnError` to instance + methods of classes of type `argcmdr.Command`. + + Note: The decorator wrapper depends upon instance attribute + `parser`. As such, affected command classes must extend the default + `__init__` (*i.e.* `super()`); or, decorated methods must be invoked + *after* command initialization (*i.e.* not as part of its `__init__`). + + """ + @functools.wraps(method) + def wrapped(self, *args, **kwargs): + with ExitOnError(self.parser): + return method(self, *args, **kwargs) + + return wrapped + + class CommandInterface: class CommandStatus(enum.Enum): @@ -59,7 +78,7 @@ def __str__(self): @property def conf(self): - if not self.__parents__: + if (root := self.root) is None: # this is the root command # retrieve and store conf here try: @@ -70,11 +89,11 @@ def conf(self): return conf # defer to root - return self.root.conf + return root.conf @property def exit_on_error(self): - return ExitOnError(self.args.__parser__) + return ExitOnError(self.parser) @staticmethod def write_result(path, contents): diff --git a/src/fate/cli/base/execution.py b/src/fate/cli/base/execution.py index a5f8fc6..986d0ff 100644 --- a/src/fate/cli/base/execution.py +++ b/src/fate/cli/base/execution.py @@ -133,7 +133,7 @@ def get_command(self, args): def prepare(self, args, parser): """Execute and report on task command execution.""" try: - command_spec = self.call(args, 'get_command') + command_spec = self.delegate('get_command') except self.local.CommandNotFound as exc: hint = ('\nhint: whitespace in program name suggests a misconfiguration' if re.search(r'\s', exc.program) else '') diff --git a/src/fate/cli/base/main.py b/src/fate/cli/base/main.py index f2d764b..5d5eb5d 100644 --- a/src/fate/cli/base/main.py +++ b/src/fate/cli/base/main.py @@ -1,3 +1,6 @@ +import os.path +import sys + import argcmdr from .common import CommandInterface @@ -7,11 +10,12 @@ class Main(CommandInterface, argcmdr.RootCommand): """manage the periodic execution of commands""" @classmethod - def base_parser(cls): - parser = super().base_parser() + def _new_parser_(cls): + parser = super()._new_parser_() # enforce program name when invoked via "python -m fate" if parser.prog == '__main__.py': - parser.prog = 'fate' + command = os.path.basename(sys.executable) + parser.prog = f'{command} -m fate' return parser diff --git a/src/fate/cli/command/control.py b/src/fate/cli/command/control.py index d35d5ef..14b4569 100644 --- a/src/fate/cli/command/control.py +++ b/src/fate/cli/command/control.py @@ -148,9 +148,7 @@ def onerror(self, exc): error_msg = error and f': {error}' error_name = exc.__class__.__name__ - parser = self.args.__parser__ - - parser.exit(1, f'{parser.prog}: fatal: {error_name}{error_msg}\n') + self.parser.exit(1, f'{self.parser.prog}: fatal: {error_name}{error_msg}\n') def __call__(self, args, parser): """Execute the command.""" diff --git a/src/fate/cli/command/init.py b/src/fate/cli/command/init.py new file mode 100644 index 0000000..3b30ffc --- /dev/null +++ b/src/fate/cli/command/init.py @@ -0,0 +1,382 @@ +import abc +import enum +import os +import sys +from dataclasses import dataclass +from pathlib import Path + +import argcomplete + +from fate.util.abstract import abstractmember +from fate.util.argument import ChoiceMapping, DirAccess, FileAccess +from fate.util.compat.argument import BooleanOptionalAction +from fate.util.datastructure import StrEnum +from fate.util.format import Loader +from fate.util.term import getch +from plumbum import colors + +from .. import exit_on_error, Main + + +class StatusSymbol(StrEnum): + + complete = colors.bold & colors.success | '☑' # noqa: E221 + failed = colors.bold & colors.fatal | '☒' # noqa: E221 + incomplete = colors.bold & colors.info | '☐' # noqa: E221 + + +class EndStatus(enum.Enum): + + complete = (StatusSymbol.complete, 'installed') # noqa: E221,E241 + failed = (StatusSymbol.failed, 'failed') # noqa: E221,E241 + incomplete = (StatusSymbol.incomplete, 'skipped') # noqa: E221,E241 + + @property + def symbol(self): + return self.value[0] + + @property + def message(self): + return self.value[1] + + +class TaskSymbol(StrEnum): + + comp = colors.bold | '↹' + conf = colors.bold | '⚙' + + +@dataclass +class TaskPrompt(abc.ABC): + + identifier: str + description: str + path: Path + exists: bool = True + syncd: bool = True + + update_action = abstractmember() + + +class PathOverwrite(TaskPrompt): + + update_action = 'overwrite' + + +class PathUpdate(TaskPrompt): + + update_action = 'update' + + +class InitCommand(Main, metaclass=abc.ABCMeta): + + description = abstractmember() + path_access = abstractmember() + + def check_access(self, path): + try: + self.path_access(path) + except self.path_access.PathTypeError: + extant_type = 'directory' if isinstance(self.path_access, FileAccess) else 'file' + + self.parser.print_usage(sys.stderr) + self.parser.exit(71, f'{self.parser.prog}: fatal: inferred path is ' + f'extant {extant_type}: {path}\n' + if path.exists() else + f'{self.parser.prog}: fatal: inferred path is ' + f'inaccessible: {path}\n') + except self.path_access.PathAccessError: + self.parser.print_usage(sys.stderr) + self.parser.exit(73, f'{self.parser.prog}: fatal: inferred path is ' + f'not read-writable: {path}\n') + + @abc.abstractmethod + def execute(self): + yield from () + + @exit_on_error + def __call__(self, args): + executor = self.delegate('execute') + + prompt = next(executor) + + print(StatusSymbol.complete if prompt.syncd else StatusSymbol.incomplete, + TaskSymbol[prompt.identifier], + colors.underline & colors.dim | str(prompt.path), + sep=' ') + + lines = 1 + + if args.prompt and not prompt.syncd: + lines += 2 + + print( + '\n_ [Y|n]', + colors.warn[prompt.update_action] if prompt.exists else 'install', + f'{prompt.description}?', + end='\r', # return + ) + + while (do_install := getch().lower()) not in 'yn\r\x03\x04': + pass + + if do_install == '\r': + # set empty + do_install = 'y' + elif do_install in '\x03\x04': + # treat ^C and ^D as input of "n" + do_install = 'n' + + print(colors.underline | do_install.upper()) + else: + do_install = 'y' + + status = executor.send(do_install == 'y') + + # update status line + print( + f'\033[{lines}F', # jump to ☐ + status.symbol, # reset symbol + '\033[{}C'.format(5 + len(str(prompt.path))), # jump to end + f': {prompt.description} {status.message}', # set message + sep='', + end=('\n' * lines), # return to bottom + ) + + +@Main.register +class Init(Main): + """post-installation initializations""" + + def __init__(self, parser): + tty_detected = sys.stdin.isatty() + prompt_default = 'prompt' if tty_detected else 'no prompt' + + parser.add_argument( + '--prompt', + default=tty_detected, + action=BooleanOptionalAction, + help=f"prompt to confirm actions via TTY (default: {prompt_default})", + ) + + def __call__(self): + for (index, subcommand) in enumerate(self): + print('' if index == 0 else '\n', + colors.title | subcommand.description, + sep='', + end='\n\n') + + subcommand.call() + + +@Init.register +class Conf(InitCommand): + """install configuration files""" + + description = 'default configuration' + path_access = DirAccess('rw', parents=True) + + @dataclass + class FormatPreference: + + name: str + + @property + def suffix(self): + return '.' + self.name + + def select(self, suffix): + return suffix == self.suffix + + def __str__(self): + return self.name + + def __init__(self, parser): + formats = {loader.name: self.FormatPreference(loader.name) for loader in Loader} + parser.add_argument( + '--format', + action=ChoiceMapping, + choices=formats, + default=formats['toml'], + help="configuration format to prefer (default: %(default)s)", + ) + + parser.add_argument( + 'path', + nargs='?', + type=self.path_access, + help="force installation to directory path (default: inferred)", + ) + + def execute(self, args, parser): + if args.path: + conf_prefix = args.path + else: + conf_prefix = self.conf._prefix_.conf + + self.check_access(conf_prefix) + + update_paths = {} + + prompt = PathOverwrite('conf', self.description, path=None) + + for conf in self.conf: + builtins = {path.suffix: path for path in conf._iter_builtins_()} + + formats_builtin = sorted(builtins, key=args.format.select, reverse=True) + + try: + format_builtin = formats_builtin[0] + except IndexError: + parser.print_usage(sys.stderr) + parser.exit(70, f"{parser.prog}: fatal: no built-in for " + f"conf file '{conf.__name__}'") + + if extant := conf._get_path_(conf_prefix): + if template := builtins.get(extant.suffix): + update_paths[extant] = template + prompt.syncd = prompt.syncd and template.read_text() == extant.read_text() + else: + parser.print_usage(sys.stderr) + parser.exit(70, f'{parser.prog}: fatal: no built-in template for format ' + f'{extant.suffix[1:]} of existing conf file: {extant}') + else: + prompt.exists = prompt.syncd = False + + template = builtins[format_builtin] + target_path = conf_prefix / template.name + update_paths[target_path] = template + + pseudo_name = '{%s}' % ','.join(path.name for path in update_paths) + prompt.path = conf_prefix / pseudo_name + + confirmed = yield prompt + + if prompt.syncd: + yield EndStatus.complete + elif confirmed: + try: + conf_prefix.mkdir(parents=True, exist_ok=True) + + for (target_path, source_path) in update_paths.items(): + with target_path.open('wb') as t_fd, source_path.open('rb') as s_fd: + t_fd.writelines(s_fd) + except OSError: + yield EndStatus.failed + else: + yield EndStatus.complete + else: + yield EndStatus.incomplete + + +@Init.register +class Comp(InitCommand): + """install shell tab-completion files""" + + description = 'shell completion' + path_access = FileAccess('rw', parents=True) + + script_suffixes = ('', 'd', 's') + + class Shell(StrEnum): + + bash = 'bash' + fish = 'fish' + tcsh = 'tcsh' + + @classmethod + def get_choices(cls): + return sorted(str(member) for member in cls) + + @classmethod + def get_default(cls): + login_shell = os.getenv('SHELL') + + if not login_shell: + return None + + shell_path = Path(login_shell) + + if not shell_path.is_file(): + return None + + shell_name = shell_path.name + + return cls.__members__.get(shell_name) + + def __init__(self, parser): + shell_default = self.Shell.get_default() + parser.add_argument( + '--shell', + choices=self.Shell.get_choices(), + default=shell_default, + help="shell for which to install completion " + + ("(default: %(default)s)" if shell_default else "(required)"), + required=shell_default is None, + ) + + target = parser.add_mutually_exclusive_group() + target.add_argument( + '--system', + default=None, + dest='system_profile', + action='store_true', + help="force system-wide installation (default: inferred)", + ) + target.add_argument( + '--user', + default=None, + dest='system_profile', + action='store_false', + help="force user-only installation (default: inferred)", + ) + target.add_argument( + 'path', + nargs='?', + type=self.path_access, + help="force installation to file at path (default: inferred)", + ) + + def execute(self, args): + """install shell completion""" + # determine installation path + if args.path: + completions_path = args.path + else: + completions_path = self.conf._prefix_.completions(args.shell, args.system_profile) + + self.check_access(completions_path) + + # determine file contents + entry_points = args.__entry_points__ or [f'{self.conf._lib_}{suffix}' + for suffix in self.script_suffixes] + + contents = argcomplete.shellcode(entry_points, shell=args.shell) + + # check file status and prepare prompt + prompt = PathUpdate('comp', f'{args.shell} {self.description}', completions_path) + + try: + prompt.syncd = completions_path.read_text() == contents + except FileNotFoundError: + prompt.exists = prompt.syncd = False + else: + prompt.exists = True + + # delegate prompt to controller + confirmed = yield prompt + + # complete execution + if prompt.syncd: + yield EndStatus.complete + elif confirmed: + try: + completions_path.parent.mkdir(parents=True, + exist_ok=True) + completions_path.write_text(contents) + except OSError: + yield EndStatus.failed + else: + yield EndStatus.complete + else: + yield EndStatus.incomplete diff --git a/src/fate/cli/root.py b/src/fate/cli/root.py index a23cf2e..02f5bb1 100644 --- a/src/fate/cli/root.py +++ b/src/fate/cli/root.py @@ -6,10 +6,11 @@ from fate.cli.base import Main -def extend_parser(parser, conf=None, banner_path=None): +def extend_parser(parser, conf=None, banner_path=None, entry_points=None): parser.set_defaults( __conf__=conf, __banner_path__=banner_path, + __entry_points__=entry_points, ) diff --git a/src/fate/conf/base/conf.py b/src/fate/conf/base/conf.py index 95dc009..15abd27 100644 --- a/src/fate/conf/base/conf.py +++ b/src/fate/conf/base/conf.py @@ -68,31 +68,45 @@ def _indicator_(self): def _indicator_builtin_(self): return resources.files(self._builtin_.path) / self.__filename__ - @cachedproperty - @loads - def __path__(self): - paths = (self._indicator_.with_suffix(format_.suffix) - for format_ in self._Format) + def _iter_builtins_(self): + for format_ in self._Format: + builtin_path = self._indicator_builtin_.with_suffix(format_.suffix) + + if builtin_path.is_file(): + yield builtin_path - extant = [path for path in paths if path.exists()] + def _iter_paths_(self, prefix=None): + indicator = prefix / self.__filename__ if prefix else self._indicator_ + for format_ in self._Format: + path = indicator.with_suffix(format_.suffix) + + if path.is_file(): + yield path + + def _get_path_(self, prefix=None): try: - (path, *extra) = extant + (path, *extra) = self._iter_paths_(prefix=prefix) except ValueError: - pass - else: - if extra: - raise MultiConfError(*extant) + return None + if extra: + raise MultiConfError(path, *extra) + + return path + + @cachedproperty + @loads + def __path__(self): + if path := self._get_path_(): return path if self._builtin_.fallback: # fall back to first built-in found - for format_ in self._Format: - builtin_path = self._indicator_builtin_.with_suffix(format_.suffix) - - if builtin_path.is_file(): - return builtin_path + try: + return next(self._iter_builtins_()) + except StopIteration: + pass raise NoConfError("%s{%s}" % ( self._indicator_, diff --git a/src/fate/conf/base/conf_group.py b/src/fate/conf/base/conf_group.py index db82e30..f6126b7 100644 --- a/src/fate/conf/base/conf_group.py +++ b/src/fate/conf/base/conf_group.py @@ -80,7 +80,7 @@ def __init__(self, *specs, lib=None, **builtin_spec): @cachedproperty def _prefix_(self): - return PrefixPaths._infer(self._lib_) + return PrefixPaths.infer(self._lib_) def _iter_conf_(self, specs): for spec in (specs or self._Spec): diff --git a/src/fate/conf/path.py b/src/fate/conf/path.py index 2fa48a0..401b856 100644 --- a/src/fate/conf/path.py +++ b/src/fate/conf/path.py @@ -1,116 +1,228 @@ +"""Inference, and environment variable specification, of relevant +filesystem paths. + +""" +import enum +import functools +import operator import os -import pathlib +import re import sys -import typing +from dataclasses import dataclass +from pathlib import Path -from descriptors import classonlymethod +from descriptors import cachedproperty, classonlymethod, classproperty from fate.util.compat.path import is_relative_to -class PrefixPaths(typing.NamedTuple): - """Collection and constructors of relevant filesystem paths.""" - - # library configuration - conf: pathlib.Path +class PrefixProfile(enum.Flag): - # results directory (default) - data: pathlib.Path + # + # system: library was installed under a system (root-writable) path + # + # (if not set, library was installed under some user path.) + # + system = enum.auto() - # library (retry records) and task state - state: pathlib.Path + # + # isolated: library was installed into a python virtual environment + # for the purpose of isolation + # + # (note: this flag may not be set for some virtual environments, + # which seek only to isolate the library's requirements, but not + # the tool itself -- e.g., pipx) + # + isolated = enum.auto() - # run (lock) files - run: pathlib.Path + @classproperty + def empty(cls): + return cls(0) @classonlymethod - def _make_system(cls): - return cls( - conf=pathlib.Path('/etc/'), - data=pathlib.Path('/var/log/'), - state=pathlib.Path('/var/lib/'), - run=pathlib.Path('/run/'), - ) + def infer(cls, lib): + """Compose profile flags appropriate to the installation context. - @classonlymethod - def _make_user(cls): - home = pathlib.Path.home() - - return cls( - conf=(pathlib.Path(xdg_config) - if (xdg_config := os.getenv('XDG_CONFIG_HOME')) - else home / '.config'), - data=(pathlib.Path(xdg_data) - if (xdg_data := os.getenv('XDG_DATA_HOME')) - else home / '.local' / 'share'), - state=(pathlib.Path(xdg_state) - if (xdg_state := os.getenv('XDG_STATE_HOME')) - else home / '.local' / 'state'), - run=(pathlib.Path(xdg_runtime) - if (xdg_runtime := os.getenv('XDG_RUNTIME_DIR')) - else home / '.local' / 'run'), - ) + Inference is overridden by the process environment variable: - @classonlymethod - def _make_venv(cls): - return cls( - conf=pathlib.Path(sys.prefix), - data=pathlib.Path(sys.prefix), - state=pathlib.Path(sys.prefix), - run=pathlib.Path(sys.prefix), - ) + {LIB}_PREFIX_PROFILE={system,isolated,empty} - @classonlymethod - def _infer_paths(cls): - if sys.prefix == sys.base_prefix: + The value of this variable may be a singular profile name, or a + list of these, delimited by either "," (comma) or "|" (pipe). + + The special value `empty` may be specified to indicate the empty + flag, (*i.e.*, non-system, non-isolated, or the "user-global" + profile). + + Note: Any other non-existant profile names specified by the + environment variable are ignored. + + """ + # + # check for profile specified by environment variable + # + + environ_spec = os.getenv(f'{lib}_PREFIX_PROFILE'.upper(), '') + + # environ-given names may be delimited by comma or pipe + environ_names = re.findall(r'[^ ,|]+', environ_spec) + + # unrecognized environ-given names are simply ignored + environ_profiles = [cls[name] for name in environ_names if name in cls.__members__] + + if 'empty' in environ_names: + environ_profiles.append(cls.empty) + + if environ_profiles: + return functools.reduce(operator.or_, environ_profiles) + + # + # infer from installation + # + + # compat: Python <3.9 + user_installation = is_relative_to(Path(__file__), Path.home().parent) + + # module either installed under a user home directory + # (and so will use XDG_CONFIG_HOME, etc.) + # OR appears global + # (and so will install global) + location_profile = cls.empty if user_installation else cls.system + + if ( # using system python + sys.prefix == sys.base_prefix + + # using tox venv: treat as non-isolated + or 'pipx' in Path(sys.prefix).parts + ): + return location_profile + + # looks isolated (a venv) + # (and so will construct path from `sys.prefix`) + return location_profile | cls.isolated + + +def path(method): + """Wrap a Path-returning method such that: + + * it may be overridden by an environment variable + + * its result (if any) is given a leaf directory named for the + PrefixPath.lib attribute + + """ + @functools.wraps(method) + def wrapped(self, *args, **kwargs): + if override := os.getenv(f'{self.lib}_PREFIX_{method.__name__}'.upper()): + return Path(override).absolute() + + method_path = method(self, *args, **kwargs) + + return method_path and method_path / self.lib + + return wrapped - # compat: Python <3.9 - if is_relative_to(pathlib.Path(__file__), pathlib.Path.home()): - # module installed under a user home directory - # use XDG_CONFIG_HOME, etc. - return cls._make_user() - else: - # appears global: install global - return cls._make_system() - else: - # looks like a virtualenv - # construct path from `sys.prefix` - return cls._make_venv() + +def path_property(prop): + """Wrap a `Path`-returning method as a `path` and as a + `cachedproperty`. + + See: `path`. + + """ + return cachedproperty(path(prop)) + +path.property = path_property + + +@dataclass +class PrefixPaths: + """Path prefixes appropriate to the environment. + + Overrides to inference and defaults are retrieved from the + process environment variables: + + {LIB}_PREFIX_PROFILE={system,isolated} + + {LIB}_PREFIX_{FIELD}=path + + """ + lib: str + profile: PrefixProfile @classonlymethod - def _infer(cls, lib): - """Determine path prefixes appropriate to environment. + def infer(cls, lib): + profile = PrefixProfile.infer(lib) + return cls(lib, profile) - Overrides to inference and defaults are retrieved from the - process environment variables: + @path.property + def conf(self): + """library configuration""" + if PrefixProfile.system in self.profile: + return Path(os.sep) / 'etc' - {LIB}_PREFIX_PROFILE={system,user,venv} + if PrefixProfile.isolated in self.profile: + return Path(sys.prefix) - {LIB}_PREFIX_{FIELD}=path + if xdg_config := os.getenv('XDG_CONFIG_HOME'): + return Path(xdg_config) - """ - environ_profile = os.getenv(f'{lib}_PREFIX_PROFILE'.upper()) - - if environ_profile == 'system': - prefixes = cls._make_system() - elif environ_profile == 'user': - prefixes = cls._make_user() - elif environ_profile == 'venv': - prefixes = cls._make_venv() - else: - prefixes = cls._infer_paths() - - # add lib leaf directory to all defaults - paths = cls._make(prefix / lib for prefix in prefixes) - - environ_overrides = ( - (field, os.getenv(f'{lib}_PREFIX_{field}'.upper())) - for field in cls._fields - ) - replacements = { - field: pathlib.Path(override).absolute() - for (field, override) in environ_overrides if override - } - - return paths._replace(**replacements) if replacements else paths + return Path.home() / '.config' + + @path.property + def data(self): + """results directory (default)""" + if PrefixProfile.system in self.profile: + return Path(os.sep) / 'var' / 'log' + + if PrefixProfile.isolated in self.profile: + return Path(sys.prefix) + + if xdg_data := os.getenv('XDG_DATA_HOME'): + return Path(xdg_data) + + return Path.home() / '.local' / 'share' + + @path.property + def state(self): + """library (retry records) and task state""" + if PrefixProfile.system in self.profile: + return Path(os.sep) / 'var' / 'lib' + + if PrefixProfile.isolated in self.profile: + return Path(sys.prefix) + + if xdg_state := os.getenv('XDG_STATE_HOME'): + return Path(xdg_state) + + return Path.home() / '.local' / 'state' + + @path.property + def run(self): + """run (lock) files""" + if PrefixProfile.system in self.profile: + return Path(os.sep) / 'run' + + if PrefixProfile.isolated in self.profile: + return Path(sys.prefix) + + if xdg_runtime := os.getenv('XDG_RUNTIME_DIR'): + return Path(xdg_runtime) + + return Path.home() / '.local' / 'run' + + @path + def completions(self, shell_name, force_system=None): + """shell completion files""" + dir_name = 'bash-completion' if shell_name == 'bash' else shell_name + + if force_system or (force_system is None and PrefixProfile.system in self.profile): + return Path(os.sep) / 'usr' / 'share' / dir_name / 'completions' + + # completions must ignore user virtual env (and shouldn't matter) + + data_path = (Path(xdg_data) if (xdg_data := os.getenv('XDG_DATA_HOME')) + else Path.home() / '.local' / 'share') + + return data_path / dir_name / 'completions' diff --git a/src/fate/util/argument.py b/src/fate/util/argument.py index e8923ad..1460dc1 100644 --- a/src/fate/util/argument.py +++ b/src/fate/util/argument.py @@ -1,4 +1,9 @@ import argparse +import enum +import functools +import operator +import os +import pathlib class ChoiceMapping(argparse.Action): @@ -13,3 +18,106 @@ def __init__(self, *args, choices, **kwargs): def __call__(self, parser, namespace, value, option_string=None): setattr(namespace, self.dest, self.mapping[value]) + + +def access_parent(path): + access_target = path + + # Path.exists() calls stat() which can raise PermissionError (prematurely) + while not os.path.exists(access_target) and access_target.name != '': + access_target = access_target.parent + + return access_target + + +class PathAccess: + """Argparse type ensuring filesystem access to given path argument. + + An instance of pathlib.Path is returned. + + """ + class Access(enum.IntFlag): + + r = os.R_OK + w = os.W_OK + + @property + def mode(self): + # + # note: enum has various internal means of determining contents of + # a composite flag until proper instance iteration lands in 3.11 or + # so. + # + # rather than worrying about that, here we determine contained names + # manually. in 3.11 should be even simpler. + # + return ''.join(member.name for member in self.__class__ if member in self) + + def ok(self, path): + return os.access(path, self) + + class PathAccessError(argparse.ArgumentTypeError): + """Subclass of ArgumentTypeError raised when path permissions + do not match specified mode. + + """ + + def __init__(self, mode, parents=False): + if isinstance(mode, str): + self.access = functools.reduce(operator.or_, (self.Access[part] for part in mode)) + elif isinstance(mode, int): + self.access = self.Access(mode) + elif isinstance(mode, self.Access): + self.access = mode + else: + raise TypeError('expected access mode of type str, int or Access ' + 'not ' + mode.__class__.__name__) + + self.parents = parents + + def __call__(self, value): + path = pathlib.Path(value) + + access_target = access_parent(path) if self.parents else path + + if not self.access.ok(access_target): + raise self.PathAccessError("failed to access path with mode " + f"{self.access.mode}: {path}") + + return path + + +class PathTypeError(argparse.ArgumentTypeError): + """Subclass of ArgumentTypeError raised for path of incorrect type.""" + + +class FileAccess(PathAccess): + + PathTypeError = PathTypeError + + def __call__(self, value): + path = super().__call__(value) + + if self.parents and not path.exists(): + if not access_parent(path).is_dir(): + raise self.PathTypeError(f"path inaccessible: {path}") + elif not path.is_file(): + raise self.PathTypeError(f"path must be file: {path}") + + return path + + +class DirAccess(PathAccess): + + PathTypeError = PathTypeError + + def __call__(self, value): + path = super().__call__(value) + + if self.parents and not path.exists(): + if not access_parent(path).is_dir(): + raise self.PathTypeError(f"path inaccessible: {path}") + elif not path.is_dir(): + raise self.PathTypeError(f"path must be directory: {path}") + + return path diff --git a/src/fate/util/compat/argument.py b/src/fate/util/compat/argument.py new file mode 100644 index 0000000..605934c --- /dev/null +++ b/src/fate/util/compat/argument.py @@ -0,0 +1,60 @@ +import argparse +import sys + + +if sys.version_info < (3, 9): + # backport BooleanOptionalAction + + class BooleanOptionalAction(argparse.Action): + + def __init__(self, + option_strings, + dest, + default=None, + type=None, + choices=None, + required=False, + help=None, + metavar=None): + + _option_strings = [] + for option_string in option_strings: + _option_strings.append(option_string) + + if option_string.startswith('--'): + option_string = '--no-' + option_string[2:] + _option_strings.append(option_string) + + super().__init__( + option_strings=_option_strings, + dest=dest, + nargs=0, + default=default, + type=type, + choices=choices, + required=required, + help=help, + metavar=metavar) + + def __call__(self, parser, namespace, values, option_string=None): + if option_string in self.option_strings: + setattr(namespace, self.dest, not option_string.startswith('--no-')) + + def format_usage(self): + return ' | '.join(self.option_strings) + +elif sys.version_info < (3, 12): + # fix BooleanOptionalAction help + # + # see: https://github.com/python/cpython/issues/83137 + # + class BooleanOptionalAction(argparse.BooleanOptionalAction): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if self.help.endswith(' (default: %(default)s)'): + self.help = self.help[:-23] + +else: + BooleanOptionalAction = argparse.BooleanOptionalAction diff --git a/src/fate/util/term.py b/src/fate/util/term.py new file mode 100644 index 0000000..7d90221 --- /dev/null +++ b/src/fate/util/term.py @@ -0,0 +1,22 @@ +import sys +import tty +import termios + + +def getch(n=1, /, file=sys.stdin): + """Get n character(s) from TTY file.""" + fd = file.fileno() + old = termios.tcgetattr(fd) + + try: + tty.setraw(fd) + return file.read(n) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + +def getche(n=1, /, infile=sys.stdin, outfile=sys.stdout, end=''): + """Print back input from getch.""" + result = getch(n, infile) + print(result, end=end, file=outfile) + return result