chouchouvs commited on
Commit
7a8bf2f
·
verified ·
1 Parent(s): e08033d

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +265 -300
main.py CHANGED
@@ -1,91 +1,87 @@
1
  # -*- coding: utf-8 -*-
2
  from __future__ import annotations
3
- import os, time, uuid, logging, random
 
 
 
 
 
 
4
  from typing import List, Optional, Dict, Any, Tuple
5
 
6
  import numpy as np
7
  import requests
8
- from fastapi import FastAPI, BackgroundTasks, Header, HTTPException, Query
9
  from pydantic import BaseModel, Field
10
 
11
- # Qdrant (optionnel si VECTOR_STORE=memory)
12
- try:
13
- from qdrant_client import QdrantClient
14
- from qdrant_client.http.models import VectorParams, Distance, PointStruct
15
- except Exception: # si non installé, on retombe en mémoire
16
- QdrantClient = None
17
- VectorParams = Distance = PointStruct = None
18
-
19
- # ---------- logging ----------
20
  logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s")
21
  LOG = logging.getLogger("remote_indexer")
22
 
23
- # ---------- ENV (config) ----------
24
- # Choix du store: "qdrant" (par défaut) ou "memory"
25
- VECTOR_STORE = os.getenv("VECTOR_STORE", "qdrant").strip().lower()
26
-
27
- # Ordre des backends d'embeddings à essayer. Par défaut: DeepInfra, puis HF.
28
- DEFAULT_BACKENDS = "deepinfra,hf"
29
- EMB_BACKEND_ORDER = [s.strip().lower()
30
- for s in os.getenv("EMB_BACKEND_ORDER", os.getenv("EMB_BACKEND", DEFAULT_BACKENDS)).split(",")
31
- if s.strip()]
32
-
33
- ALLOW_DI_AUTOFALLBACK = os.getenv("ALLOW_DI_AUTOFALLBACK", "true").lower() in ("1","true","yes","on")
34
-
35
- # HF Inference API
36
- HF_TOKEN = os.getenv("HF_API_TOKEN", "").strip()
37
- HF_MODEL = os.getenv("HF_EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2").strip()
38
 
39
- HF_API_URL_USER = os.getenv("HF_API_URL", "").strip()
40
- HF_API_URL_PIPELINE = os.getenv("HF_API_URL_PIPELINE", "").strip()
41
- HF_API_URL_MODELS = os.getenv("HF_API_URL_MODELS", "").strip()
42
- if HF_API_URL_USER:
43
- if "/pipeline" in HF_API_URL_USER:
44
- HF_API_URL_PIPELINE = HF_API_URL_USER
45
- else:
46
- HF_API_URL_MODELS = HF_API_URL_USER
47
-
48
- HF_URL_PIPELINE = (HF_API_URL_PIPELINE or f"https://api-inference.huggingface.co/pipeline/feature-extraction/{HF_MODEL}")
49
- HF_URL_MODELS = (HF_API_URL_MODELS or f"https://api-inference.huggingface.co/models/{HF_MODEL}")
50
 
51
- HF_TIMEOUT = float(os.getenv("EMB_TIMEOUT_SEC", "120"))
52
- HF_WAIT = os.getenv("HF_WAIT_FOR_MODEL", "true").lower() in ("1","true","yes","on")
53
- HF_PIPELINE_FIRST = os.getenv("HF_PIPELINE_FIRST", "true").lower() in ("1","true","yes","on")
54
-
55
- # DeepInfra (OpenAI-compatible embeddings)
56
  DI_TOKEN = os.getenv("DEEPINFRA_API_KEY", "").strip()
57
  DI_MODEL = os.getenv("DEEPINFRA_EMBED_MODEL", "BAAI/bge-m3").strip()
58
  DI_URL = os.getenv("DEEPINFRA_EMBED_URL", "https://api.deepinfra.com/v1/openai/embeddings").strip()
59
  DI_TIMEOUT = float(os.getenv("EMB_TIMEOUT_SEC", "120"))
60
 
61
- # Retries embeddings
62
- RETRY_MAX = int(os.getenv("EMB_RETRY_MAX", "6"))
63
- RETRY_BASE_SEC = float(os.getenv("EMB_RETRY_BASE", "1.5"))
64
- RETRY_JITTER = float(os.getenv("EMB_RETRY_JITTER", "0.35"))
65
-
66
- # Qdrant
67
- QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333").strip()
68
- QDRANT_API = os.getenv("QDRANT_API_KEY", "").strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
- # Auth d’API du service (simple header)
 
71
  AUTH_TOKEN = os.getenv("REMOTE_INDEX_TOKEN", "").strip()
72
 
73
  LOG.info(f"Embeddings backend order = {EMB_BACKEND_ORDER}")
74
- LOG.info(f"HF pipeline URL = {HF_URL_PIPELINE}")
75
- LOG.info(f"HF models URL = {HF_URL_MODELS}")
76
  LOG.info(f"VECTOR_STORE = {VECTOR_STORE}")
77
- if "hf" in EMB_BACKEND_ORDER and not HF_TOKEN:
78
- LOG.warning("HF_API_TOKEN manquant — tentatives HF échoueront.")
79
  if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN:
80
  LOG.warning("DEEPINFRA_API_KEY manquant — tentatives DeepInfra échoueront.")
 
 
81
 
82
- # ---------- Vector store abstraction ----------
83
- # ---------- Vector Stores ----------
84
- from typing import Dict, Any, List, Optional, Tuple
85
- import numpy as np
86
- import logging
87
-
88
- LOG = logging.getLogger("remote_indexer")
89
 
90
  try:
91
  from qdrant_client import QdrantClient
@@ -94,19 +90,16 @@ except Exception:
94
  QdrantClient = None
95
  PointStruct = None
96
 
97
-
98
  class BaseStore:
99
  def ensure_collection(self, name: str, dim: int): ...
100
  def upsert(self, name: str, vectors: np.ndarray, payloads: List[Dict[str, Any]]) -> int: ...
101
  def search(self, name: str, query_vec: np.ndarray, top_k: int) -> List[Dict[str, Any]]: ...
102
  def wipe(self, name: str): ...
103
 
104
-
105
  class MemoryStore(BaseStore):
106
- """Store en mémoire (volatile)."""
107
  def __init__(self):
108
- # { collection: {"vecs": [np.ndarray], "payloads": [dict]} }
109
- self.db: Dict[str, Dict[str, List[Any]]] = {}
110
 
111
  def ensure_collection(self, name: str, dim: int):
112
  self.db.setdefault(name, {"vecs": [], "payloads": [], "dim": dim})
@@ -116,18 +109,17 @@ class MemoryStore(BaseStore):
116
  raise RuntimeError(f"MemoryStore: collection {name} inconnue")
117
  if len(vectors) != len(payloads):
118
  raise ValueError("MemoryStore.upsert: tailles vectors/payloads incohérentes")
119
- self.db[name]["vecs"].extend([v.astype(np.float32) for v in vectors])
120
  self.db[name]["payloads"].extend(payloads)
