Spaces:
Running
Running
| """ | |
| vector_store.py | |
| ----------------------------------------------------- | |
| Maintains FAISS runtime index + metadata cache. | |
| Features | |
| -------- | |
| - Ensure local FAISS runtime index exists (download from HF if missing) | |
| - FAISS semantic search and BM25 text access | |
| - Automatic TTL reload | |
| - Full cache clearing for Hugging Face Space | |
| - Explicit "♻️ FAISS memory cache reset" logging on rebuild | |
| """ | |
| import os | |
| import json | |
| import time | |
| import shutil | |
| from typing import List, Dict, Any, Optional | |
| import numpy as np | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from huggingface_hub import hf_hub_download | |
| # ------------------------------------------------------------------ | |
| # 🔧 Paths & constants | |
| # ------------------------------------------------------------------ | |
| PERSISTENT_DIR = "/home/user/app/persistent" | |
| RUNTIME_DIR = "/home/user/app/runtime_faiss" | |
| INDEX_NAME = "faiss.index" | |
| META_NAME = "faiss.index.meta.json" | |
| GLOSSARY_META = "glossary.json" | |
| HF_INDEX_REPO = "essprasad/CT-Chat-Index" | |
| EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
| EMBED_MODEL = None # lazy loaded | |
| # in-memory cache | |
| _runtime_index: Optional[faiss.Index] = None | |
| _runtime_meta: Optional[List[Dict[str, Any]]] = None | |
| _meta_loaded_time = 0.0 | |
| _META_TTL_SECONDS = 300.0 | |
| # ------------------------------------------------------------------ | |
| # 🔹 Helpers | |
| # ------------------------------------------------------------------ | |
| def _ensure_dirs(): | |
| os.makedirs(PERSISTENT_DIR, exist_ok=True) | |
| os.makedirs(RUNTIME_DIR, exist_ok=True) | |
| def _ensure_model(): | |
| global EMBED_MODEL | |
| if EMBED_MODEL is None: | |
| print("📥 Loading embedding model for FAISS retrieval…") | |
| EMBED_MODEL = SentenceTransformer(EMBED_MODEL_NAME) | |
| print("✅ Embedding model loaded.") | |
| return EMBED_MODEL | |
| # ------------------------------------------------------------------ | |
| # 🔹 Cache control | |
| # ------------------------------------------------------------------ | |
| def clear_local_faiss(): | |
| """Delete all local FAISS + glossary caches (safe in HF Space).""" | |
| for p in [ | |
| os.path.join(PERSISTENT_DIR, INDEX_NAME), | |
| os.path.join(PERSISTENT_DIR, META_NAME), | |
| os.path.join(PERSISTENT_DIR, GLOSSARY_META), | |
| RUNTIME_DIR, | |
| ]: | |
| try: | |
| if os.path.isdir(p): | |
| shutil.rmtree(p, ignore_errors=True) | |
| elif os.path.exists(p): | |
| os.remove(p) | |
| print(f"🗑️ Cleared: {p}") | |
| except Exception as e: | |
| print(f"⚠️ Failed to clear {p}: {e}") | |
| print("♻️ FAISS memory cache reset (runtime + persistent cleared)") | |
| # ------------------------------------------------------------------ | |
| # 🔹 Loaders | |
| # ------------------------------------------------------------------ | |
| def _load_local_index() -> bool: | |
| """Load FAISS index + metadata from persistent into runtime.""" | |
| global _runtime_index, _runtime_meta, _meta_loaded_time | |
| _ensure_dirs() | |
| idx_path = os.path.join(PERSISTENT_DIR, INDEX_NAME) | |
| meta_path = os.path.join(PERSISTENT_DIR, META_NAME) | |
| try: | |
| if not (os.path.exists(idx_path) and os.path.exists(meta_path)): | |
| return False | |
| os.makedirs(RUNTIME_DIR, exist_ok=True) | |
| shutil.copy2(idx_path, os.path.join(RUNTIME_DIR, INDEX_NAME)) | |
| shutil.copy2(meta_path, os.path.join(RUNTIME_DIR, META_NAME)) | |
| _runtime_index = faiss.read_index(os.path.join(RUNTIME_DIR, INDEX_NAME)) | |
| with open(os.path.join(RUNTIME_DIR, META_NAME), "r", encoding="utf-8") as f: | |
| _runtime_meta = json.load(f) | |
| _meta_loaded_time = time.time() | |
| print(f"✅ Loaded FAISS index ({len(_runtime_meta)} vectors).") | |
| return True | |
| except Exception as e: | |
| print(f"⚠️ Could not load local FAISS index: {e}") | |
| _runtime_index = None | |
| _runtime_meta = None | |
| return False | |
| def _download_index_from_hub() -> bool: | |
| """Download FAISS artifacts from Hugging Face dataset repo.""" | |
| _ensure_dirs() | |
| try: | |
| print("☁️ Downloading FAISS artifacts from HF dataset…") | |
| idx = hf_hub_download(repo_id=HF_INDEX_REPO, | |
| filename=f"persistent/{INDEX_NAME}", | |
| repo_type="dataset") | |
| meta = hf_hub_download(repo_id=HF_INDEX_REPO, | |
| filename=f"persistent/{META_NAME}", | |
| repo_type="dataset") | |
| shutil.copy2(idx, os.path.join(PERSISTENT_DIR, INDEX_NAME)) | |
| shutil.copy2(meta, os.path.join(PERSISTENT_DIR, META_NAME)) | |
| print("✅ FAISS artifacts downloaded and stored persistently.") | |
| return _load_local_index() | |
| except Exception as e: | |
| print(f"⚠️ HF download failed: {e}") | |
| return False | |
| def _ensure_faiss_index(force_refresh: bool = False) -> bool: | |
| """ | |
| Ensure runtime FAISS is available. | |
| If force_refresh=True, clears runtime and reloads fresh. | |
| """ | |
| global _runtime_index, _runtime_meta, _meta_loaded_time | |
| _ensure_dirs() | |
| if force_refresh: | |
| try: | |
| shutil.rmtree(RUNTIME_DIR, ignore_errors=True) | |
| _runtime_index = None | |
| _runtime_meta = None | |
| print("♻️ Forced FAISS runtime reload requested.") | |
| except Exception as e: | |
| print(f"⚠️ Force refresh failed: {e}") | |
| if _runtime_index is not None and (time.time() - _meta_loaded_time) < _META_TTL_SECONDS: | |
| return True | |
| if _load_local_index(): | |
| return True | |
| if _download_index_from_hub(): | |
| return True | |
| print("⚠️ No FAISS index found locally or remotely.") | |
| return False | |
| # ------------------------------------------------------------------ | |
| # 🔹 Accessors | |
| # ------------------------------------------------------------------ | |
| def load_all_text_chunks() -> List[Dict[str, Any]]: | |
| """Return metadata list for BM25 fallback or analysis.""" | |
| global _runtime_meta, _meta_loaded_time | |
| if _runtime_meta is None: | |
| if not _ensure_faiss_index(): | |
| return [] | |
| if (time.time() - _meta_loaded_time) > _META_TTL_SECONDS: | |
| try: | |
| meta_path = os.path.join(RUNTIME_DIR, META_NAME) | |
| with open(meta_path, "r", encoding="utf-8") as f: | |
| _runtime_meta = json.load(f) | |
| _meta_loaded_time = time.time() | |
| except Exception: | |
| pass | |
| return _runtime_meta or [] | |
| # ------------------------------------------------------------------ | |
| # 🔹 Core Search | |
| # ------------------------------------------------------------------ | |
| def search_index(query: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
| """Perform semantic FAISS search and return metadata hits.""" | |
| if not _ensure_faiss_index(): | |
| return [] | |
| try: | |
| model = _ensure_model() | |
| q_emb = model.encode([query], convert_to_numpy=True).astype("float32") | |
| faiss.normalize_L2(q_emb) | |
| D, I = _runtime_index.search(q_emb, top_k) | |
| results = [] | |
| for dist, idx in zip(D[0], I[0]): | |
| if idx < 0 or idx >= len(_runtime_meta): | |
| continue | |
| meta = dict(_runtime_meta[idx]) | |
| meta["score"] = float(dist) | |
| meta["file"] = meta.get("file") or meta.get("source") or "unknown" | |
| meta["text"] = meta.get("text") or meta.get("definition", "") | |
| results.append(meta) | |
| return results | |
| except Exception as e: | |
| print(f"⚠️ FAISS search failed: {e}") | |
| return [] | |