Skip to content

Commit

Permalink
Merge pull request #1010 from reef-technologies/zsh_autocomplete
Browse files Browse the repository at this point in the history
Zsh&fish autocomplete
  • Loading branch information
mjurbanski-reef authored Mar 29, 2024
2 parents d9d8851 + 5414c54 commit 0f81ed3
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 28 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ jobs:
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install test binary dependencies
if: startsWith(matrix.os, 'ubuntu')
run: |
sudo apt-get -y update
sudo apt-get -y install zsh fish
sudo chmod -R 755 /usr/share/zsh/vendor-completions /usr/share/zsh # Fix permissions for zsh completions
- name: Install test binary dependencies (macOS)
if: startsWith(matrix.os, 'macos')
run: |
brew install fish
- name: Install dependencies
run: python -m pip install --upgrade nox pdm
- name: Run unit tests
Expand Down
221 changes: 197 additions & 24 deletions b2/_internal/_cli/autocomplete_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,28 @@
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import abc
import io
import logging
import os
import re
import shlex
import shutil
import signal
import subprocess
import textwrap
from datetime import datetime
from importlib.util import find_spec
from pathlib import Path
from typing import List
from shlex import quote

import argcomplete
from class_registry import ClassRegistry, RegistryKeyError

from b2._internal._utils.python_compat import shlex_join

logger = logging.getLogger(__name__)

SHELL_REGISTRY = ClassRegistry()
Expand All @@ -33,7 +44,7 @@ def autocomplete_install(prog: str, shell: str = 'bash') -> None:
logger.info("Autocomplete for %s has been enabled.", prog)


class ShellAutocompleteInstaller:
class ShellAutocompleteInstaller(abc.ABC):
shell_exec: str

def __init__(self, prog: str):
Expand All @@ -43,6 +54,10 @@ def install(self) -> None:
"""Install autocomplete for the given program."""
script_path = self.create_script()
if not self.is_enabled():
logger.info(
"%s completion doesn't seem to be autoloaded from %s.", self.shell_exec,
script_path.parent
)
try:
self.force_enable(script_path)
except NotImplementedError as e:
Expand All @@ -60,10 +75,11 @@ def create_script(self) -> Path:

script_path = self.get_script_path()
logger.info("Creating autocompletion script under %s", script_path)
script_path.parent.mkdir(exist_ok=True)
script_path.parent.mkdir(exist_ok=True, parents=True, mode=0o755)
script_path.write_text(shellcode)
return script_path

@abc.abstractmethod
def force_enable(self, completion_script: Path) -> None:
"""
Enable autocomplete for the given program.
Expand All @@ -76,6 +92,7 @@ def get_shellcode(self) -> str:
"""Get autocomplete shellcode for the given program."""
return argcomplete.shellcode([self.prog], shell=self.shell_exec)

@abc.abstractmethod
def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program."""
raise NotImplementedError
Expand All @@ -84,43 +101,199 @@ def program_in_path(self) -> bool:
"""Check if the given program is in PATH."""
return _silent_success_run([self.shell_exec, '-c', self.prog])

@abc.abstractmethod
def is_enabled(self) -> bool:
"""Check if autocompletion is enabled."""
return _silent_success_run([self.shell_exec, '-i', '-c', f'complete -p {self.prog}'])
raise NotImplementedError


@SHELL_REGISTRY.register('bash')
class BashAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec = 'bash'
class BashLikeAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec: str
rc_file_path: str

def get_rc_path(self) -> Path:
return Path(self.rc_file_path).expanduser()

def force_enable(self, completion_script: Path) -> None:
"""Enable autocomplete for the given program."""
logger.info(
"Bash completion doesn't seem to be autoloaded from %s. Most likely `bash-completion` is not installed.",
completion_script.parent
)
bashrc_path = Path("~/.bashrc").expanduser()
if bashrc_path.exists():
bck_path = bashrc_path.with_suffix(f".{datetime.now():%Y-%m-%dT%H-%M-%S}.bak")
logger.warning("Backing up %s to %s", bashrc_path, bck_path)
"""Enable autocomplete for the given program, common logic."""
rc_path = self.get_rc_path()
if rc_path.exists() and rc_path.read_text().strip():
bck_path = rc_path.with_suffix(f".{datetime.now():%Y-%m-%dT%H-%M-%S}.bak")
logger.warning("Backing up %s to %s", rc_path, bck_path)
try:
shutil.copyfile(bashrc_path, bck_path)
shutil.copyfile(rc_path, bck_path)
except OSError as e:
raise AutocompleteInstallError(
f"Failed to backup {bashrc_path} under {bck_path}"
f"Failed to backup {rc_path} under {bck_path}"
) from e
logger.warning("Explicitly adding %s to %s", completion_script, bashrc_path)
logger.warning("Explicitly adding %s to %s", completion_script, rc_path)
add_or_update_shell_section(
bashrc_path, f"{self.prog} autocomplete", self.prog, f"source {completion_script}"
rc_path, f"{self.prog} autocomplete", self.prog, self.get_rc_section(completion_script)
)