121
  return len(vectors)
122
 
123
  def search(self, name: str, query_vec: np.ndarray, top_k: int) -> List[Dict[str, Any]]:
124
  if name not in self.db or not self.db[name]["vecs"]:
125
  return []
126
- mat = np.vstack(self.db[name]["vecs"]) # [N, dim]
127
- q = query_vec.reshape(1, -1).astype(np.float32) # [1, dim]
128
- # cosine similarity sur vecteurs normalisés
129
- # (on suppose que les embeddings sont déjà normalisés en amont)
130
- sims = (mat @ q.T).ravel() # [N]
131
  top_idx = np.argsort(-sims)[:top_k]
132
  out = []
133
  for i in top_idx:
@@ -139,27 +131,22 @@ class MemoryStore(BaseStore):
139
  def wipe(self, name: str):
140
  self.db.pop(name, None)
141
 
142
-
143
  class QdrantStore(BaseStore):
144
- """Store Qdrant avec gestion d'IDs séquentiels par collection."""
145
  def __init__(self, url: str, api_key: Optional[str] = None):
146
  if QdrantClient is None or PointStruct is None:
147
  raise RuntimeError("qdrant_client non disponible")
148
  self.client = QdrantClient(url=url, api_key=api_key if api_key else None)
149
- # compteur d'IDs par collection
150
  self._next_ids: Dict[str, int] = {}
151
 
152
  def _init_next_id(self, name: str):
153
- # on cherche le count exact des points existants pour démarrer l'ID à count
154
  try:
155
  cnt = self.client.count(collection_name=name, exact=True).count
156
  except Exception:
157
- # si count échoue (collection vide juste créée), on démarre à 0
158
  cnt = 0
159
  self._next_ids[name] = int(cnt)
160
 
161
  def ensure_collection(self, name: str, dim: int):
162
- # si existe déjà, rien à faire ; sinon, création
163
  try:
164
  self.client.get_collection(name)
165
  except Exception:
@@ -167,7 +154,6 @@ class QdrantStore(BaseStore):
167
  collection_name=name,
168
  vectors_config=VectorParams(size=dim, distance=Distance.COSINE),
169
  )
170
- # initialiser le prochain id si absent
171
  if name not in self._next_ids:
172
  self._init_next_id(name)
173
 
@@ -176,14 +162,14 @@ class QdrantStore(BaseStore):
176
  return 0
177
  if len(vectors) != len(payloads):
178
  raise ValueError("QdrantStore.upsert: tailles vectors/payloads incohérentes")
179
-
180
  if name not in self._next_ids:
181
  self._init_next_id(name)
182
 
183
  start = self._next_ids[name]
184
- # construction des points avec IDs séquentiels (int)
185
  pts = [
186
- PointStruct(id=start + i, vector=v.astype(np.float32).tolist(), payload=payloads[i])
 
 
187
  for i, v in enumerate(vectors)
188
  ]
189
  self.client.upsert(collection_name=name, points=pts)
@@ -201,7 +187,10 @@ class QdrantStore(BaseStore):
201
  out = []
202
  for p in res:
203
  pl = p.payload or {}
204
- pl["_score"] = float(p.score) if hasattr(p, "score") else None
 
 
 
205
  out.append(pl)
206
  return out
207
 
@@ -212,47 +201,25 @@ class QdrantStore(BaseStore):
212
  pass
213
  self._next_ids.pop(name, None)
214
 
215
-
216
- # ---------- Initialisation du store actif ----------
217
- import os
218
-
219
- VECTOR_STORE = os.getenv("VECTOR_STORE", "qdrant").strip().lower()
220
- QDRANT_URL = os.getenv("QDRANT_URL", "").strip()
221
- QDRANT_API = os.getenv("QDRANT_API_KEY", "").strip()
222
-
223
  try:
224
  if VECTOR_STORE == "qdrant" and QDRANT_URL:
225
- STORE: BaseStore = QdrantStore(QDRANT_URL, api_key=QDRANT_API)
226
- # test léger: liste des collections
227
- _ = STORE.client.get_collections()
228
  LOG.info("Connecté à Qdrant.")
229
  VECTOR_STORE_ACTIVE = "QdrantStore"
230
  else:
231
  raise RuntimeError("Qdrant non configuré, fallback mémoire.")
232
  except Exception as e:
233
- LOG.error(f"Qdrant indisponible ({e}) — fallback en mémoire.")
234
  STORE = MemoryStore()
235
  VECTOR_STORE_ACTIVE = "MemoryStore"
236
  LOG.warning("Vector store: MEMORY (fallback). Les données sont volatiles (perdues au restart).")
237
 
 
 
 
238
 
239
- # Sélection / auto-fallback du store
240
- STORE: VectorStoreBase
241
- def _init_store() -> VectorStoreBase:
242
- prefer = VECTOR_STORE
243
- if prefer == "memory":
244
- return MemoryStore()
245
-
246
- # prefer qdrant
247
- try:
248
- return QdrantStore(QDRANT_URL, QDRANT_API if QDRANT_API else None)
249
- except Exception as e:
250
- LOG.error(f"Qdrant indisponible ({e}) — fallback en mémoire.")
251
- return MemoryStore()
252
-
253
- STORE = _init_store()
254
-
255
- # ---------- Pydantic ----------
256
  class FileIn(BaseModel):
257
  path: str
258
  text: str
@@ -270,103 +237,47 @@ class QueryRequest(BaseModel):
270
  query: str
271
  top_k: int = 6
272
 
273
- # ---------- Jobs store ----------
274
- JOBS: Dict[str, Dict[str, Any]] = {}
 
 
275
 
276
  def _append_log(job_id: str, line: str):
277
  job = JOBS.get(job_id)
278
- if job: job["logs"].append(line)
 
279
 
280
  def _set_status(job_id: str, status: str):
281
  job = JOBS.get(job_id)
282
- if job: job["status"] = status
 
283
 
284
  def _auth(x_auth: Optional[str]):
285
  if AUTH_TOKEN and (x_auth or "") != AUTH_TOKEN:
286
- raise HTTPException(status_code=401, detail="Unauthorized")
 
 
 
 
287
 
288
- # ---------- Helpers retry ----------
289
- def _retry_sleep(attempt: int):
290
  back = (RETRY_BASE_SEC ** attempt)
291
  jitter = 1.0 + random.uniform(-RETRY_JITTER, RETRY_JITTER)
292
  return max(0.25, back * jitter)
293
 
