chouchouvs commited on
Commit
f1ee128
·
verified ·
1 Parent(s): 903baeb

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +257 -248
main.py CHANGED
@@ -1,51 +1,58 @@
1
  # -*- coding: utf-8 -*-
2
  """
3
- Remote Indexer (HF Space) Qdrant + embeddings (HF ou dummy)
4
- Version: worker-thread + ensure_collection_hard (retries + verify) + count tolérant.
5
-
6
- Endpoints:
7
- - GET / redirige vers UI_PATH (défaut: /ui)
8
- - GET /ui UI Gradio
9
- - GET /health healthcheck
10
- - GET /api → infos service
11
- - GET /debug/env → aperçu config (sans secrets)
12
- - POST /wipe?project_id=XXX
13
- - POST /index
14
- - GET /status/{job_id}
15
- - GET /collections/{project_id}/count
16
- - POST /query
17
- - POST /collections/{project_id}/ensure?dim=XXX (debug)
18
 
19
  ENV:
20
- - QDRANT_URL, QDRANT_API_KEY (requis pour upsert)
21
- - COLLECTION_PREFIX (défaut "proj_")
22
- - EMB_PROVIDER ("hf" | "dummy"; défaut "hf")
23
- - HF_EMBED_MODEL (défaut "BAAI/bge-m3")
24
- - HUGGINGFACEHUB_API_TOKEN (si EMB_PROVIDER=hf)
25
- - EMB_FALLBACK_TO_DUMMY (true/false) bascule dummy si HF échoue
26
- - LOG_LEVEL (défaut DEBUG)
27
- - UI_PATH (défaut "/ui")
28
- - PORT (défaut 7860)
29
  """
30
 
31
  from __future__ import annotations
32
  import os
 
 
 
33
  import time
34
  import uuid
 
35
  import hashlib
36
  import logging
37
- import threading
38
  import asyncio
 
39
  from typing import List, Dict, Any, Optional, Tuple
40
 
41
  import numpy as np
42
  import httpx
43
  import uvicorn
 
 
44
  from pydantic import BaseModel, Field, ValidationError
45
  from fastapi import FastAPI, HTTPException, Query
46
  from fastapi.middleware.cors import CORSMiddleware
47
- from fastapi.responses import RedirectResponse
48
- import gradio as gr
 
 
 
 
 
 
 
49
 
50
  # ------------------------------------------------------------------------------
51
  # Config & logs
@@ -55,26 +62,24 @@ logging.basicConfig(
55
  level=getattr(logging, LOG_LEVEL, logging.DEBUG),
56
  format="%(asctime)s - %(levelname)s - %(message)s",
57
  )
58
- LOG = logging.getLogger("remote_indexer_min")
59
-
60
- QDRANT_URL = os.getenv("QDRANT_URL", "").rstrip("/")
61
- QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "")
62
- COLLECTION_PREFIX = os.getenv("COLLECTION_PREFIX", "proj_").strip() or "proj_"
63
 
64
- EMB_PROVIDER = os.getenv("EMB_PROVIDER", "hf").lower()
65
  HF_EMBED_MODEL = os.getenv("HF_EMBED_MODEL", "BAAI/bge-m3")
66
  HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN", "")
67
  EMB_FALLBACK_TO_DUMMY = os.getenv("EMB_FALLBACK_TO_DUMMY", "false").lower() in ("1","true","yes","on")
68
 
 
 
 
69
  UI_PATH = os.getenv("UI_PATH", "/ui")
 
70
 
71
- if not QDRANT_URL or not QDRANT_API_KEY:
72
- LOG.warning("QDRANT_URL / QDRANT_API_KEY non fournis : l'upsert échouera.")
73
  if EMB_PROVIDER == "hf" and not HF_TOKEN and not EMB_FALLBACK_TO_DUMMY:
74
- LOG.warning("EMB_PROVIDER=hf sans HUGGINGFACEHUB_API_TOKEN (pas de fallback) préférer EMB_PROVIDER=dummy ou EMB_FALLBACK_TO_DUMMY=true.")
75
 
76
  # ------------------------------------------------------------------------------
77
- # Models
78
  # ------------------------------------------------------------------------------
79
  class FileItem(BaseModel):
80
  path: str
@@ -96,11 +101,11 @@ class QueryRequest(BaseModel):
96
  class JobState(BaseModel):
97
  job_id: str
98
  project_id: str
99
- stage: str = "pending" # pending -> embedding -> upserting -> done/failed
100
  total_files: int = 0
101
  total_chunks: int = 0
102
  embedded: int = 0
103
- upserted: int = 0
104
  errors: List[str] = Field(default_factory=list)
105
  messages: List[str] = Field(default_factory=list)
106
  started_at: float = Field(default_factory=time.time)
@@ -114,6 +119,9 @@ class JobState(BaseModel):
114
 
115
  JOBS: Dict[str, JobState] = {}
116
 
 
 
 
117
  # ------------------------------------------------------------------------------
118
  # Utils
119
  # ------------------------------------------------------------------------------
