From 56cb14065ed0bf82370817b23f4593f543367143 Mon Sep 17 00:00:00 2001 From: Jesse London Date: Wed, 15 Feb 2023 13:30:32 -0600 Subject: [PATCH] added subcommand `init conf` to write default/example conf files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …to default directory or specified directory. * includes (related) upgrade to argcmdr 1.0.0 * display `python -m fate` rather than `fate` when run that way part of #15 --- pyproject.toml | 2 +- src/fate/cli/__init__.py | 2 +- src/fate/cli/base/__init__.py | 5 +- src/fate/cli/base/common.py | 25 +- src/fate/cli/base/execution.py | 2 +- src/fate/cli/base/main.py | 10 +- src/fate/cli/command/control.py | 4 +- src/fate/cli/command/init.py | 440 +++++++++++++++++++++----------- src/fate/conf/base/conf.py | 46 ++-- src/fate/util/argument.py | 29 ++- src/fate/util/term.py | 22 ++ 11 files changed, 407 insertions(+), 180 deletions(-) create mode 100644 src/fate/util/term.py diff --git a/pyproject.toml b/pyproject.toml index 1ab7ae9..144301d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ packages = [{include = "fate", from = "src"}] [tool.poetry.dependencies] python = "^3.8" -argcmdr = "^0.14.0" +argcmdr = "^1.0.0" argcomplete = "^2.0" croniter = "^1.3.5" Jinja2 = "^3.1.2" diff --git a/src/fate/cli/__init__.py b/src/fate/cli/__init__.py index d63dd50..ec8e2bc 100644 --- a/src/fate/cli/__init__.py +++ b/src/fate/cli/__init__.py @@ -1,3 +1,3 @@ -from .base import runcmd # noqa: F401 +from .base import exit_on_error, runcmd # noqa: F401 from .root import main, Main, daemon, serve # noqa: F401 diff --git a/src/fate/cli/base/__init__.py b/src/fate/cli/base/__init__.py index c6e24f8..2576a12 100644 --- a/src/fate/cli/base/__init__.py +++ b/src/fate/cli/base/__init__.py @@ -1,2 +1,3 @@ -from .execution import runcmd # noqa: F401 -from .main import Main # noqa: F401 +from .common import exit_on_error # noqa: F401 +from .execution import runcmd # noqa: F401 +from .main import Main # noqa: F401 diff --git a/src/fate/cli/base/common.py b/src/fate/cli/base/common.py index ac886da..1c6be84 100644 --- a/src/fate/cli/base/common.py +++ b/src/fate/cli/base/common.py @@ -1,4 +1,5 @@ import enum +import functools import fate.conf @@ -32,6 +33,24 @@ def __exit__(self, exc_type, exc_value, _traceback): self.parser.exit(78, f'{self.parser.prog}: error: {exc_value}\n') +def exit_on_error(method): + """Decorator to apply context manager `ExitOnError` to instance + methods of classes of type `argcmdr.Command`. + + Note: The decorator wrapper depends upon instance attribute + `parser`. As such, affected command classes must extend the default + `__init__` (*i.e.* `super()`); or, decorated methods must be invoked + *after* command initialization (*i.e.* not as part of its `__init__`). + + """ + @functools.wraps(method) + def wrapped(self, *args, **kwargs): + with ExitOnError(self.parser): + return method(self, *args, **kwargs) + + return wrapped + + class CommandInterface: class CommandStatus(enum.Enum): @@ -59,7 +78,7 @@ def __str__(self): @property def conf(self): - if not self.__parents__: + if (root := self.root) is None: # this is the root command # retrieve and store conf here try: @@ -70,11 +89,11 @@ def conf(self): return conf # defer to root - return self.root.conf + return root.conf @property def exit_on_error(self): - return ExitOnError(self.args.__parser__) + return ExitOnError(self.parser) @staticmethod def write_result(path, contents): diff --git a/src/fate/cli/base/execution.py b/src/fate/cli/base/execution.py index a5f8fc6..986d0ff 100644 --- a/src/fate/cli/base/execution.py +++ b/src/fate/cli/base/execution.py @@ -133,7 +133,7 @@ def get_command(self, args): def prepare(self, args, parser): """Execute and report on task command execution.""" try: - command_spec = self.call(args, 'get_command') + command_spec = self.delegate('get_command') except self.local.CommandNotFound as exc: hint = ('\nhint: whitespace in program name suggests a misconfiguration' if re.search(r'\s', exc.program) else '') diff --git a/src/fate/cli/base/main.py b/src/fate/cli/base/main.py index f2d764b..5d5eb5d 100644 --- a/src/fate/cli/base/main.py +++ b/src/fate/cli/base/main.py @@ -1,3 +1,6 @@ +import os.path +import sys + import argcmdr from .common import CommandInterface @@ -7,11 +10,12 @@ class Main(CommandInterface, argcmdr.RootCommand): """manage the periodic execution of commands""" @classmethod - def base_parser(cls): - parser = super().base_parser() + def _new_parser_(cls): + parser = super()._new_parser_() # enforce program name when invoked via "python -m fate" if parser.prog == '__main__.py': - parser.prog = 'fate' + command = os.path.basename(sys.executable) + parser.prog = f'{command} -m fate' return parser diff --git a/src/fate/cli/command/control.py b/src/fate/cli/command/control.py index d35d5ef..14b4569 100644 --- a/src/fate/cli/command/control.py +++ b/src/fate/cli/command/control.py @@ -148,9 +148,7 @@ def onerror(self, exc): error_msg = error and f': {error}' error_name = exc.__class__.__name__ - parser = self.args.__parser__ - - parser.exit(1, f'{parser.prog}: fatal: {error_name}{error_msg}\n') + self.parser.exit(1, f'{self.parser.prog}: fatal: {error_name}{error_msg}\n') def __call__(self, args, parser): """Execute the command.""" diff --git a/src/fate/cli/command/init.py b/src/fate/cli/command/init.py index 7eb9227..3b30ffc 100644 --- a/src/fate/cli/command/init.py +++ b/src/fate/cli/command/init.py @@ -1,16 +1,21 @@ +import abc import enum import os import sys +from dataclasses import dataclass from pathlib import Path import argcomplete -from fate.util.argument import FileAccess, access_parent +from fate.util.abstract import abstractmember +from fate.util.argument import ChoiceMapping, DirAccess, FileAccess from fate.util.compat.argument import BooleanOptionalAction from fate.util.datastructure import StrEnum +from fate.util.format import Loader +from fate.util.term import getch from plumbum import colors -from .. import Main +from .. import exit_on_error, Main class StatusSymbol(StrEnum): @@ -38,6 +43,105 @@ def message(self): class TaskSymbol(StrEnum): comp = colors.bold | '↹' + conf = colors.bold | '⚙' + + +@dataclass +class TaskPrompt(abc.ABC): + + identifier: str + description: str + path: Path + exists: bool = True + syncd: bool = True + + update_action = abstractmember() + + +class PathOverwrite(TaskPrompt): + + update_action = 'overwrite' + + +class PathUpdate(TaskPrompt): + + update_action = 'update' + + +class InitCommand(Main, metaclass=abc.ABCMeta): + + description = abstractmember() + path_access = abstractmember() + + def check_access(self, path): + try: + self.path_access(path) + except self.path_access.PathTypeError: + extant_type = 'directory' if isinstance(self.path_access, FileAccess) else 'file' + + self.parser.print_usage(sys.stderr) + self.parser.exit(71, f'{self.parser.prog}: fatal: inferred path is ' + f'extant {extant_type}: {path}\n' + if path.exists() else + f'{self.parser.prog}: fatal: inferred path is ' + f'inaccessible: {path}\n') + except self.path_access.PathAccessError: + self.parser.print_usage(sys.stderr) + self.parser.exit(73, f'{self.parser.prog}: fatal: inferred path is ' + f'not read-writable: {path}\n') + + @abc.abstractmethod + def execute(self): + yield from () + + @exit_on_error + def __call__(self, args): + executor = self.delegate('execute') + + prompt = next(executor) + + print(StatusSymbol.complete if prompt.syncd else StatusSymbol.incomplete, + TaskSymbol[prompt.identifier], + colors.underline & colors.dim | str(prompt.path), + sep=' ') + + lines = 1 + + if args.prompt and not prompt.syncd: + lines += 2 + + print( + '\n_ [Y|n]', + colors.warn[prompt.update_action] if prompt.exists else 'install', + f'{prompt.description}?', + end='\r', # return + ) + + while (do_install := getch().lower()) not in 'yn\r\x03\x04': + pass + + if do_install == '\r': + # set empty + do_install = 'y' + elif do_install in '\x03\x04': + # treat ^C and ^D as input of "n" + do_install = 'n' + + print(colors.underline | do_install.upper()) + else: + do_install = 'y' + + status = executor.send(do_install == 'y') + + # update status line + print( + f'\033[{lines}F', # jump to ☐ + status.symbol, # reset symbol + '\033[{}C'.format(5 + len(str(prompt.path))), # jump to end + f': {prompt.description} {status.message}', # set message + sep='', + end=('\n' * lines), # return to bottom + ) @Main.register @@ -56,173 +160,223 @@ def __init__(self, parser): ) def __call__(self): - print(colors.title | 'shell completion', end='\n\n') + for (index, subcommand) in enumerate(self): + print('' if index == 0 else '\n', + colors.title | subcommand.description, + sep='', + end='\n\n') - self['comp'].delegate() + subcommand.call() - class Comp(Main): - """install shell tab-completion files""" - script_suffixes = ('', 'd', 's') +@Init.register +class Conf(InitCommand): + """install configuration files""" - class Shell(StrEnum): + description = 'default configuration' + path_access = DirAccess('rw', parents=True) - bash = 'bash' - fish = 'fish' - tcsh = 'tcsh' + @dataclass + class FormatPreference: - @classmethod - def get_choices(cls): - return sorted(str(member) for member in cls) + name: str - @classmethod - def get_default(cls): - login_shell = os.getenv('SHELL') + @property + def suffix(self): + return '.' + self.name - if not login_shell: - return None + def select(self, suffix): + return suffix == self.suffix - shell_path = Path(login_shell) + def __str__(self): + return self.name - if not shell_path.is_file(): - return None - - shell_name = shell_path.name + def __init__(self, parser): + formats = {loader.name: self.FormatPreference(loader.name) for loader in Loader} + parser.add_argument( + '--format', + action=ChoiceMapping, + choices=formats, + default=formats['toml'], + help="configuration format to prefer (default: %(default)s)", + ) - return cls.__members__.get(shell_name) + parser.add_argument( + 'path', + nargs='?', + type=self.path_access, + help="force installation to directory path (default: inferred)", + ) - def __init__(self, parser): - shell_default = self.Shell.get_default() - parser.add_argument( - '--shell', - choices=self.Shell.get_choices(), - default=shell_default, - help="shell for which to install completion " - + ("(default: %(default)s)" if shell_default else "(required)"), - required=shell_default is None, - ) + def execute(self, args, parser): + if args.path: + conf_prefix = args.path + else: + conf_prefix = self.conf._prefix_.conf - target = parser.add_mutually_exclusive_group() - target.add_argument( - '--system', - default=None, - dest='system_profile', - action='store_true', - help="force system-wide installation (default: inferred)", - ) - target.add_argument( - '--user', - default=None, - dest='system_profile', - action='store_false', - help="force user-only installation (default: inferred)", - ) - target.add_argument( - 'path', - nargs='?', - type=FileAccess('rw', parents=True), - help="force installation to file at path (default: inferred)", - ) + self.check_access(conf_prefix) - def __call__(self, args, parser): - """install shell completion""" - # determine installation path - if args.path: - completions_path = args.path - else: - completions_path = self.conf._prefix_.completions(args.shell, args.system_profile) + update_paths = {} - if completions_path.exists(): - access_target = completions_path + prompt = PathOverwrite('conf', self.description, path=None) - if access_target.is_dir(): - parser.print_usage(sys.stderr) - parser.exit(71, f'{parser.prog}: fatal: inferred path is ' - f'extant directory: {completions_path}\n') - else: - access_target = access_parent(completions_path) + for conf in self.conf: + builtins = {path.suffix: path for path in conf._iter_builtins_()} - if not access_target.is_dir(): - parser.print_usage(sys.stderr) - parser.exit(71, f'{parser.prog}: fatal: inferred path is ' - f'inaccessible: {completions_path}\n') + formats_builtin = sorted(builtins, key=args.format.select, reverse=True) - if not os.access(access_target, os.R_OK | os.W_OK): + try: + format_builtin = formats_builtin[0] + except IndexError: + parser.print_usage(sys.stderr) + parser.exit(70, f"{parser.prog}: fatal: no built-in for " + f"conf file '{conf.__name__}'") + + if extant := conf._get_path_(conf_prefix): + if template := builtins.get(extant.suffix): + update_paths[extant] = template + prompt.syncd = prompt.syncd and template.read_text() == extant.read_text() + else: parser.print_usage(sys.stderr) - parser.exit(73, f'{parser.prog}: fatal: inferred path is ' - f'not read-writable: {completions_path}\n') + parser.exit(70, f'{parser.prog}: fatal: no built-in template for format ' + f'{extant.suffix[1:]} of existing conf file: {extant}') + else: + prompt.exists = prompt.syncd = False + + template = builtins[format_builtin] + target_path = conf_prefix / template.name + update_paths[target_path] = template - # determine file contents - entry_points = args.__entry_points__ or [f'{self.conf._lib_}{suffix}' - for suffix in self.script_suffixes] + pseudo_name = '{%s}' % ','.join(path.name for path in update_paths) + prompt.path = conf_prefix / pseudo_name - contents = argcomplete.shellcode(entry_points, shell=args.shell) + confirmed = yield prompt - # check file status + if prompt.syncd: + yield EndStatus.complete + elif confirmed: try: - up_to_date = completions_path.read_text() == contents - except FileNotFoundError: - file_exists = up_to_date = False + conf_prefix.mkdir(parents=True, exist_ok=True) + + for (target_path, source_path) in update_paths.items(): + with target_path.open('wb') as t_fd, source_path.open('rb') as s_fd: + t_fd.writelines(s_fd) + except OSError: + yield EndStatus.failed else: - file_exists = True + yield EndStatus.complete + else: + yield EndStatus.incomplete - # print status line - print(StatusSymbol.complete if up_to_date else StatusSymbol.incomplete, - TaskSymbol.comp, - colors.underline & colors.dim | str(completions_path), - sep=' ') - lines = 1 +@Init.register +class Comp(InitCommand): + """install shell tab-completion files""" - if up_to_date: - status = EndStatus.complete - else: - if args.prompt: - lines += 2 - - print( - '\n_ [Y|n]', - 'update' if file_exists else 'install', - 'shell completion?', - end='\r', # return - ) - - with colors: - colors.underline() # must be reset by context manager - - try: - while (do_install := input().lower() or 'y') not in 'yn': - pass - except KeyboardInterrupt: - # treat as input of "n" - do_install = 'n' - print('\r', do_install, ~colors.underline, ' ', sep='') - else: - if do_install == 'y': - # set empty - print('\033[F', 'Y', sep='') + description = 'shell completion' + path_access = FileAccess('rw', parents=True) - else: - do_install = 'y' - - if do_install == 'y': - try: - completions_path.parent.mkdir(parents=True, - exist_ok=True) - completions_path.write_text(contents) - except OSError: - status = EndStatus.failed - else: - status = EndStatus.complete - else: - status = EndStatus.incomplete + script_suffixes = ('', 'd', 's') - # update status line - print( - f'\033[{lines}F', # jump to ☐ - status.symbol, # reset symbol - '\033[{}C'.format(5 + len(str(completions_path))), # jump to end - f': {args.shell} shell completion {status.message}', # set message - sep='', - end=('\n' * lines), # return to bottom - ) + class Shell(StrEnum): + + bash = 'bash' + fish = 'fish' + tcsh = 'tcsh' + + @classmethod + def get_choices(cls): + return sorted(str(member) for member in cls) + + @classmethod + def get_default(cls): + login_shell = os.getenv('SHELL') + + if not login_shell: + return None + + shell_path = Path(login_shell) + + if not shell_path.is_file(): + return None + + shell_name = shell_path.name + + return cls.__members__.get(shell_name) + + def __init__(self, parser): + shell_default = self.Shell.get_default() + parser.add_argument( + '--shell', + choices=self.Shell.get_choices(), + default=shell_default, + help="shell for which to install completion " + + ("(default: %(default)s)" if shell_default else "(required)"), + required=shell_default is None, + ) + + target = parser.add_mutually_exclusive_group() + target.add_argument( + '--system', + default=None, + dest='system_profile', + action='store_true', + help="force system-wide installation (default: inferred)", + ) + target.add_argument( + '--user', + default=None, + dest='system_profile', + action='store_false', + help="force user-only installation (default: inferred)", + ) + target.add_argument( + 'path', + nargs='?', + type=self.path_access, + help="force installation to file at path (default: inferred)", + ) + + def execute(self, args): + """install shell completion""" + # determine installation path + if args.path: + completions_path = args.path + else: + completions_path = self.conf._prefix_.completions(args.shell, args.system_profile) + + self.check_access(completions_path) + + # determine file contents + entry_points = args.__entry_points__ or [f'{self.conf._lib_}{suffix}' + for suffix in self.script_suffixes] + + contents = argcomplete.shellcode(entry_points, shell=args.shell) + + # check file status and prepare prompt + prompt = PathUpdate('comp', f'{args.shell} {self.description}', completions_path) + + try: + prompt.syncd = completions_path.read_text() == contents + except FileNotFoundError: + prompt.exists = prompt.syncd = False + else: + prompt.exists = True + + # delegate prompt to controller + confirmed = yield prompt + + # complete execution + if prompt.syncd: + yield EndStatus.complete + elif confirmed: + try: + completions_path.parent.mkdir(parents=True, + exist_ok=True) + completions_path.write_text(contents) + except OSError: + yield EndStatus.failed + else: + yield EndStatus.complete + else: + yield EndStatus.incomplete diff --git a/src/fate/conf/base/conf.py b/src/fate/conf/base/conf.py index 95dc009..15abd27 100644 --- a/src/fate/conf/base/conf.py +++ b/src/fate/conf/base/conf.py @@ -68,31 +68,45 @@ def _indicator_(self): def _indicator_builtin_(self): return resources.files(self._builtin_.path) / self.__filename__ - @cachedproperty - @loads - def __path__(self): - paths = (self._indicator_.with_suffix(format_.suffix) - for format_ in self._Format) + def _iter_builtins_(self): + for format_ in self._Format: + builtin_path = self._indicator_builtin_.with_suffix(format_.suffix) + + if builtin_path.is_file(): + yield builtin_path - extant = [path for path in paths if path.exists()] + def _iter_paths_(self, prefix=None): + indicator = prefix / self.__filename__ if prefix else self._indicator_ + for format_ in self._Format: + path = indicator.with_suffix(format_.suffix) + + if path.is_file(): + yield path + + def _get_path_(self, prefix=None): try: - (path, *extra) = extant + (path, *extra) = self._iter_paths_(prefix=prefix) except ValueError: - pass - else: - if extra: - raise MultiConfError(*extant) + return None + if extra: + raise MultiConfError(path, *extra) + + return path + + @cachedproperty + @loads + def __path__(self): + if path := self._get_path_(): return path if self._builtin_.fallback: # fall back to first built-in found - for format_ in self._Format: - builtin_path = self._indicator_builtin_.with_suffix(format_.suffix) - - if builtin_path.is_file(): - return builtin_path + try: + return next(self._iter_builtins_()) + except StopIteration: + pass raise NoConfError("%s{%s}" % ( self._indicator_, diff --git a/src/fate/util/argument.py b/src/fate/util/argument.py index 488ee03..1460dc1 100644 --- a/src/fate/util/argument.py +++ b/src/fate/util/argument.py @@ -23,7 +23,8 @@ def __call__(self, parser, namespace, value, option_string=None): def access_parent(path): access_target = path - while not access_target.exists() and access_target.name != '': + # Path.exists() calls stat() which can raise PermissionError (prematurely) + while not os.path.exists(access_target) and access_target.name != '': access_target = access_target.parent return access_target @@ -55,6 +56,12 @@ def mode(self): def ok(self, path): return os.access(path, self) + class PathAccessError(argparse.ArgumentTypeError): + """Subclass of ArgumentTypeError raised when path permissions + do not match specified mode. + + """ + def __init__(self, mode, parents=False): if isinstance(mode, str): self.access = functools.reduce(operator.or_, (self.Access[part] for part in mode)) @@ -74,35 +81,43 @@ def __call__(self, value): access_target = access_parent(path) if self.parents else path if not self.access.ok(access_target): - raise argparse.ArgumentTypeError("failed to access path with mode " - f"{self.access.mode}: {path}") + raise self.PathAccessError("failed to access path with mode " + f"{self.access.mode}: {path}") return path +class PathTypeError(argparse.ArgumentTypeError): + """Subclass of ArgumentTypeError raised for path of incorrect type.""" + + class FileAccess(PathAccess): + PathTypeError = PathTypeError + def __call__(self, value): path = super().__call__(value) if self.parents and not path.exists(): if not access_parent(path).is_dir(): - raise argparse.ArgumentTypeError(f"path inaccessible: {path}") + raise self.PathTypeError(f"path inaccessible: {path}") elif not path.is_file(): - raise argparse.ArgumentTypeError(f"path must be file: {path}") + raise self.PathTypeError(f"path must be file: {path}") return path class DirAccess(PathAccess): + PathTypeError = PathTypeError + def __call__(self, value): path = super().__call__(value) if self.parents and not path.exists(): if not access_parent(path).is_dir(): - raise argparse.ArgumentTypeError(f"path inaccessible: {path}") + raise self.PathTypeError(f"path inaccessible: {path}") elif not path.is_dir(): - raise argparse.ArgumentTypeError(f"path must be directory: {path}") + raise self.PathTypeError(f"path must be directory: {path}") return path diff --git a/src/fate/util/term.py b/src/fate/util/term.py new file mode 100644 index 0000000..7d90221 --- /dev/null +++ b/src/fate/util/term.py @@ -0,0 +1,22 @@ +import sys +import tty +import termios + + +def getch(n=1, /, file=sys.stdin): + """Get n character(s) from TTY file.""" + fd = file.fileno() + old = termios.tcgetattr(fd) + + try: + tty.setraw(fd) + return file.read(n) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + +def getche(n=1, /, infile=sys.stdin, outfile=sys.stdout, end=''): + """Print back input from getch.""" + result = getch(n, infile) + print(result, end=end, file=outfile) + return result