294
- def _with_task_param(url: str, task: str = "feature-extraction") -> str:
295
- return url + ("&" if "?" in url else "?") + f"task={task}"
296
-
297
- # ---------- HF embeddings ----------
298
- def _hf_http(url: str, payload: Dict[str, Any], headers_extra: Optional[Dict[str, str]] = None) -> Tuple[np.ndarray, int]:
299
- if not HF_TOKEN:
300
- raise RuntimeError("HF_API_TOKEN manquant (backend=hf).")
301
- headers = {
302
- "Authorization": f"Bearer {HF_TOKEN}",
303
- "Content-Type": "application/json",
304
- "Accept": "application/json",
305
- }
306
- if HF_WAIT:
307
- payload.setdefault("options", {})["wait_for_model"] = True
308
- headers["X-Wait-For-Model"] = "true"
309
- headers["X-Use-Cache"] = "true"
310
- if headers_extra:
311
- headers.update(headers_extra)
312
-
313
- r = requests.post(url, headers=headers, json=payload, timeout=HF_TIMEOUT)
314
- size = int(r.headers.get("Content-Length", "0"))
315
- if r.status_code >= 400:
316
- LOG.error(f"HF error {r.status_code}: {r.text[:1000]}")
317
- r.raise_for_status()
318
-
319
- data = r.json()
320
- arr = np.array(data, dtype=np.float32)
321
- if arr.ndim == 3:
322
- arr = arr.mean(axis=1)
323
- elif arr.ndim == 1:
324
- arr = arr.reshape(1, -1)
325
- if arr.ndim != 2:
326
- raise RuntimeError(f"HF: unexpected embeddings shape: {arr.shape}")
327
-
328
  norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12
329
- arr = arr / norms
330
- return arr.astype(np.float32), size
331
-
332
- def _hf_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]:
333
- payload: Dict[str, Any] = {"inputs": (batch if len(batch) > 1 else batch[0])}
334
- urls = [HF_URL_PIPELINE, HF_URL_MODELS] if HF_PIPELINE_FIRST else [HF_URL_MODELS, HF_URL_PIPELINE]
335
- last_exc: Optional[Exception] = None
336
- for idx, url in enumerate(urls, 1):
337
- try:
338
- if "/models/" in url:
339
- return _hf_http(url, payload, headers_extra={"X-Task": "feature-extraction"})
340
- else:
341
- return _hf_http(url, payload, headers_extra=None)
342
- except requests.HTTPError as he:
343
- code = he.response.status_code if he.response is not None else 0
344
- body = he.response.text if he.response is not None else ""
345
- last_exc = he
346
- if code in (404, 405, 501) and idx < len(urls):
347
- LOG.warning(f"HF endpoint {url} non dispo ({code}), fallback vers alternative ...")
348
- continue
349
- if "/models/" in url and "SentenceSimilarityPipeline" in (body or ""):
350
- try:
351
- forced_url = _with_task_param(url, "feature-extraction")
352
- LOG.warning("HF MODELS a choisi Similarity -> retry avec %s + X-Task", forced_url)
353
- return _hf_http(forced_url, payload, headers_extra={"X-Task": "feature-extraction"})
354
- except Exception as he2:
355
- last_exc = he2
356
- raise
357
- except Exception as e:
358
- last_exc = e
359
- raise
360
- raise RuntimeError(f"HF: aucun endpoint utilisable ({last_exc})")
361
 
362
- # ---------- DeepInfra embeddings ----------
363
  def _di_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]:
364
  if not DI_TOKEN:
365
  raise RuntimeError("DEEPINFRA_API_KEY manquant (backend=deepinfra).")
366
- headers = {"Authorization": f"Bearer {DI_TOKEN}", "Content-Type": "application/json", "Accept": "application/json"}
367
  payload = {"model": DI_MODEL, "input": batch}
368
  r = requests.post(DI_URL, headers=headers, json=payload, timeout=DI_TIMEOUT)
369
- size = int(r.headers.get("Content-Length", "0"))
370
  if r.status_code >= 400:
371
  LOG.error(f"DeepInfra error {r.status_code}: {r.text[:1000]}")
372
  r.raise_for_status()
@@ -378,15 +289,92 @@ def _di_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]:
378
  arr = np.asarray(embs, dtype=np.float32)
379
  if arr.ndim != 2:
380
  raise RuntimeError(f"DeepInfra: unexpected embeddings shape: {arr.shape}")
381
- norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12
382
- arr = arr / norms
383
- return arr.astype(np.float32), size
384
 
385
- # ---------- Retry orchestrator ----------
386
- def _retry_sleep(attempt: int):
387
- back = (RETRY_BASE_SEC ** attempt)
388
- jitter = 1.0 + random.uniform(-RETRY_JITTER, RETRY_JITTER)
389
- return max(0.25, back * jitter)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
390
 
391
  def _call_with_retries(func, batch: List[str], label: str, job_id: Optional[str] = None) -> Tuple[np.ndarray, int]:
392
  last_exc = None
