Spaces:
Sleeping
Sleeping
| # rag_utils.py | |
| import os, json, time | |
| from typing import List, Dict, Tuple | |
| import numpy as np | |
| # Embedding / Vector index | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| # -------- Paths -------- | |
| ROOT_DIR = os.path.dirname(__file__) | |
| DATA_DIR = os.path.join(ROOT_DIR, "data") | |
| CACHE_DIR = "/tmp" # Spaces 可寫 | |
| TAROT_JSON = os.path.join(DATA_DIR, "tarot_data_full.json") | |
| NUM_JSON = os.path.join(DATA_DIR, "numerology_data_full.json") | |
| EXT_JSONL = os.path.join(DATA_DIR, "external_chunks.jsonl") # 可選(爬蟲結果) | |
| TAROT_IDX = os.path.join(CACHE_DIR, "faiss_tarot.index") | |
| TAROT_META = os.path.join(CACHE_DIR, "faiss_tarot_meta.json") | |
| NUM_IDX = os.path.join(CACHE_DIR, "faiss_num.index") | |
| NUM_META = os.path.join(CACHE_DIR, "faiss_num_meta.json") | |
| EXT_IDX = os.path.join(CACHE_DIR, "faiss_ext.index") | |
| EXT_META = os.path.join(CACHE_DIR, "faiss_ext_meta.json") | |
| EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # 輕量好用 | |
| _model = None | |
| def get_model(): | |
| global _model | |
| if _model is None: | |
| _model = SentenceTransformer(EMBED_MODEL) | |
| return _model | |
| # ---------- Utils ---------- | |
| def _mtime(path: str) -> float: | |
| return os.path.getmtime(path) if os.path.exists(path) else 0.0 | |
| def _should_rebuild(src_paths: List[str], idx_paths: List[str]) -> bool: | |
| """任一來源較新,或索引不存在 → 重建""" | |
| src_time = max((_mtime(p) for p in src_paths if p and os.path.exists(p)), default=0.0) | |
| idx_time = min((_mtime(p) for p in idx_paths if p and os.path.exists(p)), default=0.0) | |
| if not all(os.path.exists(p) for p in idx_paths): | |
| return True | |
| return src_time > idx_time | |
| def _encode(texts: List[str]) -> np.ndarray: | |
| model = get_model() | |
| embs = model.encode(texts, normalize_embeddings=True, batch_size=64, show_progress_bar=False) | |
| return np.asarray(embs, dtype="float32") | |
| def _build_index(texts: List[str], dim: int = None) -> faiss.IndexFlatIP: | |
| embs = _encode(texts) | |
| index = faiss.IndexFlatIP(embs.shape[1] if dim is None else dim) | |
| index.add(embs) | |
| return index | |
| def _save_index(index: faiss.Index, idx_path: str, meta: List[Dict], meta_path: str): | |
| faiss.write_index(index, idx_path) | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump(meta, f, ensure_ascii=False, indent=2) | |
| def _search(idx_path: str, meta_path: str, query: str, k: int) -> List[Dict]: | |
| if not (os.path.exists(idx_path) and os.path.exists(meta_path)): | |
| return [] | |
| index = faiss.read_index(idx_path) | |
| with open(meta_path, "r", encoding="utf-8") as f: | |
| meta = json.load(f) | |
| q = _encode([query]) | |
| D, I = index.search(q, k) | |
| out = [] | |
| for rank, (score, j) in enumerate(zip(D[0], I[0]), 1): | |
| if j < 0 or j >= len(meta): | |
| continue | |
| m = dict(meta[j]) | |
| m["score"] = float(score) | |
| m["rank"] = rank | |
| out.append(m) | |
| return out | |
| # ---------- Tarot ---------- | |
| def _build_tarot(): | |
| with open(TAROT_JSON, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| items = [] | |
| for i, c in enumerate(data): | |
| name = c.get("name") or c.get("card_name") or f"Card {i}" | |
| # 蒐集能用到的文字欄位 | |
| parts = [] | |
| for key in ("meaning_upright", "meaning_reversed", "description", "advice"): | |
| val = c.get(key) | |
| if isinstance(val, str) and val.strip(): | |
| parts.append(val.strip()) | |
| # keywords 可能是 list | |
| for key in ("keywords_upright", "keywords_reversed", "keywords"): | |
| val = c.get(key) | |
| if isinstance(val, list) and val: | |
| parts.append(" ".join(val)) | |
| elif isinstance(val, str) and val.strip(): | |
| parts.append(val.strip()) | |
| text = (name + " - " + " ".join(parts)).strip() | |
| items.append({"card_name": name, "text": text}) | |
| texts = [it["text"] for it in items] | |
| index = _build_index(texts) | |
| _save_index(index, TAROT_IDX, items, TAROT_META) | |
| def search_tarot(query: str, k: int = 3) -> List[Dict]: | |
| ensure_indexes() | |
| return _search(TAROT_IDX, TAROT_META, query, k) | |
| # ---------- Numerology ---------- | |
| def _build_num(): | |
| with open(NUM_JSON, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| items = [] | |
| for r in data: | |
| num = r.get("number") | |
| if num is None: | |
| continue | |
| num_int = int(num) | |
| parts = [str(num_int)] | |
| for key in ("name", "description", "advice"): | |
| val = r.get(key) | |
| if isinstance(val, str) and val.strip(): | |
| parts.append(val.strip()) | |
| kws = r.get("keywords", []) | |
| if isinstance(kws, list) and kws: | |
| parts.append(" ".join(kws)) | |
| text = " ".join(parts) | |
| items.append({"number": num_int, "text": text}) | |
| texts = [it["text"] for it in items] | |
| index = _build_index(texts) | |
| _save_index(index, NUM_IDX, items, NUM_META) | |
| def search_numerology(query: str, k: int = 3) -> List[Dict]: | |
| ensure_indexes() | |
| return _search(NUM_IDX, NUM_META, query, k) | |
| # ---------- External corpus (optional) ---------- | |
| def _build_external(): | |
| """讀取 data/external_chunks.jsonl(每行一個 JSON:{id,url,title,text})""" | |
| items, texts = [], [] | |
| with open(EXT_JSONL, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| j = json.loads(line) | |
| except Exception: | |
| continue | |
| text = (j.get("title","").strip() + " " + j.get("text","").strip()).strip() | |
| if not text: | |
| continue | |
| items.append({ | |
| "id": j.get("id"), | |
| "url": j.get("url"), | |
| "title": j.get("title"), | |
| "text": text | |
| }) | |
| texts.append(text) | |
| if not texts: | |
| # 沒內容就不建索引,留空檔避免噴錯 | |
| return | |
| index = _build_index(texts) | |
| _save_index(index, EXT_IDX, items, EXT_META) | |
| def search_external(query: str, k: int = 3) -> List[Dict]: | |
| """如果沒有 external 索引/檔案會傳回空陣列,不報錯。""" | |
| if not (os.path.exists(EXT_IDX) and os.path.exists(EXT_META)): | |
| return [] | |
| return _search(EXT_IDX, EXT_META, query, k) | |
| # ---------- Ensure / Union ---------- | |
| def ensure_indexes(): | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| # Tarot | |
| if _should_rebuild([TAROT_JSON], [TAROT_IDX, TAROT_META]): | |
| try: | |
| _build_tarot() | |
| except Exception as e: | |
| print("[rag_utils] build tarot failed:", e) | |
| # Numerology | |
| if _should_rebuild([NUM_JSON], [NUM_IDX, NUM_META]): | |
| try: | |
| _build_num() | |
| except Exception as e: | |
| print("[rag_utils] build numerology failed:", e) | |
| # External(可選) | |
| if os.path.exists(EXT_JSONL) and _should_rebuild([EXT_JSONL], [EXT_IDX, EXT_META]): | |
| try: | |
| _build_external() | |
| except Exception as e: | |
| print("[rag_utils] build external failed:", e) | |
| def search_union(query: str, k_each: int = 2) -> List[Dict]: | |
| """把 Tarot + Numerology + External 合併(各取 k_each),再依分數排序。""" | |
| ensure_indexes() | |
| out = [] | |
| out += search_tarot(query, k_each) | |
| out += search_numerology(query, k_each) | |
| out += search_external(query, k_each) | |
| # 排序:score 高在前 | |
| out.sort(key=lambda x: x.get("score", 0.0), reverse=True) | |
| return out[: max(3, k_each)] # 至少回 3 則 |