""" vector_store.py ----------------------------------------------------- Maintains FAISS runtime index + metadata cache. Features -------- - Ensure local FAISS runtime index exists (download from HF if missing) - FAISS semantic search and BM25 text access - Automatic TTL reload - Full cache clearing for Hugging Face Space - Explicit "♻️ FAISS memory cache reset" logging on rebuild """ import os import json import time import shutil from typing import List, Dict, Any, Optional import numpy as np import faiss from sentence_transformers import SentenceTransformer from huggingface_hub import hf_hub_download # ------------------------------------------------------------------ # 🔧 Paths & constants # ------------------------------------------------------------------ PERSISTENT_DIR = "/home/user/app/persistent" RUNTIME_DIR = "/home/user/app/runtime_faiss" INDEX_NAME = "faiss.index" META_NAME = "faiss.index.meta.json" GLOSSARY_META = "glossary.json" HF_INDEX_REPO = "essprasad/CT-Chat-Index" EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" EMBED_MODEL = None # lazy loaded # in-memory cache _runtime_index: Optional[faiss.Index] = None _runtime_meta: Optional[List[Dict[str, Any]]] = None _meta_loaded_time = 0.0 _META_TTL_SECONDS = 300.0 # ------------------------------------------------------------------ # 🔹 Helpers # ------------------------------------------------------------------ def _ensure_dirs(): os.makedirs(PERSISTENT_DIR, exist_ok=True) os.makedirs(RUNTIME_DIR, exist_ok=True) def _ensure_model(): global EMBED_MODEL if EMBED_MODEL is None: print("📥 Loading embedding model for FAISS retrieval…") EMBED_MODEL = SentenceTransformer(EMBED_MODEL_NAME) print("✅ Embedding model loaded.") return EMBED_MODEL # ------------------------------------------------------------------ # 🔹 Cache control # ------------------------------------------------------------------ def clear_local_faiss(): """Delete all local FAISS + glossary caches (safe in HF Space).""" for p in [ os.path.join(PERSISTENT_DIR, INDEX_NAME), os.path.join(PERSISTENT_DIR, META_NAME), os.path.join(PERSISTENT_DIR, GLOSSARY_META), RUNTIME_DIR, ]: try: if os.path.isdir(p): shutil.rmtree(p, ignore_errors=True) elif os.path.exists(p): os.remove(p) print(f"🗑️ Cleared: {p}") except Exception as e: print(f"⚠️ Failed to clear {p}: {e}") print("♻️ FAISS memory cache reset (runtime + persistent cleared)") # ------------------------------------------------------------------ # 🔹 Loaders # ------------------------------------------------------------------ def _load_local_index() -> bool: """Load FAISS index + metadata from persistent into runtime.""" global _runtime_index, _runtime_meta, _meta_loaded_time _ensure_dirs() idx_path = os.path.join(PERSISTENT_DIR, INDEX_NAME) meta_path = os.path.join(PERSISTENT_DIR, META_NAME) try: if not (os.path.exists(idx_path) and os.path.exists(meta_path)): return False os.makedirs(RUNTIME_DIR, exist_ok=True) shutil.copy2(idx_path, os.path.join(RUNTIME_DIR, INDEX_NAME)) shutil.copy2(meta_path, os.path.join(RUNTIME_DIR, META_NAME)) _runtime_index = faiss.read_index(os.path.join(RUNTIME_DIR, INDEX_NAME)) with open(os.path.join(RUNTIME_DIR, META_NAME), "r", encoding="utf-8") as f: _runtime_meta = json.load(f) _meta_loaded_time = time.time() print(f"✅ Loaded FAISS index ({len(_runtime_meta)} vectors).") return True except Exception as e: print(f"⚠️ Could not load local FAISS index: {e}") _runtime_index = None _runtime_meta = None return False def _download_index_from_hub() -> bool: """Download FAISS artifacts from Hugging Face dataset repo.""" _ensure_dirs() try: print("☁️ Downloading FAISS artifacts from HF dataset…") idx = hf_hub_download(repo_id=HF_INDEX_REPO, filename=f"persistent/{INDEX_NAME}", repo_type="dataset") meta = hf_hub_download(repo_id=HF_INDEX_REPO, filename=f"persistent/{META_NAME}", repo_type="dataset") shutil.copy2(idx, os.path.join(PERSISTENT_DIR, INDEX_NAME)) shutil.copy2(meta, os.path.join(PERSISTENT_DIR, META_NAME)) print("✅ FAISS artifacts downloaded and stored persistently.") return _load_local_index() except Exception as e: print(f"⚠️ HF download failed: {e}") return False def _ensure_faiss_index(force_refresh: bool = False) -> bool: """ Ensure runtime FAISS is available. If force_refresh=True, clears runtime and reloads fresh. """ global _runtime_index, _runtime_meta, _meta_loaded_time _ensure_dirs() if force_refresh: try: shutil.rmtree(RUNTIME_DIR, ignore_errors=True) _runtime_index = None _runtime_meta = None print("♻️ Forced FAISS runtime reload requested.") except Exception as e: print(f"⚠️ Force refresh failed: {e}") if _runtime_index is not None and (time.time() - _meta_loaded_time) < _META_TTL_SECONDS: return True if _load_local_index(): return True if _download_index_from_hub(): return True print("⚠️ No FAISS index found locally or remotely.") return False # ------------------------------------------------------------------ # 🔹 Accessors # ------------------------------------------------------------------ def load_all_text_chunks() -> List[Dict[str, Any]]: """Return metadata list for BM25 fallback or analysis.""" global _runtime_meta, _meta_loaded_time if _runtime_meta is None: if not _ensure_faiss_index(): return [] if (time.time() - _meta_loaded_time) > _META_TTL_SECONDS: try: meta_path = os.path.join(RUNTIME_DIR, META_NAME) with open(meta_path, "r", encoding="utf-8") as f: _runtime_meta = json.load(f) _meta_loaded_time = time.time() except Exception: pass return _runtime_meta or [] # ------------------------------------------------------------------ # 🔹 Core Search # ------------------------------------------------------------------ def search_index(query: str, top_k: int = 5) -> List[Dict[str, Any]]: """Perform semantic FAISS search and return metadata hits.""" if not _ensure_faiss_index(): return [] try: model = _ensure_model() q_emb = model.encode([query], convert_to_numpy=True).astype("float32") faiss.normalize_L2(q_emb) D, I = _runtime_index.search(q_emb, top_k) results = [] for dist, idx in zip(D[0], I[0]): if idx < 0 or idx >= len(_runtime_meta): continue meta = dict(_runtime_meta[idx]) meta["score"] = float(dist) meta["file"] = meta.get("file") or meta.get("source") or "unknown" meta["text"] = meta.get("text") or meta.get("definition", "") results.append(meta) return results except Exception as e: print(f"⚠️ FAISS search failed: {e}") return []