Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use f-strings #686

Merged
merged 2 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 37 additions & 58 deletions sh.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import struct
import sys
import termios
import textwrap
import threading
import time
import traceback
Expand All @@ -71,10 +72,9 @@

if "windows" in platform.system().lower(): # pragma: no cover
raise ImportError(
"sh %s is currently only supported on linux and osx. \
f"sh {__version__} is currently only supported on linux and osx. \
please install pbs 0.110 (http://pypi.python.org/pypi/pbs) for windows \
support."
% __version__
)

TEE_STDOUT = {True, "out", 1}
Expand Down Expand Up @@ -246,24 +246,15 @@ def poll(self, timeout):
Poller = PollPoller


def _indent_text(text, num=4):
lines = []
for line in text.split("\n"):
line = (" " * num) + line
lines.append(line)
return "\n".join(lines)


class ForkException(Exception):
def __init__(self, orig_exc):
tmpl = """
msg = f"""

Original exception:
===================

%s
{textwrap.indent(orig_exc, " ")}
"""
msg = tmpl % _indent_text(orig_exc)
Exception.__init__(self, msg)


Expand Down Expand Up @@ -321,25 +312,19 @@ def __init__(self, full_cmd, stdout, stderr, truncate=True):
exc_stdout = exc_stdout[: self.truncate_cap]
out_delta = len(self.stdout) - len(exc_stdout)
if out_delta:
exc_stdout += (
"... (%d more, please see e.stdout)" % out_delta
).encode()
exc_stdout += (f"... ({out_delta} more, please see e.stdout)").encode()

exc_stderr = self.stderr
if truncate:
exc_stderr = exc_stderr[: self.truncate_cap]
err_delta = len(self.stderr) - len(exc_stderr)
if err_delta:
exc_stderr += (
"... (%d more, please see e.stderr)" % err_delta
).encode()
exc_stderr += (f"... ({err_delta} more, please see e.stderr)").encode()

msg_tmpl = str("\n\n RAN: {cmd}\n\n STDOUT:\n{stdout}\n\n STDERR:\n{stderr}")

msg = msg_tmpl.format(
cmd=self.full_cmd,
stdout=exc_stdout.decode(DEFAULT_ENCODING, "replace"),
stderr=exc_stderr.decode(DEFAULT_ENCODING, "replace"),
msg = (
f"\n\n RAN: {self.full_cmd}"
f"\n\n STDOUT:\n{exc_stdout.decode(DEFAULT_ENCODING, 'replace')}"
f"\n\n STDERR:\n{exc_stderr.decode(DEFAULT_ENCODING, 'replace')}"
)

super(ErrorReturnCode, self).__init__(msg)
Expand Down Expand Up @@ -435,11 +420,10 @@ def get_rc_exc(rc):
pass

if rc >= 0:
name = "ErrorReturnCode_%d" % rc
name = f"ErrorReturnCode_{rc}"
base = ErrorReturnCode
else:
signame = SIGNAL_MAPPING[abs(rc)]
name = "SignalException_" + signame
name = f"SignalException_{SIGNAL_MAPPING[abs(rc)]}"
base = SignalException

exc = ErrorReturnCodeMeta(name, (base,), {"exit_code": rc})
Expand Down Expand Up @@ -569,12 +553,12 @@ class Logger(object):

def __init__(self, name, context=None):
self.name = name
self.log = logging.getLogger("%s.%s" % (SH_LOGGER_NAME, name))
self.log = logging.getLogger(f"{SH_LOGGER_NAME}.{name}")
self.context = self.sanitize_context(context)

def _format_msg(self, msg, *a):
if self.context:
msg = "%s: %s" % (self.context, msg)
msg = f"{self.context}: {msg}"
return msg % a

@staticmethod
Expand Down Expand Up @@ -603,9 +587,9 @@ def exception(self, msg, *a):

def default_logger_str(cmd, call_args, pid=None):
if pid:
s = "<Command %r, pid %d>" % (cmd, pid)
s = f"<Command {cmd!r}, pid {pid}>"
else:
s = "<Command %r>" % cmd
s = f"<Command {cmd!r}>"
return s


Expand Down Expand Up @@ -1089,8 +1073,8 @@ def tty_in_validator(passed_kwargs, merged_kwargs):
for tty_type, std in pairs:
if tty_type in passed_kwargs and ob_is_tty(passed_kwargs.get(std, None)):
error = (
"`_%s` is a TTY already, so so it doesn't make sense to set up a"
" TTY with `_%s`" % (std, tty_type)
f"`_{std}` is a TTY already, so so it doesn't make sense to set up a"
f" TTY with `_{tty_type}`"
)
invalid.append(((tty_type, std), error))

Expand Down Expand Up @@ -1163,16 +1147,14 @@ def env_validator(passed_kwargs, merged_kwargs):
return invalid

if not isinstance(env, Mapping):
invalid.append(("env", "env must be dict-like. Got {!r}".format(env)))
invalid.append(("env", f"env must be dict-like. Got {env!r}"))
return invalid

for k, v in passed_kwargs["env"].items():
if not isinstance(k, str):
invalid.append(("env", "env key {!r} must be a str".format(k)))
invalid.append(("env", f"env key {k!r} must be a str"))
if not isinstance(v, str):
invalid.append(
("env", "value {!r} of env key {!r} must be a str".format(v, k))
)
invalid.append(("env", f"value {v!r} of env key {k!r} must be a str"))

return invalid

Expand Down Expand Up @@ -1378,9 +1360,9 @@ def _extract_call_args(cls, kwargs):
if invalid_kwargs:
exc_msg = []
for kwarg, error_msg in invalid_kwargs:
exc_msg.append(" %r: %s" % (kwarg, error_msg))
exc_msg.append(f" {kwarg!r}: {error_msg}")
exc_msg = "\n".join(exc_msg)
raise TypeError("Invalid special arguments:\n\n%s\n" % exc_msg)
raise TypeError(f"Invalid special arguments:\n\n{exc_msg}\n")

return call_args, kwargs

Expand Down Expand Up @@ -1416,7 +1398,7 @@ def __eq__(self, other):
return str(self) == str(other)

def __repr__(self):
return "<Command %r>" % str(self)
return f"<Command {str(self)!r}>"

def __enter__(self):
self(_with=True)
Expand Down Expand Up @@ -2041,7 +2023,7 @@ def __init__(

sid = os.getsid(0)
pgid = os.getpgid(0)
payload = ("%d,%d" % (sid, pgid)).encode(DEFAULT_ENCODING)
payload = (f"{sid},{pgid}").encode(DEFAULT_ENCODING)
os.write(session_pipe_write, payload)

if ca["tty_out"] and not stdout_is_fd_based and not single_tty:
Expand Down Expand Up @@ -2143,7 +2125,7 @@ def __init__(

except Exception as e:
# dump to stderr if we cannot save it to exc_pipe_write
sys.stderr.write("\nFATAL SH ERROR: %s\n" % e)
sys.stderr.write(f"\nFATAL SH ERROR: {e}\n")

finally:
os._exit(255)
Expand Down Expand Up @@ -2351,7 +2333,7 @@ def fn(exit_code):

self._quit_threads = threading.Event()

thread_name = "background thread for pid %d" % self.pid
thread_name = f"background thread for pid {self.pid}"
self._bg_thread_exc_queue = Queue(1)
self._background_thread = _start_daemon_thread(
background_thread,
Expand All @@ -2370,7 +2352,7 @@ def fn(exit_code):
self._input_thread_exc_queue = Queue(1)
if self._stdin_stream:
close_before_term = not needs_ctty
thread_name = "STDIN thread for pid %d" % self.pid
thread_name = f"STDIN thread for pid {self.pid}"
self._input_thread = _start_daemon_thread(
input_thread,
thread_name,
Expand Down Expand Up @@ -2407,7 +2389,7 @@ def output_complete():
loop.call_soon_threadsafe(self.command.aio_output_complete.set)

self._output_thread_exc_queue = Queue(1)
thread_name = "STDOUT/ERR thread for pid %d" % self.pid
thread_name = f"STDOUT/ERR thread for pid {self.pid}"
self._output_thread = _start_daemon_thread(
output_thread,
thread_name,
Expand All @@ -2423,7 +2405,7 @@ def output_complete():
)

def __repr__(self):
return "<Process %d %r>" % (self.pid, self.cmd[:500])
return f"<Process {self.pid} {self.cmd[:500]!r}>"

def change_in_bufsize(self, buf):
self._stdin_stream.stream_bufferer.change_buffering(buf)
Expand Down Expand Up @@ -3335,26 +3317,24 @@ def _args(**kwargs):
"""allows us to temporarily override all the special keyword parameters in
a with context"""

kwargs_str = ",".join(["%s=%r" % (k, v) for k, v in kwargs.items()])
kwargs_str = ",".join([f"{k}={v!r}" for k, v in kwargs.items()])

raise DeprecationWarning(
"""
f"""

sh.args() has been deprecated because it was never thread safe. use the
following instead:

sh2 = sh({kwargs})
sh2 = sh({kwargs_str})
sh2.your_command()

or

sh2 = sh({kwargs})
sh2 = sh({kwargs_str})
from sh2 import your_command
your_command()

""".format(
kwargs=kwargs_str
)
"""
)


Expand Down Expand Up @@ -3499,7 +3479,7 @@ def sudo(orig): # pragma: no cover
"""a nicer version of sudo that uses getpass to ask for a password, or
allows the first argument to be a string password"""

prompt = "[sudo] password for %s: " % getpass.getuser()
prompt = f"[sudo] password for {getpass.getuser()}: "

def stdin():
pw = getpass.getpass(prompt=prompt) + "\n"
Expand Down Expand Up @@ -3614,9 +3594,8 @@ def login_success(content):


def run_repl(env): # pragma: no cover
banner = "\n>> sh v{version}\n>> https://github.com/amoffat/sh\n"
print(f"\n>> sh v{__version__}\n>> https://github.com/amoffat/sh\n")

print(banner.format(version=__version__))
while True:
try:
line = input("sh> ")
Expand Down
20 changes: 8 additions & 12 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def requires_progs(*progs):

friendly_missing = ", ".join(missing)
return unittest.skipUnless(
len(missing) == 0, "Missing required system programs: %s" % friendly_missing
len(missing) == 0, f"Missing required system programs: {friendly_missing}"
)


Expand All @@ -115,7 +115,7 @@ def requires_poller(poller):
use_select = bool(int(os.environ.get("SH_TESTS_USE_SELECT", "0")))
cur_poller = "select" if use_select else "poll"
return unittest.skipUnless(
cur_poller == poller, "Only enabled for select.%s" % cur_poller
cur_poller == poller, f"Only enabled for select.{cur_poller}"
)


Expand Down Expand Up @@ -2112,17 +2112,15 @@ def test_cwd(self):
def test_cwd_fg(self):
td = realpath(tempfile.mkdtemp())
py = create_tmp_test(
"""
f"""
import sh
import os
from os.path import realpath
orig = realpath(os.getcwd())
print(orig)
sh.pwd(_cwd="{newdir}", _fg=True)
sh.pwd(_cwd="{td}", _fg=True)
print(realpath(os.getcwd()))
""".format(
newdir=td
)
"""
)

orig, newdir, restored = python(py.name).strip().split("\n")
Expand Down Expand Up @@ -3231,12 +3229,10 @@ def test_unicode_path(self):

python_name = os.path.basename(sys.executable)
py = create_tmp_test(
"""#!/usr/bin/env {0}
f"""#!/usr/bin/env {python_name}
# -*- coding: utf8 -*-
print("字")
""".format(
python_name
),
""",
prefix="字",
delete=False,
)
Expand Down Expand Up @@ -3272,7 +3268,7 @@ def test_signal_exception_aliases(self):

import sh

sig_name = "SignalException_%d" % signal.SIGQUIT
sig_name = f"SignalException_{signal.SIGQUIT}"
sig = getattr(sh, sig_name)
from sh import SignalException_SIGQUIT

Expand Down