@@ -414,35 +402,43 @@ def _call_with_retries(func, batch: List[str], label: str, job_id: Optional[str]
414
  raise RuntimeError(f"{label}: retries exhausted: {last_exc}")
415
 
416
  def _post_embeddings(batch: List[str], job_id: Optional[str] = None) -> Tuple[np.ndarray, int]:
 
 
 
 
417
  last_err = None
418
- similarity_misroute = False
419
  for b in EMB_BACKEND_ORDER:
420
- if b == "hf":
421
- try:
422
- return _call_with_retries(_hf_post_embeddings_once, batch, "HF", job_id)
423
- except requests.HTTPError as he:
424
- body = he.response.text if getattr(he, "response", None) is not None else ""
425
- if "SentenceSimilarityPipeline.__call__()" in (body or ""):
426
- similarity_misroute = True
427
- last_err = he
428
- _append_log(job_id, f"HF failed: {he}.")
429
- LOG.error(f"HF failed: {he}")
430
- elif b == "deepinfra":
431
  try:
432
  return _call_with_retries(_di_post_embeddings_once, batch, "DeepInfra", job_id)
433
  except Exception as e:
434
  last_err = e
435
  _append_log(job_id, f"DeepInfra failed: {e}.")
436
  LOG.error(f"DeepInfra failed: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
437
  else:
438
  _append_log(job_id, f"Backend inconnu ignoré: {b}")
439
- if ALLOW_DI_AUTOFALLBACK and similarity_misroute and DI_TOKEN:
440
- LOG.warning("HF a routé sur SentenceSimilarity => auto-fallback DeepInfra (override ordre).")
441
- _append_log(job_id, "Auto-fallback DeepInfra (HF => SentenceSimilarity).")
442
- return _call_with_retries(_di_post_embeddings_once, batch, "DeepInfra", job_id)
443
  raise RuntimeError(f"Tous les backends ont échoué: {last_err}")
444
 
445
- # ---------- Chunking ----------
 
 
 
446
  def _chunk_with_spans(text: str, size: int, overlap: int):
447
  n = len(text or "")
448
  if size <= 0:
@@ -452,66 +448,71 @@ def _chunk_with_spans(text: str, size: int, overlap: int):
452
  j = min(n, i + size)
453
  yield (i, j, text[i:j])
454
  i = max(0, j - overlap)
455
- if i >= n: break
 
 
 
 
 
456
 
457
- # ---------- Background task ----------
458
  def run_index_job(job_id: str, req: IndexRequest):
459
  try:
460
  _set_status(job_id, "running")
461
- total_chunks = 0
462
  _append_log(job_id, f"Start project={req.project_id} files={len(req.files)} | backends={EMB_BACKEND_ORDER} | store={VECTOR_STORE}")
463
  LOG.info(f"[{job_id}] Index start project={req.project_id} files={len(req.files)}")
464
 
465
  # Warmup -> dimension
466
- warm = "warmup"
467
- if req.files:
468
- for _, _, chunk_txt in _chunk_with_spans(req.files[0].text or "", req.chunk_size, req.overlap):
469
- if (chunk_txt or "").strip():
470
- warm = chunk_txt; break
471
  embs, _ = _post_embeddings([warm], job_id=job_id)
472
  dim = embs.shape[1]
473
  col = f"proj_{req.project_id}"
 
 
474
  STORE.ensure_collection(col, dim)
475
  _append_log(job_id, f"Collection ready: {col} (dim={dim})")
476
 
477
- # loop fichiers
 
 
 
 
 
 
 
 
 
 
 
 
 
 
478
  for fi, f in enumerate(req.files, 1):
479
- if not (f.text or "").strip():
480
- _append_log(job_id, f"file {fi}: vide — ignoré")
481
- continue
482
-
483
- batch_txts, metas = [], []
484
- def _flush():
485
- nonlocal batch_txts, metas, total_chunks
486
- if not batch_txts: return
487
- vecs, sz = _post_embeddings(batch_txts, job_id=job_id)
488
- added = STORE.upsert(col, vecs, metas)
489
- total_chunks += added
490
- _append_log(job_id, f"file {fi}/{len(req.files)}: +{added} chunks (total={total_chunks})")
491
- batch_txts, metas = [], []
492
-
493
  for ci, (start, end, chunk_txt) in enumerate(_chunk_with_spans(f.text, req.chunk_size, req.overlap)):
494
- if not (chunk_txt or "").strip():
495
- continue
496
- batch_txts.append(chunk_txt)
497
  meta = {"path": f.path, "chunk": ci, "start": start, "end": end}
498
  if req.store_text:
499
  meta["text"] = chunk_txt
500
- metas.append(meta)
501
- if len(batch_txts) >= req.batch_size:
502
  _flush()
503
-
 
504
  _flush()
 
505
 
506
  _append_log(job_id, f"Done. chunks={total_chunks}")
507
  _set_status(job_id, "done")
508
  LOG.info(f"[{job_id}] Index finished. chunks={total_chunks}")
 
509
  except Exception as e:
510
  LOG.exception("Index job failed")
511
  _append_log(job_id, f"ERROR: {e}")
512
  _set_status(job_id, "error")
513
 
514
- # ---------- API ----------
 
 
 
515
  app = FastAPI()
516
 
517
  @app.get("/")
@@ -520,18 +521,18 @@ def root():
520
  "ok": True,
521
  "service": "remote-indexer",
522
  "backends": EMB_BACKEND_ORDER,
523
- "hf_url_pipeline": HF_URL_PIPELINE if "hf" in EMB_BACKEND_ORDER else None,
524
- "hf_url_models": HF_URL_MODELS if "hf" in EMB_BACKEND_ORDER else None,
525
  "di_url": DI_URL if "deepinfra" in EMB_BACKEND_ORDER else None,
526
  "di_model": DI_MODEL if "deepinfra" in EMB_BACKEND_ORDER else None,
527
  "vector_store": VECTOR_STORE,
528
- "vector_store_active": type(STORE).__name__,
529
- "docs": "/health, /index, /status/{job_id}, /query, /wipe"
530
  }
531
 
532
  @app.get("/health")
533
  def health():
534
- return {"ok": True}
535
 
536
  def _check_backend_ready():
537
  if "hf" in EMB_BACKEND_ORDER and not HF_TOKEN:
@@ -541,14 +542,8 @@ def _check_backend_ready():
541
 
542
  @app.post("/index")
543
  def start_index(req: IndexRequest, background_tasks: BackgroundTasks, x_auth_token: Optional[str] = Header(default=None)):
544
- if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN:
545
- raise HTTPException(401, "Unauthorized")
546
  _check_backend_ready()
547
- non_empty = [f for f in req.files if (f.text or "").strip()]
548
- if not non_empty:
549
- raise HTTPException(422, "Aucun fichier non vide à indexer.")
550
- req.files = non_empty
551
-
552
  job_id = uuid.uuid4().hex[:12]
553
  JOBS[job_id] = {"status": "queued", "logs": [], "created": time.time()}
554
  background_tasks.add_task(run_index_job, job_id, req)
@@ -556,58 +551,25 @@ def start_index(req: IndexRequest, background_tasks: BackgroundTasks, x_auth_tok
556
 
557
  @app.get("/status/{job_id}")
558
  def status(job_id: str, x_auth_token: Optional[str] = Header(default=None)):
559
- if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN:
560
- raise HTTPException(401, "Unauthorized")
561
- j = JOBS.get(job_id)
562
- if not j:
563
- raise HTTPException(404, "job inconnu")
564
- return {"status": j["status"], "logs": j["logs"][-800:]}
565
-
566
- # Legacy compat
567
- @app.get("/status")
568
- def status_qp(job_id: str = Query(None), x_auth_token: Optional[str] = Header(default=None)):
569
- if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN:
570
- raise HTTPException(401, "Unauthorized")
571
- if not job_id:
572
- raise HTTPException(404, "job inconnu")
573
  j = JOBS.get(job_id)
574
  if not j:
575
  raise HTTPException(404, "job inconnu")
576
- return {"status": j["status"], "logs": j["logs"][-800:]}
577
-
578
- class _StatusBody(BaseModel):
579
- job_id: str
580
-
581
- @app.post("/status")
582
- def status_post(body: _StatusBody, x_auth_token: Optional[str] = Header(default=None)):
583
- if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN:
584
- raise HTTPException(401, "Unauthorized")
585
- j = JOBS.get(body.job_id)
586
- if not j:
587
- raise HTTPException(404, "job inconnu")
588
- return {"status": j["status"], "logs": j["logs"][-800:]}
589
 
590
  @app.post("/query")
591
  def query(req: QueryRequest, x_auth_token: Optional[str] = Header(default=None)):
592
- if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN:
593
- raise HTTPException(401, "Unauthorized")
594
  _check_backend_ready()
595
- k = int(max(1, min(50, req.top_k or 6)))
596
-
597
  vecs, _ = _post_embeddings([req.query])
598
  col = f"proj_{req.project_id}"
599
-
600
- # Recherche selon le store actif
601
  try:
602
- hits = STORE.search(col, vecs[0], k)
603
  except Exception as e:
604
  raise HTTPException(400, f"Search failed: {e}")
605
-
606
  out = []
607
- # Qdrant renvoie des objets avec .score, .payload
608
- for p in hits:
609
- pl = getattr(p, "payload", None) or {}
610
- score = float(getattr(p, "score", 0.0))
611
  txt = pl.get("text")
612
  if txt and len(txt) > 800:
613
  txt = txt[:800] + "..."
@@ -617,21 +579,24 @@ def query(req: QueryRequest, x_auth_token: Optional[str] = Header(default=None))
617
  "start": pl.get("start"),
618
  "end": pl.get("end"),
619
  "text": txt,
620
- "score": score,
621
  })
