|
|
|
|
|
import os, json, re, shutil |
|
|
from typing import List, Dict, Tuple |
|
|
from functools import lru_cache |
|
|
|
|
|
import faiss |
|
|
import numpy as np |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
from providers import embed, generate, rerank, qa_extract |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
VSTORE_DIR = "vectorstore" |
|
|
FAISS_FILE = "index.faiss" |
|
|
META_JSONL = "meta.jsonl" |
|
|
|
|
|
TOP_K_DEFAULT = 4 |
|
|
FETCH_K_DEFAULT = 16 |
|
|
HNSW_EFSEARCH = 32 |
|
|
HIGH_SCORE_THRES = 0.78 |
|
|
MARGIN_THRES = 0.06 |
|
|
|
|
|
CTX_CHAR_LIMIT = 1400 |
|
|
QA_SCORE_THRES = 0.25 |
|
|
QA_PER_PASSAGES = 4 |
|
|
|
|
|
W_TITLE_BOOST = 0.25 |
|
|
W_LEXICAL = 0.15 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DATE_RX = re.compile( |
|
|
r"\b(\d{1,2}\s+(Ocak|Şubat|Mart|Nisan|Mayıs|Haziran|Temmuz|Ağustos|Eylül|Ekim|Kasım|Aralık)\s+\d{3,4}" |
|
|
r"|\d{1,2}\.\d{1,2}\.\d{2,4}" |
|
|
r"|\d{4})\b", |
|
|
flags=re.IGNORECASE, |
|
|
) |
|
|
|
|
|
DEATH_KEYS = ["öldü", "vefat", "ölümü", "hayatını kaybet", "ölüm"] |
|
|
FOUND_KEYS = ["kuruldu", "kuruluş", "kurulmuştur", "kuruluş tarihi"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _detect_repo_id() -> str: |
|
|
"""Space repo kimliğini otomatik bulur.""" |
|
|
for key in ("SPACE_ID", "HF_SPACE_REPO_ID", "HF_REPO_ID"): |
|
|
v = os.getenv(key) |
|
|
if v: |
|
|
return v |
|
|
return "" |
|
|
|
|
|
|
|
|
def _split_sentences(txt: str) -> List[str]: |
|
|
parts = re.split(r"(?<=[.!?])\s+", (txt or "").strip()) |
|
|
return [p.strip() for p in parts if p.strip()] |
|
|
|
|
|
|
|
|
def _extract_fact_sentence(query: str, hits: List[Dict]) -> Tuple[str, str]: |
|
|
"""'ne zaman öldü / ne zaman kuruldu' gibi sorular için tarih içeren cümleyi bulur.""" |
|
|
q = (query or "").lower() |
|
|
if "ne zaman" not in q: |
|
|
return "", "" |
|
|
|
|
|
if any(k in q for k in DEATH_KEYS): |
|
|
keylist = DEATH_KEYS |
|
|
elif any(k in q for k in FOUND_KEYS): |
|
|
keylist = FOUND_KEYS |
|
|
else: |
|
|
keylist = DEATH_KEYS + FOUND_KEYS |
|
|
|
|
|
for h in hits: |
|
|
sents = _split_sentences(h.get("text", "")) |
|
|
for s in sents: |
|
|
if any(k in s.lower() for k in keylist) and DATE_RX.search(s): |
|
|
return s, h.get("source", "") |
|
|
return "", "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_vectorstore(vstore_dir: str = "vectorstore") -> Tuple[faiss.Index, List[Dict]]: |
|
|
faiss_file = os.path.join(vstore_dir, "index.faiss") |
|
|
meta_file = os.path.join(vstore_dir, "meta.jsonl") |
|
|
|
|
|
have_local_faiss = os.path.exists(faiss_file) |
|
|
have_local_meta = os.path.exists(meta_file) |
|
|
|
|
|
if not (have_local_faiss and have_local_meta): |
|
|
try: |
|
|
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") |
|
|
repo_id = _detect_repo_id() or "ecceembusra/turkish-wikipedia-rag" |
|
|
|
|
|
if not have_local_faiss: |
|
|
local_faiss = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
filename="vectorstore/index.faiss", |
|
|
local_dir=".", |
|
|
local_dir_use_symlinks=False, |
|
|
) |
|
|
os.makedirs(vstore_dir, exist_ok=True) |
|
|
shutil.copy2(local_faiss, faiss_file) |
|
|
|
|
|
if not have_local_meta: |
|
|
local_meta = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
filename="vectorstore/meta.jsonl", |
|
|
local_dir=".", |
|
|
local_dir_use_symlinks=False, |
|
|
) |
|
|
os.makedirs(vstore_dir, exist_ok=True) |
|
|
shutil.copy2(local_meta, meta_file) |
|
|
|
|
|
except Exception as e: |
|
|
raise FileNotFoundError( |
|
|
"'vectorstore/index.faiss' indirilemedi veya bulunamadı. " |
|
|
"Lütfen bu dosyaları Space deposunda 'vectorstore/' klasörüne yükleyin " |
|
|
"veya Settings > Variables kısmına 'HF_SPACE_REPO_ID' ekleyin.\n" |
|
|
f"Hata ayrıntısı: {e}" |
|
|
) |
|
|
|
|
|
if not (os.path.exists(faiss_file) and os.path.exists(meta_file)): |
|
|
raise FileNotFoundError( |
|
|
"Vektör deposu bulunamadı. 'vectorstore/index.faiss' ve 'vectorstore/meta.jsonl' mevcut olmalı." |
|
|
) |
|
|
|
|
|
index = faiss.read_index(faiss_file) |
|
|
try: |
|
|
index.hnsw.efSearch = HNSW_EFSEARCH |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
records: List[Dict] = [] |
|
|
with open(meta_file, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
obj = json.loads(line) |
|
|
records.append({"text": obj.get("text", ""), "metadata": obj.get("metadata", {})}) |
|
|
|
|
|
if not records: |
|
|
raise RuntimeError("meta.jsonl boş görünüyor.") |
|
|
return index, records |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_CAP_WORD = re.compile(r"\b([A-ZÇĞİIÖŞÜ][a-zçğıiöşü]+(?:\s+[A-ZÇĞİIÖŞÜ][a-zçğıiöşü]+)*)\b") |
|
|
|
|
|
def _keywords_from_query(q: str) -> List[str]: |
|
|
q = (q or "").strip() |
|
|
caps = [m.group(1) for m in _CAP_WORD.finditer(q)] |
|
|
nums = re.findall(r"\b\d{3,4}\b", q) |
|
|
base = re.findall(r"[A-Za-zÇĞİIÖŞÜçğıiöşü]+", q) |
|
|
base = [w.lower() for w in base if len(w) > 2] |
|
|
return list(dict.fromkeys(caps + nums + base)) |
|
|
|
|
|
|
|
|
def _lexical_overlap(q_tokens: List[str], text: str) -> float: |
|
|
toks = re.findall(r"[A-Za-zÇĞİIÖŞÜçğıiöşü]+", (text or "").lower()) |
|
|
if not toks: |
|
|
return 0.0 |
|
|
qset = set([t for t in q_tokens if len(t) > 2]) |
|
|
tset = set([t for t in toks if len(t) > 2]) |
|
|
return len(qset & tset) / (len(qset) or 1) |
|
|
|
|
|
|
|
|
@lru_cache(maxsize=256) |
|
|
def _cached_query_vec(e5_query: str) -> np.ndarray: |
|
|
v = embed([e5_query]).astype("float32") |
|
|
return v |
|
|
|
|
|
|
|
|
def search_chunks(query: str, index: faiss.Index, records: List[Dict], top_k: int = TOP_K_DEFAULT, |
|
|
fetch_k: int = FETCH_K_DEFAULT) -> List[Dict]: |
|
|
q = (query or "").strip() |
|
|
q_e5 = "query: " + q |
|
|
q_vec = _cached_query_vec(q_e5) |
|
|
faiss.normalize_L2(q_vec) |
|
|
|
|
|
scores, idxs = index.search(q_vec, fetch_k) |
|
|
pool = [] |
|
|
for i, s in zip(idxs[0], scores[0]): |
|
|
if 0 <= i < len(records): |
|
|
md = records[i]["metadata"] |
|
|
pool.append({ |
|
|
"text": records[i]["text"], |
|
|
"title": md.get("title", ""), |
|
|
"source": md.get("source", ""), |
|
|
"score_vec": float(s) |
|
|
}) |
|
|
if not pool: |
|
|
return [] |
|
|
|
|
|
q_tokens = _keywords_from_query(q) |
|
|
for p in pool: |
|
|
title_hit = any(tok.lower() in (p.get("title", "").lower()) for tok in q_tokens) |
|
|
lex = _lexical_overlap(q_tokens, p["text"]) * W_LEXICAL |
|
|
boost = W_TITLE_BOOST if title_hit else 0 |
|
|
p["score_boosted"] = p["score_vec"] + boost + lex |
|
|
|
|
|
pool.sort(key=lambda x: x["score_boosted"], reverse=True) |
|
|
return pool[:top_k] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_answer(query: str, index: faiss.Index, records: List[Dict], top_k: int = TOP_K_DEFAULT) -> str: |
|
|
hits = search_chunks(query, index, records, top_k=top_k) |
|
|
if not hits: |
|
|
return "Bilgi bulunamadı." |
|
|
|
|
|
rule_sent, rule_src = _extract_fact_sentence(query, hits) |
|
|
if rule_sent: |
|
|
return f"{rule_sent}\n\nKaynaklar:\n- {rule_src or hits[0].get('source','')}" |
|
|
|
|
|
best = {"answer": None, "score": 0.0, "src": None} |
|
|
for h in hits[:QA_PER_PASSAGES]: |
|
|
try: |
|
|
qa = qa_extract(query, h["text"]) |
|
|
except Exception: |
|
|
qa = None |
|
|
if qa and qa.get("answer"): |
|
|
score = float(qa.get("score", 0)) |
|
|
ans = qa["answer"].strip() |
|
|
if re.search(r"\b(19\d{2}|20\d{2}|Atatürk|Gökçen|Kemal|Ankara|Fenerbahçe)\b", ans): |
|
|
score += 0.3 |
|
|
if score > best["score"]: |
|
|
best = {"answer": ans, "score": score, "src": h.get("source")} |
|
|
|
|
|
if best["answer"]: |
|
|
return f"{best['answer']}\n\nKaynaklar:\n- {best['src'] or hits[0].get('source','')}" |
|
|
return "Verilen bağlamda bu sorunun cevabı bulunamadı." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
idx, recs = load_vectorstore() |
|
|
print(generate_answer("Türkiye'nin ilk kadın pilotu kimdir?", idx, recs)) |