Spaces:
Sleeping
Sleeping
| from langchain_community.document_loaders import DirectoryLoader | |
| from langchain.embeddings import HuggingFaceInstructEmbeddings,HuggingFaceEmbeddings # for embedding task | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter # for converting the large documents into smaller chunks | |
| from langchain.schema import Document | |
| from langchain_core.documents import Document | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| import openai | |
| import openai | |
| import os | |
| import shutil | |
| import uuid | |
| # Configurations | |
| UPLOAD_FOLDER = "./uploads" | |
| VECTOR_DB_FOLDER = "./VectorDB" | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) | |
| def load_document(data_path): | |
| # Load documents | |
| loader = DirectoryLoader(data_path, glob="*.*") | |
| print("loader",loader) | |
| document = loader.load() | |
| return document | |
| # Creating the chunks of Data from the knowledge | |
| def split_text(documents: list[Document]): | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size = 1000, | |
| chunk_overlap = 500, | |
| length_function = len, | |
| add_start_index=True, | |
| ) | |
| chunks = text_splitter.split_documents(documents) | |
| print(f"Split {len(documents)} documents into {len(chunks)} chunks.") | |
| return chunks | |
| # # Chroma for creating the vector db whcch we will use for the searching relvant data. | |
| # def save_to_chroma(chunks: list[Document],name: str): | |
| # CHROMA_PATH = f"./VectorDB/chroma_{name}" | |
| # # Clear out the database first. | |
| # if os.path.exists(CHROMA_PATH): | |
| # shutil.rmtree(CHROMA_PATH) | |
| # # Initialize SBERT embedding function | |
| # embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| # db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
| # # Add documents and persist the database | |
| # db.add_documents(chunks) | |
| # db.persist() | |
| # # Return the database instance or a success status | |
| # return db | |
| async def save_to_chroma(chunks: list[Document], name: str): | |
| CHROMA_PATH = f"./VectorDB/chroma_{name}" | |
| # Clear out the database first | |
| if os.path.exists(CHROMA_PATH): | |
| shutil.rmtree(CHROMA_PATH) | |
| try: | |
| # Initialize SBERT embedding function | |
| embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
| # Add documents and persist the database | |
| print("Adding documents to the database...") | |
| db.add_documents(chunks) | |
| print("Persisting the database...") | |
| db.persist() | |
| print("Database successfully saved.") | |
| return db | |
| except Exception as e: | |
| print("Error while saving to Chroma:", e) | |
| return None | |
| def get_unique_sources(chroma_path): | |
| # Load the Chroma database | |
| db = Chroma(persist_directory=chroma_path) | |
| # Retrieve all metadata from the database | |
| metadata_list = db.get()['metadatas'] | |
| # Extract unique sources from metadata | |
| unique_sources = {metadata['source'] for metadata in metadata_list if 'source' in metadata} | |
| return list(unique_sources) | |
| def generate_data_store(file_path,db_name): | |
| CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
| print(f"filepath===>{file_path} db_name =====>{db_name}") | |
| try: | |
| documents = load_document(file_path) | |
| print("Documents loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading documents: {e}") | |
| return | |
| try: | |
| chunks = split_text(documents) | |
| print(f"Text split into {len(chunks)} chunks.") | |
| except Exception as e: | |
| print(f"Error splitting text: {e}") | |
| return | |
| try: | |
| save_to_chroma(chunks, db_name) | |
| print(f"Data saved to Chroma for database {db_name}.") | |
| except Exception as e: | |
| print(f"Error saving to Chroma: {e}") | |
| return | |
| # def main(): | |
| # data_path = "H:\\DEV PATEL\\RAG Project\\data1" | |
| # db_name = "Product_data" | |
| # generate_data_store(data_path,db_name) | |
| # if __name__ == "__main__": | |
| # main() | |