Spaces:
Running
Running
| """ | |
| core/vector_sync.py | |
| ------------------------------------------------------------ | |
| Handles FAISS index rebuild + upload to Hugging Face dataset | |
| without caching, optimized for limited HF Space storage. | |
| """ | |
| import os | |
| import re | |
| import json | |
| import faiss | |
| import numpy as np | |
| from pathlib import Path | |
| from huggingface_hub import HfApi, hf_hub_download, upload_file, HfFolder | |
| from sentence_transformers import SentenceTransformer | |
| from nltk.stem import WordNetLemmatizer | |
| from core.van_normalizer import normalize_to_van | |
| # ========================================================== | |
| # Helper: Upload FAISS index + metadata to dataset safely | |
| # ========================================================== | |
| from huggingface_hub import HfApi | |
| def _upload_to_dataset(index_path: str, meta_path: str, repo_id: str): | |
| """ | |
| Upload FAISS index + metadata to Hugging Face dataset safely. | |
| Used by rebuild_index() in app.py. | |
| """ | |
| try: | |
| print(f"🚀 [vector_sync] Uploading FAISS index + metadata to {repo_id}...") | |
| api = HfApi() | |
| for path in [index_path, meta_path]: | |
| if not os.path.exists(path): | |
| print(f"⚠️ [vector_sync] Skipping {os.path.basename(path)} (not found locally).") | |
| continue | |
| api.upload_file( | |
| path_or_fileobj=path, | |
| path_in_repo=f"persistent/{os.path.basename(path)}", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Auto-upload {os.path.basename(path)}", | |
| ) | |
| print(f"✅ [vector_sync] Uploaded {os.path.basename(path)}") | |
| except Exception as e: | |
| print(f"⚠️ [vector_sync] Upload failed: {e}") | |
| # -------------------------------------------------------------------- | |
| # ⚙️ CONFIGURATION | |
| # -------------------------------------------------------------------- | |
| REPO_ID = "essprasad/CT-Chat-Index" | |
| REPO_TYPE = "dataset" | |
| REMOTE_DIR = "persistent/" | |
| FILES = ["faiss.index", "faiss.index.meta.json"] | |
| api = HfApi() | |
| token = HfFolder.get_token() or os.getenv("HF_TOKEN") | |
| # -------------------------------------------------------------------- | |
| # 🔹 NORMALIZATION HELPERS | |
| # -------------------------------------------------------------------- | |
| lemmatizer = WordNetLemmatizer() | |
| def normalize_for_index(term: str) -> str: | |
| """Normalize term for embedding.""" | |
| if not term: | |
| return "" | |
| s = term.lower().strip() | |
| s = re.sub(r"[\-_/\\.,;:()]+", " ", s) | |
| s = re.sub(r"\s+", " ", s) | |
| words = s.split() | |
| s = " ".join([lemmatizer.lemmatize(w) for w in words]) | |
| return s.strip() | |
| def prepare_text_for_embedding(term: str, definition: str) -> str: | |
| """Prepare text for embedding with VAN normalization.""" | |
| if not term: | |
| return "" | |
| t = term.lower().strip() | |
| t = re.sub(r"[^\w\s-]", " ", t) | |
| d = re.sub(r"\s+", " ", definition.strip()) | |
| t_van = normalize_to_van(t) | |
| return f"{t_van}. {d}".strip() | |
| # -------------------------------------------------------------------- | |
| # 🔹 1. IMPORT: Download FAISS from Hub (on-demand) | |
| # -------------------------------------------------------------------- | |
| def auto_import_from_hub(force=False): | |
| print(f"📥 [vector_sync] Checking for FAISS index on {REPO_ID}...") | |
| try: | |
| for fname in FILES: | |
| print(f"⬇️ Downloading {fname} ...") | |
| hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=f"{REMOTE_DIR}{fname}", | |
| repo_type=REPO_TYPE, | |
| local_dir="/home/user/app/tmp", | |
| cache_dir="/home/user/app/tmp", | |
| local_dir_use_symlinks=False, | |
| token=token, | |
| force_download=True, | |
| ) | |
| print("✅ FAISS index + metadata downloaded.") | |
| except Exception as e: | |
| print(f"⚠️ [vector_sync] Could not import FAISS files: {e}") | |
| # -------------------------------------------------------------------- | |
| # 🔹 2. EXPORT: Upload FAISS to Hub | |
| # -------------------------------------------------------------------- | |
| def auto_export_to_hub(commit_msg="Auto-sync after rebuild"): | |
| """Uploads FAISS index + metadata from /tmp/ to the dataset.""" | |
| if not token: | |
| print("⚠️ [vector_sync] No HF token found. Skipping upload.") | |
| return | |
| print(f"🚀 [vector_sync] Uploading FAISS index + metadata to {REPO_ID}...") | |
| try: | |
| api.upload_file( | |
| path_or_fileobj="/home/user/app/tmp/faiss.index", | |
| path_in_repo="persistent/faiss.index", | |
| repo_id=REPO_ID, | |
| repo_type=REPO_TYPE, | |
| token=token, | |
| commit_message=commit_msg, | |
| ) | |
| api.upload_file( | |
| path_or_fileobj="/home/user/app/tmp/faiss.index.meta.json", | |
| path_in_repo="persistent/faiss.index.meta.json", | |
| repo_id=REPO_ID, | |
| repo_type=REPO_TYPE, | |
| token=token, | |
| commit_message=commit_msg, | |
| ) | |
| print("✅ [vector_sync] Upload complete.") | |
| except Exception as e: | |
| print(f"⚠️ [vector_sync] Upload failed: {e}") | |
| # -------------------------------------------------------------------- | |
| # 🔹 3. REBUILD: Create FAISS from glossary.json | |
| # -------------------------------------------------------------------- | |
| def rebuild_faiss_from_glossary( | |
| glossary_path="/home/user/app/persistent/glossary.json", | |
| model_name="all-MiniLM-L6-v2", | |
| ): | |
| """Rebuild FAISS index from glossary.json (no caching, low footprint).""" | |
| try: | |
| print(f"🧠 [vector_sync] Rebuilding FAISS from: {glossary_path}") | |
| if not os.path.isfile(glossary_path): | |
| print(f"⚠️ Glossary not found: {glossary_path}") | |
| return None, None | |
| with open(glossary_path, "r", encoding="utf-8") as f: | |
| glossary = json.load(f) | |
| print(f"📘 Loaded {len(glossary)} glossary entries.") | |
| model = SentenceTransformer(model_name) | |
| texts, metas = [], [] | |
| for k, v in glossary.items(): | |
| term = v.get("term", k) | |
| definition = v.get("definition", "") | |
| sources = v.get("sources", []) | |
| if not definition.strip(): | |
| continue | |
| combined = prepare_text_for_embedding(term, definition) | |
| texts.append(combined) | |
| metas.append({"term": term, "definition": definition, "sources": sources}) | |
| if not texts: | |
| print("⚠️ No valid glossary entries for embedding.") | |
| return None, None | |
| print(f"🧩 Encoding {len(texts)} entries with {model_name}...") | |
| embeddings = model.encode(texts, show_progress_bar=True, convert_to_numpy=True).astype("float32") | |
| faiss.normalize_L2(embeddings) | |
| dim = embeddings.shape[1] | |
| index = faiss.IndexFlatIP(dim) | |
| index.add(embeddings) | |
| tmp_dir = "/home/user/app/tmp" | |
| os.makedirs(tmp_dir, exist_ok=True) | |
| tmp_index = os.path.join(tmp_dir, "faiss.index") | |
| tmp_meta = os.path.join(tmp_dir, "faiss.index.meta.json") | |
| faiss.write_index(index, tmp_index) | |
| with open(tmp_meta, "w", encoding="utf-8") as f: | |
| json.dump(metas, f, indent=2, ensure_ascii=False) | |
| # Upload and cleanup | |
| auto_export_to_hub("Glossary-based FAISS rebuild") | |
| os.remove(tmp_index) | |
| os.remove(tmp_meta) | |
| print(f"✅ [vector_sync] Rebuild complete — {len(texts)} vectors uploaded to dataset.") | |
| return index, metas | |
| except Exception as e: | |
| print(f"⚠️ Error in rebuild_faiss_from_glossary: {e}") | |
| return None, None | |