622
  return {"results": out}
623
 
624
  @app.post("/wipe")
625
  def wipe_collection(project_id: str, x_auth_token: Optional[str] = Header(default=None)):
626
- if AUTH_TOKEN and (x_auth_token or "") != AUTH_TOKEN:
627
- raise HTTPException(401, "Unauthorized")
628
  col = f"proj_{project_id}"
629
  try:
630
- STORE.wipe(col); return {"ok": True}
 
631
  except Exception as e:
632
  raise HTTPException(400, f"wipe failed: {e}")
633
 
634
- # ---------- Entrypoint ----------
 
 
 
635
  if __name__ == "__main__":
636
  import uvicorn
637
  port = int(os.getenv("PORT", "7860"))
 
1
  # -*- coding: utf-8 -*-
2
  from __future__ import annotations
3
+
4
+ import os
5
+ import time
6
+ import uuid
7
+ import math
8
+ import random
9
+ import logging
10
  from typing import List, Optional, Dict, Any, Tuple
11
 
12
  import numpy as np
13
  import requests
14
+ from fastapi import FastAPI, BackgroundTasks, Header, HTTPException
15
  from pydantic import BaseModel, Field
16
 
17
+ # ======================================================================================
18
+ # Logging
19
+ # ======================================================================================
 
 
 
 
 
 
20
  logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s")
21
  LOG = logging.getLogger("remote_indexer")
22
 
23
+ # ======================================================================================
24
+ # ENV (config)
25
+ # ======================================================================================
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
+ # Ordre des backends d'embeddings à essayer (séparés par des virgules). Ex: "deepinfra,hf"
28
+ EMB_BACKEND_ORDER = [
29
+ s.strip().lower()
30
+ for s in os.getenv("EMB_BACKEND_ORDER", os.getenv("EMB_BACKEND", "deepinfra,hf")).split(",")
31
+ if s.strip()
32
+ ]
 
 
 
 
 
33
 
34
+ # --- DeepInfra Embeddings (OpenAI-like) ---
35
+ # API: POST https://api.deepinfra.com/v1/openai/embeddings
36
+ # Body: {"model":"BAAI/bge-m3","input":[text1,text2,...]}
 
 
37
  DI_TOKEN = os.getenv("DEEPINFRA_API_KEY", "").strip()
38
  DI_MODEL = os.getenv("DEEPINFRA_EMBED_MODEL", "BAAI/bge-m3").strip()
39
  DI_URL = os.getenv("DEEPINFRA_EMBED_URL", "https://api.deepinfra.com/v1/openai/embeddings").strip()
40
  DI_TIMEOUT = float(os.getenv("EMB_TIMEOUT_SEC", "120"))
41
 
42
+ # --- Hugging Face Inference API ---
43
+ # Deux endpoints possibles :
44
+ # 1) Pipeline feature-extraction (souvent 404 selon le modèle)
45
+ # 2) Models (parfois route sur SentenceSimilarity => besoin de forcer feature-extraction)
46
+ HF_TOKEN = os.getenv("HF_API_TOKEN", "").strip()
47
+ HF_MODEL = os.getenv("HF_EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2").strip()
48
+ HF_URL_PIPE = os.getenv("HF_API_URL_PIPELINE", "").strip() or (
49
+ f"https://api-inference.huggingface.co/pipeline/feature-extraction/{HF_MODEL}"
50
+ )
51
+ HF_URL_MODL = os.getenv("HF_API_URL_MODELS", "").strip() or (
52
+ f"https://api-inference.huggingface.co/models/{HF_MODEL}"
53
+ )
54
+ HF_TIMEOUT = float(os.getenv("EMB_TIMEOUT_SEC", "120"))
55
+ HF_WAIT = os.getenv("HF_WAIT_FOR_MODEL", "true").lower() in ("1", "true", "yes", "on")
56
+
57
+ # --- Retries / backoff ---
58
+ RETRY_MAX = int(os.getenv("EMB_RETRY_MAX", "6")) # tentatives max par backend
59
+ RETRY_BASE_SEC = float(os.getenv("EMB_RETRY_BASE", "1.6")) # base du backoff exponentiel
60
+ RETRY_JITTER = float(os.getenv("EMB_RETRY_JITTER", "0.35")) # jitter fraction (0..1)
61
+
62
+ # --- Vector store (Qdrant / Memory fallback) ---
63
+ VECTOR_STORE = os.getenv("VECTOR_STORE", "qdrant").strip().lower()
64
+ QDRANT_URL = os.getenv("QDRANT_URL", "").strip()
65
+ QDRANT_API = os.getenv("QDRANT_API_KEY", "").strip()
66
 
67
+ # --- Auth d’API de ce service (simple header) ---
68
+ # Si défini, le client doit envoyer X-Auth-Token:{REMOTE_INDEX_TOKEN}
69
  AUTH_TOKEN = os.getenv("REMOTE_INDEX_TOKEN", "").strip()
70
 
71
  LOG.info(f"Embeddings backend order = {EMB_BACKEND_ORDER}")
72
+ LOG.info(f"HF pipeline URL = {HF_URL_PIPE}")
73
+ LOG.info(f"HF models URL = {HF_URL_MODL}")
74
  LOG.info(f"VECTOR_STORE = {VECTOR_STORE}")
75
+
 
76
  if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN:
77
  LOG.warning("DEEPINFRA_API_KEY manquant — tentatives DeepInfra échoueront.")
78
+ if "hf" in EMB_BACKEND_ORDER and not HF_TOKEN:
79
+ LOG.warning("HF_API_TOKEN manquant — tentatives HF échoueront.")
80
 
81
+ # ======================================================================================
82
+ # Vector Stores (Memory + Qdrant)
83
+ # ======================================================================================
84
+ from typing import Iterable
 
 
 
85
 
86
  try:
87
  from qdrant_client import QdrantClient
 
90
  QdrantClient = None
91
  PointStruct = None
92
 
 
93
  class BaseStore:
94
  def ensure_collection(self, name: str, dim: int): ...
95
  def upsert(self, name: str, vectors: np.ndarray, payloads: List[Dict[str, Any]]) -> int: ...
96
  def search(self, name: str, query_vec: np.ndarray, top_k: int) -> List[Dict[str, Any]]: ...
97
  def wipe(self, name: str): ...
98
 
 
99
  class MemoryStore(BaseStore):
100
+ """Store en mémoire (volatile) — pour fallback et tests."""
101
  def __init__(self):
102
+ self.db: Dict[str, Dict[str, Any]] = {} # name -> {"vecs":[np.ndarray], "payloads":[dict], "dim":int}
 
