Spaces:
Running
Running
| from langchain_community.document_loaders import DirectoryLoader | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.schema import Document | |
| from langchain_core.documents import Document | |
| from langchain_community.vectorstores import Chroma | |
| import os | |
| import shutil | |
| import asyncio | |
| from unstructured.partition.pdf import partition_pdf | |
| from unstructured.partition.auto import partition | |
| import pytesseract | |
| import os | |
| import re | |
| import uuid | |
| from langchain.schema import Document | |
| from collections import defaultdict | |
| pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') | |
| pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') | |
| # Configurations | |
| UPLOAD_FOLDER = "./uploads" | |
| VECTOR_DB_FOLDER = "./VectorDB" | |
| IMAGE_DB_FOLDER = "./ImageDB" | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) | |
| os.makedirs(IMAGE_DB_FOLDER, exist_ok=True) | |
| ######################################################################################################################################################## | |
| ####-------------------------------------------------------------- Documnet Loader ---------------------------------------------------------------#### | |
| ######################################################################################################################################################## | |
| # Loaders for loading Document text, tables and images from any file format. | |
| def load_document(data_path): | |
| processed_documents = [] | |
| #element_content = [] | |
| table_document = [] | |
| #having different process for the pdf | |
| for root, _, files in os.walk(data_path): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| doc_id = str(uuid.uuid4()) # Generate a unique ID for the document | |
| print(f"Processing document ID: {doc_id}, Path: {file_path}") | |
| try: | |
| # Determine the file type based on extension | |
| filename, file_extension = os.path.splitext(file.lower()) | |
| image_output = f"./ImageDB/{filename}/" | |
| # Use specific partition techniques based on file extension | |
| if file_extension == ".pdf": | |
| elements = partition_pdf( | |
| filename=file_path, | |
| strategy="hi_res", # Use layout detection | |
| infer_table_structure=True, | |
| hi_res_model_name="yolox", | |
| extract_images_in_pdf=True, | |
| extract_image_block_types=["Image","Table"], | |
| extract_image_block_output_dir=image_output, | |
| show_progress=True, | |
| #chunking_strategy="by_title", | |
| ) | |
| else: | |
| # Default to auto partition if no specific handler is found | |
| elements = partition( | |
| filename=file_path, | |
| strategy="hi_res", | |
| infer_table_structure=True, | |
| show_progress=True, | |
| #chunking_strategy="by_title" | |
| ) | |
| except Exception as e: | |
| print(f"Failed to process document {file_path}: {e}") | |
| continue | |
| categorized_content = { | |
| "tables": {"content": [], "Metadata": []}, | |
| "images": {"content": [], "Metadata": []}, | |
| "text": {"content": [], "Metadata": []}, | |
| } | |
| #element_content.append(elements) | |
| CNT=1 | |
| for chunk in elements: | |
| # Safely extract metadata and text | |
| chunk_type = str(type(chunk)) | |
| chunk_metadata = chunk.metadata.to_dict() if chunk.metadata else {} | |
| chunk_text = getattr(chunk, "text", None) | |
| # Separate content into categories | |
| #if "Table" in chunk_type: | |
| if any( | |
| keyword in chunk_type | |
| for keyword in [ | |
| "Table", | |
| "TableChunk"]): | |
| categorized_content["tables"]["content"].append(chunk_text) | |
| categorized_content["tables"]["Metadata"].append(chunk_metadata) | |
| #test1 | |
| TABLE_DATA=f"Table number {CNT} "+chunk_metadata.get("text_as_html", "")+" " | |
| CNT+=1 | |
| categorized_content["text"]["content"].append(TABLE_DATA) | |
| categorized_content["text"]["Metadata"].append(chunk_metadata) | |
| elif "Image" in chunk_type: | |
| categorized_content["images"]["content"].append(chunk_text) | |
| categorized_content["images"]["Metadata"].append(chunk_metadata) | |
| elif any( | |
| keyword in chunk_type | |
| for keyword in [ | |
| "CompositeElement", | |
| "Text", | |
| "NarrativeText", | |
| "Title", | |
| "Header", | |
| "Footer", | |
| "FigureCaption", | |
| "ListItem", | |
| "UncategorizedText", | |
| "Formula", | |
| "CodeSnippet", | |
| "Address", | |
| "EmailAddress", | |
| "PageBreak", | |
| ] | |
| ): | |
| categorized_content["text"]["content"].append(chunk_text) | |
| categorized_content["text"]["Metadata"].append(chunk_metadata) | |
| else: | |
| continue | |
| # Append processed document | |
| processed_documents.append({ | |
| "doc_id": doc_id, | |
| "source": file_path, | |
| **categorized_content, | |
| }) | |
| # Loop over tables and match text from the same document and page | |
| for doc in processed_documents: | |
| cnt=1 # count for storing number of the table | |
| for table_metadata in doc.get("tables", {}).get("Metadata", []): | |
| page_number = table_metadata.get("page_number") | |
| source = doc.get("source") | |
| page_content = "" | |
| for text_metadata, text_content in zip( | |
| doc.get("text", {}).get("Metadata", []), | |
| doc.get("text", {}).get("content", []) | |
| ): | |
| page_number2 = text_metadata.get("page_number") | |
| source2 = doc.get("source") | |
| if source == source2 and page_number == page_number2: | |
| print(f"Matching text found for source: {source}, page: {page_number}") | |
| page_content += f"{text_content} " # Concatenate text with a space | |
| # Add the matched content to the table metadata | |
| table_metadata["page_content"] =f"Table number {cnt} "+table_metadata.get("text_as_html", "")+" "+page_content.strip() # Remove trailing spaces and have the content proper here | |
| table_metadata["text_as_html"] = table_metadata.get("text_as_html", "") # we are also storing it seperatly | |
| table_metadata["Table_number"] = cnt # addiing the table number it will be use in retrival | |
| cnt+=1 | |
| # Custom loader of document which will store the table along with the text on that page specifically | |
| # making document of each table with its content | |
| unique_id = str(uuid.uuid4()) | |
| table_document.append( | |
| Document( | |
| id =unique_id, # Add doc_id directly | |
| page_content=table_metadata.get("page_content", ""), # Get page_content from metadata, default to empty string if missing | |
| metadata={ | |
| "source": doc["source"], | |
| "text_as_html": table_metadata.get("text_as_html", ""), | |
| "filetype": table_metadata.get("filetype", ""), | |
| "page_number": str(table_metadata.get("page_number", 0)), # Default to 0 if missing | |
| "image_path": table_metadata.get("image_path", ""), | |
| "file_directory": table_metadata.get("file_directory", ""), | |
| "filename": table_metadata.get("filename", ""), | |
| "Table_number": str(table_metadata.get("Table_number", 0)) # Default to 0 if missing | |
| } | |
| ) | |
| ) | |
| # Initialize a structure to group content by doc_id | |
| grouped_by_doc_id = defaultdict(lambda: { | |
| "text_content": [], | |
| "metadata": None, # Metadata will only be set once per doc_id | |
| }) | |
| for doc in processed_documents: | |
| doc_id = doc.get("doc_id") | |
| source = doc.get("source") | |
| text_content = doc.get("text", {}).get("content", []) | |
| metadata_list = doc.get("text", {}).get("Metadata", []) | |
| # Merge text content | |
| grouped_by_doc_id[doc_id]["text_content"].extend(text_content) | |
| # Set metadata (if not already set) | |
| if grouped_by_doc_id[doc_id]["metadata"] is None and metadata_list: | |
| metadata = metadata_list[0] # Assuming metadata is consistent | |
| grouped_by_doc_id[doc_id]["metadata"] = { | |
| "source": source, | |
| #"filetype": metadata.get("filetype"), | |
| "file_directory": metadata.get("file_directory"), | |
| "filename": metadata.get("filename"), | |
| #"languages": str(metadata.get("languages")), | |
| } | |
| # Convert grouped content into Document objects | |
| grouped_documents = [] | |
| for doc_id, data in grouped_by_doc_id.items(): | |
| grouped_documents.append( | |
| Document( | |
| id=doc_id, | |
| page_content=" ".join(data["text_content"]).strip(), | |
| metadata=data["metadata"], | |
| ) | |
| ) | |
| #Dirctory loader for loading the text data only to specific db | |
| loader = DirectoryLoader(data_path, glob="*.*") | |
| documents = loader.load() | |
| # update the metadata adding filname to the met | |
| for doc in documents: | |
| unique_id = str(uuid.uuid4()) | |
| doc.id = unique_id | |
| path=doc.metadata.get("source") | |
| match = re.search(r'([^\\]+\.[^\\]+)$', path) | |
| doc.metadata.update({"filename":match.group(1)}) | |
| return grouped_documents,documents,table_document | |
| #grouped_documents = load_document(data_path) | |
| #documents,processed_documents,table_document = load_document(data_path) | |
| ######################################################################################################################################################## | |
| ####-------------------------------------------------------------- Chunking the Text --------------------------------------------------------------#### | |
| ######################################################################################################################################################## | |
| def split_text(documents: list[Document]): | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=2000, | |
| chunk_overlap=600, | |
| length_function=len, | |
| add_start_index=True, | |
| ) | |
| chunks = text_splitter.split_documents(documents) # splitting the document into chunks | |
| for index in chunks: | |
| index.metadata["start_index"]=str(index.metadata["start_index"]) # the converstion of int metadata to str was done to store it in sqlite3 | |
| print(f"Split {len(documents)} documents into {len(chunks)} chunks.") | |
| return chunks | |
| ######################################################################################################################################################## | |
| ####---------------------------------------------------- Creating and Storeing Data in Vector DB --------------------------------------------------#### | |
| ######################################################################################################################################################## | |
| #def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): | |
| async def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): | |
| CHROMA_PATH = f"./VectorDB/chroma_{name}" | |
| TABLE_PATH = f"./TableDB/chroma_{name}" | |
| if os.path.exists(CHROMA_PATH): | |
| shutil.rmtree(CHROMA_PATH) | |
| if os.path.exists(TABLE_PATH): | |
| shutil.rmtree(TABLE_PATH) | |
| try: | |
| # Load the embedding model | |
| embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True) | |
| #embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| # Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] | |
| print("Creating document vector database...") | |
| db =Chroma.from_documents( | |
| documents=chunks, | |
| embedding=embedding_function, | |
| persist_directory=CHROMA_PATH, | |
| ) | |
| print("Persisting the document database...") | |
| db.persist() | |
| print("Document database successfully saved.") | |
| # Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] | |
| if tables !=[]: | |
| print("Creating table vector database...") | |
| tdb =Chroma.from_documents( | |
| documents=tables, | |
| embedding=embedding_function, | |
| persist_directory=TABLE_PATH, | |
| ) | |
| print("Persisting the table database...") | |
| db.persist() | |
| print("Table database successfully saved.") | |
| else: | |
| tdb = None | |
| return db, tdb | |
| #return db | |
| except Exception as e: | |
| print("Error while saving to Chroma:", e) | |
| return None | |
| ######################################################################################################################################################## | |
| ####----------------------------------------------------------- Updating Existing Data in Vector DB -----------------------------------------------#### | |
| ######################################################################################################################################################## | |
| # adding document to Existing db | |
| async def add_document_to_existing_db(new_chunks: list[Document], db_name: str,tables: list[Document]): | |
| CHROMA_PATH = f"./VectorDB/{db_name}" | |
| TABLE_PATH = f"./TableDB/{db_name}" | |
| if not os.path.exists(CHROMA_PATH): | |
| print(f"Database '{db_name}' does not exist. Please create it first.") | |
| return | |
| try: | |
| # Load the embedding model | |
| embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True) | |
| #embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| # Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] | |
| print("Creating document vector database...") | |
| db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
| # db =Chroma.from_documents( | |
| # documents=new_chunks, | |
| # embedding=embedding_function, | |
| # persist_directory=CHROMA_PATH, | |
| # ) | |
| print("Persisting the document database...") | |
| db.add_documents(new_chunks) | |
| db.persist() | |
| print("Document database successfully saved.") | |
| # Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] | |
| if tables !=[]: | |
| print("Creating table vector database...") | |
| if not os.path.exists(TABLE_PATH): | |
| print(f"Database '{db_name}' does not exist. Lets create it first.") | |
| print("Persisting the table database...") | |
| tdb =Chroma.from_documents( | |
| documents=tables, | |
| embedding=embedding_function, | |
| persist_directory=TABLE_PATH, | |
| ) | |
| else: | |
| tdb = Chroma(persist_directory=TABLE_PATH, embedding_function=embedding_function) | |
| print("Persisting the table database...") | |
| db.add_documents(tables) | |
| db.persist() | |
| print("Table database successfully saved.") | |
| else: | |
| tdb = None | |
| return db, tdb | |
| #return db | |
| except Exception as e: | |
| print("Error while saving to Chroma:", e) | |
| return None | |
| #delete chunks by logics | |
| def delete_chunks_by_source(chroma_path, source_to_delete): | |
| if not os.path.exists(chroma_path): | |
| print(f"Database at path '{chroma_path}' does not exist.") | |
| return | |
| try: | |
| #embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| db = Chroma(persist_directory=chroma_path, embedding_function=embedding_function) | |
| print(f"Retrieving all metadata to identify chunks with source '{source_to_delete}'...") | |
| metadata_list = db.get()["metadatas"] | |
| # Identify indices of chunks to delete | |
| indices_to_delete = [ | |
| idx for idx, metadata in enumerate(metadata_list) if metadata.get("source") == source_to_delete | |
| ] | |
| if not indices_to_delete: | |
| print(f"No chunks found with source '{source_to_delete}'.") | |
| return | |
| print(f"Deleting {len(indices_to_delete)} chunks with source '{source_to_delete}'...") | |
| db.delete(indices=indices_to_delete) | |
| db.persist() | |
| print("Chunks deleted and database updated successfully.") | |
| except Exception as e: | |
| print(f"Error while deleting chunks by source: {e}") | |
| ######################################################################################################################################################## | |
| ####-----------------------------------------------Combine Process of upload, Chunk and Store (FOR NEW DOC)----------------------------------------#### | |
| ######################################################################################################################################################## | |
| # update a data store | |
| async def update_data_store(file_path, db_name): | |
| CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
| print(f"Filepath ===> {file_path} DB Name ====> {db_name}") | |
| try: | |
| documents,processed_documents,table_document = load_document(file_path) | |
| #grouped_document,document = load_document(file_path) | |
| print("Documents loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading documents: {e}") | |
| return | |
| try: | |
| chunks = split_text(documents) | |
| print(f"Text split into {len(chunks)} chunks.") | |
| except Exception as e: | |
| print(f"Error splitting text: {e}") | |
| return | |
| try: | |
| await add_document_to_existing_db(chunks, db_name, table_document) | |
| #await asyncio.run(save_to_chroma(chunks, db_name,table_document)) | |
| print(f"Data saved to Chroma for database {db_name}.") | |
| except Exception as e: | |
| print(f"Error saving to Chroma: {e}") | |
| return | |
| ######################################################################################################################################################## | |
| ####------------------------------------------------------- Combine Process of Load, Chunk and Store ----------------------------------------------#### | |
| ######################################################################################################################################################## | |
| async def generate_data_store(file_path, db_name): | |
| CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
| print(f"Filepath ===> {file_path} DB Name ====> {db_name}") | |
| try: | |
| documents,processed_documents,table_document = load_document(file_path) | |
| #grouped_document,document = load_document(file_path) | |
| print("Documents loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading documents: {e}") | |
| return | |
| try: | |
| chunks = split_text(documents) | |
| print(f"Text split into {len(chunks)} chunks.") | |
| except Exception as e: | |
| print(f"Error splitting text: {e}") | |
| return | |
| try: | |
| await save_to_chroma(chunks, db_name, table_document) | |
| #await asyncio.run(save_to_chroma(chunks, db_name,table_document)) | |
| print(f"Data saved to Chroma for database {db_name}.") | |
| except Exception as e: | |
| print(f"Error saving to Chroma: {e}") | |
| return | |
| ######################################################################################################################################################## | |
| ####-------------------------------------------------------------------- Token counter -----------------------------------------------------------#### | |
| ######################################################################################################################################################## | |
| def approximate_bpe_token_counter(text): | |
| # Split on spaces, punctuation, and common subword patterns | |
| tokens = re.findall(r"\w+|[^\w\s]", text, re.UNICODE) | |
| return len(tokens) | |