moazx's picture
Update the Assessment Results
4a17bbc
import pickle
import logging
import os
import shutil
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Optional, Iterable
from langchain.schema import Document
from langchain_community.vectorstores import FAISS
from .config import get_embedding_model, VECTOR_STORE_DIR, CHUNKS_PATH, NEW_DATA, PROCESSED_DATA, settings
from .text_processors import markdown_splitter, recursive_splitter
from . import data_loaders
logger = logging.getLogger(__name__)
MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4)))
def load_vector_store() -> Optional[FAISS]:
"""Load existing vector store with proper error handling.
Only attempt to load if required FAISS files are present.
"""
try:
store_dir = Path(VECTOR_STORE_DIR)
index_file = store_dir / "index.faiss"
meta_file = store_dir / "index.pkl" # created by LangChain FAISS.save_local
# If directory exists but files are missing, do not attempt load
if not (index_file.exists() and meta_file.exists()):
logger.info("Vector store not initialized yet; index files not found. Skipping load.")
return None
vector_store = FAISS.load_local(
str(VECTOR_STORE_DIR),
get_embedding_model(),
allow_dangerous_deserialization=True,
)
logger.info("Successfully loaded existing vector store")
return vector_store
except Exception as e:
logger.error(f"Failed to load vector store: {e}")
return None
def load_chunks() -> Optional[List[Document]]:
"""Load pre-processed document chunks from cache with error handling."""
try:
if Path(CHUNKS_PATH).exists():
with open(CHUNKS_PATH, 'rb') as f:
chunks = pickle.load(f)
logger.info(f"Successfully loaded {len(chunks)} chunks from cache")
return chunks
else:
logger.info("No cached chunks found")
return None
except Exception as e:
logger.error(f"Failed to load chunks: {e}")
return None
def save_chunks(chunks: List[Document]) -> bool:
"""Save processed document chunks to cache file.
Args:
chunks: List of document chunks to save
Returns:
True if successful, False otherwise
"""
try:
# Ensure directory exists
Path(CHUNKS_PATH).parent.mkdir(parents=True, exist_ok=True)
with open(CHUNKS_PATH, 'wb') as f:
pickle.dump(chunks, f)
logger.info(f"Successfully saved {len(chunks)} chunks to {CHUNKS_PATH}")
return True
except Exception as e:
logger.error(f"Failed to save chunks: {e}")
return False
# ============================================================================
# DOCUMENT PROCESSING UTILITIES
# ============================================================================
def _iter_files(root: Path) -> Iterable[Path]:
"""Yield PDF and Markdown files under the given root directory recursively.
Args:
root: Root directory to search
Yields:
Path objects for PDF and Markdown files
"""
if not root.exists():
return []
for p in root.rglob('*'):
if p.is_file() and p.suffix.lower() in {'.pdf', '.md'}:
yield p
def create_documents() -> List[Document]:
"""Load documents from NEW_DATA directory.
Returns:
List of loaded documents
Note:
Use create_documents_and_files() if you need both documents and file paths.
"""
docs, _ = create_documents_and_files()
return docs
def _load_documents_for_file(file_path: Path) -> List[Document]:
"""Load documents from a single file (PDF or Markdown).
Args:
file_path: Path to the file to load
Returns:
List of documents loaded from the file
"""
try:
if file_path.suffix.lower() == '.pdf':
# Use advanced LlamaParse loader with settings from config
api_key = settings.LLAMA_CLOUD_API_KEY
premium_mode = settings.LLAMA_PREMIUM_MODE
return data_loaders.load_pdf_documents_advanced(
file_path,
api_key=api_key,
premium_mode=premium_mode
)
return data_loaders.load_markdown_documents(file_path)
except Exception as e:
logger.error(f"Failed to load {file_path}: {e}")
return []
def create_documents_and_files() -> tuple[List[Document], List[Path]]:
"""Load documents from NEW_DATA directory and return both documents and file paths.
Returns:
Tuple of (documents, file_paths) where:
- documents: List of loaded Document objects
- file_paths: List of Path objects for files that were loaded
"""
documents: List[Document] = []
files = list(_iter_files(NEW_DATA))
if not files:
logger.info(f"No new files found under {NEW_DATA}")
return documents, []
worker_count = min(MAX_WORKERS, len(files)) or 1
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = {executor.submit(_load_documents_for_file, file_path): file_path for file_path in files}
for future in as_completed(futures):
documents.extend(future.result())
logger.info(f"Loaded {len(documents)} documents from {NEW_DATA}")
return documents, files
def _segment_document(doc: Document) -> List[Document]:
"""Segment a document using markdown headers if applicable.
Args:
doc: Document to segment
Returns:
List of segmented documents (or original if not markdown)
"""
source_name = str(doc.metadata.get("source", "")).lower()
if source_name.endswith('.md'):
try:
md_sections = markdown_splitter.split_text(doc.page_content)
return [Document(page_content=section.page_content, metadata={**doc.metadata, **section.metadata}) for section in md_sections]
except Exception:
return [doc]
return [doc]
def _split_chunk(doc: Document) -> List[Document]:
"""Split a document into smaller chunks using recursive splitter.
Args:
doc: Document to split
Returns:
List of document chunks
"""
try:
return recursive_splitter.split_documents([doc])
except Exception as exc:
logger.error(f"Failed to split document {doc.metadata.get('source', 'unknown')}: {exc}")
return []
def split_documents(documents: List[Document]) -> List[Document]:
"""Split documents into smaller chunks for vector store indexing.
Process:
1. Segment markdown files by headers (if applicable)
2. Split all documents into uniform chunks using recursive splitter
Args:
documents: List of documents to split
Returns:
List of document chunks ready for indexing
"""
if not documents:
return []
# First pass: optional markdown header segmentation for .md sources
worker_count = min(MAX_WORKERS, len(documents)) or 1
with ThreadPoolExecutor(max_workers=worker_count) as executor:
segmented_lists = list(executor.map(_segment_document, documents))
segmented: List[Document] = [seg for sublist in segmented_lists for seg in sublist]
if not segmented:
return []
# Second pass: split into uniform chunks
split_worker_count = min(MAX_WORKERS, len(segmented)) or 1
with ThreadPoolExecutor(max_workers=split_worker_count) as executor:
chunk_lists = list(executor.map(_split_chunk, segmented))
chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list]
logger.info(f"Split {len(segmented)} documents into {len(chunks)} chunks")
return chunks
def create_vector_store(chunks: List[Document]) -> FAISS:
"""Create a new FAISS vector store from document chunks and persist it.
Args:
chunks: List of document chunks to index
Returns:
Created FAISS vector store
Raises:
ValueError: If chunks list is empty
"""
if not chunks:
raise ValueError("Cannot create vector store from empty chunks")
vector_store = FAISS.from_documents(chunks, get_embedding_model())
vector_store.save_local(str(VECTOR_STORE_DIR))
logger.info("Vector store created and saved")
return vector_store
def update_vector_store_with_chunks(chunks: List[Document]) -> FAISS:
"""Update vector store with new chunks or create if doesn't exist.
Args:
chunks: List of new document chunks to add
Returns:
Updated or newly created FAISS vector store
"""
if not chunks:
existing = load_vector_store()
if existing:
return existing
store = load_vector_store()
if store is None:
store = create_vector_store(chunks)
else:
# Add to existing store and persist
store.add_documents(chunks)
store.save_local(str(VECTOR_STORE_DIR))
logger.info(f"Added {len(chunks)} new chunks to existing vector store")
return store
def _move_to_processed(paths: List[Path]) -> None:
"""Move processed files to processed_data folder maintaining directory structure.
Args:
paths: List of file paths to move
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
for p in paths:
try:
if p.exists() and p.is_file():
# Calculate relative path from NEW_DATA
try:
rel_path = p.relative_to(NEW_DATA)
except ValueError:
# File is not under NEW_DATA, skip it
logger.warning(f"File {p} is not under NEW_DATA directory, skipping")
continue
# Create destination path in PROCESSED_DATA with same structure
dest_dir = PROCESSED_DATA / rel_path.parent
dest_dir.mkdir(parents=True, exist_ok=True)
# Add timestamp to filename to avoid overwriting
dest_file = dest_dir / f"{p.stem}_{timestamp}{p.suffix}"
# Move the file
shutil.move(str(p), str(dest_file))
logger.info(f"📦 Moved processed file: {p.name} -> {dest_file.relative_to(PROCESSED_DATA)}")
except Exception as e:
logger.error(f"❌ Failed to move {p}: {e}")
def _cleanup_empty_dirs(root: Path) -> None:
"""Remove empty directories under root directory (best-effort).
Args:
root: Root directory to clean up
"""
try:
# Walk bottom-up to remove empty directories
dirs = [d for d in root.rglob('*') if d.is_dir()]
for dirpath in sorted(dirs, key=lambda x: len(str(x)), reverse=True):
try:
if not any(dirpath.iterdir()):
dirpath.rmdir()
logger.info(f"Removed empty directory: {dirpath}")
except Exception:
pass
except Exception:
pass
def process_new_data_and_update_vector_store() -> Optional[FAISS]:
"""Process new documents and update the vector store.
Workflow:
1. Load documents from NEW_DATA directory
2. Split documents into chunks
3. Update chunks cache and vector store
4. Delete processed files and clean up empty directories
Returns:
Updated FAISS vector store, or None if processing failed
"""
try:
docs, files = create_documents_and_files()
if not docs:
logger.info("No new documents to process.")
return load_vector_store()
chunks = split_documents(docs)
# Save/merge chunks first (durability)
existing_chunks = load_chunks() or []
merged_chunks = existing_chunks + chunks
with ThreadPoolExecutor(max_workers=2) as executor:
save_future = executor.submit(save_chunks, merged_chunks)
store_future = executor.submit(update_vector_store_with_chunks, chunks)
save_success = save_future.result()
store = store_future.result()
if not save_success:
logger.warning("Chunk persistence reported failure; vector store was updated but cache may be stale.")
# If we reached here, store update succeeded; move processed source files
_move_to_processed(files)
_cleanup_empty_dirs(NEW_DATA)
logger.info(
f"✅ Processed {len(docs)} new documents into {len(chunks)} chunks, updated vector store, and moved files to processed_data."
)
return store
except Exception as e:
logger.error(f"Failed processing new data: {e}")
return None