@@ -151,90 +159,37 @@ def chunk_text(text: str, chunk_size: int, overlap: int) -> List[Tuple[int, int,
151
  i = j
152
  return res
153
 
154
- # ------------------------------------------------------------------------------
155
- # Qdrant
156
- # ------------------------------------------------------------------------------
157
- async def ensure_collection_hard(client: httpx.AsyncClient, coll: str, vector_size: int, job: Optional[JobState] = None) -> None:
158
- """
159
- Crée la collection si absente, ou la recrée si dimension ≠, puis
160
- vérifie qu'elle existe bien (poll GET avec retries).
161
- """
162
- def _j(msg: str):
163
- if job: job.log(msg)
164
- else: LOG.debug(msg)
165
-
166
- url = f"{QDRANT_URL}/collections/{coll}"
167
- r = await client.get(url, headers={"api-key": QDRANT_API_KEY}, timeout=20)
168
- recreate = False
169
- if r.status_code == 200:
170
- data = r.json()
171
- existing_size = data.get("result", {}).get("vectors", {}).get("size")
172
- _j(f"GET collection '{coll}' → 200 dim={existing_size}")
173
- if existing_size and int(existing_size) != int(vector_size):
174
- _j(f"Dimension ≠ ({existing_size} ≠ {vector_size}) → suppression + recréation")
175
- rd = await client.delete(url, headers={"api-key": QDRANT_API_KEY}, timeout=20)
176
- _j(f"DELETE collection '{coll}' → {rd.status_code}")
177
- recreate = True
178
- elif r.status_code == 404:
179
- _j(f"GET collection '{coll}' → 404 (à créer)")
180
- else:
181
- _j(f"GET collection '{coll}' → {r.status_code} {r.text}")
182
-
183
- if r.status_code != 200 or recreate:
184
- body = {"vectors": {"size": vector_size, "distance": "Cosine"}}
185
- r2 = await client.put(url, headers={"api-key": QDRANT_API_KEY}, json=body, timeout=30)
186
- _j(f"PUT create collection '{coll}' dim={vector_size} → {r2.status_code}")
187
- if r2.status_code not in (200, 201):
188
- raise HTTPException(status_code=500, detail=f"Qdrant PUT collection a échoué: {r2.text}")
189
-
190
- # Poll jusqu'à visibilité
191
- for i in range(10):
192
- rg = await client.get(url, headers={"api-key": QDRANT_API_KEY}, timeout=20)
193
- if rg.status_code == 200:
194
- _j(f"Collection '{coll}' disponible (try={i+1})")
195
- return
196
- await asyncio.sleep(0.3)
197
- raise HTTPException(status_code=500, detail=f"Collection '{coll}' non visible après création.")
198
-
199
- async def qdrant_upsert(client: httpx.AsyncClient, coll: str, points: List[Dict[str, Any]], job: Optional[JobState] = None) -> int:
200
- if not points:
201
- return 0
202
- url = f"{QDRANT_URL}/collections/{coll}/points?wait=true"
203
- body = {"points": points}
204
- r = await client.put(url, headers={"api-key": QDRANT_API_KEY}, json=body, timeout=60)
205
- if r.status_code not in (200, 202):
206
- if job: job.log(f"Upsert → {r.status_code}: {r.text[:200]}")
207
- raise HTTPException(status_code=500, detail=f"Qdrant upsert échoué: {r.text}")
208
- return len(points)
209
-
210
- async def qdrant_count(client: httpx.AsyncClient, coll: str) -> int:
211
- url = f"{QDRANT_URL}/collections/{coll}/points/count"
212
- r = await client.post(url, headers={"api-key": QDRANT_API_KEY}, json={"exact": True}, timeout=20)
213
- if r.status_code == 404 and "doesn't exist" in (r.text or ""):
214
- # Tolérant: collection absente → 0 (utile pour l'UI)
215
- return 0
216
- if r.status_code != 200:
217
- raise HTTPException(status_code=500, detail=f"Qdrant count échoué: {r.text}")
218
- return int(r.json().get("result", {}).get("count", 0))
219
-
220
- async def qdrant_search(client: httpx.AsyncClient, coll: str, vector: List[float], limit: int = 5) -> Dict[str, Any]:
221
- url = f"{QDRANT_URL}/collections/{coll}/points/search"
222
- r = await client.post(
223
- url,
224
- headers={"api-key": QDRANT_API_KEY},
225
- json={"vector": vector, "limit": limit, "with_payload": True},
226
- timeout=30,
227
- )
228
- if r.status_code != 200:
229
- raise HTTPException(status_code=500, detail=f"Qdrant search échoué: {r.text}")
230
- return r.json()
231
 
232
  # ------------------------------------------------------------------------------
233
- # Embeddings
234
  # ------------------------------------------------------------------------------
235
  def _maybe_prefix_for_model(texts: List[str], model_name: str) -> List[str]:
236
  m = (model_name or "").lower()
237
  if "e5" in m:
 
238
  return [("query: " + t) for t in texts]
239
  return texts
240
 
@@ -284,9 +239,14 @@ async def embed_texts(client: httpx.AsyncClient, texts: List[str]) -> List[List[
284
  return embed_dummy(texts, dim=128)
285
 
286
  # ------------------------------------------------------------------------------
287
- # Core: run_index_job (async) + worker thread wrapper
288
  # ------------------------------------------------------------------------------
289
- async def run_index_job(job: JobState, req: IndexRequest) -> None:
 
 
 
 
 
290
  try:
291
  job.stage = "embedding"
292
  job.total_files = len(req.files)
@@ -306,7 +266,10 @@ async def run_index_job(job: JobState, req: IndexRequest) -> None:
306
  payload = {"path": f.path, "chunk": idx, "start": start, "end": end}
307
  if req.store_text:
308
  payload["text"] = ch
309
- records.append({"payload": payload, "raw": ch})
 
 
 
310
  job.total_chunks = len(records)
311
  job.log(f"Total chunks = {job.total_chunks}")
312
  if job.total_chunks == 0:
@@ -315,53 +278,72 @@ async def run_index_job(job: JobState, req: IndexRequest) -> None:
315
  job.finished_at = time.time()
316
  return
317
 
 
318
  async with httpx.AsyncClient(timeout=180) as client:
319
- # Warmup dim
320
- warmup_vec = (await embed_texts(client, [records[0]["raw"]]))[0]
321
- vec_dim = len(warmup_vec)
322
- job.log(f"Warmup embeddings dim={vec_dim}")
323
-
324
- # Collection (hard ensure)
325
- coll = f"{COLLECTION_PREFIX}{req.project_id}"
326
- await ensure_collection_hard(client, coll, vector_size=vec_dim, job=job)
327
- job.log(f"Collection prête: {coll} (dim={vec_dim})")
328
-
329
- # Upsert
330
- job.stage = "upserting"
331
- batch_points: List[Dict[str, Any]] = []
332
-
333
- async def flush_batch():
334
- nonlocal batch_points
335
- if not batch_points:
336
- return 0
337
- added = await qdrant_upsert(client, coll, batch_points, job=job)
338
- job.upserted += added
339
- job.log(f"+{added} points upsert (total={job.upserted})")
340
- batch_points = []
341
- return added
342
-
343
- EMB_BATCH = max(8, min(64, req.batch_size * 2))
344
  i = 0
345
  while i < len(records):
346
- sub = records[i : i + EMB_BATCH]
347
  texts = [r["raw"] for r in sub]
348
  vecs = await embed_texts(client, texts)
349
  if len(vecs) != len(sub):
350
  raise HTTPException(status_code=500, detail="Embedding batch size mismatch")
 
351
  job.embedded += len(vecs)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
352
 
353
- for r, v in zip(sub, vecs):
354
- point = {"id": str(uuid.uuid4()), "vector": v, "payload": r["payload"]}
355
- batch_points.append(point)
356
- if len(batch_points) >= req.batch_size:
357
- await flush_batch()
358
- i += EMB_BATCH
359
 
360
- await flush_batch()
 
 
 
 
 
 
 
 
361
 
362
  job.stage = "done"
363
  job.finished_at = time.time()
364
- job.log("Index job terminé.")
365
  except Exception as e:
366
  job.stage = "failed"
367
  job.errors.append(str(e))
@@ -369,10 +351,9 @@ async def run_index_job(job: JobState, req: IndexRequest) -> None:
369
  job.log(f"❌ Exception: {e}")
370
 
371
  def _run_job_in_thread(job: JobState, req: IndexRequest) -> None:
372
- """Exécute l'async run_index_job dans un thread dédié avec son propre event loop."""
373
  def _runner():
374
  try:
375
- asyncio.run(run_index_job(job, req))
376
  except Exception as e:
377
  job.stage = "failed"
378
  job.errors.append(str(e))
@@ -389,41 +370,49 @@ def create_and_start_job(req: IndexRequest) -> JobState:
389
  _run_job_in_thread(job, req)
390
  return job
391
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
392
  # ------------------------------------------------------------------------------
393
  # FastAPI app
394
  # ------------------------------------------------------------------------------
395
- fastapi_app = FastAPI(title="Remote Indexer - Minimal Test Space")
396
  fastapi_app.add_middleware(
397
- CORSMiddleware,
398
- allow_origins=["*"],
399
- allow_methods=["*"],
400
- allow_headers=["*"],
401
  )
402
 
403
  @fastapi_app.get("/health")
404
  async def health():
405
- return {"status": "ok"}
406
 
407
  @fastapi_app.get("/api")
408
  async def api_info():
409
  return {
410
- "ok": True, "service": "remote-indexer-min",
411
- "qdrant": bool(QDRANT_URL),
412
  "emb_provider": EMB_PROVIDER, "hf_model": HF_EMBED_MODEL,
413
  "fallback_to_dummy": EMB_FALLBACK_TO_DUMMY,
414
- "ui_path": UI_PATH,
415
- }
416
-
417
- @fastapi_app.get("/debug/env")
418
- async def debug_env():
419
- return {
420
- "qdrant_url_set": bool(QDRANT_URL),
421
- "qdrant_key_set": bool(QDRANT_API_KEY),
422
- "emb_provider": EMB_PROVIDER,
423
- "hf_model": HF_EMBED_MODEL,
424
- "hf_token_set": bool(HF_TOKEN),
425
- "fallback_to_dummy": EMB_FALLBACK_TO_DUMMY,
426
- "collection_prefix": COLLECTION_PREFIX,
427
  }
428
 
429
  @fastapi_app.get("/")
@@ -432,19 +421,15 @@ async def root_redirect():
432
 
433
  @fastapi_app.post("/wipe")
434
  async def wipe(project_id: str = Query(..., min_length=1)):
435
- if not QDRANT_URL or not QDRANT_API_KEY:
436
- raise HTTPException(status_code=400, detail="QDRANT_URL / QDRANT_API_KEY requis")
437
- coll = f"{COLLECTION_PREFIX}{project_id}"
438
- async with httpx.AsyncClient() as client:
439
- r = await client.delete(f"{QDRANT_URL}/collections/{coll}", headers={"api-key": QDRANT_API_KEY}, timeout=30)
440
- if r.status_code not in (200, 202, 404):
441
- raise HTTPException(status_code=500, detail=f"Echec wipe: {r.text}")
442
- return {"ok": True, "collection": coll, "wiped": True}
443
 
444
  @fastapi_app.post("/index")
445
  async def index(req: IndexRequest):
446
- if not QDRANT_URL or not QDRANT_API_KEY:
447
- raise HTTPException(status_code=400, detail="QDRANT_URL / QDRANT_API_KEY requis")
448
  job = create_and_start_job(req)
449
  return {"job_id": job.job_id, "project_id": job.project_id}
450
 
@@ -457,39 +442,70 @@ async def status(job_id: str):
457
 
458
  @fastapi_app.get("/collections/{project_id}/count")
459
  async def coll_count(project_id: str):
460
- if not QDRANT_URL or not QDRANT_API_KEY:
461
- raise HTTPException(status_code=400, detail="QDRANT_URL / QDRANT_API_KEY requis")
462
- coll = f"{COLLECTION_PREFIX}{project_id}"
463
- async with httpx.AsyncClient() as client:
464
- try:
465
- cnt = await qdrant_count(client, coll)
466
- return {"project_id": project_id, "collection": coll, "count": cnt}
467
- except HTTPException as he:
468
- # On remonte le 500 (autre qu'un 404 not found géré dans qdrant_count)
469
- raise he
470
 
471
  @fastapi_app.post("/query")
472
  async def query(req: QueryRequest):
473
- if not QDRANT_URL or not QDRANT_API_KEY:
474
- raise HTTPException(status_code=400, detail="QDRANT_URL / QDRANT_API_KEY requis")
475
- coll = f"{COLLECTION_PREFIX}{req.project_id}"
476
- async with httpx.AsyncClient() as client:
477
- vec = (await embed_texts(client, [req.text]))[0]
478
- data = await qdrant_search(client, coll, vec, limit=req.top_k)
479
- return data
480
-
481
- @fastapi_app.post("/collections/{project_id}/ensure")
482
- async def http_ensure(project_id: str, dim: int = Query(..., ge=16, le=8192)):
483
- """Endpoint debug pour forcer la création d'une collection à une dimension donnée."""
484
- if not QDRANT_URL or not QDRANT_API_KEY:
485
- raise HTTPException(status_code=400, detail="QDRANT_URL / QDRANT_API_KEY requis")
486
- coll = f"{COLLECTION_PREFIX}{project_id}"
487
- async with httpx.AsyncClient() as client:
488
- await ensure_collection_hard(client, coll, vector_size=dim, job=None)
489
- return {"ok": True, "collection": coll, "dim": dim}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
490
 
491
  # ------------------------------------------------------------------------------
492
- # Gradio UI (avec auto-refresh + bouton Ensure debug)
493
  # ------------------------------------------------------------------------------
494
  def _default_two_docs() -> List[Dict[str, str]]:
495
  a = "Alpha bravo charlie delta echo foxtrot golf hotel india. " * 3
@@ -498,8 +514,8 @@ def _default_two_docs() -> List[Dict[str, str]]:
498
 
499
  async def ui_wipe(project: str):
500
  try:
501
- resp = await wipe(project)
502
- return f"✅ Wipe ok — collection {resp['collection']} supprimée."
503
  except Exception as e:
504
  LOG.exception("wipe UI error")
505
  return f"❌ Wipe erreur: {e}"
@@ -528,8 +544,8 @@ async def ui_status(job_id: str):
528
  return "⚠️ Renseigne un job_id"
529
  try:
530
  st = await status(job_id)
531
- lines = [f"Job {st['job_id']} — stage={st['stage']} files={st['total_files']} chunks={st['total_chunks']} embedded={st['embedded']} upserted={st['upserted']}"]
532
- lines += st.get("messages", [])[-50:]
533
  if st.get("errors"):
534
  lines.append("Erreurs:")
535
  lines += [f" - {e}" for e in st['errors']]
@@ -539,11 +555,8 @@ async def ui_status(job_id: str):
539
 
540
  async def ui_count(project: str):
541
  try:
542
- # Count tolérant (0 si collection absente)
543
- async with httpx.AsyncClient() as client:
544
- coll = f"{COLLECTION_PREFIX}{project}"
545
- cnt = await qdrant_count(client, coll)
546
- return f"📊 Count — collection={coll} → {cnt} points"
547
  except Exception as e:
548
  LOG.exception("count UI error")
549
  return f"❌ Count erreur: {e}"
@@ -556,37 +569,31 @@ async def ui_query(project: str, text: str, topk: int):
556
  return "Aucun résultat."
557
  out = []
558
  for h in hits:
559
- score = h.get("score")
560
- payload = h.get("payload", {})
561
- path = payload.get("path")
562
- chunk = payload.get("chunk")
563
- preview = (payload.get("text") or "")[:120].replace("\n", " ")
564
- out.append(f"{score:.4f} — {path} [chunk {chunk}] — {preview}…")
565
  return "\n".join(out)
566
  except Exception as e:
567
  LOG.exception("query UI error")
568
  return f"❌ Query erreur: {e}"
569
 
570
- async def ui_ensure(project: str, dim: int):
571
  try:
572
- resp = await http_ensure(project, dim)
573
- return f"🛠️ Ensure collection={resp['collection']} dim={resp['dim']} OK"
574
  except Exception as e:
575
- LOG.exception("ensure UI error")
576
- return f"❌ Ensure erreur: {e}"
577
 
578
- with gr.Blocks(title="Remote Indexer — Tests sans console", analytics_enabled=False) as ui:
579
- gr.Markdown("## 🔬 Remote Indexer — Tests sans console\n"
580
  "Wipe → Index 2 docs → Status → Count → Query\n"
581
- f"- **Embeddings**: `{EMB_PROVIDER}` (model: `{HF_EMBED_MODEL}`)\n"
582
- f"- **Token HF présent**: `{'oui' if bool(HF_TOKEN) else 'non'}` — "
583
- f"**Fallback dummy**: `{'on' if EMB_FALLBACK_TO_DUMMY else 'off'}`\n"
584
- f"- **Qdrant**: `{'OK' if QDRANT_URL else 'ABSENT'}`")
585
  with gr.Row():
586
  project_tb = gr.Textbox(label="Project ID", value="DEEPWEB")
587
  jobid_tb = gr.Textbox(label="Job ID", value="", interactive=True)
588
  with gr.Row():
589
- wipe_btn = gr.Button("🧨 Wipe collection", variant="stop")
590
  index_btn = gr.Button("🚀 Indexer 2 documents", variant="primary")
591
  count_btn = gr.Button("📊 Count points", variant="secondary")
592
  with gr.Row():
@@ -600,16 +607,16 @@ with gr.Blocks(title="Remote Indexer — Tests sans console", analytics_enabled=
600
  status_btn = gr.Button("📡 Status (refresh)")
601
  auto_chk = gr.Checkbox(False, label="⏱️ Auto-refresh status (2 s)")
602
 
603
- with gr.Row():
604
- ensure_dim = gr.Slider(16, 2048, value=128, step=16, label="ensure dim (debug)")
605
- ensure_btn = gr.Button("🛠️ Ensure collection (debug)")
606
-
607
  with gr.Row():
608
  query_tb = gr.Textbox(label="Query text", value="alpha bravo")
609
  topk = gr.Slider(1, 20, value=5, step=1, label="top_k")
610
  query_btn = gr.Button("🔎 Query")
611
  query_out = gr.Textbox(lines=10, label="Résultats Query", interactive=False)
612
 
 
 
 
 
613
  wipe_btn.click(ui_wipe, inputs=[project_tb], outputs=[out_log])
614
  index_btn.click(ui_index_sample, inputs=[project_tb, chunk_size, overlap, batch_size, store_text], outputs=[out_log, jobid_tb])
615
  count_btn.click(ui_count, inputs=[project_tb], outputs=[out_log])
@@ -619,9 +626,11 @@ with gr.Blocks(title="Remote Indexer — Tests sans console", analytics_enabled=
619
  timer.tick(ui_status, inputs=[jobid_tb], outputs=[out_log])
620
  auto_chk.change(lambda x: gr.update(active=x), inputs=auto_chk, outputs=timer)
621
 
622
- ensure_btn.click(ui_ensure, inputs=[project_tb, ensure_dim], outputs=[out_log])
 
 
623
 
624
- # Monte l'UI Gradio
625
  app = gr.mount_gradio_app(fastapi_app, ui, path=UI_PATH)
626
 
627
  if __name__ == "__main__":
 
1
  # -*- coding: utf-8 -*-
2
  """
3
+ HF Space - Remote Indexer (No-Qdrant)
4
+ Stockage & recherche vectorielle avec 🤗 datasets + FAISS (local), UI Gradio.
5
+
6
+ Pipeline:
7
+ - /index: chunk embeddings (HF Inference ou dummy) → Dataset.from_dict → add_faiss_index(IP) → save_to_disk
8
+ - /count: lit le dataset sur disque (si non chargé) renvoie nb de lignes
9
+ - /query: embed requête dataset.get_nearest_examples('embedding', query, k)
10
+ - /wipe: supprime le dossier projet
11
+ - /export_hub (optionnel): pousse le dossier projet dans un repo Dataset du Hub
 
 
 
 
 
 
12
 
13
  ENV:
14
+ - EMB_PROVIDER ("hf" | "dummy", défaut "hf")
15
+ - HF_EMBED_MODEL (ex: "BAAI/bge-m3" | "intfloat/e5-base-v2")
16
+ - HUGGINGFACEHUB_API_TOKEN (requis si EMB_PROVIDER=hf)
17
+ - EMB_FALLBACK_TO_DUMMY (true/false)
18
+ - DATA_DIR (défaut "/data") → stockage local par projet
19
+ - HF_DATASET_REPO (optionnel "username/my_proj_vectors") pour export
20
+ - LOG_LEVEL (DEBUG par défaut)
21
+ - UI_PATH ("/ui")
22
+ - PORT (7860)
23
  """
24
 
25
  from __future__ import annotations
26
  import os
27
+ import io
28
+ import re
29
+ import json
30
  import time
31
  import uuid
32
+ import shutil
33
  import hashlib
34
  import logging
 
35
  import asyncio
36
+ import threading
37
  from typing import List, Dict, Any, Optional, Tuple
38
 
39
  import numpy as np
40
  import httpx
41
  import uvicorn
42
+ import gradio as gr
43
+ import faiss # type: ignore
44
  from pydantic import BaseModel, Field, ValidationError
45
  from fastapi import FastAPI, HTTPException, Query
46
  from fastapi.middleware.cors import CORSMiddleware
47
+ from fastapi.responses import RedirectResponse, StreamingResponse
48
+
49
+ from datasets import Dataset, Features, Sequence, Value, load_from_disk
50
+
51
+ try:
52
+ from huggingface_hub import HfApi, create_repo
53
+ except Exception:
54
+ HfApi = None
55
+ create_repo = None
56
 
57
  # ------------------------------------------------------------------------------
58
  # Config & logs
 
62
  level=getattr(logging, LOG_LEVEL, logging.DEBUG),
63
  format="%(asctime)s - %(levelname)s - %(message)s",
64
  )
65
+ LOG = logging.getLogger("remote_indexer_noqdrant")
 
 
 
 
66
 
67
+ EMB_PROVIDER = os.getenv("EMB_PROVIDER", "hf").lower() # "hf" | "dummy"
68
  HF_EMBED_MODEL = os.getenv("HF_EMBED_MODEL", "BAAI/bge-m3")
69
  HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN", "")
70
  EMB_FALLBACK_TO_DUMMY = os.getenv("EMB_FALLBACK_TO_DUMMY", "false").lower() in ("1","true","yes","on")
71
 
72
+ DATA_DIR = os.getenv("DATA_DIR", "/data")
73
+ os.makedirs(DATA_DIR, exist_ok=True)
74
+
75
  UI_PATH = os.getenv("UI_PATH", "/ui")
76
+ HF_DATASET_REPO = os.getenv("HF_DATASET_REPO", "").strip() # optionnel
77
 
 
 
78
  if EMB_PROVIDER == "hf" and not HF_TOKEN and not EMB_FALLBACK_TO_DUMMY:
79
+ LOG.warning("EMB_PROVIDER=hf sans HUGGINGFACEHUB_API_TOKEN (pas de fallback). Mets EMB_PROVIDER=dummy ou EMB_FALLBACK_TO_DUMMY=true pour tester.")
80
 
81
  # ------------------------------------------------------------------------------
82
+ # Modèles Pydantic
83
  # ------------------------------------------------------------------------------
84
  class FileItem(BaseModel):
85
  path: str
 
101
  class JobState(BaseModel):
102
  job_id: str
103
  project_id: str
104
+ stage: str = "pending" # pending -> embedding -> indexing -> done/failed
105
  total_files: int = 0
106
  total_chunks: int = 0
107
  embedded: int = 0
108
+ indexed: int = 0
109
  errors: List[str] = Field(default_factory=list)
110
  messages: List[str] = Field(default_factory=list)
111
  started_at: float = Field(default_factory=time.time)
 
119
 
120
  JOBS: Dict[str, JobState] = {}
121
 
122
+ # In-memory cache {project_id: (Dataset, dim)}
123
+ DATASETS: Dict[str, Tuple[Dataset, int]] = {}
124
+
125
  # ------------------------------------------------------------------------------
126
  # Utils
127
  # ------------------------------------------------------------------------------
 
159
  i = j
160
  return res
161
 
162
+ def project_paths(project_id: str) -> Dict[str, str]:
163
+ base = os.path.join(DATA_DIR, project_id)
164
+ return {
165
+ "base": base,
166
+ "ds_dir": os.path.join(base, "dataset"),
167
+ "faiss_dir": os.path.join(base, "faiss"),
168
+ "faiss_file": os.path.join(base, "faiss", "emb.faiss"),
169
+ "meta_file": os.path.join(base, "meta.json"),
170
+ }
171
+
172
+ def save_meta(meta_path: str, data: Dict[str, Any]) -> None:
173
+ os.makedirs(os.path.dirname(meta_path), exist_ok=True)
174
+ with open(meta_path, "w", encoding="utf-8") as f:
175
+ json.dump(data, f, indent=2, ensure_ascii=False)
176
+
177
+ def load_meta(meta_path: str) -> Dict[str, Any]:
178
+ if not os.path.exists(meta_path):
179
+ return {}
180
+ try:
181
+ with open(meta_path, "r", encoding="utf-8") as f:
182
+ return json.load(f)
183
+ except Exception:
184
+ return {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
 
186
  # ------------------------------------------------------------------------------
187
+ # Embeddings (HF Inference ou dummy)
188
  # ------------------------------------------------------------------------------
189
  def _maybe_prefix_for_model(texts: List[str], model_name: str) -> List[str]:
190
  m = (model_name or "").lower()
191
  if "e5" in m:
192
+ # E5: "query: ..." / "passage: ..." etc. Ici on uniformise simple.
193
  return [("query: " + t) for t in texts]
194
  return texts
195
 
 
239
  return embed_dummy(texts, dim=128)
240
 
241
  # ------------------------------------------------------------------------------
242
+ # Indexation (datasets + FAISS)
243
  # ------------------------------------------------------------------------------
244
+ async def build_dataset_with_faiss(job: JobState, req: IndexRequest) -> None:
245
+ """
246
+ Construit un dataset HuggingFace avec colonnes:
247
+ - path (str), text (optionnel), chunk (int), start (int), end (int), embedding (float32[])
248
+ Ajoute un index FAISS (Inner Product) et persiste sur disque.
249
+ """
250
  try:
251
  job.stage = "embedding"
252
  job.total_files = len(req.files)
 
266
  payload = {"path": f.path, "chunk": idx, "start": start, "end": end}
267
  if req.store_text:
268
  payload["text"] = ch
269
+ else:
270
+ payload["text"] = None
271
+ payload["raw"] = ch
272
+ records.append(payload)
273
  job.total_chunks = len(records)
274
  job.log(f"Total chunks = {job.total_chunks}")
275
  if job.total_chunks == 0:
 
278
  job.finished_at = time.time()
279
  return
280
 
281
+ # Embeddings par batch
282
  async with httpx.AsyncClient(timeout=180) as client:
283
+ all_vecs: List[List[float]] = []
284
+ B = max(8, min(64, req.batch_size * 2))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
285
  i = 0
286
  while i < len(records):
287
+ sub = records[i : i + B]
288
  texts = [r["raw"] for r in sub]
289
  vecs = await embed_texts(client, texts)
290
  if len(vecs) != len(sub):
291
  raise HTTPException(status_code=500, detail="Embedding batch size mismatch")
292
+ all_vecs.extend(vecs)
293
  job.embedded += len(vecs)
294
+ job.log(f"Embeddings {job.embedded}/{job.total_chunks}")
295
+ i += B
296
+
297
+ vec_dim = len(all_vecs[0])
298
+ job.log(f"Embeddings dim={vec_dim}")
299
+
300
+ # Prépare colonnes du dataset
301
+ paths = [r["path"] for r in records]
302
+ chunks = [int(r["chunk"]) for r in records]
303
+ starts = [int(r["start"]) for r in records]
304
+ ends = [int(r["end"]) for r in records]
305
+ texts = [r.get("text") for r in records]
306
+
307
+ features = Features({
308
+ "path": Value("string"),
309
+ "chunk": Value("int32"),
310
+ "start": Value("int32"),
311
+ "end": Value("int32"),
312
+ "text": Value("string"), # peut contenir None -> sera "None" si None ; OK pour tests
313
+ "embedding": Sequence(Value("float32")),
314
+ })
315
+
316
+ ds = Dataset.from_dict(
317
+ {
318
+ "path": paths,
319
+ "chunk": chunks,
320
+ "start": starts,
321
+ "end": ends,
322
+ "text": texts,
323
+ "embedding": [np.array(v, dtype=np.float32) for v in all_vecs],
324
+ },
325
+ features=features,
326
+ )
327
 
328
+ # Ajoute index FAISS (Inner Product sur vecteurs normalisés ~ cosine)
329
+ job.stage = "indexing"
330
+ ds.add_faiss_index(column="embedding", metric_type=faiss.METRIC_INNER_PRODUCT)
331
+ job.indexed = ds.num_rows
332
+ job.log(f"FAISS index ajouté ({ds.num_rows} points)")
 
333
 
334
+ # Persistance disque
335
+ p = project_paths(req.project_id)
336
+ os.makedirs(p["faiss_dir"], exist_ok=True)
337
+ ds.save_to_disk(p["ds_dir"])
338
+ ds.save_faiss_index("embedding", p["faiss_file"])
339
+ save_meta(p["meta_file"], {"dim": vec_dim, "rows": ds.num_rows, "model": HF_EMBED_MODEL, "ts": time.time()})
340
+
341
+ # Cache mémoire
342
+ DATASETS[req.project_id] = (ds, vec_dim)
343
 
344
  job.stage = "done"
345
  job.finished_at = time.time()
346
+ job.log(f"Dataset sauvegardé dans {p['ds_dir']}, index FAISS → {p['faiss_file']}")
347
  except Exception as e:
348
  job.stage = "failed"
349
  job.errors.append(str(e))
 
351
  job.log(f"❌ Exception: {e}")
352
 
353
  def _run_job_in_thread(job: JobState, req: IndexRequest) -> None:
 
354
  def _runner():
355
  try:
356
+ asyncio.run(build_dataset_with_faiss(job, req))
357
  except Exception as e:
358
  job.stage = "failed"
359
  job.errors.append(str(e))
 
370
  _run_job_in_thread(job, req)
371
  return job
372
 
373
+ # ------------------------------------------------------------------------------
374
+ # Chargement / Query helpers
375
+ # ------------------------------------------------------------------------------
376
+ def ensure_loaded(project_id: str) -> Tuple[Dataset, int]:
377
+ """Charge le dataset+faiss depuis disque si pas en cache mémoire."""
378
+ if project_id in DATASETS:
379
+ return DATASETS[project_id]
380
+ p = project_paths(project_id)
381
+ if not os.path.exists(p["ds_dir"]):
382
+ raise HTTPException(status_code=404, detail=f"Dataset absent pour projet {project_id}")
383
+ ds = load_from_disk(p["ds_dir"])
384
+ if os.path.exists(p["faiss_file"]):
385
+ ds.load_faiss_index("embedding", p["faiss_file"])
386
+ meta = load_meta(p["meta_file"])
387
+ vec_dim = int(meta.get("dim", 0)) or len(ds[0]["embedding"])
388
+ DATASETS[project_id] = (ds, vec_dim)
389
+ return ds, vec_dim
390
+
391
+ async def embed_query(text: str) -> List[float]:
392
+ async with httpx.AsyncClient(timeout=60) as client:
393
+ vec = (await embed_texts(client, [text]))[0]
394
+ return vec
395
+
396
  # ------------------------------------------------------------------------------
397
  # FastAPI app
398
  # ------------------------------------------------------------------------------
399
+ fastapi_app = FastAPI(title="Remote Indexer - NoQdrant (Datasets+FAISS)")
400
  fastapi_app.add_middleware(
401
+ CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
 
 
 
402
  )
403
 
404
  @fastapi_app.get("/health")
405
  async def health():
406
+ return {"status": "ok", "emb_provider": EMB_PROVIDER, "model": HF_EMBED_MODEL}
407
 
408
  @fastapi_app.get("/api")
409
  async def api_info():
410
  return {
411
+ "ok": True, "service": "remote-indexer-noqdrant",
 
412
  "emb_provider": EMB_PROVIDER, "hf_model": HF_EMBED_MODEL,
413
  "fallback_to_dummy": EMB_FALLBACK_TO_DUMMY,
414
+ "data_dir": DATA_DIR, "ui_path": UI_PATH,
415
+ "hub_export_enabled": bool(HF_DATASET_REPO and HfApi),
 
 
 
 
 
 
 
 
 
 
 
416
  }
417
 
418
  @fastapi_app.get("/")
 
421
 
422
  @fastapi_app.post("/wipe")
423
  async def wipe(project_id: str = Query(..., min_length=1)):
424
+ p = project_paths(project_id)
425
+ if os.path.exists(p["base"]):
426
+ shutil.rmtree(p["base"], ignore_errors=True)
427
+ if project_id in DATASETS:
428
+ DATASETS.pop(project_id, None)
429
+ return {"ok": True, "project_id": project_id, "removed": True}
 
 
430
 
431
  @fastapi_app.post("/index")
432
  async def index(req: IndexRequest):
 
 
433
  job = create_and_start_job(req)
434
  return {"job_id": job.job_id, "project_id": job.project_id}
435
 
 
442
 
443
  @fastapi_app.get("/collections/{project_id}/count")
444
  async def coll_count(project_id: str):
445
+ try:
446
+ ds, _ = ensure_loaded(project_id)
447
+ return {"project_id": project_id, "count": ds.num_rows}
448
+ except Exception as e:
449
+ return {"project_id": project_id, "count": 0, "note": f"{e}"}
 
 
 
 
 
450
 
451
  @fastapi_app.post("/query")
452
  async def query(req: QueryRequest):
453
+ ds, vec_dim = ensure_loaded(req.project_id)
454
+ qvec = await embed_query(req.text)
455
+ if len(qvec) != vec_dim:
456
+ raise HTTPException(status_code=400, detail=f"Dim requête {len(qvec)} dim index {vec_dim}")
457
+ # get_nearest_examples renvoie (scores, examples)
458
+ scores, ex = ds.get_nearest_examples("embedding", np.array(qvec, dtype=np.float32), k=req.top_k)
459
+ results = []
460
+ for s, path, chunk, text in zip(scores, ex["path"], ex["chunk"], ex["text"]):
461
+ preview = ((text or "")[:160]).replace("\n", " ")
462
+ results.append({"score": float(s), "path": path, "chunk": int(chunk), "preview": preview})
463
+ return {"result": results, "k": req.top_k}
464
+
465
+ @fastapi_app.post("/export_hub")
466
+ async def export_hub(project_id: str = Query(..., min_length=1), repo_id: Optional[str] = None):
467
+ """
468
+ Optionnel: push le dossier du projet (dataset + faiss + meta) dans un repo Dataset du Hub.
469
+ - HF_DATASET_REPO ou ?repo_id=... (ex: "chourmovs/deepweb_vectors")
470
+ """
471
+ if not HfApi or not HF_TOKEN:
472
+ raise HTTPException(status_code=400, detail="huggingface_hub non dispo ou HF token absent.")
473
+ p = project_paths(project_id)
474
+ if not os.path.exists(p["ds_dir"]):
475
+ raise HTTPException(status_code=404, detail="Aucun dataset local à exporter.")
476
+ rid = (repo_id or HF_DATASET_REPO or "").strip()
477
+ if not rid:
478
+ raise HTTPException(status_code=400, detail="repo_id requis (ou HF_DATASET_REPO).")
479
+
480
+ api = HfApi(token=HF_TOKEN)
481
+ try:
482
+ create_repo(rid, repo_type="dataset", exist_ok=True, token=HF_TOKEN)
483
+ except Exception:
484
+ pass
485
+
486
+ # Zipper le dossier projet pour un upload rapide
487
+ buf = io.BytesIO()
488
+ base_dir = p["base"]
489
+ zip_name = f"{project_id}_vectors.zip"
490
+ import zipfile
491
+ with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as z:
492
+ for root, _, files in os.walk(base_dir):
493
+ for fn in files:
494
+ full = os.path.join(root, fn)
495
+ rel = os.path.relpath(full, base_dir)
496
+ z.write(full, arcname=rel)
497
+ buf.seek(0)
498
+
499
+ api.upload_file(
500
+ path_or_fileobj=buf,
501
+ path_in_repo=zip_name,
502
+ repo_id=rid,
503
+ repo_type="dataset",
504
+ )
505
+ return {"ok": True, "repo_id": rid, "file": zip_name}
506
 
507
  # ------------------------------------------------------------------------------
508
+ # Gradio UI
509
  # ------------------------------------------------------------------------------
510
  def _default_two_docs() -> List[Dict[str, str]]:
511
  a = "Alpha bravo charlie delta echo foxtrot golf hotel india. " * 3
 
514
 
515
  async def ui_wipe(project: str):
516
  try:
517
+ resp = await wipe(project) # appelle route interne
518
+ return f"✅ Wipe ok — projet {resp['project_id']} vidé."
519
  except Exception as e:
520
  LOG.exception("wipe UI error")
521
  return f"❌ Wipe erreur: {e}"
 
544
  return "⚠️ Renseigne un job_id"
545
  try:
546
  st = await status(job_id)
547
+ lines = [f"Job {st['job_id']} — stage={st['stage']} files={st['total_files']} chunks={st['total_chunks']} embedded={st['embedded']} indexed={st['indexed']}"]
548
+ lines += st.get("messages", [])[-80:]
549
  if st.get("errors"):
550
  lines.append("Erreurs:")
551
  lines += [f" - {e}" for e in st['errors']]
 
555
 
556
  async def ui_count(project: str):
557
  try:
558
+ data = await coll_count(project)
559
+ return f"📊 Count — project={project} → {data['count']} points" + (f" ({data.get('note')})" if 'note' in data else "")
 
 
 
560
  except Exception as e:
561
  LOG.exception("count UI error")
562
  return f"❌ Count erreur: {e}"
 
569
  return "Aucun résultat."
570
  out = []
571
  for h in hits:
572
+ out.append(f"{h['score']:.4f} {h['path']} [chunk {h['chunk']}] — {h['preview']}…")
 
 
 
 
 
573
  return "\n".join(out)
574
  except Exception as e:
575
  LOG.exception("query UI error")
576
  return f"❌ Query erreur: {e}"
577
 
578
+ async def ui_export(project: str, repo_id: str):
579
  try:
580
+ resp = await export_hub(project, repo_id or None)
581
+ return f"📤 Export dataset repo={resp['repo_id']} file={resp['file']}"
582
  except Exception as e:
583
+ LOG.exception("export UI error")
584
+ return f"❌ Export erreur: {e}"
585
 
586
+ with gr.Blocks(title="Remote Indexer — No-Qdrant (datasets+FAISS)", analytics_enabled=False) as ui:
587
+ gr.Markdown("## 🧪 Remote Indexer — No-Qdrant (datasets+FAISS)\n"
588
  "Wipe → Index 2 docs → Status → Count → Query\n"
589
+ f"- **Embeddings**: `{EMB_PROVIDER}` (model: `{HF_EMBED_MODEL}`)"
590
+ f"HF token présent: `{'oui' if bool(HF_TOKEN) else 'non'}` — Fallback dummy: `{'on' if EMB_FALLBACK_TO_DUMMY else 'off'}`\n"
591
+ f"- **Data dir**: `{DATA_DIR}` — **Hub export**: `{'on' if (HF_DATASET_REPO and HfApi) else 'off'}`")
 
592
  with gr.Row():
593
  project_tb = gr.Textbox(label="Project ID", value="DEEPWEB")
594
  jobid_tb = gr.Textbox(label="Job ID", value="", interactive=True)
595
  with gr.Row():
596
+ wipe_btn = gr.Button("🧨 Wipe project", variant="stop")
597
  index_btn = gr.Button("🚀 Indexer 2 documents", variant="primary")
598
  count_btn = gr.Button("📊 Count points", variant="secondary")
599
  with gr.Row():
 
607
  status_btn = gr.Button("📡 Status (refresh)")
608
  auto_chk = gr.Checkbox(False, label="⏱️ Auto-refresh status (2 s)")
609
 
 
 
 
 
610
  with gr.Row():
611
  query_tb = gr.Textbox(label="Query text", value="alpha bravo")
612
  topk = gr.Slider(1, 20, value=5, step=1, label="top_k")
613
  query_btn = gr.Button("🔎 Query")
614
  query_out = gr.Textbox(lines=10, label="Résultats Query", interactive=False)
615
 
616
+ with gr.Row():
617
+ repo_tb = gr.Textbox(label="Hub dataset repo (ex: user/deepweb_vectors)", value=os.getenv("HF_DATASET_REPO", ""))
618
+ export_btn = gr.Button("📤 Export to Hub", variant="secondary")
619
+
620
  wipe_btn.click(ui_wipe, inputs=[project_tb], outputs=[out_log])
621
  index_btn.click(ui_index_sample, inputs=[project_tb, chunk_size, overlap, batch_size, store_text], outputs=[out_log, jobid_tb])
622
  count_btn.click(ui_count, inputs=[project_tb], outputs=[out_log])
 
626
  timer.tick(ui_status, inputs=[jobid_tb], outputs=[out_log])
627
  auto_chk.change(lambda x: gr.update(active=x), inputs=auto_chk, outputs=timer)
628
 
629
+ query_btn.click(ui_query, inputs=[project_tb, query_tb, topk], outputs=[query_out])
630
+
631
+ export_btn.click(ui_export, inputs=[project_tb, repo_tb], outputs=[out_log])
632
 
633
+ # Monte l'UI
634
  app = gr.mount_gradio_app(fastapi_app, ui, path=UI_PATH)
635
 
636
  if __name__ == "__main__":