Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| from __future__ import annotations | |
| import os, time, uuid, logging, random | |
| from typing import List, Optional, Dict, Any, Tuple | |
| import numpy as np | |
| import requests | |
| from fastapi import FastAPI, BackgroundTasks, Header, HTTPException, Query | |
| from pydantic import BaseModel, Field | |
| # Qdrant (optionnel si VECTOR_STORE=memory) | |
| try: | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import VectorParams, Distance, PointStruct | |
| except Exception: # si non installé, on retombe en mémoire | |
| QdrantClient = None | |
| VectorParams = Distance = PointStruct = None | |
| # ---------- logging ---------- | |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s") | |
| LOG = logging.getLogger("remote_indexer") | |
| # ---------- ENV (config) ---------- | |
| # Choix du store: "qdrant" (par défaut) ou "memory" | |
| VECTOR_STORE = os.getenv("VECTOR_STORE", "qdrant").strip().lower() | |
| # Ordre des backends d'embeddings à essayer. Par défaut: DeepInfra, puis HF. | |
| DEFAULT_BACKENDS = "deepinfra,hf" | |
| EMB_BACKEND_ORDER = [s.strip().lower() | |
| for s in os.getenv("EMB_BACKEND_ORDER", os.getenv("EMB_BACKEND", DEFAULT_BACKENDS)).split(",") | |
| if s.strip()] | |
| ALLOW_DI_AUTOFALLBACK = os.getenv("ALLOW_DI_AUTOFALLBACK", "true").lower() in ("1","true","yes","on") | |
| # HF Inference API | |
| HF_TOKEN = os.getenv("HF_API_TOKEN", "").strip() | |
| HF_MODEL = os.getenv("HF_EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2").strip() | |
| HF_API_URL_USER = os.getenv("HF_API_URL", "").strip() | |
| HF_API_URL_PIPELINE = os.getenv("HF_API_URL_PIPELINE", "").strip() | |
| HF_API_URL_MODELS = os.getenv("HF_API_URL_MODELS", "").strip() | |
| if HF_API_URL_USER: | |
| if "/pipeline" in HF_API_URL_USER: | |
| HF_API_URL_PIPELINE = HF_API_URL_USER | |
| else: | |
| HF_API_URL_MODELS = HF_API_URL_USER | |
| HF_URL_PIPELINE = (HF_API_URL_PIPELINE or f"https://api-inference.huggingface.co/pipeline/feature-extraction/{HF_MODEL}") | |
| HF_URL_MODELS = (HF_API_URL_MODELS or f"https://api-inference.huggingface.co/models/{HF_MODEL}") | |
| HF_TIMEOUT = float(os.getenv("EMB_TIMEOUT_SEC", "120")) | |
| HF_WAIT = os.getenv("HF_WAIT_FOR_MODEL", "true").lower() in ("1","true","yes","on") | |
| HF_PIPELINE_FIRST = os.getenv("HF_PIPELINE_FIRST", "true").lower() in ("1","true","yes","on") | |
| # DeepInfra (OpenAI-compatible embeddings) | |
| DI_TOKEN = os.getenv("DEEPINFRA_API_KEY", "").strip() | |
| DI_MODEL = os.getenv("DEEPINFRA_EMBED_MODEL", "BAAI/bge-m3").strip() | |
| DI_URL = os.getenv("DEEPINFRA_EMBED_URL", "https://api.deepinfra.com/v1/openai/embeddings").strip() | |
| DI_TIMEOUT = float(os.getenv("EMB_TIMEOUT_SEC", "120")) | |
| # Retries embeddings | |
| RETRY_MAX = int(os.getenv("EMB_RETRY_MAX", "6")) | |
| RETRY_BASE_SEC = float(os.getenv("EMB_RETRY_BASE", "1.5")) | |
| RETRY_JITTER = float(os.getenv("EMB_RETRY_JITTER", "0.35")) | |
| # Qdrant | |
| QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333").strip() | |
| QDRANT_API = os.getenv("QDRANT_API_KEY", "").strip() | |
| # Auth d’API du service (simple header) | |
| AUTH_TOKEN = os.getenv("REMOTE_INDEX_TOKEN", "").strip() | |
| LOG.info(f"Embeddings backend order = {EMB_BACKEND_ORDER}") | |
| LOG.info(f"HF pipeline URL = {HF_URL_PIPELINE}") | |
| LOG.info(f"HF models URL = {HF_URL_MODELS}") | |
| LOG.info(f"VECTOR_STORE = {VECTOR_STORE}") | |
| if "hf" in EMB_BACKEND_ORDER and not HF_TOKEN: | |
| LOG.warning("HF_API_TOKEN manquant — tentatives HF échoueront.") | |
| if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN: | |
| LOG.warning("DEEPINFRA_API_KEY manquant — tentatives DeepInfra échoueront.") | |
| # ---------- Vector store abstraction ---------- | |
| class VectorStoreBase: | |
| def ensure_collection(self, name: str, dim: int): ... | |
| def upsert(self, name: str, vectors: np.ndarray, payloads: List[dict]) -> int: ... | |
| def search(self, name: str, query_vec: np.ndarray, limit: int): | |
| """return list of objects with .score and .payload""" | |
| ... | |
| def wipe(self, name: str): ... | |
| class MemoryHit: | |
| def __init__(self, score: float, payload: dict): | |
| self.score = score | |
| self.payload = payload | |
| class MemoryStore(VectorStoreBase): | |
| """Simple store en mémoire (cosine sur vecteurs normalisés). Persistance: vie du process.""" | |
| def __init__(self): | |
| self.data: Dict[str, Dict[str, Any]] = {} # {col: {"dim": d, "vecs": np.ndarray [N,d], "payloads": List[dict]}} | |
| LOG.warning("Vector store: MEMORY (fallback). Les données sont volatiles (perdues au restart).") | |
| def ensure_collection(self, name: str, dim: int): | |
| col = self.data.get(name) | |
| if not col: | |
| self.data[name] = {"dim": dim, "vecs": np.zeros((0, dim), dtype=np.float32), "payloads": []} | |
| def upsert(self, name: str, vectors: np.ndarray, payloads: List[dict]) -> int: | |
| self.ensure_collection(name, vectors.shape[1]) | |
| col = self.data[name] | |
| if vectors.ndim != 2 or vectors.shape[1] != col["dim"]: | |
| raise RuntimeError(f"MemoryStore: bad shape {vectors.shape}, expected (*,{col['dim']})") | |
| col["vecs"] = np.vstack([col["vecs"], vectors.astype(np.float32)]) | |
| col["payloads"].extend(payloads) | |
| return vectors.shape[0] | |
| def search(self, name: str, query_vec: np.ndarray, limit: int): | |
| col = self.data.get(name) | |
| if not col or col["vecs"].shape[0] == 0: | |
| return [] | |
| V = col["vecs"] # [N,d], déjà normalisés | |
| q = query_vec.reshape(1, -1) # [1,d] | |
| scores = (V @ q.T).ravel() # cos sim | |
| idx = np.argsort(-scores)[:limit] | |
| return [MemoryHit(float(scores[i]), col["payloads"][i]) for i in idx] | |
| def wipe(self, name: str): | |
| if name in self.data: | |
| del self.data[name] | |
| class QdrantStore(VectorStoreBase): | |
| def __init__(self, url: str, api_key: Optional[str]): | |
| if QdrantClient is None: | |
| raise RuntimeError("qdrant-client non installé.") | |
| self.client = QdrantClient(url=url, api_key=api_key if api_key else None) | |
| # ping rapide | |
| try: | |
| _ = self.client.get_collections() | |
| LOG.info("Connecté à Qdrant.") | |
| except Exception as e: | |
| raise RuntimeError(f"Connexion Qdrant impossible: {e}") | |
| def ensure_collection(self, name: str, dim: int): | |
| try: | |
| self.client.get_collection(name); return | |
| except Exception: | |
| pass | |
| self.client.create_collection( | |
| collection_name=name, | |
| vectors_config=VectorParams(size=dim, distance=Distance.COSINE), | |
| ) | |
| def upsert(self, name: str, vectors: np.ndarray, payloads: List[dict]) -> int: | |
| points = [ | |
| PointStruct(id=None, vector=v.tolist(), payload=payloads[i]) | |
| for i, v in enumerate(vectors) | |
| ] | |
| self.client.upsert(collection_name=name, points=points) | |
| return len(points) | |
| def search(self, name: str, query_vec: np.ndarray, limit: int): | |
| res = self.client.search(collection_name=name, query_vector=query_vec.tolist(), limit=limit) | |
| return res | |
| def wipe(self, name: str): | |
| self.client.delete_collection(name) | |
| # Sélection / auto-fallback du store | |
| STORE: VectorStoreBase | |
| def _init_store() -> VectorStoreBase: | |
| prefer = VECTOR_STORE | |
| if prefer == "memory": | |
| return MemoryStore() | |
| # prefer qdrant | |
| try: | |
| return QdrantStore(QDRANT_URL, QDRANT_API if QDRANT_API else None) | |
| except Exception as e: | |
| LOG.error(f"Qdrant indisponible ({e}) — fallback en mémoire.") | |
| return MemoryStore() | |
| STORE = _init_store() | |
| # ---------- Pydantic ---------- | |
| class FileIn(BaseModel): | |
| path: str | |
| text: str | |
| class IndexRequest(BaseModel): | |
| project_id: str = Field(..., min_length=1) | |
| files: List[FileIn] | |
| chunk_size: int = 1200 | |
| overlap: int = 200 | |
| batch_size: int = 8 | |
| store_text: bool = True | |
| class QueryRequest(BaseModel): | |
| project_id: str | |
| query: str | |
| top_k: int = 6 | |
| # ---------- Jobs store ---------- | |
| JOBS: Dict[str, Dict[str, Any]] = {} | |
| def _append_log(job_id: str, line: str): | |
| job = JOBS.get(job_id) | |
| if job: job["logs"].append(line) | |
| def _set_status(job_id: str, status: str): | |
| job = JOBS.get(job_id) | |
| if job: job["status"] = status | |
| def _auth(x_auth: Optional[str]): | |
| if AUTH_TOKEN and (x_auth or "") != AUTH_TOKEN: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # ---------- Helpers retry ---------- | |
| def _retry_sleep(attempt: int): | |
| back = (RETRY_BASE_SEC ** attempt) | |
| jitter = 1.0 + random.uniform(-RETRY_JITTER, RETRY_JITTER) | |
| return max(0.25, back * jitter) | |
| def _with_task_param(url: str, task: str = "feature-extraction") -> str: | |
| return url + ("&" if "?" in url else "?") + f"task={task}" | |
| # ---------- HF embeddings ---------- | |
| def _hf_http(url: str, payload: Dict[str, Any], headers_extra: Optional[Dict[str, str]] = None) -> Tuple[np.ndarray, int]: | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_API_TOKEN manquant (backend=hf).") | |
| headers = { | |
| "Authorization": f"Bearer {HF_TOKEN}", | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| if HF_WAIT: | |
| payload.setdefault("options", {})["wait_for_model"] = True | |
| headers["X-Wait-For-Model"] = "true" | |
| headers["X-Use-Cache"] = "true" | |
| if headers_extra: | |
| headers.update(headers_extra) | |
| r = requests.post(url, headers=headers, json=payload, timeout=HF_TIMEOUT) | |
| size = int(r.headers.get("Content-Length", "0")) | |
| if r.status_code >= 400: | |
| LOG.error(f"HF error {r.status_code}: {r.text[:1000]}") | |
| r.raise_for_status() | |
| data = r.json() | |
| arr = np.array(data, dtype=np.float32) | |
| if arr.ndim == 3: | |
| arr = arr.mean(axis=1) | |
| elif arr.ndim == 1: | |
| arr = arr.reshape(1, -1) | |
| if arr.ndim != 2: | |
| raise RuntimeError(f"HF: unexpected embeddings shape: {arr.shape}") | |
| norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12 | |
| arr = arr / norms | |
| return arr.astype(np.float32), size | |
| def _hf_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]: | |
| payload: Dict[str, Any] = {"inputs": (batch if len(batch) > 1 else batch[0])} | |
| urls = [HF_URL_PIPELINE, HF_URL_MODELS] if HF_PIPELINE_FIRST else [HF_URL_MODELS, HF_URL_PIPELINE] | |
| last_exc: Optional[Exception] = None | |
| for idx, url in enumerate(urls, 1): | |
| try: | |
| if "/models/" in url: | |
| return _hf_http(url, payload, headers_extra={"X-Task": "feature-extraction"}) | |
| else: | |
| return _hf_http(url, payload, headers_extra=None) | |
| except requests.HTTPError as he: | |
| code = he.response.status_code if he.response is not None else 0 | |
| body = he.response.text if he.response is not None else "" | |
| last_exc = he | |
| if code in (404, 405, 501) and idx < len(urls): | |
| LOG.warning(f"HF endpoint {url} non dispo ({code}), fallback vers alternative ...") | |
| continue | |
| if "/models/" in url and "SentenceSimilarityPipeline" in (body or ""): | |
| try: | |
| forced_url = _with_task_param(url, "feature-extraction") | |
| LOG.warning("HF MODELS a choisi Similarity -> retry avec %s + X-Task", forced_url) | |
| return _hf_http(forced_url, payload, headers_extra={"X-Task": "feature-extraction"}) | |
| except Exception as he2: | |
| last_exc = he2 | |
| raise | |
| except Exception as e: | |
| last_exc = e | |
| raise | |
| raise RuntimeError(f"HF: aucun endpoint utilisable ({last_exc})") | |
| # ---------- DeepInfra embeddings ---------- | |
| def _di_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]: | |
| if not DI_TOKEN: | |
| raise RuntimeError("DEEPINFRA_API_KEY manquant (backend=deepinfra).") | |
| headers = {"Authorization": f"Bearer {DI_TOKEN}", "Content-Type": "application/json", "Accept": "application/json"} | |
| payload = {"model": DI_MODEL, "input": batch} | |
| r = requests.post(DI_URL, headers=headers, json=payload, timeout=DI_TIMEOUT) | |
| size = int(r.headers.get("Content-Length", "0")) | |
| if r.status_code >= 400: | |
| LOG.error(f"DeepInfra error {r.status_code}: {r.text[:1000]}") | |
| r.raise_for_status() | |
| js = r.json() | |
| data = js.get("data") | |
| if not isinstance(data, list) or not data: | |
| raise RuntimeError(f"DeepInfra embeddings: réponse invalide {js}") | |
| embs = [d.get("embedding") for d in data] | |
| arr = np.asarray(embs, dtype=np.float32) | |
| if arr.ndim != 2: | |
| raise RuntimeError(f"DeepInfra: unexpected embeddings shape: {arr.shape}") | |
| norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12 | |
| arr = arr / norms | |
| return arr.astype(np.float32), size | |
| # ---------- Retry orchestrator ---------- | |
| def _retry_sleep(attempt: int): | |
| back = (RETRY_BASE_SEC ** attempt) | |
| jitter = 1.0 + random.uniform(-RETRY_JITTER, RETRY_JITTER) | |
| return max(0.25, back * jitter) | |
| def _call_with_retries(func, batch: List[str], label: str, job_id: Optional[str] = None) -> Tuple[np.ndarray, int]: | |
| last_exc = None | |
| for attempt in range(RETRY_MAX): | |
| try: | |
| if job_id: | |
| _append_log(job_id, f"{label}: try {attempt+1}/{RETRY_MAX} (batch={len(batch)})") | |
| return func(batch) | |
| except requests.HTTPError as he: | |
| code = he.response.status_code if he.response is not None else "HTTP" | |
| retriable = code in (429, 500, 502, 503, 504) | |
| if not retriable: | |
| raise | |
| sleep_s = _retry_sleep(attempt) | |
| msg = f"{label}: HTTP {code}, retry in {sleep_s:.1f}s" | |
| LOG.warning(msg); _append_log(job_id, msg) | |
| time.sleep(sleep_s) | |
| last_exc = he | |
| except Exception as e: | |
| sleep_s = _retry_sleep(attempt) | |
| msg = f"{label}: error {type(e).__name__}: {e}, retry in {sleep_s:.1f}s" | |
| LOG.warning(msg); _append_log(job_id, msg) | |
| time.sleep(sleep_s) | |
| last_exc = e | |
| raise RuntimeError(f"{label}: retries exhausted: {last_exc}") | |
| def _post_embeddings(batch: List[str], job_id: Optional[str] = None) -> Tuple[np.ndarray, int]: | |
| last_err = None | |
| similarity_misroute = False | |
| for b in EMB_BACKEND_ORDER: | |
| if b == "hf": | |
| try: | |
| return _call_with_retries(_hf_post_embeddings_once, batch, "HF", job_id) | |
| except requests.HTTPError as he: | |
| body = he.response.text if getattr(he, "response", None) is not None else "" | |
| if "SentenceSimilarityPipeline.__call__()" in (body or ""): | |
| similarity_misroute = True | |
| last_err = he | |
| _append_log(job_id, f"HF failed: {he}.") | |
| LOG.error(f"HF failed: {he}") | |
| elif b == "deepinfra": | |
| try: | |
| return _call_with_retries(_di_post_embeddings_once, batch, "DeepInfra", job_id) | |
| except Exception as e: | |
| last_err = e | |
| _append_log(job_id, f"DeepInfra failed: {e}.") | |
| LOG.error(f"DeepInfra failed: {e}") | |
| else: | |
| _append_log(job_id, f"Backend inconnu ignoré: {b}") | |
| if ALLOW_DI_AUTOFALLBACK and similarity_misroute and DI_TOKEN: | |
| LOG.warning("HF a routé sur SentenceSimilarity => auto-fallback DeepInfra (override ordre).") | |
| _append_log(job_id, "Auto-fallback DeepInfra (HF => SentenceSimilarity).") | |
| return _call_with_retries(_di_post_embeddings_once, batch, "DeepInfra", job_id) | |
| raise RuntimeError(f"Tous les backends ont échoué: {last_err}") | |
| # ---------- Chunking ---------- | |
| def _chunk_with_spans(text: str, size: int, overlap: int): | |
| n = len(text or "") | |
| if size <= 0: | |
| yield (0, n, text); return | |
| i = 0 | |
| while i < n: | |
| j = min(n, i + size) | |
| yield (i, j, text[i:j]) | |
| i = max(0, j - overlap) | |
| if i >= n: break | |
| # ---------- Background task ---------- | |
| def run_index_job(job_id: str, req: IndexRequest): | |
| try: | |
| _set_status(job_id, "running") | |
| total_chunks = 0 | |
| _append_log(job_id, f"Start project={req.project_id} files={len(req.files)} | backends={EMB_BACKEND_ORDER} | store={VECTOR_STORE}") | |
| LOG.info(f"[{job_id}] Index start project={req.project_id} files={len(req.files)}") | |
| # Warmup -> dimension | |
| warm = "warmup" | |
| if req.files: | |
| for _, _, chunk_txt in _chunk_with_spans(req.files[0].text or "", req.chunk_size, req.overlap): | |
| if (chunk_txt or "").strip(): | |
| warm = chunk_txt; break | |
| embs, _ = _post_embeddings([warm], job_id=job_id) | |
| dim = embs.shape[1] | |
| col = f"proj_{req.project_id}" | |
| STORE.ensure_collection(col, dim) | |
| _append_log(job_id, f"Collection ready: {col} (dim={dim})") | |
| # loop fichiers | |
| for fi, f in enumerate(req.files, 1): | |
| if not (f.text or "").strip(): | |
| _append_log(job_id, f"file {fi}: vide — ignoré") | |
| continue | |
| batch_txts, metas = [], [] | |
| def _flush(): | |
| nonlocal batch_txts, metas, total_chunks | |
| if not batch_txts: return | |
| vecs, sz = _post_embeddings(batch_txts, job_id=job_id) | |
| added = STORE.upsert(col, vecs, metas) | |
| total_chunks += added | |
| _append_log(job_id, f"file {fi}/{len(req.files)}: +{added} chunks (total={total_chunks})") | |
| batch_txts, metas = [], [] | |
| for ci, (start, end, chunk_txt) in enumerate(_chunk_with_spans(f.text, req.chunk_size, req.overlap)): | |
| if not (chunk_txt or "").strip(): | |
| continue | |
| batch_txts.append(chunk_txt) | |
| meta = {"path": f.path, "chunk": ci, "start": start, "end": end} | |
| if req.store_text: | |
| meta["text"] = chunk_txt | |
| metas.append(meta) | |
| if len(batch_txts) >= req.batch_size: | |
| _flush() | |
| _flush() | |
| _append_log(job_id, f"Done. chunks={total_chunks}") | |
| _set_status(job_id, "done") | |
| LOG.info(f"[{job_id}] Index finished. chunks={total_chunks}") | |
| except Exception as e: | |
| LOG.exception("Index job failed") | |
| _append_log(job_id, f"ERROR: {e}") | |
| _set_status(job_id, "error") | |
| # ---------- API ---------- | |
| app = FastAPI() | |
| def root(): | |
| return { | |
| "ok": True, | |
| "service": "remote-indexer", | |
| "backends": EMB_BACKEND_ORDER, | |
| "hf_url_pipeline": HF_URL_PIPELINE if "hf" in EMB_BACKEND_ORDER else None, | |
| "hf_url_models": HF_URL_MODELS if "hf" in EMB_BACKEND_ORDER else None, | |
| "di_url": DI_URL if "deepinfra" in EMB_BACKEND_ORDER else None, | |
| "di_model": DI_MODEL if "deepinfra" in EMB_BACKEND_ORDER else None, | |
| "vector_store": VECTOR_STORE, | |
| "vector_store_active": type(STORE).__name__, | |
| "docs": "/health, /index, /status/{job_id}, /query, /wipe" | |
| } | |
| def health(): | |
| return {"ok": True} | |
| def _check_backend_ready(): | |
| if "hf" in EMB_BACKEND_ORDER and not HF_TOKEN: | |
| raise HTTPException(400, "HF_API_TOKEN manquant côté serveur (backend=hf).") | |
| if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN and EMB_BACKEND_ORDER == ["deepinfra"]: | |
| raise HTTPException(400, "DEEPINFRA_API_KEY manquant côté serveur (backend=deepinfra).") | |
| def start_index(req: IndexRequest, background_tasks: BackgroundTasks, x_auth_token: Optional[str] = Header(default=None)): | |
| if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN: | |
| raise HTTPException(401, "Unauthorized") | |
| _check_backend_ready() | |
| non_empty = [f for f in req.files if (f.text or "").strip()] | |
| if not non_empty: | |
| raise HTTPException(422, "Aucun fichier non vide à indexer.") | |
| req.files = non_empty | |
| job_id = uuid.uuid4().hex[:12] | |
| JOBS[job_id] = {"status": "queued", "logs": [], "created": time.time()} | |
| background_tasks.add_task(run_index_job, job_id, req) | |
| return {"job_id": job_id} | |
| def status(job_id: str, x_auth_token: Optional[str] = Header(default=None)): | |
| if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN: | |
| raise HTTPException(401, "Unauthorized") | |
| j = JOBS.get(job_id) | |
| if not j: | |
| raise HTTPException(404, "job inconnu") | |
| return {"status": j["status"], "logs": j["logs"][-800:]} | |
| # Legacy compat | |
| def status_qp(job_id: str = Query(None), x_auth_token: Optional[str] = Header(default=None)): | |
| if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN: | |
| raise HTTPException(401, "Unauthorized") | |
| if not job_id: | |
| raise HTTPException(404, "job inconnu") | |
| j = JOBS.get(job_id) | |
| if not j: | |
| raise HTTPException(404, "job inconnu") | |
| return {"status": j["status"], "logs": j["logs"][-800:]} | |
| class _StatusBody(BaseModel): | |
| job_id: str | |
| def status_post(body: _StatusBody, x_auth_token: Optional[str] = Header(default=None)): | |
| if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN: | |
| raise HTTPException(401, "Unauthorized") | |
| j = JOBS.get(body.job_id) | |
| if not j: | |
| raise HTTPException(404, "job inconnu") | |
| return {"status": j["status"], "logs": j["logs"][-800:]} | |
| def query(req: QueryRequest, x_auth_token: Optional[str] = Header(default=None)): | |
| if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN: | |
| raise HTTPException(401, "Unauthorized") | |
| _check_backend_ready() | |
| k = int(max(1, min(50, req.top_k or 6))) | |
| vecs, _ = _post_embeddings([req.query]) | |
| col = f"proj_{req.project_id}" | |
| # Recherche selon le store actif | |
| try: | |
| hits = STORE.search(col, vecs[0], k) | |
| except Exception as e: | |
| raise HTTPException(400, f"Search failed: {e}") | |
| out = [] | |
| # Qdrant renvoie des objets avec .score, .payload | |
| for p in hits: | |
| pl = getattr(p, "payload", None) or {} | |
| score = float(getattr(p, "score", 0.0)) | |
| txt = pl.get("text") | |
| if txt and len(txt) > 800: | |
| txt = txt[:800] + "..." | |
| out.append({ | |
| "path": pl.get("path"), | |
| "chunk": pl.get("chunk"), | |
| "start": pl.get("start"), | |
| "end": pl.get("end"), | |
| "text": txt, | |
| "score": score, | |
| }) | |
| return {"results": out} | |
| def wipe_collection(project_id: str, x_auth_token: Optional[str] = Header(default=None)): | |
| if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN: | |
| raise HTTPException(401, "Unauthorized") | |
| col = f"proj_{project_id}" | |
| try: | |
| STORE.wipe(col); return {"ok": True} | |
| except Exception as e: | |
| raise HTTPException(400, f"wipe failed: {e}") | |
| # ---------- Entrypoint ---------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", "7860")) | |
| LOG.info(f"===== Application Startup on PORT {port} =====") | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |