Skip to content

Commit

Permalink
custom error in extract and url_scan API as LLMGraphBuilderException.…
Browse files Browse the repository at this point in the history
… These all errors records as status='Success/Completed'
  • Loading branch information
praveshkumar1988 committed Jan 7, 2025
1 parent 9e88b63 commit 1d1db92
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 51 deletions.
43 changes: 21 additions & 22 deletions backend/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,14 @@ async def create_source_knowledge_graph_url(
'gcs_project_id':gcs_project_id, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(json_obj, "INFO")
result ={'elapsed_api_time' : f'{elapsed_time:.2f}'}
return create_api_response("Success",message=message,success_count=success_count,failed_count=failed_count,file_name=lst_file_name,data=result)
return create_api_response("Success",message=message,success_count=success_count,failed_count=failed_count,file_name=lst_file_name,data=result)
except LLMGraphBuilderException as e:
error_message = str(e)
message = f" Unable to create source node for source type: {source_type} and source: {source}"
# Set the status "Success" becuase we are treating these error already handled by application as like custom errors.
json_obj = {'error_message':error_message, 'status':'Success','db_url':uri, 'userName':userName, 'database':database,'success_count':1, 'source_type': source_type, 'source_url':source_url, 'wiki_query':wiki_query, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(json_obj, "INFO")
return create_api_response('Failed',message=message + error_message[:80],error=error_message,file_source=source_type)
except Exception as e:
error_message = str(e)
message = f" Unable to create source node for source type: {source_type} and source: {source}"
Expand Down Expand Up @@ -261,34 +268,26 @@ async def extract_knowledge_graph_from_file(
result.update(uri_latency)
logging.info(f"extraction completed in {extract_api_time:.2f} seconds for file name {file_name}")
return create_api_response('Success', data=result, file_source= source_type)
except LLMGraphBuilderException as app_exp:
job_status="Completed"
obj_source_node = sourceNode()
obj_source_node.file_name = file_name
obj_source_node.status = job_status
obj_source_node.error_message = str(app_exp)
obj_source_node.retry_condition = retry_condition
graphDb_data_Access.update_source_node(obj_source_node)
return create_api_response("Success", data={"message": str(app_exp)}, file_name=file_name)
except LLMGraphBuilderException as e:
error_message = str(e)
graphDb_data_Access.update_exception_db(file_name,error_message, retry_condition)
failed_file_process(uri,file_name, merged_file_path, source_type)
node_detail = graphDb_data_Access.get_current_status_document_node(file_name)
# Set the status "Completed" in logging becuase we are treating these error already handled by application as like custom errors.
json_obj = {'api_name':'extract','message':error_message,'file_created_at':node_detail[0]['created_time'],'error_message':error_message, 'file_name': file_name,'status':'Completed',
'db_url':uri, 'userName':userName, 'database':database,'success_count':1, 'source_type': source_type, 'source_url':source_url, 'wiki_query':wiki_query, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(json_obj, "INFO")
return create_api_response("Failed", message = error_message, error=error_message, file_name=file_name)
except Exception as e:
message=f"Failed To Process File:{file_name} or LLM Unable To Parse Content "
error_message = str(e)
graphDb_data_Access.update_exception_db(file_name,error_message, retry_condition)
failed_file_process(uri,file_name, merged_file_path, source_type)
node_detail = graphDb_data_Access.get_current_status_document_node(file_name)
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if source_type == 'local file':
if gcs_file_cache == 'True':
folder_name = create_gcs_bucket_folder_name_hashed(uri,file_name)
copy_failed_file(BUCKET_UPLOAD, BUCKET_FAILED_FILE, folder_name, file_name)
time.sleep(5)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
json_obj = {'message':message,'file_created_at':node_detail[0]['created_time'],'error_message':error_message, 'file_name': file_name,'status':'Failed',

json_obj = {'api_name':'extract','message':message,'file_created_at':node_detail[0]['created_time'],'error_message':error_message, 'file_name': file_name,'status':'Failed',
'db_url':uri, 'userName':userName, 'database':database,'failed_count':1, 'source_type': source_type, 'source_url':source_url, 'wiki_query':wiki_query, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(json_obj, "ERROR")
logging.exception(f'File Failed in extraction: {json_obj}')
return create_api_response('Failed', message=message + error_message[:100], error=error_message, file_name = file_name)
finally:
gc.collect()
Expand Down
9 changes: 5 additions & 4 deletions backend/src/document_sources/gcs_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from langchain_core.documents import Document
from PyPDF2 import PdfReader
import io
from src.shared.llm_graph_builder_exception import LLMGraphBuilderException
from google.oauth2.credentials import Credentials
import time
import nltk
Expand Down Expand Up @@ -34,12 +35,12 @@ def get_gcs_bucket_files_info(gcs_project_id, gcs_bucket_name, gcs_bucket_folder
file_name=''
message=f" Bucket:{gcs_bucket_name} does not exist in Project:{gcs_project_id}. Please provide valid GCS bucket name"
logging.info(f"Bucket : {gcs_bucket_name} does not exist in project : {gcs_project_id}")
raise Exception(message)
raise LLMGraphBuilderException(message)
except Exception as e:
error_message = str(e)
logging.error(f"Unable to create source node for gcs bucket file {file_name}")
logging.exception(f'Exception Stack trace: {error_message}')
raise Exception(error_message)
raise LLMGraphBuilderException(error_message)

def load_pdf(file_path):
return PyMuPDFLoader(file_path)
Expand All @@ -66,7 +67,7 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
loader = GCSFileLoader(project_name=gcs_project_id, bucket=gcs_bucket_name, blob=blob_name, loader_func=load_document_content)
pages = loader.load()
else :
raise Exception('File does not exist, Please re-upload the file and try again.')
raise LLMGraphBuilderException('File does not exist, Please re-upload the file and try again.')
else:
creds= Credentials(access_token)
storage_client = storage.Client(project=gcs_project_id, credentials=creds)
Expand All @@ -83,7 +84,7 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
text += page.extract_text()
pages = [Document(page_content = text)]
else:
raise Exception(f'File Not Found in GCS bucket - {gcs_bucket_name}')
raise LLMGraphBuilderException(f'File Not Found in GCS bucket - {gcs_bucket_name}')
return gcs_blob_filename, pages

def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name, folder_name_sha1_hashed):
Expand Down
3 changes: 2 additions & 1 deletion backend/src/document_sources/s3_bucket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from langchain_community.document_loaders import S3DirectoryLoader
import logging
from src.shared.llm_graph_builder_exception import LLMGraphBuilderException
import boto3
import os
from urllib.parse import urlparse
Expand Down Expand Up @@ -74,4 +75,4 @@ def get_documents_from_s3(s3_url, aws_access_key_id, aws_secret_access_key):
except Exception as e:
error_message = str(e)
logging.exception(f'Exception in reading content from S3:{error_message}')
raise Exception(error_message)
raise LLMGraphBuilderException(error_message)
3 changes: 2 additions & 1 deletion backend/src/document_sources/web_pages.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from langchain_community.document_loaders import WebBaseLoader
from src.shared.llm_graph_builder_exception import LLMGraphBuilderException
from src.shared.common_fn import last_url_segment

def get_documents_from_web_page(source_url:str):
Expand All @@ -12,4 +13,4 @@ def get_documents_from_web_page(source_url:str):
file_name = last_url_segment(source_url)
return file_name, pages
except Exception as e:
raise Exception(str(e))
raise LLMGraphBuilderException(str(e))
9 changes: 4 additions & 5 deletions backend/src/document_sources/wikipedia.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from langchain_community.document_loaders import WikipediaLoader
from src.api_response import create_api_response
from src.shared.llm_graph_builder_exception import LLMGraphBuilderException

def get_documents_from_Wikipedia(wiki_query:str, language:str):
try:
Expand All @@ -9,9 +9,8 @@ def get_documents_from_Wikipedia(wiki_query:str, language:str):
logging.info(f"Total Pages from Wikipedia = {len(pages)}")
return file_name, pages
except Exception as e:
job_status = "Failed"
message="Failed To Process Wikipedia Query"
error_message = str(e)
logging.error(f"Failed To Process Wikipedia Query: {file_name}")
logging.exception(f'Exception Stack trace: {error_message}')
return create_api_response(job_status,message=message,error=error_message,file_name=file_name)
logging.exception(f'Failed To Process Wikipedia Query: {file_name}, Exception Stack trace: {error_message}')
raise LLMGraphBuilderException(error_message+' '+message)

7 changes: 4 additions & 3 deletions backend/src/document_sources/youtube.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from langchain.docstore.document import Document
from src.shared.llm_graph_builder_exception import LLMGraphBuilderException
from youtube_transcript_api import YouTubeTranscriptApi
import logging
from urllib.parse import urlparse,parse_qs
Expand All @@ -16,7 +17,7 @@ def get_youtube_transcript(youtube_id):
return transcript_pieces
except Exception as e:
message = f"Youtube transcript is not available for youtube Id: {youtube_id}"
raise Exception(message)
raise LLMGraphBuilderException(message)

def get_youtube_combined_transcript(youtube_id):
try:
Expand All @@ -27,7 +28,7 @@ def get_youtube_combined_transcript(youtube_id):
return transcript
except Exception as e:
message = f"Youtube transcript is not available for youtube Id: {youtube_id}"
raise Exception(message)
raise LLMGraphBuilderException(message)


def create_youtube_url(url):
Expand Down Expand Up @@ -63,7 +64,7 @@ def get_documents_from_youtube(url):
except Exception as e:
error_message = str(e)
logging.exception(f'Exception in reading transcript from youtube:{error_message}')
raise Exception(error_message)
raise LLMGraphBuilderException(error_message)

def get_calculated_timestamps(chunks, youtube_id):
logging.info('Calculating timestamps for chunks')
Expand Down
41 changes: 26 additions & 15 deletions backend/src/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from langchain_neo4j import Neo4jGraph
from src.shared.constants import (BUCKET_UPLOAD, PROJECT_ID, QUERY_TO_GET_CHUNKS,
from src.shared.constants import (BUCKET_UPLOAD,BUCKET_FAILED_FILE, PROJECT_ID, QUERY_TO_GET_CHUNKS,
QUERY_TO_DELETE_EXISTING_ENTITIES,
QUERY_TO_GET_LAST_PROCESSED_CHUNK_POSITION,
QUERY_TO_GET_LAST_PROCESSED_CHUNK_WITHOUT_ENTITY,
Expand Down Expand Up @@ -41,7 +41,7 @@ def create_source_node_graph_url_s3(graph, model, source_url, aws_access_key_id,
lst_file_name = []
files_info = get_s3_files_info(source_url,aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
if len(files_info)==0:
raise Exception('No pdf files found.')
raise LLMGraphBuilderException('No pdf files found.')
logging.info(f'files info : {files_info}')
success_count=0
failed_count=0
Expand Down Expand Up @@ -121,7 +121,7 @@ def create_source_node_graph_web_url(graph, model, source_url, source_type):
if pages==None or len(pages)==0:
failed_count+=1
message = f"Unable to read data for given url : {source_url}"
raise Exception(message)
raise LLMGraphBuilderException(message)
try:
title = pages[0].metadata['title']
if not title:
Expand Down Expand Up @@ -177,8 +177,7 @@ def create_source_node_graph_url_youtube(graph, model, source_url, source_type):
logging.info(f"Youtube transcript : {transcript}")
if transcript==None or len(transcript)==0:
message = f"Youtube transcript is not available for : {obj_source_node.file_name}"
logging.info(f"Youtube transcript is not available for : {obj_source_node.file_name}")
raise Exception(message)
raise LLMGraphBuilderException(message)
else:
obj_source_node.file_size = sys.getsizeof(transcript)

Expand All @@ -199,7 +198,7 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type
if pages==None or len(pages)==0:
failed_count+=1
message = f"Unable to read data for given Wikipedia url : {wiki_query}"
raise Exception(message)
raise LLMGraphBuilderException(message)
else:
obj_source_node = sourceNode()
obj_source_node.file_name = wiki_query_id.strip()
Expand Down Expand Up @@ -233,21 +232,21 @@ async def extract_graph_from_file_local_file(uri, userName, password, database,
else:
file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,fileName)
if pages==None or len(pages)==0:
raise Exception(f'File content is not available for file : {file_name}')
raise LLMGraphBuilderException(f'File content is not available for file : {file_name}')
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path)
else:
return await processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, True, merged_file_path, retry_condition)

async def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition):
if not retry_condition:
if(aws_access_key_id==None or aws_secret_access_key==None):
raise Exception('Please provide AWS access and secret keys')
raise LLMGraphBuilderException('Please provide AWS access and secret keys')
else:
logging.info("Insert in S3 Block")
file_name, pages = get_documents_from_s3(source_url, aws_access_key_id, aws_secret_access_key)

if pages==None or len(pages)==0:
raise Exception(f'File content is not available for file : {file_name}')
raise LLMGraphBuilderException(f'File content is not available for file : {file_name}')
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
else:
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
Expand All @@ -256,7 +255,7 @@ async def extract_graph_from_web_page(uri, userName, password, database, model,
if not retry_condition:
file_name, pages = get_documents_from_web_page(source_url)
if pages==None or len(pages)==0:
raise Exception(f'Content is not available for given URL : {file_name}')
raise LLMGraphBuilderException(f'Content is not available for given URL : {file_name}')
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
else:
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
Expand All @@ -266,7 +265,7 @@ async def extract_graph_from_file_youtube(uri, userName, password, database, mod
file_name, pages = get_documents_from_youtube(source_url)

if pages==None or len(pages)==0:
raise Exception(f'Youtube transcript is not available for file : {file_name}')
raise LLMGraphBuilderException(f'Youtube transcript is not available for file : {file_name}')
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
else:
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
Expand All @@ -275,7 +274,7 @@ async def extract_graph_from_file_Wikipedia(uri, userName, password, database, m
if not retry_condition:
file_name, pages = get_documents_from_Wikipedia(wiki_query, language)
if pages==None or len(pages)==0:
raise Exception(f'Wikipedia page is not available for file : {file_name}')
raise LLMGraphBuilderException(f'Wikipedia page is not available for file : {file_name}')
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
else:
return await processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, retry_condition=retry_condition)
Expand All @@ -284,7 +283,7 @@ async def extract_graph_from_file_gcs(uri, userName, password, database, model,
if not retry_condition:
file_name, pages = get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token)
if pages==None or len(pages)==0:
raise Exception(f'File content is not available for file : {file_name}')
raise LLMGraphBuilderException(f'File content is not available for file : {file_name}')
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship)
else:
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition)
Expand Down Expand Up @@ -456,7 +455,7 @@ async def processing_source(uri, userName, password, database, model, file_name,
else:
error_message = "Unable to get the status of document node."
logging.error(error_message)
raise Exception(error_message)
raise LLMGraphBuilderException(error_message)

async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship, node_count, rel_count):
#create vector index and update chunk node with embedding
Expand Down Expand Up @@ -538,7 +537,7 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, retry_condition):
chunks = graph.query(QUERY_TO_GET_CHUNKS, params={"filename":file_name})

if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks :
raise Exception(f"Chunks are not created for {file_name}. Please re-upload file and try again.")
raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file and try again.")
else:
for chunk in chunks:
chunk_doc = Document(page_content=chunk['text'], metadata={'id':chunk['id'], 'position':chunk['position']})
Expand Down Expand Up @@ -742,3 +741,15 @@ def set_status_retry(graph, file_name, retry_condition):
obj_source_node.relationship_count=0
logging.info(obj_source_node)
graphDb_data_Access.update_source_node(obj_source_node)

def failed_file_process(uri,file_name, merged_file_path, source_type):
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if source_type == 'local file':
if gcs_file_cache == 'True':
folder_name = create_gcs_bucket_folder_name_hashed(uri,file_name)
copy_failed_file(BUCKET_UPLOAD, BUCKET_FAILED_FILE, folder_name, file_name)
time.sleep(5)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)

0 comments on commit 1d1db92

Please sign in to comment.