Skip to content

Commit

Permalink
Automatic backup 2024-08-24
Browse files Browse the repository at this point in the history
  • Loading branch information
snejus committed Aug 24, 2024
1 parent 11d66ba commit 09e3b6b
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 2,933 deletions.
129 changes: 68 additions & 61 deletions rich_tables/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,28 @@

import argparse
import json
import sys
from contextlib import suppress
from datetime import datetime, timedelta
from functools import singledispatch
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Tuple,
Union,
)
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Tuple

from rich.bar import Bar
from rich.columns import Columns
from rich.traceback import install
from typing_extensions import TypedDict

from . import task
from .fields import FIELDS_MAP, get_val
from .fields import get_val
from .generic import flexitable
from .github import pulls_table
from .music import albums_table
from .utils import (
border_panel,
format_string,
group_by,
make_console,
md_panel,
new_table,
predictably_random_color,
pretty_diff,
wrap,
)

Expand All @@ -50,7 +38,7 @@
install(console=console, show_locals=True, width=console.width)


def lights_table(lights: List[JSONDict], **__) -> Table:
def lights_table(lights: List[JSONDict], **__) -> Iterator[Table]:
from rgbxy import Converter

headers = lights[0].keys()
Expand Down Expand Up @@ -173,39 +161,59 @@ def midnight(day_offset: int) -> datetime:
yield border_panel(table, title=year_and_month)


def load_data() -> JSONDict | None:
"""Load JSON data from the stdin."""
if sys.stdin.isatty():
return None
def load_data(data: str) -> list[JSONDict] | JSONDict | str:
"""Load data from the given file.
Try to load the data as JSON, otherwise return the text as is.
"""
path = Path(data)
text = (
path.read_text()
if path.exists() and (path.is_file() or path.is_symlink())
else data
)

text = sys.stdin.read()
try:
data: JSONDict = json.loads(text)
assert data
json_data: JSONDict | list[JSONDict] = json.loads(text)
except json.JSONDecodeError:
console.print(format_string(text))
sys.exit(0)
except AssertionError:
sys.exit(0)
return text
else:
return data
return json_data


@singledispatch
def draw_data(data: Union[JSONDict, List[JSONDict]]) -> Any:
return None


def get_args() -> JSONDict:
parser = argparse.ArgumentParser()
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="""Pretty-print JSON data.
By default, read JSON data from stdin and prettify it.
Otherwise, use command 'diff' to compare two JSON objects or blocks of text.""",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("args", nargs="*", default=[])
return vars(parser.parse_args())
parser.add_argument("-j", "--json", action="store_true", help="output as JSON")
parser.add_argument(
"-s",
"--save",
default="saved.html",
help="save the output as HTML",
required=False,
)

subparsers = parser.add_subparsers(
dest="command", title="Subcommands", required=False
)
diff_parser = subparsers.add_parser("diff", help="show diff between two objects")
diff_parser.add_argument(
"before", type=load_data, help="JSON object or text before, can be a filename"
)
diff_parser.add_argument(
"after", type=load_data, help="JSON object or text after, can be a filename"
)
return parser.parse_args()


class NamedData(TypedDict):
title: str
values: List[JSONDict]
values: list[JSONDict]


TABLE_BY_NAME: Dict[str, Callable[..., Any]] = {
Expand All @@ -217,46 +225,45 @@ class NamedData(TypedDict):
}


@singledispatch
def draw_data(data: Any) -> Iterator[RenderableType]: # noqa: ARG001
"""Render the provided data."""
raise NotImplementedError


@draw_data.register(str)
def _draw_data_str(data: str) -> Iterator[RenderableType]:
yield data


@draw_data.register(dict)
def _draw_data_dict(data: JSONDict | NamedData) -> Iterator[RenderableType]:
if (title := data.get("title")) and (values := data.get("values")):
table = TABLE_BY_NAME.get(title, flexitable)
yield from table(values, **get_args())
yield from table(values)
else:
yield flexitable(data)