def get_rc_section(self, completion_script: Path) -> str:
return f"source {quote(str(completion_script))}"

def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program."""
return Path("~/.bash_completion.d/").expanduser() / self.prog
"""Get autocomplete script path for the given program, common logic."""
script_dir = Path(f"~/.{self.shell_exec}_completion.d/").expanduser()
return script_dir / self.prog

def is_enabled(self) -> bool:
"""Check if autocompletion is enabled."""
return _silent_success_run([self.shell_exec, '-i', '-c', f'complete -p {quote(self.prog)}'])


@SHELL_REGISTRY.register('bash')
class BashAutocompleteInstaller(BashLikeAutocompleteInstaller):
shell_exec = 'bash'
rc_file_path = "~/.bashrc"


@SHELL_REGISTRY.register('zsh')
class ZshAutocompleteInstaller(BashLikeAutocompleteInstaller):
shell_exec = 'zsh'
rc_file_path = "~/.zshrc"

def get_rc_section(self, completion_script: Path) -> str:
return textwrap.dedent(
f"""\
if [[ -z "$_comps" ]] && [[ -t 0 ]]; then autoload -Uz compinit && compinit -i -D; fi
source {quote(str(completion_script))}
"""
)

def get_script_path(self) -> Path:
"""Custom get_script_path for Zsh, if the structure differs from the base implementation."""
return Path("~/.zsh/completion/").expanduser() / f"_{self.prog}"

def is_enabled(self) -> bool:
rc_path = self.get_rc_path()
if not rc_path.exists():
# if zshrc is missing `zshrc -i` may hang on creation wizard when emulating tty
rc_path.touch(mode=0o750)
_silent_success_run_with_pty(
[self.shell_exec, '-c', 'autoload -Uz compaudit; echo AUDIT; compaudit']
)

cmd = [self.shell_exec, '-i', '-c', f'[[ -v _comps[{quote(self.prog)}] ]]']
return _silent_success_run_with_tty(cmd)

def _silent_success_run(cmd: List[str]) -> bool:
return subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0

@SHELL_REGISTRY.register('fish')
class FishAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec = 'fish'
rc_file_path = "~/.config/fish/config.fish"

def force_enable(self, completion_script: Path) -> None:
raise NotImplementedError("Fish shell doesn't support manual completion enabling.")

def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program, common logic."""
complete_paths = [
Path(p) for p in shlex.split(
subprocess.run(
[self.shell_exec, '-c', 'echo $fish_complete_path'],
timeout=30,
text=True,
check=True,
capture_output=True
).stdout
)
]
user_path = Path("~/.config/fish/completions").expanduser()
if complete_paths:
target_path = user_path if user_path in complete_paths else complete_paths[0]
else:
logger.warning("$fish_complete_path is empty, falling back to %r", user_path)
target_path = user_path
return target_path / f"{self.prog}.fish"

def is_enabled(self) -> bool:
"""
Check if autocompletion is enabled.
Fish seems to lazy-load completions, hence first we trigger completion.
That alone cannot be used, since fish tends to always propose completions (e.g. suggesting similarly
named filenames).
"""
environ = os.environ.copy()
environ.setdefault("TERM", "xterm") # TERM has to be set for fish to load completions
return _silent_success_run_with_tty(
[
self.shell_exec, '-i', '-c',
f'string length -q -- (complete -C{quote(f"{self.prog} ")} >/dev/null && complete -c {quote(self.prog)})'
],
env=environ,
)


