turkish-wikipedia-rag / rag_pipeline.py
ecceembusra's picture
Update rag_pipeline.py
24d2727 verified
raw
history blame
8.28 kB
# rag_pipeline.py
import os, json, re, shutil
from typing import List, Dict, Tuple
from functools import lru_cache
import faiss
import numpy as np
from huggingface_hub import hf_hub_download
from providers import embed, generate, rerank, qa_extract
# =========================
# Ayarlar
# =========================
VSTORE_DIR = "vectorstore"
FAISS_FILE = "index.faiss"
META_JSONL = "meta.jsonl"
TOP_K_DEFAULT = 4
FETCH_K_DEFAULT = 16
HNSW_EFSEARCH = 32
HIGH_SCORE_THRES = 0.78
MARGIN_THRES = 0.06
CTX_CHAR_LIMIT = 1400
QA_SCORE_THRES = 0.25
QA_PER_PASSAGES = 4
W_TITLE_BOOST = 0.25
W_LEXICAL = 0.15
# =========================
# Regex tanımları
# =========================
DATE_RX = re.compile(
r"\b(\d{1,2}\s+(Ocak|Şubat|Mart|Nisan|Mayıs|Haziran|Temmuz|Ağustos|Eylül|Ekim|Kasım|Aralık)\s+\d{3,4}"
r"|\d{1,2}\.\d{1,2}\.\d{2,4}"
r"|\d{4})\b",
flags=re.IGNORECASE,
)
DEATH_KEYS = ["öldü", "vefat", "ölümü", "hayatını kaybet", "ölüm"]
FOUND_KEYS = ["kuruldu", "kuruluş", "kurulmuştur", "kuruluş tarihi"]
# =========================
# Yardımcı fonksiyonlar
# =========================
def _detect_repo_id() -> str:
"""Space repo kimliğini otomatik bulur."""
for key in ("SPACE_ID", "HF_SPACE_REPO_ID", "HF_REPO_ID"):
v = os.getenv(key)
if v:
return v
return ""
def _split_sentences(txt: str) -> List[str]:
parts = re.split(r"(?<=[.!?])\s+", (txt or "").strip())
return [p.strip() for p in parts if p.strip()]
def _extract_fact_sentence(query: str, hits: List[Dict]) -> Tuple[str, str]:
"""'ne zaman öldü / ne zaman kuruldu' gibi sorular için tarih içeren cümleyi bulur."""
q = (query or "").lower()
if "ne zaman" not in q:
return "", ""
if any(k in q for k in DEATH_KEYS):
keylist = DEATH_KEYS
elif any(k in q for k in FOUND_KEYS):
keylist = FOUND_KEYS
else:
keylist = DEATH_KEYS + FOUND_KEYS
for h in hits:
sents = _split_sentences(h.get("text", ""))
for s in sents:
if any(k in s.lower() for k in keylist) and DATE_RX.search(s):
return s, h.get("source", "")
return "", ""
# =========================
# Vectorstore yükleme (Otomatik indirici versiyon)
# =========================
def load_vectorstore(vstore_dir: str = "vectorstore") -> Tuple[faiss.Index, List[Dict]]:
faiss_file = os.path.join(vstore_dir, "index.faiss")
meta_file = os.path.join(vstore_dir, "meta.jsonl")
have_local_faiss = os.path.exists(faiss_file)
have_local_meta = os.path.exists(meta_file)
if not (have_local_faiss and have_local_meta):
try:
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
repo_id = _detect_repo_id() or "ecceembusra/turkish-wikipedia-rag"
if not have_local_faiss:
local_faiss = hf_hub_download(
repo_id=repo_id,
repo_type="space",
filename="vectorstore/index.faiss",
local_dir=".",
local_dir_use_symlinks=False,
)
os.makedirs(vstore_dir, exist_ok=True)
shutil.copy2(local_faiss, faiss_file)
if not have_local_meta:
local_meta = hf_hub_download(
repo_id=repo_id,
repo_type="space",
filename="vectorstore/meta.jsonl",
local_dir=".",
local_dir_use_symlinks=False,
)
os.makedirs(vstore_dir, exist_ok=True)
shutil.copy2(local_meta, meta_file)
except Exception as e:
raise FileNotFoundError(
"'vectorstore/index.faiss' indirilemedi veya bulunamadı. "
"Lütfen bu dosyaları Space deposunda 'vectorstore/' klasörüne yükleyin "
"veya Settings > Variables kısmına 'HF_SPACE_REPO_ID' ekleyin.\n"
f"Hata ayrıntısı: {e}"
)
if not (os.path.exists(faiss_file) and os.path.exists(meta_file)):
raise FileNotFoundError(
"Vektör deposu bulunamadı. 'vectorstore/index.faiss' ve 'vectorstore/meta.jsonl' mevcut olmalı."
)
index = faiss.read_index(faiss_file)
try:
index.hnsw.efSearch = HNSW_EFSEARCH
except Exception:
pass
records: List[Dict] = []
with open(meta_file, "r", encoding="utf-8") as f:
for line in f:
obj = json.loads(line)
records.append({"text": obj.get("text", ""), "metadata": obj.get("metadata", {})})
if not records:
raise RuntimeError("meta.jsonl boş görünüyor.")
return index, records
# =========================
# Retrieval + QA
# =========================
_CAP_WORD = re.compile(r"\b([A-ZÇĞİIÖŞÜ][a-zçğıiöşü]+(?:\s+[A-ZÇĞİIÖŞÜ][a-zçğıiöşü]+)*)\b")
def _keywords_from_query(q: str) -> List[str]:
q = (q or "").strip()
caps = [m.group(1) for m in _CAP_WORD.finditer(q)]
nums = re.findall(r"\b\d{3,4}\b", q)
base = re.findall(r"[A-Za-zÇĞİIÖŞÜçğıiöşü]+", q)
base = [w.lower() for w in base if len(w) > 2]
return list(dict.fromkeys(caps + nums + base))
def _lexical_overlap(q_tokens: List[str], text: str) -> float:
toks = re.findall(r"[A-Za-zÇĞİIÖŞÜçğıiöşü]+", (text or "").lower())
if not toks:
return 0.0
qset = set([t for t in q_tokens if len(t) > 2])
tset = set([t for t in toks if len(t) > 2])
return len(qset & tset) / (len(qset) or 1)
@lru_cache(maxsize=256)
def _cached_query_vec(e5_query: str) -> np.ndarray:
v = embed([e5_query]).astype("float32")
return v
def search_chunks(query: str, index: faiss.Index, records: List[Dict], top_k: int = TOP_K_DEFAULT,
fetch_k: int = FETCH_K_DEFAULT) -> List[Dict]:
q = (query or "").strip()
q_e5 = "query: " + q
q_vec = _cached_query_vec(q_e5)
faiss.normalize_L2(q_vec)
scores, idxs = index.search(q_vec, fetch_k)
pool = []
for i, s in zip(idxs[0], scores[0]):
if 0 <= i < len(records):
md = records[i]["metadata"]
pool.append({
"text": records[i]["text"],
"title": md.get("title", ""),
"source": md.get("source", ""),
"score_vec": float(s)
})
if not pool:
return []
q_tokens = _keywords_from_query(q)
for p in pool:
title_hit = any(tok.lower() in (p.get("title", "").lower()) for tok in q_tokens)
lex = _lexical_overlap(q_tokens, p["text"]) * W_LEXICAL
boost = W_TITLE_BOOST if title_hit else 0
p["score_boosted"] = p["score_vec"] + boost + lex
pool.sort(key=lambda x: x["score_boosted"], reverse=True)
return pool[:top_k]
# =========================
# Nihai cevap
# =========================
def generate_answer(query: str, index: faiss.Index, records: List[Dict], top_k: int = TOP_K_DEFAULT) -> str:
hits = search_chunks(query, index, records, top_k=top_k)
if not hits:
return "Bilgi bulunamadı."
rule_sent, rule_src = _extract_fact_sentence(query, hits)
if rule_sent:
return f"{rule_sent}\n\nKaynaklar:\n- {rule_src or hits[0].get('source','')}"
best = {"answer": None, "score": 0.0, "src": None}
for h in hits[:QA_PER_PASSAGES]:
try:
qa = qa_extract(query, h["text"])
except Exception:
qa = None
if qa and qa.get("answer"):
score = float(qa.get("score", 0))
ans = qa["answer"].strip()
if re.search(r"\b(19\d{2}|20\d{2}|Atatürk|Gökçen|Kemal|Ankara|Fenerbahçe)\b", ans):
score += 0.3
if score > best["score"]:
best = {"answer": ans, "score": score, "src": h.get("source")}
if best["answer"]:
return f"{best['answer']}\n\nKaynaklar:\n- {best['src'] or hits[0].get('source','')}"
return "Verilen bağlamda bu sorunun cevabı bulunamadı."
# =========================
# Test
# =========================
if __name__ == "__main__":
idx, recs = load_vectorstore()
print(generate_answer("Türkiye'nin ilk kadın pilotu kimdir?", idx, recs))