diff --git a/pyprconf/.gitignore b/pyprconf/.gitignore new file mode 100644 index 0000000..0e5fe9b --- /dev/null +++ b/pyprconf/.gitignore @@ -0,0 +1,4 @@ +**/__pycache__/ +build/ +*.egg-info/ +*.env/ \ No newline at end of file diff --git a/pyprconf/README.md b/pyprconf/README.md new file mode 100644 index 0000000..464470b --- /dev/null +++ b/pyprconf/README.md @@ -0,0 +1,20 @@ +# pyprconf + +**templated configuration and event handling for Hyprland** + +pyprconf is a front-end for Hyprland socket.sock and socket2.sock. +It is configured using a yaml file, and is intended to be exec'd in your +`hyprland.conf`. + +## Installation + +`python -m pip install .` + +Modules will be installed to appropiate `site-packages` location, and a +command-line tool `pyprconf` in $HOME/.local/bin`. + +### `hyprland.conf` + +``` +exec-once $HOME/.local/bin/pyprconf /path/to/conf.yaml +``` \ No newline at end of file diff --git a/pyprconf/doc.yaml b/pyprconf/doc.yaml new file mode 100644 index 0000000..3ef2528 --- /dev/null +++ b/pyprconf/doc.yaml @@ -0,0 +1,59 @@ +# this file documents the configuration keys for pyprconf +# and will only result in sadness if used as-is +pyprconf: + bindreload: SUPER+CTRL,C + bindunload: SUPER+ALT+CTRL,C + +# a section named 'sample' +sample: + # tokens are available for substitution + # in most other keys. a special token {@} + # is always available and refers to the + # name of section. + tokens: + foo: bar + fiz: buz + # repeats the section for each element in + # the range. makes the token {#} available + # for use in the section. a 3-element array + # using python range syntax [start, stop (exclusive), step] + range: [1,10,1] + # binds are set each time the configuration + # is loaded, and unbound when the configuration + # is unloaded. an object where keys are MOD,Key + # and values are the dispatcher,args + bind: + SUPER,H: togglefloating, + SUPERSHIFT,H: pass, + SUPERALT,H: pass, + # keywords at this level are set each + # time the configuration is loaded. + # an array of strings + keyword: + - windowrule float,^({foo})$ + - windowrulev2 pin,title:^()$ + - general:col.active_border 0xff000000 + # an array of event objects + events: + - event: createworkpace # (required) the event to handle + match: ^({@})$ # (required) a regex to match the event data + keyword: # (optional) keywords set when the event matches + - decorations:dim-active 1 + - general:col.active_border 0xffffffff + dispatch: # (optional) dispatchers used when the event matches + - exec {foo} + - workspace {@} + # gates are used for making additional + # conditions on event handlers. + # in most uses one event handler + # will set a gate, and a different + # handler will check the gate and + # reset it. gate names are global + # and can be used across sections. + gate: + # set (True) a gate named 'mylatch' + set: mylatch + # reset (False) a gate named 'mylatch' + reset: mylatch + # only process the event handler if 'mylatch' is set + check: mylatch \ No newline at end of file diff --git a/pyprconf/pyprconf/__init__.py b/pyprconf/pyprconf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyprconf/pyprconf/configuration.py b/pyprconf/pyprconf/configuration.py new file mode 100644 index 0000000..6b007f0 --- /dev/null +++ b/pyprconf/pyprconf/configuration.py @@ -0,0 +1,178 @@ +"""pyrprconf configuration module""" + +from collections import defaultdict +from dataclasses import dataclass +import os +import re +from typing import Any, Callable, Dict, List, Tuple, Union + +COMMAND_TYPES = ["keyword", "dispatch"] + + +@dataclass +class EventHandler: + send: Union[str, None] = None + set: Union[str, None] = None + reset: Union[str, None] = None + check: Union[str, None] = None + + +HandlerDict = Dict[str, Dict[re.Pattern, List[EventHandler]]] +Loaders = List[str] +Unloaders = Loaders +Logger = Callable[[object], None] + + +@dataclass +class Configuration: + unloaders: Union[Unloaders, None] + handlers: Union[HandlerDict, None] + + +def batch(batch: List[str]) -> str: + if batch: + return f"[[BATCH]] {str.join(';', batch)}" + return None + + +def safe_format_map(format: str, map: Dict[str, Any]) -> str: + """like str.format_map but ignores indexed positional format specifiers""" + safe_format = re.sub(r"{(\d+)}", lambda m: f"**]{m.group(1)}**]", format) + formatted = safe_format.format_map(map) + return re.sub(r"\*\*\](\d+)\*\*\]", lambda m: f"{{{m.group(1)}}}", formatted) + + +def _register_tokens(tokens: Dict[str, str], section: Dict[str, Any]) -> Dict[str, str]: + if "tokens" in section: + tokens.update(section["tokens"]) + + +def _resolve_commands( + command: str, commands: Dict[str, Any], tokens: Dict[str, str] +) -> List[str]: + return list( + safe_format_map(f"/{command} {keyword}", tokens) for keyword in commands + ) + + +def _resolve_binds( + binds: Dict[str, str], tokens: Dict[str, str] +) -> Tuple[List[str], List[str]]: + resolved = { + key.format_map(tokens): cmd.format_map(tokens) for (key, cmd) in binds.items() + } + return list(f"/keyword bind {key},{cmd}" for key, cmd in resolved.items()), list( + f"/keyword unbind {key}" for key in resolved + ) + + +def _register_handlers( + log: Logger, + handlers: HandlerDict, + events: List[Dict[str, Any]], + tokens: Dict[str, str], +): + for event in events: + try: + commands = batch( + [ + resolved + for command_type in COMMAND_TYPES + if command_type in event + for resolved in _resolve_commands( + command_type, event[command_type], tokens + ) + ] + ) + + set = None + reset = None + check = None + + gate = event.get("gate", None) + if gate: + set = gate.get("set", None) + reset = gate.get("reset", None) + check = gate.get("check", None) + + pattern = re.compile(event["match"].format_map(tokens)) + + handlers[event["event"]][pattern].append( + EventHandler(send=commands, set=set, reset=reset, check=check) + ) + + except KeyError as err: + log(f"Event in section '{tokens['@']}' missing required key: {err.args[0]}") + continue + + +def _parse_pyprconf(config: Dict[str, Any], pyrpconf: Dict) -> Tuple[Dict, List[str]]: + extensions = [name for name in config if name.startswith("x-")] + for name in extensions: + if name.startswith("x-"): + config.pop(name) + + specialbinds = [] + + bindreload = pyrpconf.get("bindreload", None) + if bindreload: + cmd = f"/keyword bind {bindreload},exec,kill -USR1 {os.getpid()}" + specialbinds.append(cmd) + + bindunload = pyrpconf.get("bindunload", None) + if bindunload: + cmd = f"/keyword bind {bindunload},exec,kill -USR2 {os.getpid()}" + specialbinds.append(cmd) + + return config, specialbinds + + +def _parse_range(log: Logger, rangedef: Union[Dict[str, Any], None]) -> List[int]: + if rangedef: + if len(rangedef) < 3: + log(f"range must be defined as: [start, stop, step]") + return [0] + + return range(rangedef[0], rangedef[1], rangedef[2]) + + return [0] + + +def parse(configdict: Dict[str, Any], log: Logger) -> Tuple[Loaders, Configuration]: + loaders = list() + unloaders = list() + handlers = defaultdict(lambda: defaultdict(list)) + + pyrpconf: Union[Dict, None] = configdict.pop("pyprconf", None) + if pyrpconf: + configdict, specialbinds = _parse_pyprconf(configdict, pyrpconf) + if specialbinds: + loaders.append(specialbinds) + + for name, section in configdict.items(): + log(f"processing section: {name}") + + rangedef = section.get("range", None) + range_tokens = _parse_range(log, rangedef) + + for range_token in range_tokens: + tokens = {"@": name, "#": range_token} + _register_tokens(tokens, section) + + for command_type in COMMAND_TYPES: + if command_type in section: + loaders.append( + _resolve_commands(command_type, section[command_type], tokens) + ) + + if "bind" in section: + binds, unbinds = _resolve_binds(section["bind"], tokens) + loaders.append(binds) + unloaders.append(unbinds) + if "events" in section: + _register_handlers(log, handlers, section["events"], tokens) + + handlers.default_factory = None + return [batch(loader) for loader in loaders], Configuration( + [batch(unloader) for unloader in unloaders], handlers + ) diff --git a/pyprconf/pyprconf/pyprconf.py b/pyprconf/pyprconf/pyprconf.py new file mode 100755 index 0000000..7569a47 --- /dev/null +++ b/pyprconf/pyprconf/pyprconf.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 + +# pyprconf - dynamic, templated configuration +# and event handling for Hyprland +# +# Usage: +# pyrpconf /path/to/conf1.yaml /path/to/conf2.yaml +# +# Add the command to hyprland.conf using exec-once (probably toward the bottom). +# +# The configuration file must be in yaml, but support for other formats would +# be pretty easy to add, as long as they parse to a dict of the same shape. + +import os +import signal +import socket +import sys +from typing import Callable, Dict, List, Union + +import yaml + +import pyprconf.configuration as configuration +from pyprconf.configuration import Configuration, EventHandler + +SOCK1_ADDR = "/tmp/hypr/" + os.environ["HYPRLAND_INSTANCE_SIGNATURE"] + "/.socket.sock" + +SOCK2_ADDR = "/tmp/hypr/" + os.environ["HYPRLAND_INSTANCE_SIGNATURE"] + "/.socket2.sock" + + +files: Union[List[str], None] = None +config: Configuration = None +gates: Dict[str, bool] = dict() +enabled = False + + +def log(object: object): + print(object, file=sys.stderr) + + +def update_gate(name: str, value: bool): + gates[name] = value + + +def check_gate(name: str): + if name in gates: + return gates[name] + + return False + + +def send(msg: bytes): + log(f"=> {msg}") + if not msg: + return + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(SOCK1_ADDR) + except socket.error as err: + log(f"connect error: {err} ") + return + + try: + sock.sendall(msg) + data = sock.recv(2048).decode() + log(f"<= {data}") + return data + except socket.error as err: + log(f"send error: {err}") + finally: + sock.close() + + +def configure(): + global config, enabled + log(f"loading configuration: {files}") + + yamldoc = dict() + for file in files: + with open(file, mode="rb") as f: + yamldoc.update(yaml.load(f, Loader=yaml.CSafeLoader)) + + loaders, config = configuration.parse(yamldoc, log=log) + for loader in loaders: + send(str(loader).encode()) + enabled = True + + +def reload(*_): + unload() + configure() + + +def unload(*_): + global enabled + log(f"unloading configuration") + enabled = False + if config: + for unloader in config.unloaders: + send(str(unloader).encode()) + + +def process_handler(send: Callable[[bytes], str], handler: EventHandler, data: str): + if handler.check: + if not check_gate(handler.check): + log(f"gate check failed: {handler.check}") + return + else: + log(f"gate check passed: {handler.check}") + if handler.set: + update_gate(handler.set, True) + log(f"set gate: {handler.set}") + if handler.reset: + update_gate(handler.reset, False) + log(f"reset gate: {handler.reset}") + if handler.send: + datalist = data.split(",") + send(str(handler.send).format(*datalist).encode()) + + +def main(): + global files + files = sys.argv[1:] + configure() + signal.signal(signal.SIGUSR1, reload) + signal.signal(signal.SIGUSR2, unload) + signal.signal(signal.SIGTERM, unload) + + try: + sock2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock2.connect(SOCK2_ADDR) + sock2file = sock2.makefile() + + while enabled: + if not config.handlers: + log("no handlers registered, waiting for config reload...") + signal.pause() + continue + + line = sock2file.readline() + if not enabled: + continue + name, data = line.rstrip().rsplit(">>") + if name in config.handlers: + for pattern, eventhandlers in config.handlers[name].items(): + if pattern.fullmatch(data): + log(f"matched event <{name}> {pattern.pattern}: {data}") + for eventhandler in eventhandlers: + try: + process_handler(send, eventhandler, data) + except Exception as err: + log(f"error in event handler: {err}") + + except KeyboardInterrupt: + log("stopped with keyboard interrupt") + unload() + finally: + sock2.close() + + +if __name__ == "__main__": + main() diff --git a/pyprconf/pyprconf/section.py b/pyprconf/pyprconf/section.py new file mode 100644 index 0000000..64cd2b8 --- /dev/null +++ b/pyprconf/pyprconf/section.py @@ -0,0 +1,26 @@ +from dataclasses import dataclass +from typing import Dict, List, Tuple, Union + + +@dataclass +class ConfigurationSection: + @dataclass + class Event: + @dataclass + class Gate: + set: str + reset: str + check: str + + event: str + match: str + gate: Union[Gate, None] + keyword: Union[List[str], None] + dispatch: Union[List[str], None] + + tokens: Union[List[str], None] + range: Union[Tuple[int, int, int], None] + bind: Union[Dict[str, str], None] + keyword: Union[List[str], None] + dispatch: Union[List[str], None] + events: Union[List[Event], None] diff --git a/pyprconf/pyproject.toml b/pyprconf/pyproject.toml new file mode 100644 index 0000000..39da0ef --- /dev/null +++ b/pyprconf/pyproject.toml @@ -0,0 +1,20 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "pyprconf" +version = "0.0.2" +description = "templated configuration and event handling for Hyprland" +readme = "README.md" +requires-python = ">=3.7" +dependencies = [ + "pyyaml" +] + +[project.scripts] +pyprconf = "pyprconf.pyprconf:main" +resig = "resig.resig:main" + +[tool.setuptools] +packages = ["pyprconf", "resig"] \ No newline at end of file diff --git a/pyprconf/resig/__init__.py b/pyprconf/resig/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyprconf/resig/resig.py b/pyprconf/resig/resig.py new file mode 100644 index 0000000..78197d6 --- /dev/null +++ b/pyprconf/resig/resig.py @@ -0,0 +1,90 @@ +import getopt +import hashlib +import os +import shutil +import signal +import sys + + +def usage(): + print( + """ +resig - start a program once, then signal if called again +Usage: + resig -h|--help + resig -r|--reset + resets the program database + resig -s signal|--signal=signal program args... + starts program with args, and adds it to the + program database. when called again with the same + program and args, program will be signaled with + signal instead of being started again. +""" + ) + + +def get_data_dir() -> str: + runtime_dir = os.environ.get("XDG_RUNTIME_DIR") or "/tmp" + data_dir = os.path.join(runtime_dir, "resig") + os.makedirs(data_dir, exist_ok=True) + return data_dir + + +def reset(): + try: + shutil.rmtree(get_data_dir()) + return 0 + except shutil.Error as err: + print(f"error resetting program database: {err}") + return 3 + + +def main(): + try: + opts, args = getopt.getopt(sys.argv[1:], "hrs:", ["help", "reset", "signal="]) + except getopt.GetoptError as err: + print(err, file=sys.stderr) + sys.exit(2) + + sig = signal.Signals["SIGUSR1"] + + for opt, arg in opts: + if opt in ("-h", "--help"): + usage() + sys.exit() + elif opt in ("-r", "--reset"): + sys.exit(reset()) + elif opt in ("-s", "--signal"): + sig = signal.Signals[arg] + else: + assert False, "unknown option" + + data_dir = get_data_dir() + key = hashlib.md5(str(args).encode()).hexdigest() + key_path = os.path.join(data_dir, key) + + print(f"program: {args}", file=sys.stderr) + print(f"keypath: {key_path}", file=sys.stderr) + + pid = None + if os.path.isfile(key_path): + with open(key_path, "r") as f: + pid = int(f.readline()) + + print(f"signaling {sig.name}: {pid}", file=sys.stderr) + try: + os.kill(pid, sig.value) + print(pid) + except ProcessLookupError as err: + print(f"failed sending signal: {err}", file=sys.stderr) + os.remove(key_path) + else: + pid = os.spawnvp(os.P_NOWAIT, args[0], args) + print(f"added: {pid}", file=sys.stderr) + with open(key_path, mode="x") as f: + f.write(str(pid)) + print(pid) + + +if __name__ == "__main__": + main() diff --git a/pyprconf/sampleconf.yaml b/pyprconf/sampleconf.yaml new file mode 100644 index 0000000..b63a6ae --- /dev/null +++ b/pyprconf/sampleconf.yaml @@ -0,0 +1,103 @@ +# this is a functional, parseable sample configuration +# but needs to be customized based on your preferences +# before deploying + +pyprconf: + # create a binding to send SIGUSR1 to the pyprconf pid + bindreload: SUPER+CTRL,C + # create a binding to send SIGUSR2 to the pyprconf pid + bindunload: SUPER+ALT+CTRL,C + +# sections that start with x- are ignored +# by the pyprconf parser, but can be used +# as templates for other sections +x-autospace: &autospace + bind: + SUPER,{key}: workspace,name:{@} + SUPER+SHIFT,{key}: movetoworkspace,name:{@} + SUPER+ALT,{key}: exec,{cmd} + events: + - event: createworkspace + match: ^({@})$ + dispatch: + - exec {cmd} + +logopenwindow: + events: + - event: openwindow + match: ^(.*),(.*),(.*),(.*)$ + dispatch: + # the event data are available as positional tokens + - 'exec notify-send "Address: {0}" "Workspace: {1}\nClass: {2}\nTitle: {3}"' + +activeborders: + tokens: + color_active: 0x8800ccde + color_full: 0xffff0000 + color_resize: 0xFFe7691e + opt: general:col.active_border + keyword: + - '{opt} {color_active}' + events: + - event: fullscreen + match: ^(1)$ + keyword: + - '{opt} {color_full}' + - event: fullscreen + match: ^(0)$ + keyword: + - '{opt} {color_active}' + - event: submap + match: ^(resize)$ + gate: + set: resize + keyword: + - decoration:dim_inactive 1 + - '{opt} {color_resize}' + - event: submap + match: ^$ + gate: + check: resize + reset: resize + keyword: + - decoration:dim_inactive 0 + - '{opt} {color_active}' + +workspaces: + range: [1,10,1] + bind: + SUPER,{#}: workspace,{#} + SUPER+SHIFT,{#}: movetoworkspace,{#} + +workspace10: + bind: + SUPER,0: workspace,10 + SUPER+SHIFT,0: movetoworkspace,10 + +grimblast: + tokens: + key: P + notify: '--notify ' + bind: + SUPER,{key}: exec,{@} {notify}save active + SUPERSHIFT,{key}: exec,{@} {notify}save area + SUPERALT,{key}: exec,{@} {notify}save output + SUPERCTRL,{key}: exec,{@} {notify}save window + +browse: + <<: *autospace + tokens: + cmd: handlr launch x-scheme-handler/https + key: B + +htop: + <<: *autospace + tokens: + cmd: gtk-launch htop.desktop + key: grave + +incognito: + <<: *autospace + tokens: + cmd: brave-beta --incognito + key: I