|
1 | 1 | import argparse
|
| 2 | +import json |
| 3 | +import os |
| 4 | +import shutil |
| 5 | +from enum import StrEnum |
2 | 6 |
|
| 7 | +import questionary |
3 | 8 | from dotenv import load_dotenv
|
| 9 | +from sqlmodel import Session, SQLModel, create_engine, select |
4 | 10 |
|
| 11 | +from ficamp.classifier.infer import infer_tx_category |
| 12 | +from ficamp.datastructures import Tx |
5 | 13 | from ficamp.parsers.abn import AbnParser
|
6 | 14 |
|
7 | 15 |
|
8 | 16 | def cli() -> argparse.Namespace:
|
9 |
| - """Parses the first argument from the command line and prints it.""" |
| 17 | + """Creates a command line interface with subcommands for import and categorize.""" |
10 | 18 |
|
11 |
| - # Create an argument parser |
| 19 | + # Create the main parser |
12 | 20 | parser = argparse.ArgumentParser(
|
13 |
| - prog="ficamp", description="Print the first argument from the CLI" |
| 21 | + prog="ficamp", description="Parse and categorize your expenses." |
14 | 22 | )
|
15 | 23 |
|
16 |
| - parser.add_argument("--bank", choices=["abn"], default="abn") |
17 |
| - parser.add_argument("filename", help="The spreadsheet to load") |
| 24 | + # Create subparsers for the two subcommands |
| 25 | + subparsers = parser.add_subparsers(dest="command", required=True) |
| 26 | + |
| 27 | + # Subparser for the import command |
| 28 | + import_parser = subparsers.add_parser("import", help="Import a Transactions") |
| 29 | + import_parser.add_argument( |
| 30 | + "--bank", choices=["abn"], default="abn", help="Specify the bank for the import" |
| 31 | + ) |
| 32 | + import_parser.add_argument("filename", help="File to load") |
| 33 | + import_parser.set_defaults(func=import_data) |
| 34 | + |
| 35 | + # Subparser for the categorize command |
| 36 | + categorize_parser = subparsers.add_parser( |
| 37 | + "categorize", help="Categorize transactions" |
| 38 | + ) |
| 39 | + categorize_parser.add_argument("--infer-category", action="store_true") |
| 40 | + categorize_parser.set_defaults(func=categorize) |
18 | 41 |
|
19 |
| - # Parse the arguments |
20 | 42 | args = parser.parse_args()
|
21 | 43 |
|
22 |
| - # Print the first argument |
23 | 44 | return args
|
24 | 45 |
|
25 | 46 |
|
26 |
| -def main(): |
27 |
| - args = cli() |
28 |
| - args.filename |
29 |
| - args.bank |
30 |
| - |
| 47 | +def import_data(args, engine): |
| 48 | + """Run the parsers.""" |
| 49 | + print(f"Importing data from {args.filename} for bank {args.bank}.") |
31 | 50 | # TODO: Build enum for banks
|
32 | 51 | if args.bank == "abn":
|
33 | 52 | parser = AbnParser()
|
34 | 53 | parser.load(args.filename)
|
35 | 54 | transactions = parser.parse()
|
36 |
| - print(transactions) |
37 |
| - # TODO: Add categorizer! |
| 55 | + for tx in transactions: |
| 56 | + with Session(engine) as session: |
| 57 | + # Assuming 'date' and 'amount' can uniquely identify a transaction |
| 58 | + statement = select(Tx).where( |
| 59 | + Tx.date == tx.date, Tx.amount == tx.amount, Tx.concept == tx.concept |
| 60 | + ) |
| 61 | + result = session.exec(statement).first() |
| 62 | + if result is None: # No existing transaction found |
| 63 | + session.add(tx) |
| 64 | + session.commit() |
| 65 | + else: |
| 66 | + print(f"Transaction already exists in the database. {tx}") |
| 67 | + |
| 68 | + |
| 69 | +def get_category_dict(categories_database_path="categories_database.json"): |
| 70 | + # FIXME: move categories to SQLITE instead of json file. |
| 71 | + if not os.path.exists(categories_database_path): |
| 72 | + return {} |
| 73 | + with open(categories_database_path, "r") as file: |
| 74 | + category_dict = json.load(file) |
| 75 | + string_to_category = { |
| 76 | + string: category |
| 77 | + for category, strings in category_dict.items() |
| 78 | + for string in strings |
| 79 | + } |
| 80 | + return string_to_category |
| 81 | + |
| 82 | + |
| 83 | +def revert_and_save_dict(string_to_category, filename="categories_database.json"): |
| 84 | + # Reverting the dictionary |
| 85 | + category_to_strings = {} |
| 86 | + for string, category in string_to_category.items(): |
| 87 | + category_to_strings.setdefault(category, []).append(string) |
| 88 | + |
| 89 | + # Saving to a JSON file |
| 90 | + if os.path.exists(filename): |
| 91 | + shutil.move(filename, "/tmp/categories_db_bkp.json") |
| 92 | + with open(filename, "w") as file: |
| 93 | + json.dump(category_to_strings, file, indent=4) |
| 94 | + |
| 95 | + |
| 96 | +class DefaultAnswers(StrEnum): |
| 97 | + SKIP = "Skip this Tx" |
| 98 | + NEW = "Type a new category" |
| 99 | + |
| 100 | + |
| 101 | +def query_business_category(tx, categories_dict, infer_category=False): |
| 102 | + # first try to get from the category_dict |
| 103 | + category = categories_dict.get(tx.concept) |
| 104 | + if category: |
| 105 | + return category |
| 106 | + # ask the user if we don't know it |
| 107 | + categories_choices = list(set(categories_dict.values())) |
| 108 | + categories_choices.extend([DefaultAnswers.NEW, DefaultAnswers.SKIP]) |
| 109 | + default_choice = DefaultAnswers.SKIP |
| 110 | + if infer_category: |
| 111 | + inferred_category = infer_tx_category(tx) |
| 112 | + if inferred_category: |
| 113 | + categories_choices.append(inferred_category) |
| 114 | + default_choice = inferred_category |
| 115 | + print(f"{tx.date.isoformat()} {tx.amount} {tx.concept}") |
| 116 | + answer = questionary.select( |
| 117 | + "Please select the category for this TX", |
| 118 | + choices=categories_choices, |
| 119 | + default=default_choice, |
| 120 | + show_selected=True, |
| 121 | + ).ask() |
| 122 | + if answer == DefaultAnswers.NEW: |
| 123 | + answer = questionary.text("What's the category for the TX above").ask() |
| 124 | + if answer == DefaultAnswers.SKIP: |
| 125 | + return None |
| 126 | + if answer is None: |
| 127 | + # https://questionary.readthedocs.io/en/stable/pages/advanced.html#keyboard-interrupts |
| 128 | + raise KeyboardInterrupt |
| 129 | + if answer: |
| 130 | + categories_dict[tx.concept] = answer |
| 131 | + category = answer |
| 132 | + return category |
| 133 | + |
| 134 | + |
| 135 | +def categorize(args, engine): |
| 136 | + """Function to categorize transactions.""" |
| 137 | + categories_dict = get_category_dict() |
| 138 | + try: |
| 139 | + with Session(engine) as session: |
| 140 | + statement = select(Tx).where(Tx.category.is_(None)) |
| 141 | + results = session.exec(statement).all() |
| 142 | + for tx in results: |
| 143 | + print(f"Processing {tx}") |
| 144 | + tx_category = query_business_category( |
| 145 | + tx, categories_dict, infer_category=args.infer_category |
| 146 | + ) |
| 147 | + if tx_category: |
| 148 | + print(f"Saving category for {tx.concept}: {tx_category}") |
| 149 | + tx.category = tx_category |
| 150 | + # update DB |
| 151 | + session.add(tx) |
| 152 | + session.commit() |
| 153 | + revert_and_save_dict(categories_dict) |
| 154 | + else: |
| 155 | + print("Not saving any category for thi Tx") |
| 156 | + revert_and_save_dict(categories_dict) |
| 157 | + except KeyboardInterrupt: |
| 158 | + print("Closing") |
| 159 | + |
| 160 | + |
| 161 | +def main(): |
| 162 | + # create DB |
| 163 | + engine = create_engine("sqlite:///ficamp.db") |
| 164 | + # create tables |
| 165 | + SQLModel.metadata.create_all(engine) |
| 166 | + |
| 167 | + try: |
| 168 | + args = cli() |
| 169 | + if args.command: |
| 170 | + args.func(args, engine) |
| 171 | + except KeyboardInterrupt: |
| 172 | + print("\nClosing") |
38 | 173 |
|
39 | 174 |
|
40 |
| -load_dotenv() |
41 |
| -main() |
| 175 | +if __name__ == "__main__": |
| 176 | + load_dotenv() |
| 177 | + main() |
0 commit comments