1
- from typing import List , Dict , Any
1
+ import os
2
+ import json
3
+ import csv
4
+ import re
2
5
from pathlib import Path
6
+ from typing import List , Dict , Any
7
+ from io import BytesIO
3
8
4
- class DocumentLoader :
5
- """
6
- A class to load documents from various sources (e.g., files, URLs) into the knowledge base.
7
- """
9
+ import requests
10
+ from bs4 import BeautifulSoup
8
11
9
- def __init__ ( self ):
10
- """
11
- Initialize the DocumentLoader.
12
- """
13
- pass
12
+ # Optional: Import pandas for XLSX support and PyPDF2 for PDF support
13
+ try :
14
+ import pandas as pd
15
+ except ImportError :
16
+ pd = None
14
17
15
- def load_from_file (self , file_path : str ) -> List [Dict [str , Any ]]:
16
- """
17
- Load documents from a file.
18
+ try :
19
+ from PyPDF2 import PdfReader
20
+ except ImportError :
21
+ PdfReader = None
18
22
19
- Args:
20
- file_path (str): The path to the file.
21
23
22
- Returns:
23
- List[Dict[str, Any]]: A list of documents, where each document is a dictionary.
24
- """
25
- file_path = Path (file_path )
26
- if not file_path .exists ():
27
- raise FileNotFoundError (f"File not found: { file_path } " )
24
+ def flatten_json (data : Any , parent_key : str = "" , separator : str = "_" ) -> List [Dict [str , Any ]]:
25
+ """
26
+ Recursively flatten a JSON structure.
27
+ For each key-value pair, add an entry mapping key->value.
28
+ Additionally, if the value is a string, add an entry mapping the value to its flattened key.
29
+ """
30
+ items = []
31
+ if isinstance (data , dict ):
32
+ for key , value in data .items ():
33
+ new_key = f"{ parent_key } { separator } { key } " if parent_key else key
34
+ if isinstance (value , (dict , list )):
35
+ items .extend (flatten_json (value , new_key , separator ))
36
+ else :
37
+ items .append ({new_key : value })
38
+ if isinstance (value , str ):
39
+ items .append ({value : new_key })
40
+ elif isinstance (data , list ):
41
+ for index , item in enumerate (data ):
42
+ new_key = f"{ parent_key } { separator } { index } " if parent_key else str (index )
43
+ if isinstance (item , (dict , list )):
44
+ items .extend (flatten_json (item , new_key , separator ))
45
+ else :
46
+ items .append ({new_key : item })
47
+ if isinstance (item , str ):
48
+ items .append ({item : new_key })
49
+ return items
28
50
29
- # Example: Load a JSON file
30
- if file_path .suffix == ".json" :
31
- import json
32
- with open (file_path , "r" ) as f :
33
- return json .load (f )
34
- # Example: Load a text file
35
- elif file_path .suffix == ".txt" :
36
- with open (file_path , "r" ) as f :
37
- return [{"text" : f .read ()}]
38
- else :
39
- raise ValueError (f"Unsupported file type: { file_path .suffix } " )
40
51
41
- def load_from_url (self , url : str ) -> List [Dict [str , Any ]]:
52
+ class DocumentLoader :
53
+ """
54
+ A dynamic document loader that supports multiple source types:
55
+
56
+ - Local files: CSV, TXT, JSON, XLSX, PDF
57
+ - URL sources: HTML websites (text extraction), JSON APIs, PDF URLs
58
+ - YouTube links: Extracts transcripts using youtube_transcript_api
59
+
60
+ For JSON sources, if flatten is True (default), the returned document is a dictionary with two keys:
61
+ "original": the raw JSON data,
62
+ "flattened": a list of flattened key/value pairs (including reverse mappings).
63
+ """
64
+ def load (self , source : str , flatten : bool = True ) -> List [Dict [str , Any ]]:
42
65
"""
43
- Load documents from a URL.
66
+ Load documents from the given source.
67
+ If source starts with "http", treat it as a URL; otherwise, as a local file.
68
+ """
69
+ if source .startswith ("http" ):
70
+ return self .load_from_url (source , flatten = flatten )
71
+ else :
72
+ return self .load_from_file (source , flatten = flatten )
44
73
45
- Args:
46
- url (str): The URL to load documents from.
74
+ def load_from_file (self , file_path : str , flatten : bool = True ) -> List [Dict [str , Any ]]:
75
+ path = Path (file_path )
76
+ if not path .exists ():
77
+ raise FileNotFoundError (f"File not found: { file_path } " )
78
+ ext = path .suffix .lower ()
79
+ if ext == ".json" :
80
+ with open (path , "r" , encoding = "utf-8" ) as f :
81
+ data = json .load (f )
82
+ if flatten :
83
+ return [{"original" : data , "flattened" : flatten_json (data )}]
84
+ else :
85
+ return data if isinstance (data , list ) else [data ]
86
+ elif ext == ".txt" :
87
+ with open (path , "r" , encoding = "utf-8" ) as f :
88
+ content = f .read ()
89
+ return [{"text" : content }]
90
+ elif ext == ".csv" :
91
+ with open (path , "r" , encoding = "utf-8" ) as f :
92
+ reader = csv .DictReader (f )
93
+ return [row for row in reader ]
94
+ elif ext == ".xlsx" :
95
+ if pd is None :
96
+ raise ImportError ("pandas is required to load XLSX files" )
97
+ df = pd .read_excel (path )
98
+ return df .to_dict (orient = "records" )
99
+ elif ext == ".pdf" :
100
+ if PdfReader is None :
101
+ raise ImportError ("PyPDF2 is required to load PDF files" )
102
+ reader = PdfReader (str (path ))
103
+ content = ""
104
+ for page in reader .pages :
105
+ content += page .extract_text () or ""
106
+ return [{"text" : content }]
107
+ else :
108
+ raise ValueError (f"Unsupported file type: { ext } " )
47
109
48
- Returns:
49
- List[Dict[str, Any]]: A list of documents, where each document is a dictionary.
50
- """
51
- import requests
110
+ def load_from_url (self , url : str , flatten : bool = True ) -> List [Dict [str , Any ]]:
111
+ if "youtube.com" in url or "youtu.be" in url :
112
+ return self ._load_youtube (url )
52
113
response = requests .get (url )
53
114
if response .status_code != 200 :
54
115
raise ValueError (f"Failed to fetch data from URL: { url } " )
55
-
56
- # Example: Load JSON data from a URL
57
- if "application/json" in response .headers .get ("Content-Type" , "" ):
58
- return response .json ()
59
- # Example: Load text data from a URL
116
+ content_type = response .headers .get ("Content-Type" , "" ).lower ()
117
+ if "application/json" in content_type :
118
+ data = response .json ()
119
+ if flatten :
120
+ return [{"original" : data , "flattened" : flatten_json (data )}]
121
+ else :
122
+ return data if isinstance (data , list ) else [data ]
123
+ elif "text/html" in content_type :
124
+ # First, try with requests + BeautifulSoup.
125
+ soup = BeautifulSoup (response .text , "html.parser" )
126
+ text = soup .get_text (separator = "\n " ).strip ()
127
+ # If the text seems too short (less than 50 words), assume content is loaded via JavaScript.
128
+ if len (text .split ()) < 50 :
129
+ try :
130
+ text = self ._fetch_with_headless_browser (url )
131
+ except Exception as e :
132
+ # If headless browser fails, log and fallback to the short text.
133
+ print (f"Headless fetch failed: { e } " )
134
+ return [{"text" : text }]
135
+ elif "application/pdf" in content_type :
136
+ if PdfReader is None :
137
+ raise ImportError ("PyPDF2 is required to load PDF files" )
138
+ pdf_file = BytesIO (response .content )
139
+ reader = PdfReader (pdf_file )
140
+ text = ""
141
+ for page in reader .pages :
142
+ text += page .extract_text () or ""
143
+ return [{"text" : text }]
60
144
else :
61
- return [{"text" : response .text }]
145
+ return [{"text" : response .text }]
146
+
147
+ def _fetch_with_headless_browser (self , url : str ) -> str :
148
+ """
149
+ Use a headless browser (Playwright) to fetch fully rendered content.
150
+ """
151
+ try :
152
+ from playwright .sync_api import sync_playwright
153
+ except ImportError :
154
+ raise ImportError ("playwright is required for JS-rendered pages. Install it with 'pip install playwright' and run 'playwright install'." )
155
+ with sync_playwright () as p :
156
+ browser = p .chromium .launch (headless = True )
157
+ page = browser .new_page ()
158
+ page .goto (url , wait_until = "networkidle" )
159
+ html = page .content ()
160
+ browser .close ()
161
+ soup = BeautifulSoup (html , "html.parser" )
162
+ text = soup .get_text (separator = "\n " ).strip ()
163
+ return text
164
+
165
+ def _load_youtube (self , url : str ) -> List [Dict [str , Any ]]:
166
+ try :
167
+ from youtube_transcript_api import YouTubeTranscriptApi
168
+ except ImportError :
169
+ raise ImportError ("youtube_transcript_api is required to load YouTube transcripts" )
170
+
171
+ video_id = None
172
+ patterns = [r"v=([^&]+)" , r"youtu\.be/([^?&]+)" ]
173
+ for pattern in patterns :
174
+ match = re .search (pattern , url )
175
+ if match :
176
+ video_id = match .group (1 )
177
+ break
178
+ if not video_id :
179
+ raise ValueError ("Could not extract video ID from URL" )
180
+
181
+ # Define a prioritized list of language codes to try
182
+ preferred_languages = ["en" , "hi" , "es" , "fr" , "de" , "ru" ]
183
+
184
+ try :
185
+ transcript = YouTubeTranscriptApi .get_transcript (video_id , languages = preferred_languages )
186
+ text = " " .join (segment ["text" ] for segment in transcript )
187
+ return [{"text" : text }]
188
+ except Exception as e :
189
+ # Return a fallback document indicating transcript retrieval failed
190
+ return [{"text" : f"Transcript not available for video { url } : { str (e )} " }]
191
+
0 commit comments