Spaces:
Running
Running
| from langchain_community.document_loaders import DirectoryLoader | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.schema import Document | |
| from langchain_core.documents import Document | |
| from langchain_community.vectorstores import Chroma | |
| import os | |
| import shutil | |
| import asyncio | |
| from unstructured.partition.pdf import partition_pdf | |
| from unstructured.partition.auto import partition | |
| import pytesseract | |
| import os | |
| import re | |
| import uuid | |
| from langchain.schema import Document | |
| from collections import defaultdict | |
| pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') | |
| pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') | |
| # Configurations | |
| UPLOAD_FOLDER = "./uploads" | |
| VECTOR_DB_FOLDER = "./VectorDB" | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) | |
| ######################################################################################################################################################## | |
| ####-------------------------------------------------------------- Documnet Loader ---------------------------------------------------------------#### | |
| ######################################################################################################################################################## | |
| # Loaders for loading Document text, tables and images from any file format. | |
| #data_path=r"H:\DEV PATEL\2025\RAG Project\test_data\google data" | |
| def load_document(data_path): | |
| processed_documents = [] | |
| #element_content = [] | |
| table_document = [] | |
| #having different process for the pdf | |
| for root, _, files in os.walk(data_path): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| doc_id = str(uuid.uuid4()) # Generate a unique ID for the document | |
| print(f"Processing document ID: {doc_id}, Path: {file_path}") | |
| try: | |
| # Determine the file type based on extension | |
| filename, file_extension = os.path.splitext(file.lower()) | |
| image_output = f"H:/DEV PATEL/2025/RAG Project/Images/{filename}/" | |
| # Use specific partition techniques based on file extension | |
| if file_extension == ".pdf": | |
| elements = partition_pdf( | |
| filename=file_path, | |
| strategy="hi_res", # Use layout detection | |
| infer_table_structure=True, | |
| hi_res_model_name="yolox", | |
| extract_images_in_pdf=True, | |
| extract_image_block_types=["Image","Table"], | |
| extract_image_block_output_dir=image_output, | |
| show_progress=True, | |
| #chunking_strategy="by_title", | |
| ) | |
| else: | |
| # Default to auto partition if no specific handler is found | |
| elements = partition( | |
| filename=file_path, | |
| strategy="hi_res", | |
| infer_table_structure=True, | |
| show_progress=True, | |
| #chunking_strategy="by_title" | |
| ) | |
| except Exception as e: | |
| print(f"Failed to process document {file_path}: {e}") | |
| continue | |
| categorized_content = { | |
| "tables": {"content": [], "Metadata": []}, | |
| "images": {"content": [], "Metadata": []}, | |
| "text": {"content": [], "Metadata": []}, | |
| } | |
| #element_content.append(elements) | |
| CNT=1 | |
| for chunk in elements: | |
| # Safely extract metadata and text | |
| chunk_type = str(type(chunk)) | |
| chunk_metadata = chunk.metadata.to_dict() if chunk.metadata else {} | |
| chunk_text = getattr(chunk, "text", None) | |
| # Separate content into categories | |
| #if "Table" in chunk_type: | |
| if any( | |
| keyword in chunk_type | |
| for keyword in [ | |
| "Table", | |
| "TableChunk"]): | |
| categorized_content["tables"]["content"].append(chunk_text) | |
| categorized_content["tables"]["Metadata"].append(chunk_metadata) | |
| #test1 | |
| TABLE_DATA=f"Table number {CNT} "+chunk_metadata.get("text_as_html", "")+" " | |
| CNT+=1 | |
| categorized_content["text"]["content"].append(TABLE_DATA) | |
| categorized_content["text"]["Metadata"].append(chunk_metadata) | |
| elif "Image" in chunk_type: | |
| categorized_content["images"]["content"].append(chunk_text) | |
| categorized_content["images"]["Metadata"].append(chunk_metadata) | |
| elif any( | |
| keyword in chunk_type | |
| for keyword in [ | |
| "CompositeElement", | |
| "Text", | |
| "NarrativeText", | |
| "Title", | |
| "Header", | |
| "Footer", | |
| "FigureCaption", | |
| "ListItem", | |
| "UncategorizedText", | |
| "Formula", | |
| "CodeSnippet", | |
| "Address", | |
| "EmailAddress", | |
| "PageBreak", | |
| ] | |
| ): | |
| categorized_content["text"]["content"].append(chunk_text) | |
| categorized_content["text"]["Metadata"].append(chunk_metadata) | |
| else: | |
| continue | |
| # Append processed document | |
| processed_documents.append({ | |
| "doc_id": doc_id, | |
| "source": file_path, | |
| **categorized_content, | |
| }) | |
| # Loop over tables and match text from the same document and page | |
| for doc in processed_documents: | |
| cnt=1 # count for storing number of the table | |
| for table_metadata in doc.get("tables", {}).get("Metadata", []): | |
| page_number = table_metadata.get("page_number") | |
| source = doc.get("source") | |
| page_content = "" | |
| for text_metadata, text_content in zip( | |
| doc.get("text", {}).get("Metadata", []), | |
| doc.get("text", {}).get("content", []) | |
| ): | |
| page_number2 = text_metadata.get("page_number") | |
| source2 = doc.get("source") | |
| if source == source2 and page_number == page_number2: | |
| print(f"Matching text found for source: {source}, page: {page_number}") | |
| page_content += f"{text_content} " # Concatenate text with a space | |
| # Add the matched content to the table metadata | |
| table_metadata["page_content"] =f"Table number {cnt} "+table_metadata.get("text_as_html", "")+" "+page_content.strip() # Remove trailing spaces and have the content proper here | |
| table_metadata["text_as_html"] = table_metadata.get("text_as_html", "") # we are also storing it seperatly | |
| table_metadata["Table_number"] = cnt # addiing the table number it will be use in retrival | |
| cnt+=1 | |
| # Custom loader of document which will store the table along with the text on that page specifically | |
| # making document of each table with its content | |
| unique_id = str(uuid.uuid4()) | |
| table_document.append( | |
| Document( | |
| id =unique_id, # Add doc_id directly | |
| page_content=table_metadata.get("page_content", ""), # Get page_content from metadata, default to empty string if missing | |
| metadata={ | |
| "source": doc["source"], | |
| "text_as_html": table_metadata.get("text_as_html", ""), | |
| "filetype": table_metadata.get("filetype", ""), | |
| "page_number": str(table_metadata.get("page_number", 0)), # Default to 0 if missing | |
| "image_path": table_metadata.get("image_path", ""), | |
| "file_directory": table_metadata.get("file_directory", ""), | |
| "filename": table_metadata.get("filename", ""), | |
| "Table_number": str(table_metadata.get("Table_number", 0)) # Default to 0 if missing | |
| } | |
| ) | |
| ) | |
| # Initialize a structure to group content by doc_id | |
| grouped_by_doc_id = defaultdict(lambda: { | |
| "text_content": [], | |
| "metadata": None, # Metadata will only be set once per doc_id | |
| }) | |
| for doc in processed_documents: | |
| doc_id = doc.get("doc_id") | |
| source = doc.get("source") | |
| text_content = doc.get("text", {}).get("content", []) | |
| metadata_list = doc.get("text", {}).get("Metadata", []) | |
| # Merge text content | |
| grouped_by_doc_id[doc_id]["text_content"].extend(text_content) | |
| # Set metadata (if not already set) | |
| if grouped_by_doc_id[doc_id]["metadata"] is None and metadata_list: | |
| metadata = metadata_list[0] # Assuming metadata is consistent | |
| grouped_by_doc_id[doc_id]["metadata"] = { | |
| "source": source, | |
| #"filetype": metadata.get("filetype"), | |
| "file_directory": metadata.get("file_directory"), | |
| "filename": metadata.get("filename"), | |
| #"languages": str(metadata.get("languages")), | |
| } | |
| # Convert grouped content into Document objects | |
| grouped_documents = [] | |
| for doc_id, data in grouped_by_doc_id.items(): | |
| grouped_documents.append( | |
| Document( | |
| id=doc_id, | |
| page_content=" ".join(data["text_content"]).strip(), | |
| metadata=data["metadata"], | |
| ) | |
| ) | |
| # Output the grouped documents | |
| # for document in grouped_documents: | |
| # print(document) | |
| #Dirctory loader for loading the text data only to specific db | |
| loader = DirectoryLoader(data_path, glob="*.*") | |
| documents = loader.load() | |
| # update the metadata adding filname to the met | |
| for doc in documents: | |
| unique_id = str(uuid.uuid4()) | |
| doc.id = unique_id | |
| path=doc.metadata.get("source") | |
| match = re.search(r'([^\\]+\.[^\\]+)$', path) | |
| doc.metadata.update({"filename":match.group(1)}) | |
| return grouped_documents,documents,table_document | |
| #grouped_documents = load_document(data_path) | |
| #documents,processed_documents,table_document = load_document(data_path) | |
| ######################################################################################################################################################## | |
| ####-------------------------------------------------------------- Chunking the Text --------------------------------------------------------------#### | |
| ######################################################################################################################################################## | |
| def split_text(documents: list[Document]): | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=2000, | |
| chunk_overlap=600, | |
| length_function=len, | |
| add_start_index=True, | |
| ) | |
| chunks = text_splitter.split_documents(documents) # splitting the document into chunks | |
| for index in chunks: | |
| index.metadata["start_index"]=str(index.metadata["start_index"]) # the converstion of int metadata to str was done to store it in sqlite3 | |
| print(f"Split {len(documents)} documents into {len(chunks)} chunks.") | |
| return chunks | |
| ######################################################################################################################################################## | |
| ####---------------------------------------------------- Creating and Storeing Data in Vector DB --------------------------------------------------#### | |
| ######################################################################################################################################################## | |
| #def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): | |
| async def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): | |
| CHROMA_PATH = f"./VectorDB/chroma_{name}" | |
| TABLE_PATH = f"./TableDB/chroma_{name}" | |
| if os.path.exists(CHROMA_PATH): | |
| shutil.rmtree(CHROMA_PATH) | |
| if os.path.exists(TABLE_PATH): | |
| shutil.rmtree(TABLE_PATH) | |
| try: | |
| # Load the embedding model | |
| embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True) | |
| #embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| # Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] | |
| print("Creating document vector database...") | |
| db =Chroma.from_documents( | |
| documents=chunks, | |
| embedding=embedding_function, | |
| persist_directory=CHROMA_PATH, | |
| ) | |
| print("Persisting the document database...") | |
| db.persist() | |
| print("Document database successfully saved.") | |
| # Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] | |
| if tables !=[]: | |
| print("Creating table vector database...") | |
| tdb =Chroma.from_documents( | |
| documents=tables, | |
| embedding=embedding_function, | |
| persist_directory=TABLE_PATH, | |
| ) | |
| print("Persisting the table database...") | |
| db.persist() | |
| print("Table database successfully saved.") | |
| else: | |
| tdb = None | |
| return db, tdb | |
| #return db | |
| except Exception as e: | |
| print("Error while saving to Chroma:", e) | |
| return None | |
| # def get_unique_sources(chroma_path): | |
| # db = Chroma(persist_directory=chroma_path) | |
| # metadata_list = db.get()["metadatas"] | |
| # unique_sources = {metadata["source"] for metadata in metadata_list if "source" in metadata} | |
| # return list(unique_sources) | |
| ######################################################################################################################################################## | |
| ####----------------------------------------------------------- Updating Existing Data in Vector DB -----------------------------------------------#### | |
| ######################################################################################################################################################## | |
| # def add_document_to_existing_db(new_documents: list[Document], db_name: str): | |
| # CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
| # if not os.path.exists(CHROMA_PATH): | |
| # print(f"Database '{db_name}' does not exist. Please create it first.") | |
| # return | |
| # try: | |
| # embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| # #embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| # db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
| # print("Adding new documents to the existing database...") | |
| # chunks = split_text(new_documents) | |
| # db.add_documents(chunks) | |
| # db.persist() | |
| # print("New documents added and database updated successfully.") | |
| # except Exception as e: | |
| # print("Error while adding documents to existing database:", e) | |
| # def delete_chunks_by_source(chroma_path, source_to_delete): | |
| # if not os.path.exists(chroma_path): | |
| # print(f"Database at path '{chroma_path}' does not exist.") | |
| # return | |
| # try: | |
| # #embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| # embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| # db = Chroma(persist_directory=chroma_path, embedding_function=embedding_function) | |
| # print(f"Retrieving all metadata to identify chunks with source '{source_to_delete}'...") | |
| # metadata_list = db.get()["metadatas"] | |
| # # Identify indices of chunks to delete | |
| # indices_to_delete = [ | |
| # idx for idx, metadata in enumerate(metadata_list) if metadata.get("source") == source_to_delete | |
| # ] | |
| # if not indices_to_delete: | |
| # print(f"No chunks found with source '{source_to_delete}'.") | |
| # return | |
| # print(f"Deleting {len(indices_to_delete)} chunks with source '{source_to_delete}'...") | |
| # db.delete(indices=indices_to_delete) | |
| # db.persist() | |
| # print("Chunks deleted and database updated successfully.") | |
| # except Exception as e: | |
| # print(f"Error while deleting chunks by source: {e}") | |
| # # update a data store | |
| # def update_data_store(file_path, db_name): | |
| # CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
| # print(f"Filepath ===> {file_path} DB Name ====> {db_name}") | |
| # try: | |
| # documents,table_document = load_document(file_path) | |
| # print("Documents loaded successfully.") | |
| # except Exception as e: | |
| # print(f"Error loading documents: {e}") | |
| # return | |
| # try: | |
| # chunks = split_text(documents) | |
| # print(f"Text split into {len(chunks)} chunks.") | |
| # except Exception as e: | |
| # print(f"Error splitting text: {e}") | |
| # return | |
| # try: | |
| # asyncio.run(save_to_chroma(save_to_chroma(chunks, db_name, table_document))) | |
| # print(f"Data saved to Chroma for database {db_name}.") | |
| # except Exception as e: | |
| # print(f"Error saving to Chroma: {e}") | |
| # return | |
| ######################################################################################################################################################## | |
| ####------------------------------------------------------- Combine Process of Load, Chunk and Store ----------------------------------------------#### | |
| ######################################################################################################################################################## | |
| async def generate_data_store(file_path, db_name): | |
| CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
| print(f"Filepath ===> {file_path} DB Name ====> {db_name}") | |
| try: | |
| documents,processed_documents,table_document = load_document(file_path) | |
| #grouped_document,document = load_document(file_path) | |
| print("Documents loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading documents: {e}") | |
| return | |
| try: | |
| chunks = split_text(documents) | |
| print(f"Text split into {len(chunks)} chunks.") | |
| except Exception as e: | |
| print(f"Error splitting text: {e}") | |
| return | |
| try: | |
| await save_to_chroma(chunks, db_name, table_document) | |
| #await asyncio.run(save_to_chroma(chunks, db_name,table_document)) | |
| print(f"Data saved to Chroma for database {db_name}.") | |
| except Exception as e: | |
| print(f"Error saving to Chroma: {e}") | |
| return | |