From 1d1db92572cbe0264c02939bf4cdaf4b946ad22e Mon Sep 17 00:00:00 2001 From: Pravesh Kumar <121786590+praveshkumar1988@users.noreply.github.com> Date: Tue, 7 Jan 2025 07:49:32 +0000 Subject: [PATCH] custom error in extract and url_scan API as LLMGraphBuilderException. These all errors records as status='Success/Completed' --- backend/score.py | 43 +++++++++++----------- backend/src/document_sources/gcs_bucket.py | 9 +++-- backend/src/document_sources/s3_bucket.py | 3 +- backend/src/document_sources/web_pages.py | 3 +- backend/src/document_sources/wikipedia.py | 9 ++--- backend/src/document_sources/youtube.py | 7 ++-- backend/src/main.py | 41 +++++++++++++-------- 7 files changed, 64 insertions(+), 51 deletions(-) diff --git a/backend/score.py b/backend/score.py index 4e69a710b..9b1d1110e 100644 --- a/backend/score.py +++ b/backend/score.py @@ -148,7 +148,14 @@ async def create_source_knowledge_graph_url( 'gcs_project_id':gcs_project_id, 'logging_time': formatted_time(datetime.now(timezone.utc))} logger.log_struct(json_obj, "INFO") result ={'elapsed_api_time' : f'{elapsed_time:.2f}'} - return create_api_response("Success",message=message,success_count=success_count,failed_count=failed_count,file_name=lst_file_name,data=result) + return create_api_response("Success",message=message,success_count=success_count,failed_count=failed_count,file_name=lst_file_name,data=result) + except LLMGraphBuilderException as e: + error_message = str(e) + message = f" Unable to create source node for source type: {source_type} and source: {source}" + # Set the status "Success" becuase we are treating these error already handled by application as like custom errors. + json_obj = {'error_message':error_message, 'status':'Success','db_url':uri, 'userName':userName, 'database':database,'success_count':1, 'source_type': source_type, 'source_url':source_url, 'wiki_query':wiki_query, 'logging_time': formatted_time(datetime.now(timezone.utc))} + logger.log_struct(json_obj, "INFO") + return create_api_response('Failed',message=message + error_message[:80],error=error_message,file_source=source_type) except Exception as e: error_message = str(e) message = f" Unable to create source node for source type: {source_type} and source: {source}" @@ -261,34 +268,26 @@ async def extract_knowledge_graph_from_file( result.update(uri_latency) logging.info(f"extraction completed in {extract_api_time:.2f} seconds for file name {file_name}") return create_api_response('Success', data=result, file_source= source_type) - except LLMGraphBuilderException as app_exp: - job_status="Completed" - obj_source_node = sourceNode() - obj_source_node.file_name = file_name - obj_source_node.status = job_status - obj_source_node.error_message = str(app_exp) - obj_source_node.retry_condition = retry_condition - graphDb_data_Access.update_source_node(obj_source_node) - return create_api_response("Success", data={"message": str(app_exp)}, file_name=file_name) + except LLMGraphBuilderException as e: + error_message = str(e) + graphDb_data_Access.update_exception_db(file_name,error_message, retry_condition) + failed_file_process(uri,file_name, merged_file_path, source_type) + node_detail = graphDb_data_Access.get_current_status_document_node(file_name) + # Set the status "Completed" in logging becuase we are treating these error already handled by application as like custom errors. + json_obj = {'api_name':'extract','message':error_message,'file_created_at':node_detail[0]['created_time'],'error_message':error_message, 'file_name': file_name,'status':'Completed', + 'db_url':uri, 'userName':userName, 'database':database,'success_count':1, 'source_type': source_type, 'source_url':source_url, 'wiki_query':wiki_query, 'logging_time': formatted_time(datetime.now(timezone.utc))} + logger.log_struct(json_obj, "INFO") + return create_api_response("Failed", message = error_message, error=error_message, file_name=file_name) except Exception as e: message=f"Failed To Process File:{file_name} or LLM Unable To Parse Content " error_message = str(e) graphDb_data_Access.update_exception_db(file_name,error_message, retry_condition) + failed_file_process(uri,file_name, merged_file_path, source_type) node_detail = graphDb_data_Access.get_current_status_document_node(file_name) - gcs_file_cache = os.environ.get('GCS_FILE_CACHE') - if source_type == 'local file': - if gcs_file_cache == 'True': - folder_name = create_gcs_bucket_folder_name_hashed(uri,file_name) - copy_failed_file(BUCKET_UPLOAD, BUCKET_FAILED_FILE, folder_name, file_name) - time.sleep(5) - delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name) - else: - logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}') - delete_uploaded_local_file(merged_file_path,file_name) - json_obj = {'message':message,'file_created_at':node_detail[0]['created_time'],'error_message':error_message, 'file_name': file_name,'status':'Failed', + + json_obj = {'api_name':'extract','message':message,'file_created_at':node_detail[0]['created_time'],'error_message':error_message, 'file_name': file_name,'status':'Failed', 'db_url':uri, 'userName':userName, 'database':database,'failed_count':1, 'source_type': source_type, 'source_url':source_url, 'wiki_query':wiki_query, 'logging_time': formatted_time(datetime.now(timezone.utc))} logger.log_struct(json_obj, "ERROR") - logging.exception(f'File Failed in extraction: {json_obj}') return create_api_response('Failed', message=message + error_message[:100], error=error_message, file_name = file_name) finally: gc.collect() diff --git a/backend/src/document_sources/gcs_bucket.py b/backend/src/document_sources/gcs_bucket.py index 891c30703..d50635571 100644 --- a/backend/src/document_sources/gcs_bucket.py +++ b/backend/src/document_sources/gcs_bucket.py @@ -6,6 +6,7 @@ from langchain_core.documents import Document from PyPDF2 import PdfReader import io +from src.shared.llm_graph_builder_exception import LLMGraphBuilderException from google.oauth2.credentials import Credentials import time import nltk @@ -34,12 +35,12 @@ def get_gcs_bucket_files_info(gcs_project_id, gcs_bucket_name, gcs_bucket_folder file_name='' message=f" Bucket:{gcs_bucket_name} does not exist in Project:{gcs_project_id}. Please provide valid GCS bucket name" logging.info(f"Bucket : {gcs_bucket_name} does not exist in project : {gcs_project_id}") - raise Exception(message) + raise LLMGraphBuilderException(message) except Exception as e: error_message = str(e) logging.error(f"Unable to create source node for gcs bucket file {file_name}") logging.exception(f'Exception Stack trace: {error_message}') - raise Exception(error_message) + raise LLMGraphBuilderException(error_message) def load_pdf(file_path): return PyMuPDFLoader(file_path) @@ -66,7 +67,7 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g loader = GCSFileLoader(project_name=gcs_project_id, bucket=gcs_bucket_name, blob=blob_name, loader_func=load_document_content) pages = loader.load() else : - raise Exception('File does not exist, Please re-upload the file and try again.') + raise LLMGraphBuilderException('File does not exist, Please re-upload the file and try again.') else: creds= Credentials(access_token) storage_client = storage.Client(project=gcs_project_id, credentials=creds) @@ -83,7 +84,7 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g text += page.extract_text() pages = [Document(page_content = text)] else: - raise Exception(f'File Not Found in GCS bucket - {gcs_bucket_name}') + raise LLMGraphBuilderException(f'File Not Found in GCS bucket - {gcs_bucket_name}') return gcs_blob_filename, pages def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name, folder_name_sha1_hashed): diff --git a/backend/src/document_sources/s3_bucket.py b/backend/src/document_sources/s3_bucket.py index 908e8474e..cdcd7fa0f 100644 --- a/backend/src/document_sources/s3_bucket.py +++ b/backend/src/document_sources/s3_bucket.py @@ -1,5 +1,6 @@ from langchain_community.document_loaders import S3DirectoryLoader import logging +from src.shared.llm_graph_builder_exception import LLMGraphBuilderException import boto3 import os from urllib.parse import urlparse @@ -74,4 +75,4 @@ def get_documents_from_s3(s3_url, aws_access_key_id, aws_secret_access_key): except Exception as e: error_message = str(e) logging.exception(f'Exception in reading content from S3:{error_message}') - raise Exception(error_message) \ No newline at end of file + raise LLMGraphBuilderException(error_message) \ No newline at end of file diff --git a/backend/src/document_sources/web_pages.py b/backend/src/document_sources/web_pages.py index 539e9f9c2..91c87510c 100644 --- a/backend/src/document_sources/web_pages.py +++ b/backend/src/document_sources/web_pages.py @@ -1,4 +1,5 @@ from langchain_community.document_loaders import WebBaseLoader +from src.shared.llm_graph_builder_exception import LLMGraphBuilderException from src.shared.common_fn import last_url_segment def get_documents_from_web_page(source_url:str): @@ -12,4 +13,4 @@ def get_documents_from_web_page(source_url:str): file_name = last_url_segment(source_url) return file_name, pages except Exception as e: - raise Exception(str(e)) \ No newline at end of file + raise LLMGraphBuilderException(str(e)) \ No newline at end of file diff --git a/backend/src/document_sources/wikipedia.py b/backend/src/document_sources/wikipedia.py index 71820a69e..e4d7742b1 100644 --- a/backend/src/document_sources/wikipedia.py +++ b/backend/src/document_sources/wikipedia.py @@ -1,6 +1,6 @@ import logging from langchain_community.document_loaders import WikipediaLoader -from src.api_response import create_api_response +from src.shared.llm_graph_builder_exception import LLMGraphBuilderException def get_documents_from_Wikipedia(wiki_query:str, language:str): try: @@ -9,9 +9,8 @@ def get_documents_from_Wikipedia(wiki_query:str, language:str): logging.info(f"Total Pages from Wikipedia = {len(pages)}") return file_name, pages except Exception as e: - job_status = "Failed" message="Failed To Process Wikipedia Query" error_message = str(e) - logging.error(f"Failed To Process Wikipedia Query: {file_name}") - logging.exception(f'Exception Stack trace: {error_message}') - return create_api_response(job_status,message=message,error=error_message,file_name=file_name) \ No newline at end of file + logging.exception(f'Failed To Process Wikipedia Query: {file_name}, Exception Stack trace: {error_message}') + raise LLMGraphBuilderException(error_message+' '+message) + \ No newline at end of file diff --git a/backend/src/document_sources/youtube.py b/backend/src/document_sources/youtube.py index 1b3776db5..82e9a9219 100644 --- a/backend/src/document_sources/youtube.py +++ b/backend/src/document_sources/youtube.py @@ -1,4 +1,5 @@ from langchain.docstore.document import Document +from src.shared.llm_graph_builder_exception import LLMGraphBuilderException from youtube_transcript_api import YouTubeTranscriptApi import logging from urllib.parse import urlparse,parse_qs @@ -16,7 +17,7 @@ def get_youtube_transcript(youtube_id): return transcript_pieces except Exception as e: message = f"Youtube transcript is not available for youtube Id: {youtube_id}" - raise Exception(message) + raise LLMGraphBuilderException(message) def get_youtube_combined_transcript(youtube_id): try: @@ -27,7 +28,7 @@ def get_youtube_combined_transcript(youtube_id): return transcript except Exception as e: message = f"Youtube transcript is not available for youtube Id: {youtube_id}" - raise Exception(message) + raise LLMGraphBuilderException(message) def create_youtube_url(url): @@ -63,7 +64,7 @@ def get_documents_from_youtube(url): except Exception as e: error_message = str(e) logging.exception(f'Exception in reading transcript from youtube:{error_message}') - raise Exception(error_message) + raise LLMGraphBuilderException(error_message) def get_calculated_timestamps(chunks, youtube_id): logging.info('Calculating timestamps for chunks') diff --git a/backend/src/main.py b/backend/src/main.py index 610d90291..80098fa80 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -1,5 +1,5 @@ from langchain_neo4j import Neo4jGraph -from src.shared.constants import (BUCKET_UPLOAD, PROJECT_ID, QUERY_TO_GET_CHUNKS, +from src.shared.constants import (BUCKET_UPLOAD,BUCKET_FAILED_FILE, PROJECT_ID, QUERY_TO_GET_CHUNKS, QUERY_TO_DELETE_EXISTING_ENTITIES, QUERY_TO_GET_LAST_PROCESSED_CHUNK_POSITION, QUERY_TO_GET_LAST_PROCESSED_CHUNK_WITHOUT_ENTITY, @@ -41,7 +41,7 @@ def create_source_node_graph_url_s3(graph, model, source_url, aws_access_key_id, lst_file_name = [] files_info = get_s3_files_info(source_url,aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key) if len(files_info)==0: - raise Exception('No pdf files found.') + raise LLMGraphBuilderException('No pdf files found.') logging.info(f'files info : {files_info}') success_count=0 failed_count=0 @@ -121,7 +121,7 @@ def create_source_node_graph_web_url(graph, model, source_url, source_type): if pages==None or len(pages)==0: failed_count+=1 message = f"Unable to read data for given url : {source_url}" - raise Exception(message) + raise LLMGraphBuilderException(message) try: title = pages[0].metadata['title'] if not title: @@ -177,8 +177,7 @@ def create_source_node_graph_url_youtube(graph, model, source_url, source_type): logging.info(f"Youtube transcript : {transcript}") if transcript==None or len(transcript)==0: message = f"Youtube transcript is not available for : {obj_source_node.file_name}" - logging.info(f"Youtube transcript is not available for : {obj_source_node.file_name}") - raise Exception(message) + raise LLMGraphBuilderException(message) else: obj_source_node.file_size = sys.getsizeof(transcript) @@ -199,7 +198,7 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type if pages==None or len(pages)==0: failed_count+=1 message = f"Unable to read data for given Wikipedia url : {wiki_query}" - raise Exception(message) + raise LLMGraphBuilderException(message) else: obj_source_node = sourceNode() obj_source_node.file_name = wiki_query_id.strip() @@ -233,7 +232,7 @@ async def extract_graph_from_file_local_file(uri, userName, password, database, else: file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,fileName) if pages==None or len(pages)==0: - raise Exception(f'File content is not available for file : {file_name}') + raise LLMGraphBuilderException(f'File content is not available for file : {file_name}') return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path) else: return await processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, True, merged_file_path, retry_condition) @@ -241,13 +240,13 @@ async def extract_graph_from_file_local_file(uri, userName, password, database, async def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, retry_condition): if not retry_condition: if(aws_access_key_id==None or aws_secret_access_key==None): - raise Exception('Please provide AWS access and secret keys') + raise LLMGraphBuilderException('Please provide AWS access and secret keys') else: logging.info("Insert in S3 Block") file_name, pages = get_documents_from_s3(source_url, aws_access_key_id, aws_secret_access_key) if pages==None or len(pages)==0: - raise Exception(f'File content is not available for file : {file_name}') + raise LLMGraphBuilderException(f'File content is not available for file : {file_name}') return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship) else: return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition) @@ -256,7 +255,7 @@ async def extract_graph_from_web_page(uri, userName, password, database, model, if not retry_condition: file_name, pages = get_documents_from_web_page(source_url) if pages==None or len(pages)==0: - raise Exception(f'Content is not available for given URL : {file_name}') + raise LLMGraphBuilderException(f'Content is not available for given URL : {file_name}') return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship) else: return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition) @@ -266,7 +265,7 @@ async def extract_graph_from_file_youtube(uri, userName, password, database, mod file_name, pages = get_documents_from_youtube(source_url) if pages==None or len(pages)==0: - raise Exception(f'Youtube transcript is not available for file : {file_name}') + raise LLMGraphBuilderException(f'Youtube transcript is not available for file : {file_name}') return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship) else: return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition) @@ -275,7 +274,7 @@ async def extract_graph_from_file_Wikipedia(uri, userName, password, database, m if not retry_condition: file_name, pages = get_documents_from_Wikipedia(wiki_query, language) if pages==None or len(pages)==0: - raise Exception(f'Wikipedia page is not available for file : {file_name}') + raise LLMGraphBuilderException(f'Wikipedia page is not available for file : {file_name}') return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship) else: return await processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, retry_condition=retry_condition) @@ -284,7 +283,7 @@ async def extract_graph_from_file_gcs(uri, userName, password, database, model, if not retry_condition: file_name, pages = get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token) if pages==None or len(pages)==0: - raise Exception(f'File content is not available for file : {file_name}') + raise LLMGraphBuilderException(f'File content is not available for file : {file_name}') return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship) else: return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, retry_condition=retry_condition) @@ -456,7 +455,7 @@ async def processing_source(uri, userName, password, database, model, file_name, else: error_message = "Unable to get the status of document node." logging.error(error_message) - raise Exception(error_message) + raise LLMGraphBuilderException(error_message) async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password, database,file_name,model,allowedNodes,allowedRelationship, node_count, rel_count): #create vector index and update chunk node with embedding @@ -538,7 +537,7 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, retry_condition): chunks = graph.query(QUERY_TO_GET_CHUNKS, params={"filename":file_name}) if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks : - raise Exception(f"Chunks are not created for {file_name}. Please re-upload file and try again.") + raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file and try again.") else: for chunk in chunks: chunk_doc = Document(page_content=chunk['text'], metadata={'id':chunk['id'], 'position':chunk['position']}) @@ -742,3 +741,15 @@ def set_status_retry(graph, file_name, retry_condition): obj_source_node.relationship_count=0 logging.info(obj_source_node) graphDb_data_Access.update_source_node(obj_source_node) + +def failed_file_process(uri,file_name, merged_file_path, source_type): + gcs_file_cache = os.environ.get('GCS_FILE_CACHE') + if source_type == 'local file': + if gcs_file_cache == 'True': + folder_name = create_gcs_bucket_folder_name_hashed(uri,file_name) + copy_failed_file(BUCKET_UPLOAD, BUCKET_FAILED_FILE, folder_name, file_name) + time.sleep(5) + delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name) + else: + logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}') + delete_uploaded_local_file(merged_file_path,file_name) \ No newline at end of file