From 835fcbbb4f358395cb919efed7cd60d6649d19e5 Mon Sep 17 00:00:00 2001
From: Filip Christiansen <22807962+filipchristiansen@users.noreply.github.com>
Date: Tue, 7 Jan 2025 08:55:27 +0100
Subject: [PATCH 1/2] chore: standardize references to 'Gitingest' (resolves
Issue #99) (#107)
---
README.md | 10 +++++-----
src/config.py | 2 +-
src/gitingest/cli.py | 2 +-
src/gitingest/exceptions.py | 2 +-
src/gitingest/ignore_patterns.py | 2 +-
src/gitingest/utils.py | 2 +-
src/templates/api.jinja | 2 +-
src/templates/base.jinja | 2 +-
8 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/README.md b/README.md
index bf1cdf2..3ad5924 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
-# GitIngest
+# Gitingest
-[![Image](./docs/frontpage.png "GitIngest main page")](https://gitingest.com)
+[![Image](./docs/frontpage.png "Gitingest main page")](https://gitingest.com)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/cyclotruc/gitingest/blob/main/LICENSE)
[![PyPI version](https://badge.fury.io/py/gitingest.svg)](https://badge.fury.io/py/gitingest)
@@ -37,9 +37,9 @@ pip install gitingest
## š§© Browser Extension Usage
-
-
-
+
+
+
The extension is open source at [lcandy2/gitingest-extension](https://github.com/lcandy2/gitingest-extension).
diff --git a/src/config.py b/src/config.py
index 1958f30..9b9553c 100644
--- a/src/config.py
+++ b/src/config.py
@@ -5,7 +5,7 @@
DELETE_REPO_AFTER: int = 60 * 60 # In seconds
EXAMPLE_REPOS: list[dict[str, str]] = [
- {"name": "GitIngest", "url": "https://github.com/cyclotruc/gitingest"},
+ {"name": "Gitingest", "url": "https://github.com/cyclotruc/gitingest"},
{"name": "FastAPI", "url": "https://github.com/tiangolo/fastapi"},
{"name": "Flask", "url": "https://github.com/pallets/flask"},
{"name": "Tldraw", "url": "https://github.com/tldraw/tldraw"},
diff --git a/src/gitingest/cli.py b/src/gitingest/cli.py
index 7da0b1f..6a4b470 100644
--- a/src/gitingest/cli.py
+++ b/src/gitingest/cli.py
@@ -1,4 +1,4 @@
-""" Command-line interface for the GitIngest package. """
+""" Command-line interface for the Gitingest package. """
# pylint: disable=no-value-for-parameter
diff --git a/src/gitingest/exceptions.py b/src/gitingest/exceptions.py
index b101f2e..bfb3888 100644
--- a/src/gitingest/exceptions.py
+++ b/src/gitingest/exceptions.py
@@ -1,4 +1,4 @@
-""" Custom exceptions for the GitIngest package. """
+""" Custom exceptions for the Gitingest package. """
class InvalidPatternError(ValueError):
diff --git a/src/gitingest/ignore_patterns.py b/src/gitingest/ignore_patterns.py
index c2b382c..a1a902d 100644
--- a/src/gitingest/ignore_patterns.py
+++ b/src/gitingest/ignore_patterns.py
@@ -1,4 +1,4 @@
-""" Default ignore patterns for GitIngest. """
+""" Default ignore patterns for Gitingest. """
DEFAULT_IGNORE_PATTERNS: list[str] = [
# Python
diff --git a/src/gitingest/utils.py b/src/gitingest/utils.py
index c93c26a..3c28da8 100644
--- a/src/gitingest/utils.py
+++ b/src/gitingest/utils.py
@@ -1,4 +1,4 @@
-""" Utility functions for the GitIngest package. """
+""" Utility functions for the Gitingest package. """
import asyncio
import functools
diff --git a/src/templates/api.jinja b/src/templates/api.jinja
index c5e57bd..85fa0c3 100644
--- a/src/templates/api.jinja
+++ b/src/templates/api.jinja
@@ -1,5 +1,5 @@
{% extends "base.jinja" %}
-{% block title %}Git ingest API{% endblock %}
+{% block title %}Gitingest API{% endblock %}
{% block content %}
diff --git a/src/templates/base.jinja b/src/templates/base.jinja
index 5b3e899..7c8359c 100644
--- a/src/templates/base.jinja
+++ b/src/templates/base.jinja
@@ -8,7 +8,7 @@
+ content="Gitingest, AI tools, LLM integration, Ingest, Digest, Context, Prompt, Git workflow, codebase extraction, Git repository, Git automation, Summarize, prompt-friendly">
From 123f0ef0d7133c7ab9bc402fa15082466799284e Mon Sep 17 00:00:00 2001
From: Filip Christiansen <22807962+filipchristiansen@users.noreply.github.com>
Date: Tue, 7 Jan 2025 10:13:22 +0100
Subject: [PATCH 2/2] Refactor: Replace os.path usage with pathlib.Path for
improved maintainability (#106)
---
src/config.py | 4 +-
src/gitingest/ingest.py | 8 +-
src/gitingest/ingest_from_query.py | 217 +++++++++++++++--------------
src/gitingest/parse_query.py | 28 ++--
src/main.py | 53 +++----
src/process_query.py | 2 +-
src/routers/download.py | 14 +-
tests/conftest.py | 2 +-
tests/test_ingest.py | 6 +-
tests/test_parse_query.py | 8 +-
10 files changed, 174 insertions(+), 168 deletions(-)
diff --git a/src/config.py b/src/config.py
index 9b9553c..68565c8 100644
--- a/src/config.py
+++ b/src/config.py
@@ -1,7 +1,9 @@
""" Configuration file for the project. """
+from pathlib import Path
+
MAX_DISPLAY_SIZE: int = 300_000
-TMP_BASE_PATH: str = "/tmp/gitingest"
+TMP_BASE_PATH = Path("/tmp/gitingest")
DELETE_REPO_AFTER: int = 60 * 60 # In seconds
EXAMPLE_REPOS: list[dict[str, str]] = [
diff --git a/src/gitingest/ingest.py b/src/gitingest/ingest.py
index e1ae657..e58743d 100644
--- a/src/gitingest/ingest.py
+++ b/src/gitingest/ingest.py
@@ -4,6 +4,7 @@
import inspect
import shutil
+from config import TMP_BASE_PATH
from gitingest.clone import CloneConfig, clone_repo
from gitingest.ingest_from_query import ingest_from_query
from gitingest.parse_query import parse_query
@@ -63,7 +64,7 @@ def ingest(
# Extract relevant fields for CloneConfig
clone_config = CloneConfig(
url=query["url"],
- local_path=query["local_path"],
+ local_path=str(query["local_path"]),
commit=query.get("commit"),
branch=query.get("branch"),
)
@@ -84,6 +85,5 @@ def ingest(
finally:
# Clean up the temporary directory if it was created
if query["url"]:
- # Clean up the temporary directory under /tmp/gitingest
- cleanup_path = "/tmp/gitingest"
- shutil.rmtree(cleanup_path, ignore_errors=True)
+ # Clean up the temporary directory
+ shutil.rmtree(TMP_BASE_PATH, ignore_errors=True)
diff --git a/src/gitingest/ingest_from_query.py b/src/gitingest/ingest_from_query.py
index 6dc757a..ff4e483 100644
--- a/src/gitingest/ingest_from_query.py
+++ b/src/gitingest/ingest_from_query.py
@@ -1,7 +1,7 @@
""" Functions to ingest and analyze a codebase directory or single file. """
-import os
from fnmatch import fnmatch
+from pathlib import Path
from typing import Any
import tiktoken
@@ -14,18 +14,18 @@
MAX_TOTAL_SIZE_BYTES = 500 * 1024 * 1024 # 500 MB
-def _should_include(path: str, base_path: str, include_patterns: list[str]) -> bool:
+def _should_include(path: Path, base_path: Path, include_patterns: list[str]) -> bool:
"""
- Determines if the given file or directory path matches any of the include patterns.
+ Determine if the given file or directory path matches any of the include patterns.
This function checks whether the relative path of a file or directory matches any of the specified patterns. If a
match is found, it returns `True`, indicating that the file or directory should be included in further processing.
Parameters
----------
- path : str
+ path : Path
The absolute path of the file or directory to check.
- base_path : str
+ base_path : Path
The base directory from which the relative path is calculated.
include_patterns : list[str]
A list of patterns to check against the relative path.
@@ -35,17 +35,22 @@ def _should_include(path: str, base_path: str, include_patterns: list[str]) -> b
bool
`True` if the path matches any of the include patterns, `False` otherwise.
"""
- rel_path = path.replace(base_path, "").lstrip(os.sep)
- include = False
+ try:
+ rel_path = path.relative_to(base_path)
+ except ValueError:
+ # If path is not under base_path at all
+ return False
+
+ rel_str = str(rel_path)
for pattern in include_patterns:
- if fnmatch(rel_path, pattern):
- include = True
- return include
+ if fnmatch(rel_str, pattern):
+ return True
+ return False
-def _should_exclude(path: str, base_path: str, ignore_patterns: list[str]) -> bool:
+def _should_exclude(path: Path, base_path: Path, ignore_patterns: list[str]) -> bool:
"""
- Determines if the given file or directory path matches any of the ignore patterns.
+ Determine if the given file or directory path matches any of the ignore patterns.
This function checks whether the relative path of a file or directory matches
any of the specified ignore patterns. If a match is found, it returns `True`, indicating
@@ -53,9 +58,9 @@ def _should_exclude(path: str, base_path: str, ignore_patterns: list[str]) -> bo
Parameters
----------
- path : str
+ path : Path
The absolute path of the file or directory to check.
- base_path : str
+ base_path : Path
The base directory from which the relative path is calculated.
ignore_patterns : list[str]
A list of patterns to check against the relative path.
@@ -65,14 +70,20 @@ def _should_exclude(path: str, base_path: str, ignore_patterns: list[str]) -> bo
bool
`True` if the path matches any of the ignore patterns, `False` otherwise.
"""
- rel_path = path.replace(base_path, "").lstrip(os.sep)
+ try:
+ rel_path = path.relative_to(base_path)
+ except ValueError:
+ # If path is not under base_path at all
+ return True
+
+ rel_str = str(rel_path)
for pattern in ignore_patterns:
- if pattern and fnmatch(rel_path, pattern):
+ if pattern and fnmatch(rel_str, pattern):
return True
return False
-def _is_safe_symlink(symlink_path: str, base_path: str) -> bool:
+def _is_safe_symlink(symlink_path: Path, base_path: Path) -> bool:
"""
Check if a symlink points to a location within the base directory.
@@ -82,9 +93,9 @@ def _is_safe_symlink(symlink_path: str, base_path: str) -> bool:
Parameters
----------
- symlink_path : str
+ symlink_path : Path
The path of the symlink to check.
- base_path : str
+ base_path : Path
The base directory to ensure the symlink points within.
Returns
@@ -93,15 +104,16 @@ def _is_safe_symlink(symlink_path: str, base_path: str) -> bool:
`True` if the symlink points within the base directory, `False` otherwise.
"""
try:
- target_path = os.path.realpath(symlink_path)
- base_path = os.path.realpath(base_path)
- return os.path.commonpath([target_path, base_path]) == base_path
+ target_path = symlink_path.resolve()
+ base_resolved = base_path.resolve()
+ # It's "safe" if target_path == base_resolved or is inside base_resolved
+ return base_resolved in target_path.parents or target_path == base_resolved
except (OSError, ValueError):
# If there's any error resolving the paths, consider it unsafe
return False
-def _is_text_file(file_path: str) -> bool:
+def _is_text_file(file_path: Path) -> bool:
"""
Determine if a file is likely a text file based on its content.
@@ -111,7 +123,7 @@ def _is_text_file(file_path: str) -> bool:
Parameters
----------
- file_path : str
+ file_path : Path
The path to the file to check.
Returns
@@ -120,16 +132,16 @@ def _is_text_file(file_path: str) -> bool:
`True` if the file is likely a text file, `False` otherwise.
"""
try:
- with open(file_path, "rb") as file:
+ with file_path.open("rb") as file:
chunk = file.read(1024)
return not bool(chunk.translate(None, bytes([7, 8, 9, 10, 12, 13, 27] + list(range(0x20, 0x100)))))
except OSError:
return False
-def _read_file_content(file_path: str) -> str:
+def _read_file_content(file_path: Path) -> str:
"""
- Reads the content of a file.
+ Read the content of a file.
This function attempts to open a file and read its contents using UTF-8 encoding.
If an error occurs during reading (e.g., file is not found or permission error),
@@ -137,7 +149,7 @@ def _read_file_content(file_path: str) -> str:
Parameters
----------
- file_path : str
+ file_path : Path
The path to the file to read.
Returns
@@ -146,7 +158,7 @@ def _read_file_content(file_path: str) -> str:
The content of the file, or an error message if the file could not be read.
"""
try:
- with open(file_path, encoding="utf-8", errors="ignore") as f:
+ with file_path.open(encoding="utf-8", errors="ignore") as f:
return f.read()
except OSError as e:
return f"Error reading file: {e}"
@@ -197,9 +209,9 @@ def _sort_children(children: list[dict[str, Any]]) -> list[dict[str, Any]]:
def _scan_directory(
- path: str,
+ path: Path,
query: dict[str, Any],
- seen_paths: set[str] | None = None,
+ seen_paths: set[Path] | None = None,
depth: int = 0,
stats: dict[str, int] | None = None,
) -> dict[str, Any] | None:
@@ -212,11 +224,11 @@ def _scan_directory(
Parameters
----------
- path : str
+ path : Path
The path of the directory to scan.
query : dict[str, Any]
A dictionary containing the query parameters, such as include and ignore patterns.
- seen_paths : set[str] | None, optional
+ seen_paths : set[Path] | None, optional
A set to track already visited paths, by default None.
depth : int
The current depth of directory traversal, by default 0.
@@ -246,7 +258,7 @@ def _scan_directory(
print(f"Skipping further processing: maximum total size ({MAX_TOTAL_SIZE_BYTES/1024/1024:.1f}MB) reached")
return None
- real_path = os.path.realpath(path)
+ real_path = path.resolve()
if real_path in seen_paths:
print(f"Skipping already visited path: {path}")
return None
@@ -254,13 +266,13 @@ def _scan_directory(
seen_paths.add(real_path)
result = {
- "name": os.path.basename(path),
+ "name": path.name,
"type": "directory",
"size": 0,
"children": [],
"file_count": 0,
"dir_count": 0,
- "path": path,
+ "path": str(path),
"ignore_content": False,
}
@@ -269,11 +281,9 @@ def _scan_directory(
include_patterns = query["include_patterns"]
try:
- for item in os.listdir(path):
- item_path = os.path.join(path, item)
+ for item in path.iterdir():
_process_item(
item=item,
- item_path=item_path,
query=query,
result=result,
seen_paths=seen_paths,
@@ -293,14 +303,13 @@ def _scan_directory(
def _process_symlink(
- item: str,
- item_path: str,
+ item: Path,
query: dict[str, Any],
result: dict[str, Any],
- seen_paths: set[str],
+ seen_paths: set[Path],
stats: dict[str, int],
depth: int,
- base_path: str,
+ base_path: Path,
include_patterns: list[str],
) -> None:
"""
@@ -311,9 +320,7 @@ def _process_symlink(
Parameters
----------
- item : str
- The name of the symlink.
- item_path : str
+ item : Path
The full path of the symlink.
query : dict[str, Any]
The query dictionary containing the parameters.
@@ -325,7 +332,7 @@ def _process_symlink(
The dictionary to track statistics such as file count and size.
depth : int
The current depth in the directory traversal.
- base_path : str
+ base_path : Path
The base path used for validation of the symlink.
include_patterns : list[str]
A list of include patterns for file filtering.
@@ -339,15 +346,15 @@ def _process_symlink(
MaxFilesReachedError
If the number of files exceeds the maximum limit.
"""
- if not _is_safe_symlink(item_path, base_path):
- raise AlreadyVisitedError(item_path)
+ if not _is_safe_symlink(item, base_path):
+ raise AlreadyVisitedError(str(item))
- real_path = os.path.realpath(item_path)
+ real_path = item.resolve()
if real_path in seen_paths:
- raise AlreadyVisitedError(item_path)
+ raise AlreadyVisitedError(str(item))
- if os.path.isfile(real_path):
- file_size = os.path.getsize(real_path)
+ if real_path.is_file():
+ file_size = real_path.stat().st_size
if stats["total_size"] + file_size > MAX_TOTAL_SIZE_BYTES:
raise MaxFileSizeReachedError(MAX_TOTAL_SIZE_BYTES)
@@ -359,19 +366,20 @@ def _process_symlink(
raise MaxFilesReachedError(MAX_FILES)
is_text = _is_text_file(real_path)
+ content = _read_file_content(real_path) if is_text else "[Non-text file]"
child = {
- "name": item,
+ "name": item.name,
"type": "file",
"size": file_size,
- "content": _read_file_content(real_path) if is_text else "[Non-text file]",
- "path": item_path,
+ "content": content,
+ "path": str(item),
}
result["children"].append(child)
result["size"] += file_size
result["file_count"] += 1
- elif os.path.isdir(real_path):
+ elif real_path.is_dir():
subdir = _scan_directory(
path=real_path,
query=query,
@@ -380,15 +388,16 @@ def _process_symlink(
stats=stats,
)
if subdir and (not include_patterns or subdir["file_count"] > 0):
- subdir["name"] = item
- subdir["path"] = item_path
+ # rename the subdir to reflect the symlink name
+ subdir["name"] = item.name
+ subdir["path"] = str(item)
result["children"].append(subdir)
result["size"] += subdir["size"]
result["file_count"] += subdir["file_count"]
result["dir_count"] += 1 + subdir["dir_count"]
-def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict[str, int]) -> None:
+def _process_file(item: Path, result: dict[str, Any], stats: dict[str, int]) -> None:
"""
Process a file in the file system.
@@ -397,9 +406,7 @@ def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict
Parameters
----------
- item : str
- The name of the file.
- item_path : str
+ item : Path
The full path of the file.
result : dict[str, Any]
The dictionary to accumulate the results.
@@ -413,9 +420,9 @@ def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict
MaxFilesReachedError
If the number of files exceeds the maximum limit.
"""
- file_size = os.path.getsize(item_path)
+ file_size = item.stat().st_size
if stats["total_size"] + file_size > MAX_TOTAL_SIZE_BYTES:
- print(f"Skipping file {item_path}: would exceed total size limit")
+ print(f"Skipping file {item}: would exceed total size limit")
raise MaxFileSizeReachedError(MAX_TOTAL_SIZE_BYTES)
stats["total_files"] += 1
@@ -425,15 +432,15 @@ def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict
print(f"Maximum file limit ({MAX_FILES}) reached")
raise MaxFilesReachedError(MAX_FILES)
- is_text = _is_text_file(item_path)
- content = _read_file_content(item_path) if is_text else "[Non-text file]"
+ is_text = _is_text_file(item)
+ content = _read_file_content(item) if is_text else "[Non-text file]"
child = {
- "name": item,
+ "name": item.name,
"type": "file",
"size": file_size,
"content": content,
- "path": item_path,
+ "path": str(item),
}
result["children"].append(child)
result["size"] += file_size
@@ -441,15 +448,14 @@ def _process_file(item: str, item_path: str, result: dict[str, Any], stats: dict
def _process_item(
- item: str,
- item_path: str,
+ item: Path,
query: dict[str, Any],
result: dict[str, Any],
- seen_paths: set[str],
+ seen_paths: set[Path],
stats: dict[str, int],
depth: int,
ignore_patterns: list[str],
- base_path: str,
+ base_path: Path,
include_patterns: list[str],
) -> None:
"""
@@ -460,15 +466,13 @@ def _process_item(
Parameters
----------
- item : str
- The name of the file or directory to process.
- item_path : str
+ item : Path
The full path of the file or directory to process.
query : dict[str, Any]
A dictionary of query parameters, including the base path and patterns.
result : dict[str, Any]
The result dictionary to accumulate processed file/directory data.
- seen_paths : set[str]
+ seen_paths : set[Path]
A set of paths that have already been visited.
stats : dict[str, int]
A dictionary of statistics like the total file count and size.
@@ -476,27 +480,22 @@ def _process_item(
The current depth of directory traversal.
ignore_patterns : list[str]
A list of patterns to exclude files or directories.
- base_path : str
+ base_path : Path
The base directory used for relative path calculations.
include_patterns : list[str]
A list of patterns to include files or directories.
"""
- if _should_exclude(item_path, base_path, ignore_patterns):
+ if _should_exclude(item, base_path, ignore_patterns):
return
- if (
- os.path.isfile(item_path)
- and query["include_patterns"]
- and not _should_include(item_path, base_path, include_patterns)
- ):
+ if item.is_file() and query["include_patterns"] and not _should_include(item, base_path, include_patterns):
result["ignore_content"] = True
return
try:
- if os.path.islink(item_path):
+ if item.is_symlink():
_process_symlink(
item=item,
- item_path=item_path,
query=query,
result=result,
seen_paths=seen_paths,
@@ -506,11 +505,11 @@ def _process_item(
include_patterns=include_patterns,
)
- if os.path.isfile(item_path):
- _process_file(item=item, item_path=item_path, result=result, stats=stats)
+ if item.is_file():
+ _process_file(item=item, result=result, stats=stats)
- elif os.path.isdir(item_path):
- subdir = _scan_directory(path=item_path, query=query, seen_paths=seen_paths, depth=depth + 1, stats=stats)
+ elif item.is_dir():
+ subdir = _scan_directory(path=item, query=query, seen_paths=seen_paths, depth=depth + 1, stats=stats)
if subdir and (not include_patterns or subdir["file_count"] > 0):
result["children"].append(subdir)
result["size"] += subdir["size"]
@@ -553,13 +552,16 @@ def _extract_files_content(
files = []
if node["type"] == "file" and node["content"] != "[Non-text file]":
- content = node["content"]
if node["size"] > max_file_size:
content = None
+ else:
+ content = node["content"]
+
+ relative_path = Path(node["path"]).relative_to(query["local_path"])
files.append(
{
- "path": node["path"].replace(query["local_path"], ""),
+ "path": str(relative_path),
"content": content,
"size": node["size"],
},
@@ -716,7 +718,7 @@ def _generate_token_string(context_string: str) -> str | None:
return str(total_tokens)
-def _ingest_single_file(path: str, query: dict[str, Any]) -> tuple[str, str, str]:
+def _ingest_single_file(path: Path, query: dict[str, Any]) -> tuple[str, str, str]:
"""
Ingest a single file and return its summary, directory structure, and content.
@@ -725,7 +727,7 @@ def _ingest_single_file(path: str, query: dict[str, Any]) -> tuple[str, str, str
Parameters
----------
- path : str
+ path : Path
The path of the file to ingest.
query : dict[str, Any]
A dictionary containing query parameters, such as the maximum file size.
@@ -740,33 +742,35 @@ def _ingest_single_file(path: str, query: dict[str, Any]) -> tuple[str, str, str
ValueError
If the specified path is not a file or if the file is not a text file.
"""
- if not os.path.isfile(path):
+ if not path.is_file():
raise ValueError(f"Path {path} is not a file")
- file_size = os.path.getsize(path)
- is_text = _is_text_file(path)
- if not is_text:
+ if not _is_text_file(path):
raise ValueError(f"File {path} is not a text file")
- content = _read_file_content(path)
+ file_size = path.stat().st_size
if file_size > query["max_file_size"]:
content = "[Content ignored: file too large]"
+ else:
+ content = _read_file_content(path)
+
+ relative_path = path.relative_to(query["local_path"])
file_info = {
- "path": path.replace(query["local_path"], ""),
+ "path": str(relative_path),
"content": content,
"size": file_size,
}
summary = (
f"Repository: {query['user_name']}/{query['repo_name']}\n"
- f"File: {os.path.basename(path)}\n"
+ f"File: {path.name}\n"
f"Size: {file_size:,} bytes\n"
f"Lines: {len(content.splitlines()):,}\n"
)
files_content = _create_file_content_string([file_info])
- tree = "Directory structure:\nāāā " + os.path.basename(path)
+ tree = "Directory structure:\nāāā " + path.name
formatted_tokens = _generate_token_string(files_content)
if formatted_tokens:
@@ -775,7 +779,7 @@ def _ingest_single_file(path: str, query: dict[str, Any]) -> tuple[str, str, str
return summary, tree, files_content
-def _ingest_directory(path: str, query: dict[str, Any]) -> tuple[str, str, str]:
+def _ingest_directory(path: Path, query: dict[str, Any]) -> tuple[str, str, str]:
"""
Ingest an entire directory and return its summary, directory structure, and file contents.
@@ -784,7 +788,7 @@ def _ingest_directory(path: str, query: dict[str, Any]) -> tuple[str, str, str]:
Parameters
----------
- path : str
+ path : Path
The path of the directory to ingest.
query : dict[str, Any]
A dictionary containing query parameters, including maximum file size.
@@ -802,6 +806,7 @@ def _ingest_directory(path: str, query: dict[str, Any]) -> tuple[str, str, str]:
nodes = _scan_directory(path=path, query=query)
if not nodes:
raise ValueError(f"No files found in {path}")
+
files = _extract_files_content(query=query, node=nodes, max_file_size=query["max_file_size"])
summary = _create_summary_string(query, nodes)
tree = "Directory structure:\n" + _create_tree_structure(query, nodes)
@@ -836,8 +841,8 @@ def ingest_from_query(query: dict[str, Any]) -> tuple[str, str, str]:
ValueError
If the specified path cannot be found or if the file is not a text file.
"""
- path = f"{query['local_path']}{query['subpath']}"
- if not os.path.exists(path):
+ path = query["local_path"] / query["subpath"].lstrip("/")
+ if not path.exists():
raise ValueError(f"{query['slug']} cannot be found")
if query.get("type") == "blob":
diff --git a/src/gitingest/parse_query.py b/src/gitingest/parse_query.py
index 394bd61..4c63a36 100644
--- a/src/gitingest/parse_query.py
+++ b/src/gitingest/parse_query.py
@@ -4,13 +4,14 @@
import re
import string
import uuid
+from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlparse
+from config import TMP_BASE_PATH
from gitingest.exceptions import InvalidPatternError
from gitingest.ignore_patterns import DEFAULT_IGNORE_PATTERNS
-TMP_BASE_PATH: str = "/tmp/gitingest"
HEX_DIGITS = set(string.hexdigits)
@@ -22,7 +23,7 @@ def parse_query(
ignore_patterns: list[str] | str | None = None,
) -> dict[str, Any]:
"""
- Parses the input source to construct a query dictionary with specified parameters.
+ Parse the input source to construct a query dictionary with specified parameters.
This function processes the provided source (either a URL or file path) and builds a
query dictionary that includes information such as the source URL, maximum file size,
@@ -78,7 +79,7 @@ def parse_query(
def _parse_url(url: str) -> dict[str, Any]:
"""
- Parses a GitHub repository URL into a structured query dictionary.
+ Parse a GitHub repository URL into a structured query dictionary.
This function extracts relevant information from a GitHub URL, such as the username,
repository name, commit, branch, and subpath, and returns them in a structured format.
@@ -99,8 +100,9 @@ def _parse_url(url: str) -> dict[str, Any]:
ValueError
If the URL is invalid or does not correspond to a valid Git repository.
"""
- url = url.split(" ")[0]
- url = unquote(url) # Decode URL-encoded characters
+ # Clean up the URL
+ url = url.split(" ")[0] # remove trailing text
+ url = unquote(url) # decode URL-encoded characters
if not url.startswith(("https://", "http://")):
url = "https://" + url
@@ -129,16 +131,17 @@ def _parse_url(url: str) -> dict[str, Any]:
"branch": None,
"commit": None,
"subpath": "/",
- "local_path": f"{TMP_BASE_PATH}/{_id}/{slug}",
+ "local_path": Path(TMP_BASE_PATH) / _id / slug,
"url": f"https://{domain}/{user_name}/{repo_name}",
"slug": slug,
"id": _id,
}
- # If this is an issues page, return early without processing subpath
+ # If this is an issues page or pull requests, return early without processing subpath
if len(path_parts) > 2 and (path_parts[2] == "issues" or path_parts[2] == "pull"):
return parsed
+ # If no extra path parts, just return
if len(path_parts) < 4:
return parsed
@@ -230,8 +233,10 @@ def _parse_patterns(pattern: list[str] | str) -> list[str]:
for p in patterns:
parsed_patterns.extend(re.split(",| ", p))
+ # Filter out any empty strings
parsed_patterns = [p for p in parsed_patterns if p != ""]
+ # Validate and normalize each pattern
for p in parsed_patterns:
if not _is_valid_pattern(p):
raise InvalidPatternError(p)
@@ -258,7 +263,7 @@ def _override_ignore_patterns(ignore_patterns: list[str], include_patterns: list
return list(set(ignore_patterns) - set(include_patterns))
-def _parse_path(path: str) -> dict[str, Any]:
+def _parse_path(path_str: str) -> dict[str, Any]:
"""
Parses a file path into a structured query dictionary.
@@ -268,7 +273,7 @@ def _parse_path(path: str) -> dict[str, Any]:
Parameters
----------
- path : str
+ path_str : str
The file path to parse.
Returns
@@ -276,10 +281,11 @@ def _parse_path(path: str) -> dict[str, Any]:
dict[str, Any]
A dictionary containing parsed details such as the local file path and slug.
"""
+ path_obj = Path(path_str).resolve()
query = {
"url": None,
- "local_path": os.path.abspath(path),
- "slug": os.path.basename(os.path.dirname(path)) + "/" + os.path.basename(path),
+ "local_path": path_obj,
+ "slug": f"{path_obj.parent.name}/{path_obj.name}",
"subpath": "/",
"id": str(uuid.uuid4()),
}
diff --git a/src/main.py b/src/main.py
index 16c3b28..7ba36a8 100644
--- a/src/main.py
+++ b/src/main.py
@@ -5,6 +5,7 @@
import shutil
import time
from contextlib import asynccontextmanager
+from pathlib import Path
from api_analytics.fastapi import Analytics
from dotenv import load_dotenv
@@ -36,71 +37,59 @@ async def remove_old_repositories():
The repository URL is extracted from the first .txt file in each directory,
assuming the filename format: "owner-repository.txt"
-
- Returns
- -------
- None
- This coroutine never returns, it runs indefinitely until cancelled.
"""
while True:
try:
- if not os.path.exists(TMP_BASE_PATH):
+ if not TMP_BASE_PATH.exists():
await asyncio.sleep(60)
continue
current_time = time.time()
- for folder in os.listdir(TMP_BASE_PATH):
- folder_path = os.path.join(TMP_BASE_PATH, folder)
+ for folder in TMP_BASE_PATH.iterdir():
+ if not folder.is_dir():
+ continue
# Skip if folder is not old enough
- if current_time - os.path.getctime(folder_path) <= DELETE_REPO_AFTER:
+ if current_time - folder.stat().st_ctime <= DELETE_REPO_AFTER:
continue
- await process_folder(folder_path)
+ await process_folder(folder)
except Exception as e:
print(f"Error in remove_old_repositories: {str(e)}")
await asyncio.sleep(60)
- return
-
-async def process_folder(folder_path: str) -> None:
+async def process_folder(folder: Path) -> None:
"""
Process a single folder for deletion and logging.
Parameters
----------
- folder_path : str
+ folder : Path
The path to the folder to be processed.
-
- Returns
- -------
- None
- This function doesn't return anything but performs side effects.
"""
# Try to log repository URL before deletion
try:
- txt_files = [f for f in os.listdir(folder_path) if f.endswith(".txt")]
- if txt_files:
- filename = txt_files[0].replace(".txt", "")
- if "-" in filename:
- owner, repo = filename.split("-", 1)
- repo_url = f"https://github.com/{owner}/{repo}"
- with open("history.txt", "a", encoding="utf-8") as history:
- history.write(f"{repo_url}\n")
+ txt_files = [f for f in folder.iterdir() if f.suffix == ".txt"]
+
+ # Extract owner and repository name from the filename
+ if txt_files and "-" in (filename := txt_files[0].stem):
+ owner, repo = filename.split("-", 1)
+ repo_url = f"https://github.com/{owner}/{repo}"
+ with open("history.txt", mode="a", encoding="utf-8") as history:
+ history.write(f"{repo_url}\n")
+
except Exception as e:
- print(f"Error logging repository URL for {folder_path}: {str(e)}")
+ print(f"Error logging repository URL for {folder}: {str(e)}")
# Delete the folder
try:
- shutil.rmtree(folder_path)
+ shutil.rmtree(folder)
except Exception as e:
- print(f"Error deleting {folder_path}: {str(e)}")
-
- return
+ print(f"Error deleting {folder}: {str(e)}")
@asynccontextmanager
diff --git a/src/process_query.py b/src/process_query.py
index 9e49bcc..7b28323 100644
--- a/src/process_query.py
+++ b/src/process_query.py
@@ -86,7 +86,7 @@ async def process_query(
)
clone_config = CloneConfig(
url=query["url"],
- local_path=query["local_path"],
+ local_path=str(query["local_path"]),
commit=query.get("commit"),
branch=query.get("branch"),
)
diff --git a/src/routers/download.py b/src/routers/download.py
index 513451c..b4da647 100644
--- a/src/routers/download.py
+++ b/src/routers/download.py
@@ -1,7 +1,5 @@
""" This module contains the FastAPI router for downloading a digest file. """
-import os
-
from fastapi import APIRouter, HTTPException
from fastapi.responses import Response
@@ -13,7 +11,7 @@
@router.get("/download/{digest_id}")
async def download_ingest(digest_id: str) -> Response:
"""
- Downloads a .txt file associated with a given digest ID.
+ Download a .txt file associated with a given digest ID.
This function searches for a `.txt` file in a directory corresponding to the provided
digest ID. If a file is found, it is read and returned as a downloadable attachment.
@@ -37,13 +35,13 @@ async def download_ingest(digest_id: str) -> Response:
HTTPException
If the digest directory is not found or if no `.txt` file exists in the directory.
"""
- directory = f"{TMP_BASE_PATH}/{digest_id}"
+ directory = TMP_BASE_PATH / digest_id
try:
- if not os.path.exists(directory):
+ if not directory.exists():
raise FileNotFoundError("Directory not found")
- txt_files = [f for f in os.listdir(directory) if f.endswith(".txt")]
+ txt_files = [f for f in directory.iterdir() if f.suffix == ".txt"]
if not txt_files:
raise FileNotFoundError("No .txt file found")
@@ -53,11 +51,11 @@ async def download_ingest(digest_id: str) -> Response:
# Find the first .txt file in the directory
first_file = txt_files[0]
- with open(f"{directory}/{first_file}", encoding="utf-8") as f:
+ with first_file.open(encoding="utf-8") as f:
content = f.read()
return Response(
content=content,
media_type="text/plain",
- headers={"Content-Disposition": f"attachment; filename={first_file}"},
+ headers={"Content-Disposition": f"attachment; filename={first_file.name}"},
)
diff --git a/tests/conftest.py b/tests/conftest.py
index 5779127..78491e9 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -11,7 +11,7 @@ def sample_query() -> dict[str, Any]:
return {
"user_name": "test_user",
"repo_name": "test_repo",
- "local_path": "/tmp/test_repo",
+ "local_path": Path("/tmp/test_repo").resolve(),
"subpath": "/",
"branch": "main",
"commit": None,
diff --git a/tests/test_ingest.py b/tests/test_ingest.py
index b0a36d7..daf9057 100644
--- a/tests/test_ingest.py
+++ b/tests/test_ingest.py
@@ -7,7 +7,8 @@
def test_scan_directory(temp_directory: Path, sample_query: dict[str, Any]) -> None:
- result = _scan_directory(str(temp_directory), query=sample_query)
+ sample_query["local_path"] = temp_directory
+ result = _scan_directory(temp_directory, query=sample_query)
if result is None:
assert False, "Result is None"
@@ -18,7 +19,8 @@ def test_scan_directory(temp_directory: Path, sample_query: dict[str, Any]) -> N
def test_extract_files_content(temp_directory: Path, sample_query: dict[str, Any]) -> None:
- nodes = _scan_directory(str(temp_directory), query=sample_query)
+ sample_query["local_path"] = temp_directory
+ nodes = _scan_directory(temp_directory, query=sample_query)
if nodes is None:
assert False, "Nodes is None"
files = _extract_files_content(query=sample_query, node=nodes, max_file_size=1_000_000)
diff --git a/tests/test_parse_query.py b/tests/test_parse_query.py
index 3d9a51e..1c35efa 100644
--- a/tests/test_parse_query.py
+++ b/tests/test_parse_query.py
@@ -1,5 +1,7 @@
""" Tests for the parse_query module. """
+from pathlib import Path
+
import pytest
from gitingest.ignore_patterns import DEFAULT_IGNORE_PATTERNS
@@ -119,7 +121,8 @@ def test_parse_query_include_and_ignore_overlap() -> None:
def test_parse_query_local_path() -> None:
path = "/home/user/project"
result = parse_query(path, max_file_size=100, from_web=False)
- assert result["local_path"] == "/home/user/project"
+ tail = Path("home/user/project")
+ assert result["local_path"].parts[-len(tail.parts) :] == tail.parts
assert result["id"] is not None
assert result["slug"] == "user/project"
@@ -127,7 +130,8 @@ def test_parse_query_local_path() -> None:
def test_parse_query_relative_path() -> None:
path = "./project"
result = parse_query(path, max_file_size=100, from_web=False)
- assert result["local_path"].endswith("project")
+ tail = Path("project")
+ assert result["local_path"].parts[-len(tail.parts) :] == tail.parts
assert result["slug"].endswith("project")