Skip to content

Commit

Permalink
added subcommand init for post-installation initialization beginnin…
Browse files Browse the repository at this point in the history
…g with: shell completion files

part of #15
  • Loading branch information
jesteria committed Feb 8, 2023
1 parent 791f8b5 commit 6b05815
Show file tree
Hide file tree
Showing 6 changed files with 422 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ packages = [{include = "fate", from = "src"}]

[tool.poetry.dependencies]
python = "^3.8"
argcmdr = "^0.13.2"
argcmdr = "^0.14.0"
argcomplete = "^2.0"
croniter = "^1.3.5"
Jinja2 = "^3.1.2"
loguru = "^0.6.0"
Expand Down
226 changes: 226 additions & 0 deletions src/fate/cli/command/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import enum
import os
import sys
from pathlib import Path

import argcomplete

from fate.util.argument import FileAccess, access_parent
from fate.util.compat.argument import BooleanOptionalAction
from fate.util.datastructure import StrEnum
from plumbum import colors

from .. import Main


class StatusSymbol(StrEnum):

complete = colors.bold & colors.success | '☑' # noqa: E221
failed = colors.bold & colors.fatal | '☒' # noqa: E221
incomplete = colors.bold & colors.info | '☐' # noqa: E221


class EndStatus(enum.Enum):

complete = (StatusSymbol.complete, 'installed') # noqa: E221,E241
failed = (StatusSymbol.failed, 'failed') # noqa: E221,E241
incomplete = (StatusSymbol.incomplete, 'skipped') # noqa: E221,E241

@property
def symbol(self):
return self.value[0]

@property
def message(self):
return self.value[1]


class TaskSymbol(StrEnum):

comp = colors.bold | '↹'


@Main.register
class Init(Main):
"""post-installation initializations"""

def __init__(self, parser):
tty_detected = sys.stdin.isatty()
prompt_default = 'prompt' if tty_detected else 'no prompt'

parser.add_argument(
'--prompt',
default=tty_detected,
action=BooleanOptionalAction,
help=f"prompt to confirm actions via TTY (default: {prompt_default})",
)

def __call__(self):
print(colors.title | 'shell completion', end='\n\n')

self['comp'].delegate()

class Comp(Main):
"""install shell tab-completion files"""

script_suffixes = ('', 'd', 's')

class Shell(StrEnum):

bash = 'bash'
fish = 'fish'
tcsh = 'tcsh'

@classmethod
def get_choices(cls):
return sorted(str(member) for member in cls)

@classmethod
def get_default(cls):
login_shell = os.getenv('SHELL')

if not login_shell:
return None

shell_path = Path(login_shell)

if not shell_path.is_file():
return None

shell_name = shell_path.name

return cls.__members__.get(shell_name)

def __init__(self, parser):
shell_default = self.Shell.get_default()
parser.add_argument(
'--shell',
choices=self.Shell.get_choices(),
default=shell_default,
help="shell for which to install completion "
+ ("(default: %(default)s)" if shell_default else "(required)"),
required=shell_default is None,
)

target = parser.add_mutually_exclusive_group()
target.add_argument(
'--system',
default=None,
dest='system_profile',
action='store_true',
help="force system-wide installation (default: inferred)",
)
target.add_argument(
'--user',
default=None,
dest='system_profile',
action='store_false',
help="force user-only installation (default: inferred)",
)
target.add_argument(
'path',
nargs='?',
type=FileAccess('rw', parents=True),
help="force installation to file at path (default: inferred)",
)

def __call__(self, args, parser):
"""install shell completion"""
# determine installation path
if args.path:
completions_path = args.path
else:
completions_path = self.conf._prefix_.completions(args.shell, args.system_profile)

if completions_path.exists():
access_target = completions_path

if access_target.is_dir():
parser.print_usage(sys.stderr)
parser.exit(71, f'{parser.prog}: fatal: inferred path is '
f'extant directory: {completions_path}\n')
else:
access_target = access_parent(completions_path)

if not access_target.is_dir():
parser.print_usage(sys.stderr)
parser.exit(71, f'{parser.prog}: fatal: inferred path is '
f'inaccessible: {completions_path}\n')

if not os.access(access_target, os.R_OK | os.W_OK):
parser.print_usage(sys.stderr)
parser.exit(73, f'{parser.prog}: fatal: inferred path is '
f'not read-writable: {completions_path}\n')

# determine file contents
entry_points = args.__entry_points__ or [f'{self.conf._lib_}{suffix}'
for suffix in self.script_suffixes]

contents = argcomplete.shellcode(entry_points, shell=args.shell)

