diff --git a/README.md b/README.md index bf1cdf2..3ad5924 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# GitIngest +# Gitingest -[![Image](./docs/frontpage.png "GitIngest main page")](https://gitingest.com) +[![Image](./docs/frontpage.png "Gitingest main page")](https://gitingest.com) [![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/cyclotruc/gitingest/blob/main/LICENSE) [![PyPI version](https://badge.fury.io/py/gitingest.svg)](https://badge.fury.io/py/gitingest) @@ -37,9 +37,9 @@ pip install gitingest ## š§© Browser Extension Usage - - - + + + The extension is open source at [lcandy2/gitingest-extension](https://github.com/lcandy2/gitingest-extension). diff --git a/src/config.py b/src/config.py index 1958f30..68565c8 100644 --- a/src/config.py +++ b/src/config.py @@ -1,11 +1,13 @@ """ Configuration file for the project. """ +from pathlib import Path + MAX_DISPLAY_SIZE: int = 300_000 -TMP_BASE_PATH: str = "/tmp/gitingest" +TMP_BASE_PATH = Path("/tmp/gitingest") DELETE_REPO_AFTER: int = 60 * 60 # In seconds EXAMPLE_REPOS: list[dict[str, str]] = [ - {"name": "GitIngest", "url": "https://github.com/cyclotruc/gitingest"}, + {"name": "Gitingest", "url": "https://github.com/cyclotruc/gitingest"}, {"name": "FastAPI", "url": "https://github.com/tiangolo/fastapi"}, {"name": "Flask", "url": "https://github.com/pallets/flask"}, {"name": "Tldraw", "url": "https://github.com/tldraw/tldraw"}, diff --git a/src/gitingest/cli.py b/src/gitingest/cli.py index 7da0b1f..6a4b470 100644 --- a/src/gitingest/cli.py +++ b/src/gitingest/cli.py @@ -1,4 +1,4 @@ -""" Command-line interface for the GitIngest package. """ +""" Command-line interface for the Gitingest package. """ # pylint: disable=no-value-for-parameter diff --git a/src/gitingest/exceptions.py b/src/gitingest/exceptions.py index b101f2e..bfb3888 100644 --- a/src/gitingest/exceptions.py +++ b/src/gitingest/exceptions.py @@ -1,4 +1,4 @@ -""" Custom exceptions for the GitIngest package. """ +""" Custom exceptions for the Gitingest package. """ class InvalidPatternError(ValueError): diff --git a/src/gitingest/ignore_patterns.py b/src/gitingest/ignore_patterns.py index c2b382c..a1a902d 100644 --- a/src/gitingest/ignore_patterns.py +++ b/src/gitingest/ignore_patterns.py @@ -1,4 +1,4 @@ -""" Default ignore patterns for GitIngest. """ +""" Default ignore patterns for Gitingest. """ DEFAULT_IGNORE_PATTERNS: list[str] = [ # Python diff --git a/src/gitingest/ingest.py b/src/gitingest/ingest.py index e1ae657..e58743d 100644 --- a/src/gitingest/ingest.py +++ b/src/gitingest/ingest.py @@ -4,6 +4,7 @@ import inspect import shutil +from config import TMP_BASE_PATH from gitingest.clone import CloneConfig, clone_repo from gitingest.ingest_from_query import ingest_from_query from gitingest.parse_query import parse_query @@ -63,7 +64,7 @@ def ingest( # Extract relevant fields for CloneConfig clone_config = CloneConfig( url=query["url"], - local_path=query["local_path"], + local_path=str(query["local_path"]), commit=query.get("commit"), branch=query.get("branch"), ) @@ -84,6 +85,5 @@ def ingest( finally: # Clean up the temporary directory if it was created if query["url"]: - # Clean up the temporary directory under /tmp/gitingest - cleanup_path = "/tmp/gitingest" - shutil.rmtree(cleanup_path, ignore_errors=True) + # Clean up the temporary directory + shutil.rmtree(TMP_BASE_PATH, ignore_errors=True) diff --git a/src/gitingest/ingest_from_query.py b/src/gitingest/ingest_from_query.py index 6dc757a..ff4e483 100644 --- a/src/gitingest/ingest_from_query.py +++ b/src/gitingest/ingest_from_query.py @@ -1,7 +1,7 @@ """ Functions to ingest and analyze a codebase directory or single file. """ -import os from fnmatch import fnmatch +from pathlib import Path from typing import Any import tiktoken @@ -14,18 +14,18 @@ MAX_TOTAL_SIZE_BYTES = 500 * 1024 * 1024 # 500 MB -def _should_include(path: str, base_path: str, include_patterns: list[str]) -> bool: +def _should_include(path: Path, base_path: Path, include_patterns: list[str]) -> bool: """ - Determines if the given file or directory path matches any of the include patterns. + Determine if the given file or directory path matches any of the include patterns. This function checks whether the relative path of a file or directory matches any of the specified patterns. If a match is found, it returns `True`, indicating that the file or directory should be included in further processing. Parameters ---------- - path : str + path : Path The absolute path of the file or directory to check. - base_path : str + base_path : Path The base directory from which the relative path is calculated. include_patterns : list[str] A list of patterns to check against the relative path. @@ -35,17 +35,22 @@ def _should_include(path: str, base_path: str, include_patterns: list[str]) -> b bool `True` if the path matches any of the include patterns, `False` otherwise. """ - rel_path = path.replace(base_path, "").lstrip(os.sep) - include = False + try: + rel_path = path.relative_to(base_path) + except ValueError: + # If path is not under base_path at all + return False + + rel_str = str(rel_path) for pattern in include_patterns: - if fnmatch(rel_path, pattern): - include = True - return include + if fnmatch(rel_str, pattern): + return True + return False -def _should_exclude(path: str, base_path: str, ignore_patterns: list[str]) -> bool: +def _should_exclude(path: Path, base_path: Path, ignore_patterns: list[str]) -> bool: """ - Determines if the given file or directory path matches any of the ignore patterns. + Determine if the given file or directory path matches any of the ignore patterns. This function checks whether the relative path of a file or directory matches any of the specified ignore patterns. If a match is found, it returns `True`, indicating @@ -53,9 +58,9 @@ def _should_exclude(path: str, base_path: str, ignore_patterns: list[str]) -> bo Parameters ---------- - path : str + path : Path The absolute path of the file or directory to check. - base_path : str + base_path : Path The base directory from which the relative path is calculated. ignore_patterns : list[str] A list of patterns to check against the relative path. @@ -65,14 +70,20 @@ def _should_exclude(path: str, base_path: str, ignore_patterns: list[str]) -> bo bool `True` if the path matches any of the ignore patterns, `False` otherwise. """ - rel_path = path.replace(base_path, "").lstrip(os.sep) + try: + rel_path = path.relative_to(base_path) + except ValueError: + # If path is not under base_path at all + return True + + rel_str = str(rel_path) for pattern in ignore_patterns: - if pattern and fnmatch(rel_path, pattern): + if pattern and fnmatch(rel_str, pattern): return True return False -def _is_safe_symlink(symlink_path: str, base_path: str) -> bool: +def _is_safe_symlink(symlink_path: Path, base_path: Path) -> bool: """ Check if a symlink points to a location within the base directory. @@ -82,9 +93,9 @@ def _is_safe_symlink(symlink_path: str, base_path: str) -> bool: Parameters ---------- - symlink_path : str + symlink_path : Path The path of the symlink to check. - base_path : str + base_path : Path The base directory to ensure the symlink points within. Returns @@ -93,15 +104,16 @@ def _is_safe_symlink(symlink_path: str, base_path: str) -> bool: `True` if the symlink points within the base directory, `False` otherwise. """ try: - target_path = os.path.realpath(symlink_path) - base_path = os.path.realpath(base_path) - return os.path.commonpath([target_path, base_path]) == base_path + target_path = symlink_path.resolve() + base_resolved = base_path.resolve() + # It's "safe" if target_path == base_resolved or is inside base_resolved + return base_resolved in target_path.parents or target_path == base_resolved except (OSError, ValueError): # If there's any error resolving the paths, consider it unsafe return False -def _is_text_file(file_path: str) -> bool: +def _is_text_file(file_path: Path) -> bool: """ Determine if a file is likely a text file based on its content. @@ -111,7 +123,7 @@ def _is_text_file(file_path: str) -> bool: Parameters ---------- - file_path : str + file_path : Path The path to the file to check. Returns @@ -120,16 +132,16 @@ def _is_text_file(file_path: str) -> bool: `True` if the file is likely a text file, `False` otherwise. """ try: - with open(file_path, "rb") as file: + with file_path.open("rb") as file: chunk = file.read(1024) return not bool(chunk.translate(None, bytes([7, 8, 9, 10, 12, 13, 27] + list(range(0x20, 0x100))))) except OSError: return False -def _read_file_content(file_path: str) -> str: +def _read_file_content(file_path: Path) -> str: """ - Reads the content of a file. + Read the content of a file. This function attempts to open a file and read its contents using UTF-8 encoding. If an error occurs during reading (e.g., file is not found or permission error), @@ -137,7 +149,7 @@ def _read_file_content(file_path: str) -> str: Parameters ---------- - file_path : str + file_path : Path The path to the file to read. Returns @@ -146,7 +158,7 @@ def _read_file_content(file_path: str) -> str: The content of the file, or an error message if the file could not be read. """ try: - with open(file_path, encoding="utf-8", errors="ignore") as f: + with file_path.open(encoding="utf-8", errors="ignore") as f: return f.read() except OSError as e: return f"Error reading file: {e}" @@ -197,9 +209,9 @@ def _sort_children(children: list[dict[str, Any]]) -> list[dict[str, Any]]: def _scan_directory( - path: str, + path: Path, query: dict[str, Any], - seen_paths: set[str] | None = None, + seen_paths: set[Path] | None = None, depth: int = 0, stats: dict[str, int] | None = None, ) -> dict[str, Any] | None: @@ -212,11 +224,11 @@ def _scan_directory( Parameters ---------- - path : str + path : Path The path of the directory to scan. query : dict[str, Any] A dictionary containing the query parameters, such as include and ignore patterns. - seen_paths : set[str] | None, optional + seen_paths : set[Path] | None, optional A set to track already visited paths, by default None. depth : int The current depth of directory traversal, by default 0. @@ -246,7 +258,7 @@ def _scan_directory( print(f"Skipping further processing: maximum total size ({MAX_TOTAL_SIZE_BYTES/1024/1024:.1f}MB) reached") return None - real_path = os.path.realpath(path) + real_path = path.resolve() if real_path in seen_paths: print(f"Skipping already visited path: {path}") return None @@ -254,13 +266,13 @@ def _scan_directory( seen_paths.add(real_path) result = { - "name": os.path.basename(path), + "name": path.name, "type": "directory", "size": 0, "children": [], "file_count": 0, "dir_count": 0, - "path": path, + "path": str(path), "ignore_content": False, } @@ -269,11 +281,9 @@ def _scan_directory( include_patterns = query["include_patterns"] try: - for item in os.listdir(path): - item_path = os.path.join(path, item) + for item in path.iterdir(): _process_item( item=item, - item_path=item_path, query=query, result=result, seen_paths=seen_paths, @@ -293,14 +303,13 @@ def _scan_directory( def _process_symlink( - item: str, - item_path: str, + item: Path, query: dict[str, Any], result: dict[str, Any], - seen_paths: set[str], + seen_paths: set[Path], stats: dict[str, int], depth: int, - base_path: str, + base_path: Path, include_patterns: list[str], ) -> None: """ @@ -311,9 +320,7 @@ def _process_symlink( Parameters ---------- - item : str - The name of the symlink. - item_path : str + item : Path The full path of the symlink. query : dict[str, Any] The query dictionary containing the parameters. @@ -325,7 +332,7 @@ def _process_symlink( The dictionary to track statistics such as file count and size. depth : int The current depth in the directory traversal. - base_path : str + base_path : Path The base path used for validation of the symlink. include_patterns : list[str] A list of include patterns for file filtering. @@ -339,15 +346,15 @@ def _process_symlink( MaxFilesReachedError If the number of files exceeds the maximum limit. """ - if not _is_safe_symlink(item_path, base_path): - raise AlreadyVisitedError(item_path) + if not _is_safe_symlink(item, base_path): + raise AlreadyVisitedError(str(item)) - real_path = os.path.realpath(item_path) + real_path = item.resolve() if real_path in seen_paths: - raise AlreadyVisitedError(item_path) + raise AlreadyVisitedError(str(item)) - if os.path.isfile(real_path): - file_size = os.path.getsize(real_path) + if real_path.is_file(): + file_size = real_path.stat().st_size if stats["total_size"] + file_size > MAX_TOTAL_SIZE_BYTES: raise MaxFileSizeReachedError(MAX_TOTAL_SIZE_BYTES) @@ -359,19 +366,20 @@ def _process_symlink( raise MaxFilesReachedError(MAX_FILES) is_text = _is_text_file(real_path) + content = _read_file_content(real_path) if is_text else "[Non-text file]" child = { - "name": item, + "name": item.name, "type": "file", "size": file_size, - "content": _read_file_content(real_path) if is_text else "[Non-text file]", - "path": item_path, + "content": content, + "path": str(item), } result["children"].append(child) result["size"] += file_size result["file_count"] += 1 - elif os.path.isdir(real_path): + elif real_path.is_dir(): subdir = _scan_directory( path=real_path, query=query, @@ -380,15 +388,16 @@ def _process_symlink( stats=stats, ) if subdir and (not include_patterns or subdir["file_count"] > 0): - subdir["name"] = item - subdir["path"] = item_path + # rename the subdir to reflect the symlink name + subdir["name"] = item.name + subdir["path"] = str(item) result["children"].append(subdir) result["size"] += subdir["size"] result["file_count"] += subdir["file_count"] result["dir_count"] += 1 + subdir["dir_count"] -def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict[str, int]) -> None: +def _process_file(item: Path, result: dict[str, Any], stats: dict[str, int]) -> None: """ Process a file in the file system. @@ -397,9 +406,7 @@ def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict Parameters ---------- - item : str - The name of the file. - item_path : str + item : Path The full path of the file. result : dict[str, Any] The dictionary to accumulate the results. @@ -413,9 +420,9 @@ def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict MaxFilesReachedError If the number of files exceeds the maximum limit. """ - file_size = os.path.getsize(item_path) + file_size = item.stat().st_size if stats["total_size"] + file_size > MAX_TOTAL_SIZE_BYTES: - print(f"Skipping file {item_path}: would exceed total size limit") + print(f"Skipping file {item}: would exceed total size limit") raise MaxFileSizeReachedError(MAX_TOTAL_SIZE_BYTES) stats["total_files"] += 1 @@ -425,15 +432,15 @@ def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict print(f"Maximum file limit ({MAX_FILES}) reached") raise MaxFilesReachedError(MAX_FILES) - is_text = _is_text_file(item_path) - content = _read_file_content(item_path) if is_text else "[Non-text file]" + is_text = _is_text_file(item) + content = _read_file_content(item) if is_text else "[Non-text file]" child = { - "name": item, + "name": item.name, "type": "file", "size": file_size, "content": content, - "path": item_path, + "path": str(item), } result["children"].append(child) result["size"] += file_size @@ -441,15 +448,14 @@ def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict def _process_item( - item: str, - item_path: str, + item: Path, query: dict[str, Any], result: dict[str, Any], - seen_paths: set[str], + seen_paths: set[Path], stats: dict[str, int], depth: int, ignore_patterns: list[str], - base_path: str, + base_path: Path, include_patterns: list[str], ) -> None: """ @@ -460,15 +466,13 @@ def _process_item( Parameters ---------- - item : str - The name of the file or directory to process. - item_path : str + item : Path The full path of the file or directory to process. query : dict[str, Any] A dictionary of query parameters, including the base path and patterns. result : dict[str, Any] The result dictionary to accumulate processed file/directory data. - seen_paths : set[str] + seen_paths : set[Path] A set of paths that have already been visited. stats : dict[str, int] A dictionary of statistics like the total file count and size. @@ -476,27 +480,22 @@ def _process_item( The current depth of directory traversal. ignore_patterns : list[str] A list of patterns to exclude files or directories. - base_path : str + base_path : Path The base directory used for relative path calculations. include_patterns : list[str] A list of patterns to include files or directories. """ - if _should_exclude(item_path, base_path, ignore_patterns): + if _should_exclude(item, base_path, ignore_patterns): return - if ( - os.path.isfile(item_path) - and query["include_patterns"] - and not _should_include(item_path, base_path, include_patterns) - ): + if item.is_file() and query["include_patterns"] and not _should_include(item, base_path, include_patterns): result["ignore_content"] = True return try: - if os.path.islink(item_path): + if item.is_symlink(): _process_symlink( item=item, - item_path=item_path, query=query, result=result, seen_paths=seen_paths, @@ -506,11 +505,11 @@ def _process_item( include_patterns=include_patterns, ) - if os.path.isfile(item_path): - _process_file(item=item, item_path=item_path, result=result, stats=stats) + if item.is_file(): + _process_file(item=item, result=result, stats=stats) - elif os.path.isdir(item_path): - subdir = _scan_directory(path=item_path, query=query, seen_paths=seen_paths, depth=depth + 1, stats=stats) + elif item.is_dir(): + subdir = _scan_directory(path=item, query=query, seen_paths=seen_paths, depth=depth + 1, stats=stats) if subdir and (not include_patterns or subdir["file_count"] > 0): result["children"].append(subdir) result["size"] += subdir["size"] @@ -553,13 +552,16 @@ def _extract_files_content( files = [] if node["type"] == "file" and node["content"] != "[Non-text file]": - content = node["content"] if node["size"] > max_file_size: content = None + else: + content = node["content"] + + relative_path = Path(node["path"]).relative_to(query["local_path"]) files.append( { - "path": node["path"].replace(query["local_path"], ""), + "path": str(relative_path), "content": content, "size": node["size"], }, @@ -716,7 +718,7 @@ def _generate_token_string(context_string: str) -> str | None: return str(total_tokens) -def _ingest_single_file(path: str, query: dict[str, Any]) -> tuple[str, str, str]: +def _ingest_single_file(path: Path, query: dict[str, Any]) -> tuple[str, str, str]: """ Ingest a single file and return its summary, directory structure, and content. @@ -725,7 +727,7 @@ def _ingest_single_file(path: str, query: dict[str, Any]) -> tuple[str, str, str Parameters ---------- - path : str + path : Path The path of the file to ingest. query : dict[str, Any] A dictionary containing query parameters, such as the maximum file size. @@ -740,33 +742,35 @@ def _ingest_single_file(path: str, query: dict[str, Any]) -> tuple[str, str, str ValueError If the specified path is not a file or if the file is not a text file. """ - if not os.path.isfile(path): + if not path.is_file(): raise ValueError(f"Path {path} is not a file") - file_size = os.path.getsize(path) - is_text = _is_text_file(path) - if not is_text: + if not _is_text_file(path): raise ValueError(f"File {path} is not a text file") - content = _read_file_content(path) + file_size = path.stat().st_size if file_size > query["max_file_size"]: content = "[Content ignored: file too large]" + else: + content = _read_file_content(path) + + relative_path = path.relative_to(query["local_path"]) file_info = { - "path": path.replace(query["local_path"], ""), + "path": str(relative_path), "content": content, "size": file_size, } summary = ( f"Repository: {query['user_name']}/{query['repo_name']}\n" - f"File: {os.path.basename(path)}\n" + f"File: {path.name}\n" f"Size: {file_size:,} bytes\n" f"Lines: {len(content.splitlines()):,}\n" ) files_content = _create_file_content_string([file_info]) - tree = "Directory structure:\nāāā " + os.path.basename(path) + tree = "Directory structure:\nāāā " + path.name formatted_tokens = _generate_token_string(files_content) if formatted_tokens: @@ -775,7 +779,7 @@ def _ingest_single_file(path: str, query: dict[str, Any]) -> tuple[str, str, str return summary, tree, files_content -def _ingest_directory(path: str, query: dict[str, Any]) -> tuple[str, str, str]: +def _ingest_directory(path: Path, query: dict[str, Any]) -> tuple[str, str, str]: """ Ingest an entire directory and return its summary, directory structure, and file contents. @@ -784,7 +788,7 @@ def _ingest_directory(path: str, query: dict[str, Any]) -> tuple[str, str, str]: Parameters ---------- - path : str + path : Path The path of the directory to ingest. query : dict[str, Any] A dictionary containing query parameters, including maximum file size. @@ -802,6 +806,7 @@ def _ingest_directory(path: str, query: dict[str, Any]) -> tuple[str, str, str]: nodes = _scan_directory(path=path, query=query) if not nodes: raise ValueError(f"No files found in {path}") + files = _extract_files_content(query=query, node=nodes, max_file_size=query["max_file_size"]) summary = _create_summary_string(query, nodes) tree = "Directory structure:\n" + _create_tree_structure(query, nodes) @@ -836,8 +841,8 @@ def ingest_from_query(query: dict[str, Any]) -> tuple[str, str, str]: ValueError If the specified path cannot be found or if the file is not a text file. """ - path = f"{query['local_path']}{query['subpath']}" - if not os.path.exists(path): + path = query["local_path"] / query["subpath"].lstrip("/") + if not path.exists(): raise ValueError(f"{query['slug']} cannot be found") if query.get("type") == "blob": diff --git a/src/gitingest/parse_query.py b/src/gitingest/parse_query.py index 394bd61..4c63a36 100644 --- a/src/gitingest/parse_query.py +++ b/src/gitingest/parse_query.py @@ -4,13 +4,14 @@ import re import string import uuid +from pathlib import Path from typing import Any from urllib.parse import unquote, urlparse +from config import TMP_BASE_PATH from gitingest.exceptions import InvalidPatternError from gitingest.ignore_patterns import DEFAULT_IGNORE_PATTERNS -TMP_BASE_PATH: str = "/tmp/gitingest" HEX_DIGITS = set(string.hexdigits) @@ -22,7 +23,7 @@ def parse_query( ignore_patterns: list[str] | str | None = None, ) -> dict[str, Any]: """ - Parses the input source to construct a query dictionary with specified parameters. + Parse the input source to construct a query dictionary with specified parameters. This function processes the provided source (either a URL or file path) and builds a query dictionary that includes information such as the source URL, maximum file size, @@ -78,7 +79,7 @@ def parse_query( def _parse_url(url: str) -> dict[str, Any]: """ - Parses a GitHub repository URL into a structured query dictionary. + Parse a GitHub repository URL into a structured query dictionary. This function extracts relevant information from a GitHub URL, such as the username, repository name, commit, branch, and subpath, and returns them in a structured format. @@ -99,8 +100,9 @@ def _parse_url(url: str) -> dict[str, Any]: ValueError If the URL is invalid or does not correspond to a valid Git repository. """ - url = url.split(" ")[0] - url = unquote(url) # Decode URL-encoded characters + # Clean up the URL + url = url.split(" ")[0] # remove trailing text + url = unquote(url) # decode URL-encoded characters if not url.startswith(("https://", "http://")): url = "https://" + url @@ -129,16 +131,17 @@ def _parse_url(url: str) -> dict[str, Any]: "branch": None, "commit": None, "subpath": "/", - "local_path": f"{TMP_BASE_PATH}/{_id}/{slug}", + "local_path": Path(TMP_BASE_PATH) / _id / slug, "url": f"https://{domain}/{user_name}/{repo_name}", "slug": slug, "id": _id, } - # If this is an issues page, return early without processing subpath + # If this is an issues page or pull requests, return early without processing subpath if len(path_parts) > 2 and (path_parts[2] == "issues" or path_parts[2] == "pull"): return parsed + # If no extra path parts, just return if len(path_parts) < 4: return parsed @@ -230,8 +233,10 @@ def _parse_patterns(pattern: list[str] | str) -> list[str]: for p in patterns: parsed_patterns.extend(re.split(",| ", p)) + # Filter out any empty strings parsed_patterns = [p for p in parsed_patterns if p != ""] + # Validate and normalize each pattern for p in parsed_patterns: if not _is_valid_pattern(p): raise InvalidPatternError(p) @@ -258,7 +263,7 @@ def _override_ignore_patterns(ignore_patterns: list[str], include_patterns: list return list(set(ignore_patterns) - set(include_patterns)) -def _parse_path(path: str) -> dict[str, Any]: +def _parse_path(path_str: str) -> dict[str, Any]: """ Parses a file path into a structured query dictionary. @@ -268,7 +273,7 @@ def _parse_path(path: str) -> dict[str, Any]: Parameters ---------- - path : str + path_str : str The file path to parse. Returns @@ -276,10 +281,11 @@ def _parse_path(path: str) -> dict[str, Any]: dict[str, Any] A dictionary containing parsed details such as the local file path and slug. """ + path_obj = Path(path_str).resolve() query = { "url": None, - "local_path": os.path.abspath(path), - "slug": os.path.basename(os.path.dirname(path)) + "/" + os.path.basename(path), + "local_path": path_obj, + "slug": f"{path_obj.parent.name}/{path_obj.name}", "subpath": "/", "id": str(uuid.uuid4()), } diff --git a/src/gitingest/utils.py b/src/gitingest/utils.py index c93c26a..3c28da8 100644 --- a/src/gitingest/utils.py +++ b/src/gitingest/utils.py @@ -1,4 +1,4 @@ -""" Utility functions for the GitIngest package. """ +""" Utility functions for the Gitingest package. """ import asyncio import functools diff --git a/src/main.py b/src/main.py index 16c3b28..7ba36a8 100644 --- a/src/main.py +++ b/src/main.py @@ -5,6 +5,7 @@ import shutil import time from contextlib import asynccontextmanager +from pathlib import Path from api_analytics.fastapi import Analytics from dotenv import load_dotenv @@ -36,71 +37,59 @@ async def remove_old_repositories(): The repository URL is extracted from the first .txt file in each directory, assuming the filename format: "owner-repository.txt" - - Returns - ------- - None - This coroutine never returns, it runs indefinitely until cancelled. """ while True: try: - if not os.path.exists(TMP_BASE_PATH): + if not TMP_BASE_PATH.exists(): await asyncio.sleep(60) continue current_time = time.time() - for folder in os.listdir(TMP_BASE_PATH): - folder_path = os.path.join(TMP_BASE_PATH, folder) + for folder in TMP_BASE_PATH.iterdir(): + if not folder.is_dir(): + continue # Skip if folder is not old enough - if current_time - os.path.getctime(folder_path) <= DELETE_REPO_AFTER: + if current_time - folder.stat().st_ctime <= DELETE_REPO_AFTER: continue - await process_folder(folder_path) + await process_folder(folder) except Exception as e: print(f"Error in remove_old_repositories: {str(e)}") await asyncio.sleep(60) - return - -async def process_folder(folder_path: str) -> None: +async def process_folder(folder: Path) -> None: """ Process a single folder for deletion and logging. Parameters ---------- - folder_path : str + folder : Path The path to the folder to be processed. - - Returns - ------- - None - This function doesn't return anything but performs side effects. """ # Try to log repository URL before deletion try: - txt_files = [f for f in os.listdir(folder_path) if f.endswith(".txt")] - if txt_files: - filename = txt_files[0].replace(".txt", "") - if "-" in filename: - owner, repo = filename.split("-", 1) - repo_url = f"https://github.com/{owner}/{repo}" - with open("history.txt", "a", encoding="utf-8") as history: - history.write(f"{repo_url}\n") + txt_files = [f for f in folder.iterdir() if f.suffix == ".txt"] + + # Extract owner and repository name from the filename + if txt_files and "-" in (filename := txt_files[0].stem): + owner, repo = filename.split("-", 1) + repo_url = f"https://github.com/{owner}/{repo}" + with open("history.txt", mode="a", encoding="utf-8") as history: + history.write(f"{repo_url}\n") + except Exception as e: - print(f"Error logging repository URL for {folder_path}: {str(e)}") + print(f"Error logging repository URL for {folder}: {str(e)}") # Delete the folder try: - shutil.rmtree(folder_path) + shutil.rmtree(folder) except Exception as e: - print(f"Error deleting {folder_path}: {str(e)}") - - return + print(f"Error deleting {folder}: {str(e)}") @asynccontextmanager diff --git a/src/process_query.py b/src/process_query.py index 9e49bcc..7b28323 100644 --- a/src/process_query.py +++ b/src/process_query.py @@ -86,7 +86,7 @@ async def process_query( ) clone_config = CloneConfig( url=query["url"], - local_path=query["local_path"], + local_path=str(query["local_path"]), commit=query.get("commit"), branch=query.get("branch"), ) diff --git a/src/routers/download.py b/src/routers/download.py index 513451c..b4da647 100644 --- a/src/routers/download.py +++ b/src/routers/download.py @@ -1,7 +1,5 @@ """ This module contains the FastAPI router for downloading a digest file. """ -import os - from fastapi import APIRouter, HTTPException from fastapi.responses import Response @@ -13,7 +11,7 @@ @router.get("/download/{digest_id}") async def download_ingest(digest_id: str) -> Response: """ - Downloads a .txt file associated with a given digest ID. + Download a .txt file associated with a given digest ID. This function searches for a `.txt` file in a directory corresponding to the provided digest ID. If a file is found, it is read and returned as a downloadable attachment. @@ -37,13 +35,13 @@ async def download_ingest(digest_id: str) -> Response: HTTPException If the digest directory is not found or if no `.txt` file exists in the directory. """ - directory = f"{TMP_BASE_PATH}/{digest_id}" + directory = TMP_BASE_PATH / digest_id try: - if not os.path.exists(directory): + if not directory.exists(): raise FileNotFoundError("Directory not found") - txt_files = [f for f in os.listdir(directory) if f.endswith(".txt")] + txt_files = [f for f in directory.iterdir() if f.suffix == ".txt"] if not txt_files: raise FileNotFoundError("No .txt file found") @@ -53,11 +51,11 @@ async def download_ingest(digest_id: str) -> Response: # Find the first .txt file in the directory first_file = txt_files[0] - with open(f"{directory}/{first_file}", encoding="utf-8") as f: + with first_file.open(encoding="utf-8") as f: content = f.read() return Response( content=content, media_type="text/plain", - headers={"Content-Disposition": f"attachment; filename={first_file}"}, + headers={"Content-Disposition": f"attachment; filename={first_file.name}"}, ) diff --git a/src/templates/api.jinja b/src/templates/api.jinja index c5e57bd..85fa0c3 100644 --- a/src/templates/api.jinja +++ b/src/templates/api.jinja @@ -1,5 +1,5 @@ {% extends "base.jinja" %} -{% block title %}Git ingest API{% endblock %} +{% block title %}Gitingest API{% endblock %} {% block content %}