103
 
104
  def ensure_collection(self, name: str, dim: int):
105
  self.db.setdefault(name, {"vecs": [], "payloads": [], "dim": dim})
 
109
  raise RuntimeError(f"MemoryStore: collection {name} inconnue")
110
  if len(vectors) != len(payloads):
111
  raise ValueError("MemoryStore.upsert: tailles vectors/payloads incohérentes")
112
+ self.db[name]["vecs"].extend([np.asarray(v, dtype=np.float32) for v in vectors])
113
  self.db[name]["payloads"].extend(payloads)
114
  return len(vectors)
115
 
116
  def search(self, name: str, query_vec: np.ndarray, top_k: int) -> List[Dict[str, Any]]:
117
  if name not in self.db or not self.db[name]["vecs"]:
118
  return []
119
+ mat = np.vstack(self.db[name]["vecs"]).astype(np.float32) # [N, dim]
120
+ q = query_vec.reshape(1, -1).astype(np.float32) # [1, dim]
121
+ # cosine similarity (embeddings déjà normalisés en amont)
122
+ sims = (mat @ q.T).ravel()
 
123
  top_idx = np.argsort(-sims)[:top_k]
124
  out = []
125
  for i in top_idx:
 
131
  def wipe(self, name: str):
132
  self.db.pop(name, None)
133
 
 
134
  class QdrantStore(BaseStore):
135
+ """Store Qdrant avec gestion d'IDs séquentiels (requis par PointStruct)."""
136
  def __init__(self, url: str, api_key: Optional[str] = None):
137
  if QdrantClient is None or PointStruct is None:
138
  raise RuntimeError("qdrant_client non disponible")
139
  self.client = QdrantClient(url=url, api_key=api_key if api_key else None)
 
140
  self._next_ids: Dict[str, int] = {}
141
 
142
  def _init_next_id(self, name: str):
 
143
  try:
144
  cnt = self.client.count(collection_name=name, exact=True).count
145
  except Exception:
 
146
  cnt = 0
147
  self._next_ids[name] = int(cnt)
148
 
149
  def ensure_collection(self, name: str, dim: int):
 
150
  try:
151
  self.client.get_collection(name)
152
  except Exception:
 
154
  collection_name=name,
155
  vectors_config=VectorParams(size=dim, distance=Distance.COSINE),
156
  )
 
157
  if name not in self._next_ids:
158
  self._init_next_id(name)
159
 
 
162
  return 0
163
  if len(vectors) != len(payloads):
164
  raise ValueError("QdrantStore.upsert: tailles vectors/payloads incohérentes")
 
165
  if name not in self._next_ids:
166
  self._init_next_id(name)
167
 
168
  start = self._next_ids[name]
 
169
  pts = [
170
+ PointStruct(id=start + i,
171
+ vector=np.asarray(v, dtype=np.float32).tolist(),
172
+ payload=payloads[i])
173
  for i, v in enumerate(vectors)
174
  ]
175
  self.client.upsert(collection_name=name, points=pts)
 
187
  out = []
188
  for p in res:
189
  pl = p.payload or {}
190
+ try:
191
+ pl["_score"] = float(p.score)
192
+ except Exception:
193
+ pl["_score"] = None
194
  out.append(pl)
195
  return out
196
 
 
201
  pass
202
  self._next_ids.pop(name, None)
203
 
204
+ # Initialisation du store actif (avec test de connexion)
 
 
 
 
 
 
 
205
  try:
206
  if VECTOR_STORE == "qdrant" and QDRANT_URL:
207
+ STORE: BaseStore = QdrantStore(QDRANT_URL, api_key=QDRANT_API if QDRANT_API else None)
208
+ _ = STORE.client.get_collections() # ping
 
209
  LOG.info("Connecté à Qdrant.")
210
  VECTOR_STORE_ACTIVE = "QdrantStore"
211
  else:
212
  raise RuntimeError("Qdrant non configuré, fallback mémoire.")
213
  except Exception as e:
214
+ LOG.error(f"Qdrant indisponible (Connexion Qdrant impossible: {e}) — fallback en mémoire.")
215
  STORE = MemoryStore()
216
  VECTOR_STORE_ACTIVE = "MemoryStore"
217
  LOG.warning("Vector store: MEMORY (fallback). Les données sont volatiles (perdues au restart).")
218
 
219
+ # ======================================================================================
220
+ # Pydantic I/O
221
+ # ======================================================================================
222
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
223
  class FileIn(BaseModel):
224
  path: str
225
  text: str
 
237
  query: str
238
  top_k: int = 6
239
 
240
+ # ======================================================================================
241
+ # Jobs store (mémoire)
242
+ # ======================================================================================
243
+ JOBS: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": "...", "logs": [...], "created": ts}}
244
 
245
  def _append_log(job_id: str, line: str):
246
  job = JOBS.get(job_id)
247
+ if job:
248
+ job["logs"].append(line)
249
 
250
  def _set_status(job_id: str, status: str):
251
  job = JOBS.get(job_id)
252
+ if job:
253
+ job["status"] = status
254
 
255
  def _auth(x_auth: Optional[str]):
256
  if AUTH_TOKEN and (x_auth or "") != AUTH_TOKEN:
257
+ raise HTTPException(401, "Unauthorized")
258
+
259
+ # ======================================================================================
260
+ # Embeddings backends + retry/fallback
261
+ # ======================================================================================
262
 
263
+ def _retry_sleep(attempt: int) -> float:
264
+ # backoff exponentiel + jitter
265
  back = (RETRY_BASE_SEC ** attempt)
266
  jitter = 1.0 + random.uniform(-RETRY_JITTER, RETRY_JITTER)
267
  return max(0.25, back * jitter)
268
 
269
+ def _normalize_rows(arr: np.ndarray) -> np.ndarray:
270
+ arr = np.asarray(arr, dtype=np.float32)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271
  norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12
272
+ return (arr / norms).astype(np.float32)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
273
 
 
274
  def _di_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]:
275
  if not DI_TOKEN:
276
  raise RuntimeError("DEEPINFRA_API_KEY manquant (backend=deepinfra).")
277
+ headers = {"Authorization": f"Bearer {DI_TOKEN}", "Content-Type": "application/json"}
278
  payload = {"model": DI_MODEL, "input": batch}
279
  r = requests.post(DI_URL, headers=headers, json=payload, timeout=DI_TIMEOUT)
280
+ size = int(r.headers.get("Content-Length", "0") or 0)
281
  if r.status_code >= 400:
282
  LOG.error(f"DeepInfra error {r.status_code}: {r.text[:1000]}")
283
  r.raise_for_status()
 
289
  arr = np.asarray(embs, dtype=np.float32)
290
  if arr.ndim != 2:
291
  raise RuntimeError(f"DeepInfra: unexpected embeddings shape: {arr.shape}")
292
+ return _normalize_rows(arr), size
 
 
293
 
