import pickle import logging import os import shutil from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import List, Optional, Iterable from langchain.schema import Document from langchain_community.vectorstores import FAISS from .config import get_embedding_model, VECTOR_STORE_DIR, CHUNKS_PATH, NEW_DATA, PROCESSED_DATA from .text_processors import markdown_splitter, recursive_splitter from . import data_loaders logger = logging.getLogger(__name__) MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4))) def load_vector_store() -> Optional[FAISS]: """Load existing vector store with proper error handling. Only attempt to load if required FAISS files are present. """ try: store_dir = Path(VECTOR_STORE_DIR) index_file = store_dir / "index.faiss" meta_file = store_dir / "index.pkl" # created by LangChain FAISS.save_local # If directory exists but files are missing, do not attempt load if not (index_file.exists() and meta_file.exists()): logger.info("Vector store not initialized yet; index files not found. Skipping load.") return None vector_store = FAISS.load_local( str(VECTOR_STORE_DIR), get_embedding_model(), allow_dangerous_deserialization=True, ) logger.info("Successfully loaded existing vector store") return vector_store except Exception as e: logger.error(f"Failed to load vector store: {e}") return None def load_chunks() -> Optional[List[Document]]: """Load pre-processed document chunks from cache with error handling.""" try: if Path(CHUNKS_PATH).exists(): with open(CHUNKS_PATH, 'rb') as f: chunks = pickle.load(f) logger.info(f"Successfully loaded {len(chunks)} chunks from cache") return chunks else: logger.info("No cached chunks found") return None except Exception as e: logger.error(f"Failed to load chunks: {e}") return None def save_chunks(chunks: List[Document]) -> bool: """Save processed document chunks to cache file. Args: chunks: List of document chunks to save Returns: True if successful, False otherwise """ try: # Ensure directory exists Path(CHUNKS_PATH).parent.mkdir(parents=True, exist_ok=True) with open(CHUNKS_PATH, 'wb') as f: pickle.dump(chunks, f) logger.info(f"Successfully saved {len(chunks)} chunks to {CHUNKS_PATH}") return True except Exception as e: logger.error(f"Failed to save chunks: {e}") return False # ============================================================================ # DOCUMENT PROCESSING UTILITIES # ============================================================================ def _iter_files(root: Path) -> Iterable[Path]: """Yield PDF and Markdown files under the given root directory recursively. Args: root: Root directory to search Yields: Path objects for PDF and Markdown files """ if not root.exists(): return [] for p in root.rglob('*'): if p.is_file() and p.suffix.lower() in {'.pdf', '.md'}: yield p def create_documents() -> List[Document]: """Load documents from NEW_DATA directory. Returns: List of loaded documents Note: Use create_documents_and_files() if you need both documents and file paths. """ docs, _ = create_documents_and_files() return docs def _load_documents_for_file(file_path: Path) -> List[Document]: """Load documents from a single file (PDF or Markdown). Args: file_path: Path to the file to load Returns: List of documents loaded from the file """ try: if file_path.suffix.lower() == '.pdf': return data_loaders.load_pdf_documents(file_path) return data_loaders.load_markdown_documents(file_path) except Exception as e: logger.error(f"Failed to load {file_path}: {e}") return [] def create_documents_and_files() -> tuple[List[Document], List[Path]]: """Load documents from NEW_DATA directory and return both documents and file paths. Returns: Tuple of (documents, file_paths) where: - documents: List of loaded Document objects - file_paths: List of Path objects for files that were loaded """ documents: List[Document] = [] files = list(_iter_files(NEW_DATA)) if not files: logger.info(f"No new files found under {NEW_DATA}") return documents, [] worker_count = min(MAX_WORKERS, len(files)) or 1 with ThreadPoolExecutor(max_workers=worker_count) as executor: futures = {executor.submit(_load_documents_for_file, file_path): file_path for file_path in files} for future in as_completed(futures): documents.extend(future.result()) logger.info(f"Loaded {len(documents)} documents from {NEW_DATA}") return documents, files def _segment_document(doc: Document) -> List[Document]: """Segment a document using markdown headers if applicable. Args: doc: Document to segment Returns: List of segmented documents (or original if not markdown) """ source_name = str(doc.metadata.get("source", "")).lower() if source_name.endswith('.md'): try: md_sections = markdown_splitter.split_text(doc.page_content) return [Document(page_content=section.page_content, metadata={**doc.metadata, **section.metadata}) for section in md_sections] except Exception: return [doc] return [doc] def _split_chunk(doc: Document) -> List[Document]: """Split a document into smaller chunks using recursive splitter. Args: doc: Document to split Returns: List of document chunks """ try: return recursive_splitter.split_documents([doc]) except Exception as exc: logger.error(f"Failed to split document {doc.metadata.get('source', 'unknown')}: {exc}") return [] def split_documents(documents: List[Document]) -> List[Document]: """Split documents into smaller chunks for vector store indexing. Process: 1. Segment markdown files by headers (if applicable) 2. Split all documents into uniform chunks using recursive splitter Args: documents: List of documents to split Returns: List of document chunks ready for indexing """ if not documents: return [] # First pass: optional markdown header segmentation for .md sources worker_count = min(MAX_WORKERS, len(documents)) or 1 with ThreadPoolExecutor(max_workers=worker_count) as executor: segmented_lists = list(executor.map(_segment_document, documents)) segmented: List[Document] = [seg for sublist in segmented_lists for seg in sublist] if not segmented: return [] # Second pass: split into uniform chunks split_worker_count = min(MAX_WORKERS, len(segmented)) or 1 with ThreadPoolExecutor(max_workers=split_worker_count) as executor: chunk_lists = list(executor.map(_split_chunk, segmented)) chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list] logger.info(f"Split {len(segmented)} documents into {len(chunks)} chunks") return chunks def create_vector_store(chunks: List[Document]) -> FAISS: """Create a new FAISS vector store from document chunks and persist it. Args: chunks: List of document chunks to index Returns: Created FAISS vector store Raises: ValueError: If chunks list is empty """ if not chunks: raise ValueError("Cannot create vector store from empty chunks") vector_store = FAISS.from_documents(chunks, get_embedding_model()) vector_store.save_local(str(VECTOR_STORE_DIR)) logger.info("Vector store created and saved") return vector_store def update_vector_store_with_chunks(chunks: List[Document]) -> FAISS: """Update vector store with new chunks or create if doesn't exist. Args: chunks: List of new document chunks to add Returns: Updated or newly created FAISS vector store """ if not chunks: existing = load_vector_store() if existing: return existing store = load_vector_store() if store is None: store = create_vector_store(chunks) else: # Add to existing store and persist store.add_documents(chunks) store.save_local(str(VECTOR_STORE_DIR)) logger.info(f"Added {len(chunks)} new chunks to existing vector store") return store def _move_to_processed(paths: List[Path]) -> None: """Move processed files to processed_data folder maintaining directory structure. Args: paths: List of file paths to move """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") for p in paths: try: if p.exists() and p.is_file(): # Calculate relative path from NEW_DATA try: rel_path = p.relative_to(NEW_DATA) except ValueError: # File is not under NEW_DATA, skip it logger.warning(f"File {p} is not under NEW_DATA directory, skipping") continue # Create destination path in PROCESSED_DATA with same structure dest_dir = PROCESSED_DATA / rel_path.parent dest_dir.mkdir(parents=True, exist_ok=True) # Add timestamp to filename to avoid overwriting dest_file = dest_dir / f"{p.stem}_{timestamp}{p.suffix}" # Move the file shutil.move(str(p), str(dest_file)) logger.info(f"📦 Moved processed file: {p.name} -> {dest_file.relative_to(PROCESSED_DATA)}") except Exception as e: logger.error(f"❌ Failed to move {p}: {e}") def _cleanup_empty_dirs(root: Path) -> None: """Remove empty directories under root directory (best-effort). Args: root: Root directory to clean up """ try: # Walk bottom-up to remove empty directories dirs = [d for d in root.rglob('*') if d.is_dir()] for dirpath in sorted(dirs, key=lambda x: len(str(x)), reverse=True): try: if not any(dirpath.iterdir()): dirpath.rmdir() logger.info(f"Removed empty directory: {dirpath}") except Exception: pass except Exception: pass def process_new_data_and_update_vector_store() -> Optional[FAISS]: """Process new documents and update the vector store. Workflow: 1. Load documents from NEW_DATA directory 2. Split documents into chunks 3. Update chunks cache and vector store 4. Delete processed files and clean up empty directories Returns: Updated FAISS vector store, or None if processing failed """ try: docs, files = create_documents_and_files() if not docs: logger.info("No new documents to process.") return load_vector_store() chunks = split_documents(docs) # Save/merge chunks first (durability) existing_chunks = load_chunks() or [] merged_chunks = existing_chunks + chunks with ThreadPoolExecutor(max_workers=2) as executor: save_future = executor.submit(save_chunks, merged_chunks) store_future = executor.submit(update_vector_store_with_chunks, chunks) save_success = save_future.result() store = store_future.result() if not save_success: logger.warning("Chunk persistence reported failure; vector store was updated but cache may be stale.") # If we reached here, store update succeeded; move processed source files _move_to_processed(files) _cleanup_empty_dirs(NEW_DATA) logger.info( f"✅ Processed {len(docs)} new documents into {len(chunks)} chunks, updated vector store, and moved files to processed_data." ) return store except Exception as e: logger.error(f"Failed processing new data: {e}") return None