Spaces:
Running
Running
| import pickle | |
| import logging | |
| import os | |
| import shutil | |
| from datetime import datetime | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from typing import List, Optional, Iterable | |
| from langchain.schema import Document | |
| from langchain_community.vectorstores import FAISS | |
| from .config import get_embedding_model, VECTOR_STORE_DIR, CHUNKS_PATH, NEW_DATA, PROCESSED_DATA, settings | |
| from .text_processors import markdown_splitter, recursive_splitter | |
| from . import data_loaders | |
| logger = logging.getLogger(__name__) | |
| MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4))) | |
| def load_vector_store() -> Optional[FAISS]: | |
| """Load existing vector store with proper error handling. | |
| Only attempt to load if required FAISS files are present. | |
| """ | |
| try: | |
| store_dir = Path(VECTOR_STORE_DIR) | |
| index_file = store_dir / "index.faiss" | |
| meta_file = store_dir / "index.pkl" # created by LangChain FAISS.save_local | |
| # If directory exists but files are missing, do not attempt load | |
| if not (index_file.exists() and meta_file.exists()): | |
| logger.info("Vector store not initialized yet; index files not found. Skipping load.") | |
| return None | |
| vector_store = FAISS.load_local( | |
| str(VECTOR_STORE_DIR), | |
| get_embedding_model(), | |
| allow_dangerous_deserialization=True, | |
| ) | |
| logger.info("Successfully loaded existing vector store") | |
| return vector_store | |
| except Exception as e: | |
| logger.error(f"Failed to load vector store: {e}") | |
| return None | |
| def load_chunks() -> Optional[List[Document]]: | |
| """Load pre-processed document chunks from cache with error handling.""" | |
| try: | |
| if Path(CHUNKS_PATH).exists(): | |
| with open(CHUNKS_PATH, 'rb') as f: | |
| chunks = pickle.load(f) | |
| logger.info(f"Successfully loaded {len(chunks)} chunks from cache") | |
| return chunks | |
| else: | |
| logger.info("No cached chunks found") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to load chunks: {e}") | |
| return None | |
| def save_chunks(chunks: List[Document]) -> bool: | |
| """Save processed document chunks to cache file. | |
| Args: | |
| chunks: List of document chunks to save | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| # Ensure directory exists | |
| Path(CHUNKS_PATH).parent.mkdir(parents=True, exist_ok=True) | |
| with open(CHUNKS_PATH, 'wb') as f: | |
| pickle.dump(chunks, f) | |
| logger.info(f"Successfully saved {len(chunks)} chunks to {CHUNKS_PATH}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save chunks: {e}") | |
| return False | |
| # ============================================================================ | |
| # DOCUMENT PROCESSING UTILITIES | |
| # ============================================================================ | |
| def _iter_files(root: Path) -> Iterable[Path]: | |
| """Yield PDF and Markdown files under the given root directory recursively. | |
| Args: | |
| root: Root directory to search | |
| Yields: | |
| Path objects for PDF and Markdown files | |
| """ | |
| if not root.exists(): | |
| return [] | |
| for p in root.rglob('*'): | |
| if p.is_file() and p.suffix.lower() in {'.pdf', '.md'}: | |
| yield p | |
| def create_documents() -> List[Document]: | |
| """Load documents from NEW_DATA directory. | |
| Returns: | |
| List of loaded documents | |
| Note: | |
| Use create_documents_and_files() if you need both documents and file paths. | |
| """ | |
| docs, _ = create_documents_and_files() | |
| return docs | |
| def _load_documents_for_file(file_path: Path) -> List[Document]: | |
| """Load documents from a single file (PDF or Markdown). | |
| Args: | |
| file_path: Path to the file to load | |
| Returns: | |
| List of documents loaded from the file | |
| """ | |
| try: | |
| if file_path.suffix.lower() == '.pdf': | |
| # Use advanced LlamaParse loader with settings from config | |
| api_key = settings.LLAMA_CLOUD_API_KEY | |
| premium_mode = settings.LLAMA_PREMIUM_MODE | |
| return data_loaders.load_pdf_documents_advanced( | |
| file_path, | |
| api_key=api_key, | |
| premium_mode=premium_mode | |
| ) | |
| return data_loaders.load_markdown_documents(file_path) | |
| except Exception as e: | |
| logger.error(f"Failed to load {file_path}: {e}") | |
| return [] | |
| def create_documents_and_files() -> tuple[List[Document], List[Path]]: | |
| """Load documents from NEW_DATA directory and return both documents and file paths. | |
| Returns: | |
| Tuple of (documents, file_paths) where: | |
| - documents: List of loaded Document objects | |
| - file_paths: List of Path objects for files that were loaded | |
| """ | |
| documents: List[Document] = [] | |
| files = list(_iter_files(NEW_DATA)) | |
| if not files: | |
| logger.info(f"No new files found under {NEW_DATA}") | |
| return documents, [] | |
| worker_count = min(MAX_WORKERS, len(files)) or 1 | |
| with ThreadPoolExecutor(max_workers=worker_count) as executor: | |
| futures = {executor.submit(_load_documents_for_file, file_path): file_path for file_path in files} | |
| for future in as_completed(futures): | |
| documents.extend(future.result()) | |
| logger.info(f"Loaded {len(documents)} documents from {NEW_DATA}") | |
| return documents, files | |
| def _segment_document(doc: Document) -> List[Document]: | |
| """Segment a document using markdown headers if applicable. | |
| Args: | |
| doc: Document to segment | |
| Returns: | |
| List of segmented documents (or original if not markdown) | |
| """ | |
| source_name = str(doc.metadata.get("source", "")).lower() | |
| if source_name.endswith('.md'): | |
| try: | |
| md_sections = markdown_splitter.split_text(doc.page_content) | |
| return [Document(page_content=section.page_content, metadata={**doc.metadata, **section.metadata}) for section in md_sections] | |
| except Exception: | |
| return [doc] | |
| return [doc] | |
| def _split_chunk(doc: Document) -> List[Document]: | |
| """Split a document into smaller chunks using recursive splitter. | |
| Args: | |
| doc: Document to split | |
| Returns: | |
| List of document chunks | |
| """ | |
| try: | |
| return recursive_splitter.split_documents([doc]) | |
| except Exception as exc: | |
| logger.error(f"Failed to split document {doc.metadata.get('source', 'unknown')}: {exc}") | |
| return [] | |
| def split_documents(documents: List[Document]) -> List[Document]: | |
| """Split documents into smaller chunks for vector store indexing. | |
| Process: | |
| 1. Segment markdown files by headers (if applicable) | |
| 2. Split all documents into uniform chunks using recursive splitter | |
| Args: | |
| documents: List of documents to split | |
| Returns: | |
| List of document chunks ready for indexing | |
| """ | |
| if not documents: | |
| return [] | |
| # First pass: optional markdown header segmentation for .md sources | |
| worker_count = min(MAX_WORKERS, len(documents)) or 1 | |
| with ThreadPoolExecutor(max_workers=worker_count) as executor: | |
| segmented_lists = list(executor.map(_segment_document, documents)) | |
| segmented: List[Document] = [seg for sublist in segmented_lists for seg in sublist] | |
| if not segmented: | |
| return [] | |
| # Second pass: split into uniform chunks | |
| split_worker_count = min(MAX_WORKERS, len(segmented)) or 1 | |
| with ThreadPoolExecutor(max_workers=split_worker_count) as executor: | |
| chunk_lists = list(executor.map(_split_chunk, segmented)) | |
| chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list] | |
| logger.info(f"Split {len(segmented)} documents into {len(chunks)} chunks") | |
| return chunks | |
| def create_vector_store(chunks: List[Document]) -> FAISS: | |
| """Create a new FAISS vector store from document chunks and persist it. | |
| Args: | |
| chunks: List of document chunks to index | |
| Returns: | |
| Created FAISS vector store | |
| Raises: | |
| ValueError: If chunks list is empty | |
| """ | |
| if not chunks: | |
| raise ValueError("Cannot create vector store from empty chunks") | |
| vector_store = FAISS.from_documents(chunks, get_embedding_model()) | |
| vector_store.save_local(str(VECTOR_STORE_DIR)) | |
| logger.info("Vector store created and saved") | |
| return vector_store | |
| def update_vector_store_with_chunks(chunks: List[Document]) -> FAISS: | |
| """Update vector store with new chunks or create if doesn't exist. | |
| Args: | |
| chunks: List of new document chunks to add | |
| Returns: | |
| Updated or newly created FAISS vector store | |
| """ | |
| if not chunks: | |
| existing = load_vector_store() | |
| if existing: | |
| return existing | |
| store = load_vector_store() | |
| if store is None: | |
| store = create_vector_store(chunks) | |
| else: | |
| # Add to existing store and persist | |
| store.add_documents(chunks) | |
| store.save_local(str(VECTOR_STORE_DIR)) | |
| logger.info(f"Added {len(chunks)} new chunks to existing vector store") | |
| return store | |
| def _move_to_processed(paths: List[Path]) -> None: | |
| """Move processed files to processed_data folder maintaining directory structure. | |
| Args: | |
| paths: List of file paths to move | |
| """ | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| for p in paths: | |
| try: | |
| if p.exists() and p.is_file(): | |
| # Calculate relative path from NEW_DATA | |
| try: | |
| rel_path = p.relative_to(NEW_DATA) | |
| except ValueError: | |
| # File is not under NEW_DATA, skip it | |
| logger.warning(f"File {p} is not under NEW_DATA directory, skipping") | |
| continue | |
| # Create destination path in PROCESSED_DATA with same structure | |
| dest_dir = PROCESSED_DATA / rel_path.parent | |
| dest_dir.mkdir(parents=True, exist_ok=True) | |
| # Add timestamp to filename to avoid overwriting | |
| dest_file = dest_dir / f"{p.stem}_{timestamp}{p.suffix}" | |
| # Move the file | |
| shutil.move(str(p), str(dest_file)) | |
| logger.info(f"📦 Moved processed file: {p.name} -> {dest_file.relative_to(PROCESSED_DATA)}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to move {p}: {e}") | |
| def _cleanup_empty_dirs(root: Path) -> None: | |
| """Remove empty directories under root directory (best-effort). | |
| Args: | |
| root: Root directory to clean up | |
| """ | |
| try: | |
| # Walk bottom-up to remove empty directories | |
| dirs = [d for d in root.rglob('*') if d.is_dir()] | |
| for dirpath in sorted(dirs, key=lambda x: len(str(x)), reverse=True): | |
| try: | |
| if not any(dirpath.iterdir()): | |
| dirpath.rmdir() | |
| logger.info(f"Removed empty directory: {dirpath}") | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| def process_new_data_and_update_vector_store() -> Optional[FAISS]: | |
| """Process new documents and update the vector store. | |
| Workflow: | |
| 1. Load documents from NEW_DATA directory | |
| 2. Split documents into chunks | |
| 3. Update chunks cache and vector store | |
| 4. Delete processed files and clean up empty directories | |
| Returns: | |
| Updated FAISS vector store, or None if processing failed | |
| """ | |
| try: | |
| docs, files = create_documents_and_files() | |
| if not docs: | |
| logger.info("No new documents to process.") | |
| return load_vector_store() | |
| chunks = split_documents(docs) | |
| # Save/merge chunks first (durability) | |
| existing_chunks = load_chunks() or [] | |
| merged_chunks = existing_chunks + chunks | |
| with ThreadPoolExecutor(max_workers=2) as executor: | |
| save_future = executor.submit(save_chunks, merged_chunks) | |
| store_future = executor.submit(update_vector_store_with_chunks, chunks) | |
| save_success = save_future.result() | |
| store = store_future.result() | |
| if not save_success: | |
| logger.warning("Chunk persistence reported failure; vector store was updated but cache may be stale.") | |
| # If we reached here, store update succeeded; move processed source files | |
| _move_to_processed(files) | |
| _cleanup_empty_dirs(NEW_DATA) | |
| logger.info( | |
| f"✅ Processed {len(docs)} new documents into {len(chunks)} chunks, updated vector store, and moved files to processed_data." | |
| ) | |
| return store | |
| except Exception as e: | |
| logger.error(f"Failed processing new data: {e}") | |
| return None |