294
+ def _hf_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]:
295
+ """
296
+ 1) On tente PIPELINE feature-extraction
297
+ 2) Si 404 => on tente MODELS
298
+ 2a) Si la route sélectionne SentenceSimilarity (erreur "missing 'sentences'"),
299
+ on reforce la tâche feature-extraction par ?task=feature-extraction + X-Task
300
+ """
301
+ if not HF_TOKEN:
302
+ raise RuntimeError("HF_API_TOKEN manquant (backend=hf).")
303
+ headers = {
304
+ "Authorization": f"Bearer {HF_TOKEN}",
305
+ "Content-Type": "application/json",
306
+ }
307
+ if HF_WAIT:
308
+ headers["X-Wait-For-Model"] = "true"
309
+ headers["X-Use-Cache"] = "true"
310
+
311
+ # Helper interne
312
+ def _call(url: str, payload: Dict[str, Any], extra_headers: Optional[Dict[str, str]] = None):
313
+ h = dict(headers)
314
+ if extra_headers:
315
+ h.update(extra_headers)
316
+ r = requests.post(url, headers=h, json=payload, timeout=HF_TIMEOUT)
317
+ return r
318
+
319
+ # 1) Pipeline
320
+ payload = {"inputs": batch if len(batch) > 1 else batch[0]}
321
+ r = _call(HF_URL_PIPE, payload)
322
+ size = int(r.headers.get("Content-Length", "0") or 0)
323
+ if r.status_code == 404:
324
+ LOG.error("HF error 404: Not Found")
325
+ LOG.warning(f"HF endpoint {HF_URL_PIPE} non dispo (404), fallback vers alternative ...")
326
+ elif r.status_code >= 400:
327
+ LOG.error(f"HF error {r.status_code}: {r.text[:1000]}")
328
+ r.raise_for_status()
329
+ # si on arrive ici, pas de fallback (raise)
330
+ else:
331
+ data = r.json()
332
+ arr = np.array(data, dtype=np.float32)
333
+ if arr.ndim == 3: # [batch, tokens, dim]
334
+ arr = arr.mean(axis=1)
335
+ if arr.ndim == 1:
336
+ arr = arr.reshape(1, -1)
337
+ if arr.ndim != 2:
338
+ raise RuntimeError(f"HF: unexpected embeddings shape: {arr.shape}")
339
+ return _normalize_rows(arr), size
340
+
341
+ # 2) MODELS
342
+ r2 = _call(HF_URL_MODL, payload)
343
+ size2 = int(r2.headers.get("Content-Length", "0") or 0)
344
+ if r2.status_code >= 400:
345
+ LOG.error(f"HF error {r2.status_code}: {r2.text[:1000]}")
346
+ # Si c'est la fameuse erreur Similarity => tenter X-Task + query param
347
+ if r2.status_code == 400 and "SentenceSimilarityPipeline" in (r2.text or ""):
348
+ LOG.warning("HF MODELS a choisi Similarity -> retry avec ?task=feature-extraction + X-Task")
349
+ r3 = _call(
350
+ HF_URL_MODL + "?task=feature-extraction",
351
+ payload,
352
+ extra_headers={"X-Task": "feature-extraction"}
353
+ )
354
+ size3 = int(r3.headers.get("Content-Length", "0") or 0)
355
+ if r3.status_code >= 400:
356
+ LOG.error(f"HF error {r3.status_code}: {r3.text[:1000]}")
357
+ r3.raise_for_status()
358
+ data3 = r3.json()
359
+ arr3 = np.array(data3, dtype=np.float32)
360
+ if arr3.ndim == 3:
361
+ arr3 = arr3.mean(axis=1)
362
+ if arr3.ndim == 1:
363
+ arr3 = arr3.reshape(1, -1)
364
+ if arr3.ndim != 2:
365
+ raise RuntimeError(f"HF: unexpected embeddings shape: {arr3.shape}")
366
+ return _normalize_rows(arr3), size3
367
+ else:
368
+ r2.raise_for_status()
369
+ data2 = r2.json()
370
+ arr2 = np.array(data2, dtype=np.float32)
371
+ if arr2.ndim == 3: # [batch, tokens, dim]
372
+ arr2 = arr2.mean(axis=1)
373
+ if arr2.ndim == 1:
374
+ arr2 = arr2.reshape(1, -1)
375
+ if arr2.ndim != 2:
376
+ raise RuntimeError(f"HF: unexpected embeddings shape: {arr2.shape}")
377
+ return _normalize_rows(arr2), size2
378
 
379
  def _call_with_retries(func, batch: List[str], label: str, job_id: Optional[str] = None) -> Tuple[np.ndarray, int]:
380
  last_exc = None
 
402
  raise RuntimeError(f"{label}: retries exhausted: {last_exc}")
403
 
404
  def _post_embeddings(batch: List[str], job_id: Optional[str] = None) -> Tuple[np.ndarray, int]:
405
+ """
406
+ Essaie les backends dans EMB_BACKEND_ORDER avec retries.
407
+ Ex: EMB_BACKEND_ORDER=deepinfra,hf
408
+ """
409
  last_err = None
 
410
  for b in EMB_BACKEND_ORDER:
411
+ if b == "deepinfra":
 
 
 
 
 
 
 
 
 
 
412
  try:
413
  return _call_with_retries(_di_post_embeddings_once, batch, "DeepInfra", job_id)
414
  except Exception as e:
415
  last_err = e
416
  _append_log(job_id, f"DeepInfra failed: {e}.")
417
  LOG.error(f"DeepInfra failed: {e}")
418
+ elif b == "hf":
419
+ try:
420
+ return _call_with_retries(_hf_post_embeddings_once, batch, "HF", job_id)
421
+ except Exception as e:
422
+ last_err = e
423
+ _append_log(job_id, f"HF failed: {e}.")
424
+ LOG.error(f"HF failed: {e}")
425
+ # Si HF route vers SentenceSimilarity (erreur 'sentences'), on peut tenter auto-fallback DI
426
+ if "SentenceSimilarityPipeline" in str(e) and "deepinfra" not in EMB_BACKEND_ORDER:
427
+ _append_log(job_id, "Auto-fallback DeepInfra (HF => SentenceSimilarity).")
428
+ try:
429
+ return _call_with_retries(_di_post_embeddings_once, batch, "DeepInfra", job_id)
430
+ except Exception as e2:
431
+ last_err = e2
432
+ _append_log(job_id, f"DeepInfra failed after HF: {e2}.")
433
+ LOG.error(f"DeepInfra failed after HF: {e2}")
434
  else:
435
  _append_log(job_id, f"Backend inconnu ignoré: {b}")
 
 
 
 
436
  raise RuntimeError(f"Tous les backends ont échoué: {last_err}")
437
 
438
+ # ======================================================================================
439
+ # Helpers chunking
440
+ # ======================================================================================
441
+
442
  def _chunk_with_spans(text: str, size: int, overlap: int):
443
  n = len(text or "")
444
  if size <= 0:
 
448
  j = min(n, i + size)