@draw_data.register(list)
def _draw_data_list(data: List[JSONDict]) -> Iterator[RenderableType]:
def _draw_data_list(data: list[JSONDict]) -> Iterator[RenderableType]:
yield flexitable(data)


def main() -> None:
args = []
if len(sys.argv) > 1:
args.extend(sys.argv[1:])

if args and args[0] == "diff":
arguments = args[1:]
with suppress(json.JSONDecodeError):
arguments = list(map(json.loads, arguments))

console.print(FIELDS_MAP["diff"](arguments), highlight=False)
elif args and args[0] == "md":
console.print(md_panel(sys.stdin.read().replace(r"\x00", "")))
args = get_args()
if args.command == "diff":
console.print(pretty_diff(args.before, args.after))
else:
if "-s" in set(args):
console.record = True

data = load_data()
if "-j" in args:
data = load_data("/dev/stdin")
if args.json:
console.print_json(data=data)
else:
console.record = True
for ret in draw_data(data):
console.print(ret)

if "-s" in set(args):
console.save_html("saved.html")
if args.save:
console.save_html("saved.html")


if __name__ == "__main__":
Expand Down
28 changes: 10 additions & 18 deletions rich_tables/task.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from dataclasses import asdict, dataclass, field
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List

from funcy import curry, join
from funcy import join
from typing_extensions import Literal, TypedDict

from .fields import FIELDS_MAP, get_val
Expand Down Expand Up @@ -44,8 +45,8 @@
JSONDict = Dict[str, Any]


@curry
def keep_keys(keys: Iterable[str], item: JSONDict) -> JSONDict:
"""Keep only the keys in `keys` from `item`."""
return dict(zip(keys, map(item.get, keys)))


Expand Down Expand Up @@ -99,16 +100,19 @@ def get_tree(self, get_desc: Callable[[str], str]) -> Tree:

return tree

def get_row(self, extract_data: Callable, *args, **kwargs) -> JSONDict:
def get_row(
self, extract_data: Callable[[JSONDict], JSONDict], *args, **kwargs
) -> JSONDict:
data = extract_data(asdict(self))
data["tree"] = self.get_tree(**kwargs)

return data


def get_headers(task_headers: Iterable[str]) -> List[str]:
"""Return the list of headers that will be used in the table."""
ordered_keys = dict.fromkeys([*INITIAL_HEADERS, *sorted(task_headers)]).keys()
return [k for k in ordered_keys if k not in SKIP_HEADERS]
return [*[k for k in ordered_keys if k not in SKIP_HEADERS], "tree"]


fields_map: JSONDict = {
Expand All @@ -134,30 +138,18 @@ def get_headers(task_headers: Iterable[str]) -> List[str]:
}
FIELDS_MAP.update(fields_map)

# def get_tasks(tasks_by_group: Dict[str, list[Task]], **__) -> Iterator[Panel]:
# """Generate a table for the given grouped tasks."""
# if not tasks_by_group:
# return


# tasks = list(join(tasks_by_group.values()))
# first_task = tasks[0]

# for group, tasks in tasks_by_group.items():
# yield tasks


def get_table(tasks_data_by_group: Dict[str, list[JSONDict]], **__) -> Iterator[Panel]:
"""Yield a table for each tasks group."""
headers = get_headers(next(iter(join(tasks_data_by_group.values()))))
keep_headers = keep_keys(("tree", *headers))
keep_headers = partial(keep_keys, headers)

tasks_by_group = {
g: [Task(**t) for t in tasks_data]
for g, tasks_data in tasks_data_by_group.items()
}
desc_by_uuid = {t.uuid: t.desc for t in join(tasks_by_group.values())}
for group, tasks in tasks_by_group.items():
# tasks = get_tasks([Task(**t) for t in tasks_data])
yield border_panel(
flexitable([
t.get_row(keep_headers, get_desc=desc_by_uuid.get) for t in tasks
Expand Down
Loading

0 comments on commit 09e3b6b

Please sign in to comment.