def _silent_success_run_with_tty(
cmd: list[str], timeout: int = 30, env: dict | None = None
) -> bool:
emulate_tty = not os.isatty(0) # is True under GHA or pytest-xdist
if emulate_tty and not find_spec('pexpect'):
emulate_tty = False
logger.warning(
"pexpect is needed to check autocomplete installation correctness without tty. "
"You can install it via `pip install pexpect`."
)
run_func = _silent_success_run_with_pty if emulate_tty else _silent_success_run
return run_func(cmd, timeout=timeout, env=env)


def _silent_success_run(cmd: list[str], timeout: int = 30, env: dict | None = None) -> bool:
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL,
start_new_session=True, # prevents `zsh -i` messing with parent tty under pytest-xdist
env=env,
)

try:
stdout, stderr = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
stdout, stderr = p.communicate(timeout=1)
logger.warning("Command %r timed out, stdout: %r, stderr: %r", cmd, stdout, stderr)
else:
logger.log(
logging.DEBUG if p.returncode == 0 else logging.WARNING,
"Command %r exited with code %r, stdout: %r, stderr: %r", cmd, p.returncode, stdout,
stderr
)
return p.returncode == 0


def _silent_success_run_with_pty(
cmd: list[str], timeout: int = 30, env: dict | None = None
) -> bool:
"""
Run a command with emulated terminal and return whether it succeeded.
"""
import pexpect

command_str = shlex_join(cmd)

child = pexpect.spawn(command_str, timeout=timeout, env=env)
output = io.BytesIO()
try:
child.logfile_read = output
child.expect(pexpect.EOF)
except pexpect.TIMEOUT:
logger.warning("Command %r timed out, output: %r", cmd, output.getvalue())
child.kill(signal.SIGKILL)
return False
finally:
child.close()

logger.log(
logging.DEBUG if child.exitstatus == 0 else logging.WARNING,
"Command %r exited with code %r, output: %r", cmd, child.exitstatus, output.getvalue()
)
return child.exitstatus == 0


def add_or_update_shell_section(
Expand Down
5 changes: 5 additions & 0 deletions b2/_internal/_utils/python_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Utilities for compatibility with older Python versions.
"""
import functools
import shlex
import sys

if sys.version_info < (3, 9):
Expand Down Expand Up @@ -45,5 +46,9 @@ def method_wrapper(arg, *args, **kwargs):

method_wrapper.register = self.register
return method_wrapper

def shlex_join(split_command):
return ' '.join(shlex.quote(arg) for arg in split_command)
else:
singledispatchmethod = functools.singledispatchmethod
shlex_join = shlex.join
1 change: 1 addition & 0 deletions changelog.d/+verbose_ci.infrastructure.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increase verbosity when running tests under CI.
1 change: 1 addition & 0 deletions changelog.d/+zsh_autocomplete.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add autocomplete support for `zsh` and `fish` shells.
13 changes: 11 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def _detect_python_nox_id() -> str:
'test',
]

PYTEST_GLOBAL_ARGS = []
if CI:
PYTEST_GLOBAL_ARGS.append("-vv")


def pdm_install(
session: nox.Session, *groups: str, dev: bool = True, editable: bool = False
Expand Down Expand Up @@ -159,7 +163,7 @@ def lint(session):
# *PY_PATHS,
# )

session.run('pytest', 'test/static')
session.run('pytest', 'test/static', *PYTEST_GLOBAL_ARGS)
session.run('liccheck', '-s', 'pyproject.toml')
session.run('pdm', 'lock', '--check', external=True)

Expand All @@ -177,6 +181,7 @@ def unit(session):
'--cov-branch',
'--cov-report=xml',
'--doctest-modules',
*PYTEST_GLOBAL_ARGS,
*session.posargs,
'test/unit',
]
Expand Down Expand Up @@ -205,6 +210,7 @@ def run_integration_test(session, pytest_posargs):
'INFO',
'-W',
'ignore::DeprecationWarning:rst2ansi.visitor:',
*PYTEST_GLOBAL_ARGS,
*pytest_posargs,
]

Expand Down Expand Up @@ -248,7 +254,10 @@ def test(session):
def cleanup_buckets(session):
"""Remove buckets from previous test runs."""
pdm_install(session, 'test')
session.run('pytest', '-s', '-x', *session.posargs, 'test/integration/cleanup_buckets.py')
session.run(
'pytest', '-s', '-x', *PYTEST_GLOBAL_ARGS, *session.posargs,
'test/integration/cleanup_buckets.py'
)


@nox.session
Expand Down
Loading

0 comments on commit 0f81ed3

Please sign in to comment.