449
  yield (i, j, text[i:j])
450
  i = max(0, j - overlap)
451
+ if i >= n:
452
+ break
453
+
454
+ # ======================================================================================
455
+ # Background task : indexation
456
+ # ======================================================================================
457
 
 
458
  def run_index_job(job_id: str, req: IndexRequest):
459
  try:
460
  _set_status(job_id, "running")
 
461
  _append_log(job_id, f"Start project={req.project_id} files={len(req.files)} | backends={EMB_BACKEND_ORDER} | store={VECTOR_STORE}")
462
  LOG.info(f"[{job_id}] Index start project={req.project_id} files={len(req.files)}")
463
 
464
  # Warmup -> dimension
465
+ warm = next(_chunk_with_spans(req.files[0].text if req.files else "", req.chunk_size, req.overlap))[2] if req.files else "warmup"
 
 
 
 
466
  embs, _ = _post_embeddings([warm], job_id=job_id)
467
  dim = embs.shape[1]
468
  col = f"proj_{req.project_id}"
469
+
470
+ # Créer/assurer la collection
471
  STORE.ensure_collection(col, dim)
472
  _append_log(job_id, f"Collection ready: {col} (dim={dim})")
473
 
474
+ total_chunks = 0
475
+ buf_chunks: List[str] = []
476
+ buf_metas: List[Dict[str, Any]] = []
477
+
478
+ def _flush():
479
+ nonlocal buf_chunks, buf_metas, total_chunks
480
+ if not buf_chunks:
481
+ return
482
+ vecs, sz = _post_embeddings(buf_chunks, job_id=job_id)
483
+ added = STORE.upsert(col, vecs, buf_metas)
484
+ total_chunks += added
485
+ _append_log(job_id, f"+{added} chunks (total={total_chunks}) ~{(sz/1024.0):.1f}KiB")
486
+ buf_chunks, buf_metas = [], []
487
+
488
+ # Boucle fichiers + chunks
489
  for fi, f in enumerate(req.files, 1):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
490
  for ci, (start, end, chunk_txt) in enumerate(_chunk_with_spans(f.text, req.chunk_size, req.overlap)):
491
+ buf_chunks.append(chunk_txt)
 
 
492
  meta = {"path": f.path, "chunk": ci, "start": start, "end": end}
493
  if req.store_text:
494
  meta["text"] = chunk_txt
495
+ buf_metas.append(meta)
496
+ if len(buf_chunks) >= req.batch_size:
497
  _flush()
498
+ _append_log(job_id, f"file {fi}/{len(req.files)}: +{req.batch_size} chunks (total={total_chunks})")
499
+ # flush fin de fichier
500
  _flush()
501
+ _append_log(job_id, f"file {fi}/{len(req.files)} processed.")
502
 
503
  _append_log(job_id, f"Done. chunks={total_chunks}")
504
  _set_status(job_id, "done")
505
  LOG.info(f"[{job_id}] Index finished. chunks={total_chunks}")
506
+
507
  except Exception as e:
508
  LOG.exception("Index job failed")
509
  _append_log(job_id, f"ERROR: {e}")
510
  _set_status(job_id, "error")
511
 
512
+ # ======================================================================================
513
+ # API
514
+ # ======================================================================================
515
+
516
  app = FastAPI()
517
 
518
  @app.get("/")
 
521
  "ok": True,
522
  "service": "remote-indexer",
523
  "backends": EMB_BACKEND_ORDER,
524
+ "hf_url_pipeline": HF_URL_PIPE if "hf" in EMB_BACKEND_ORDER else None,
525
+ "hf_url_models": HF_URL_MODL if "hf" in EMB_BACKEND_ORDER else None,
526
  "di_url": DI_URL if "deepinfra" in EMB_BACKEND_ORDER else None,
527
  "di_model": DI_MODEL if "deepinfra" in EMB_BACKEND_ORDER else None,
528
  "vector_store": VECTOR_STORE,
529
+ "vector_store_active": VECTOR_STORE_ACTIVE,
530
+ "docs": "/health, /index, /status/{job_id}, /query, /wipe",
531
  }
532
 
533
  @app.get("/health")
534
  def health():
535
+ return {"ok": True, "store": VECTOR_STORE_ACTIVE}
536
 
537
  def _check_backend_ready():
538
  if "hf" in EMB_BACKEND_ORDER and not HF_TOKEN:
 
542
 
543
  @app.post("/index")
544
  def start_index(req: IndexRequest, background_tasks: BackgroundTasks, x_auth_token: Optional[str] = Header(default=None)):
545
+ _auth(x_auth_token)
 
546
  _check_backend_ready()
 
 
 
 
 
547
  job_id = uuid.uuid4().hex[:12]
548
  JOBS[job_id] = {"status": "queued", "logs": [], "created": time.time()}
549
  background_tasks.add_task(run_index_job, job_id, req)
 
551
 
552
  @app.get("/status/{job_id}")
553
  def status(job_id: str, x_auth_token: Optional[str] = Header(default=None)):
554
+ _auth(x_auth_token)
 
 
 
 
 
 
 
 
 
 
 
 
 
555
  j = JOBS.get(job_id)
556
  if not j:
557
  raise HTTPException(404, "job inconnu")
558
+ # garder les derniers logs pour éviter de gonfler la réponse
559
+ return {"status": j["status"], "logs": j["logs"][-1200:]}
 
 
 
 
 
 
 
 
 
 
 
560
 
561
  @app.post("/query")
562
  def query(req: QueryRequest, x_auth_token: Optional[str] = Header(default=None)):
563
+ _auth(x_auth_token)
 
564
  _check_backend_ready()
 
 
565
  vecs, _ = _post_embeddings([req.query])
566
  col = f"proj_{req.project_id}"
 
 
567
  try:
568
+ results = STORE.search(col, vecs[0], int(req.top_k))
569
  except Exception as e:
570
  raise HTTPException(400, f"Search failed: {e}")
 
571
  out = []
572
+ for pl in results:
 
 
 
573
  txt = pl.get("text")
574
  if txt and len(txt) > 800:
575
  txt = txt[:800] + "..."
 
579
  "start": pl.get("start"),
580
  "end": pl.get("end"),
581
  "text": txt,
582
+ "score": float(pl.get("_score")) if pl.get("_score") is not None else None
583
  })
584
  return {"results": out}
585
 
586
  @app.post("/wipe")
587
  def wipe_collection(project_id: str, x_auth_token: Optional[str] = Header(default=None)):
588
+ _auth(x_auth_token)
 
589
  col = f"proj_{project_id}"
590
  try:
591
+ STORE.wipe(col)
592
+ return {"ok": True}
593
  except Exception as e:
594
  raise HTTPException(400, f"wipe failed: {e}")
595
 
596
+ # ======================================================================================
597
+ # Entrypoint
598
+ # ======================================================================================
599
+
600
  if __name__ == "__main__":
601
  import uvicorn
602
  port = int(os.getenv("PORT", "7860"))