# check file status
try:
up_to_date = completions_path.read_text() == contents
except FileNotFoundError:
file_exists = up_to_date = False
else:
file_exists = True

# print status line
print(StatusSymbol.complete if up_to_date else StatusSymbol.incomplete,
TaskSymbol.comp,
colors.underline & colors.dim | str(completions_path),
sep=' ')

lines = 1

if up_to_date:
status = EndStatus.complete
else:
if args.prompt:
lines += 2

print(
'\n_ [Y|n]',
'update' if file_exists else 'install',
'shell completion?',
end='\r', # return
)

with colors:
colors.underline() # must be reset by context manager

try:
while (do_install := input().lower() or 'y') not in 'yn':
pass
except KeyboardInterrupt:
# treat as input of "n"
do_install = 'n'
print('\r', do_install, ~colors.underline, ' ', sep='')
else:
if do_install == 'y':
# set empty
print('\033[F', 'Y', sep='')

else:
do_install = 'y'

if do_install == 'y':
try:
completions_path.write_text(contents)
except OSError:
status = EndStatus.failed
else:
status = EndStatus.complete
else:
status = EndStatus.incomplete

# update status line
print(
f'\033[{lines}F', # jump to ☐
status.symbol, # reset symbol
'\033[{}C'.format(5 + len(str(completions_path))), # jump to end
f': {args.shell} shell completion {status.message}', # set message
sep='',
end=('\n' * lines), # return to bottom
)
3 changes: 2 additions & 1 deletion src/fate/cli/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from fate.cli.base import Main


def extend_parser(parser, conf=None, banner_path=None):
def extend_parser(parser, conf=None, banner_path=None, entry_points=None):
parser.set_defaults(
__conf__=conf,
__banner_path__=banner_path,
__entry_points__=entry_points,
)


Expand Down
51 changes: 39 additions & 12 deletions src/fate/conf/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,37 @@ def infer(cls, lib):
return location_profile | cls.isolated


def path(prop):
"""Wrap a Path-returning method as a cachedproperty and such that:
def path(method):
"""Wrap a Path-returning method such that:
* it may be overridden by an environment variable
* its result (if any) is given a leaf directory named for the
PrefixPath.lib attribute
"""
@functools.wraps(prop)
def wrapped(self):
if override := os.getenv(f'{self.lib}_PREFIX_{prop.__name__}'.upper()):
@functools.wraps(method)
def wrapped(self, *args, **kwargs):
if override := os.getenv(f'{self.lib}_PREFIX_{method.__name__}'.upper()):
return Path(override).absolute()

prop_path = prop(self)
method_path = method(self, *args, **kwargs)

return prop_path and prop_path / self.lib
return method_path and method_path / self.lib

return cachedproperty(wrapped)
return wrapped


def path_property(prop):
"""Wrap a `Path`-returning method as a `path` and as a
`cachedproperty`.
See: `path`.
"""
return cachedproperty(path(prop))

path.property = path_property


@dataclass
Expand All @@ -144,7 +156,7 @@ def infer(cls, lib):
profile = PrefixProfile.infer(lib)
return cls(lib, profile)

@path
@path.property
def conf(self):
"""library configuration"""
if PrefixProfile.system in self.profile:
Expand All @@ -158,7 +170,7 @@ def conf(self):

return Path.home() / '.config'

@path
@path.property
def data(self):
"""results directory (default)"""
if PrefixProfile.system in self.profile:
Expand All @@ -172,7 +184,7 @@ def data(self):

return Path.home() / '.local' / 'share'

@path
@path.property
def state(self):
"""library (retry records) and task state"""
if PrefixProfile.system in self.profile:
Expand All @@ -186,7 +198,7 @@ def state(self):

return Path.home() / '.local' / 'state'

@path
@path.property
def run(self):
"""run (lock) files"""
if PrefixProfile.system in self.profile:
Expand All @@ -199,3 +211,18 @@ def run(self):
return Path(xdg_runtime)

return Path.home() / '.local' / 'run'

@path
def completions(self, shell_name, force_system=None):
"""shell completion files"""
dir_name = 'bash-completion' if shell_name == 'bash' else shell_name

if force_system or (force_system is None and PrefixProfile.system in self.profile):
return Path(os.sep) / 'usr' / 'share' / dir_name / 'completions'

# completions must ignore user virtual env (and shouldn't matter)

data_path = (Path(xdg_data) if (xdg_data := os.getenv('XDG_DATA_HOME'))
else Path.home() / '.local' / 'share')

return data_path / dir_name / 'completions'
Loading

0 comments on commit 6b05815

Please sign in to comment.