chouchouvs commited on
Commit
903baeb
·
verified ·
1 Parent(s): 8bbbad5

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +74 -15
main.py CHANGED
@@ -1,7 +1,7 @@
1
  # -*- coding: utf-8 -*-
2
  """
3
  Remote Indexer (HF Space) — Qdrant + embeddings (HF ou dummy)
4
- Version: worker-thread (pas d'asyncio.create_task dans l'UI), robust logging.
5
 
6
  Endpoints:
7
  - GET / → redirige vers UI_PATH (défaut: /ui)
@@ -14,6 +14,7 @@ Endpoints:
14
  - GET /status/{job_id}
15
  - GET /collections/{project_id}/count
16
  - POST /query
 
17
 
18
  ENV:
19
  - QDRANT_URL, QDRANT_API_KEY (requis pour upsert)
@@ -153,38 +154,65 @@ def chunk_text(text: str, chunk_size: int, overlap: int) -> List[Tuple[int, int,
153
  # ------------------------------------------------------------------------------
154
  # Qdrant
155
  # ------------------------------------------------------------------------------
156
- async def ensure_collection(client: httpx.AsyncClient, coll: str, vector_size: int) -> None:
 
 
 
 
 
 
 
 
157
  url = f"{QDRANT_URL}/collections/{coll}"
158
  r = await client.get(url, headers={"api-key": QDRANT_API_KEY}, timeout=20)
159
  recreate = False
160
  if r.status_code == 200:
161
  data = r.json()
162
  existing_size = data.get("result", {}).get("vectors", {}).get("size")
 
163
  if existing_size and int(existing_size) != int(vector_size):
164
- LOG.warning(f"Collection {coll} dim={existing_size} ≠ attendu {vector_size} → recréation")
165
- await client.delete(url, headers={"api-key": QDRANT_API_KEY}, timeout=20)
 
166
  recreate = True
167
- else:
168
- LOG.debug(f"Collection {coll} existante (dim={existing_size})")
 
 
 
169
  if r.status_code != 200 or recreate:
170
  body = {"vectors": {"size": vector_size, "distance": "Cosine"}}
171
  r2 = await client.put(url, headers={"api-key": QDRANT_API_KEY}, json=body, timeout=30)
 
172
  if r2.status_code not in (200, 201):
173
  raise HTTPException(status_code=500, detail=f"Qdrant PUT collection a échoué: {r2.text}")
174
 
175
- async def qdrant_upsert(client: httpx.AsyncClient, coll: str, points: List[Dict[str, Any]]) -> int:
 
 
 
 
 
 
 
 
 
176
  if not points:
177
  return 0
178
  url = f"{QDRANT_URL}/collections/{coll}/points?wait=true"
179
  body = {"points": points}
180
  r = await client.put(url, headers={"api-key": QDRANT_API_KEY}, json=body, timeout=60)
181
  if r.status_code not in (200, 202):
 
182
  raise HTTPException(status_code=500, detail=f"Qdrant upsert échoué: {r.text}")
183
  return len(points)
184
 
185
  async def qdrant_count(client: httpx.AsyncClient, coll: str) -> int:
186
  url = f"{QDRANT_URL}/collections/{coll}/points/count"
187
  r = await client.post(url, headers={"api-key": QDRANT_API_KEY}, json={"exact": True}, timeout=20)
 
 
 
188
  if r.status_code != 200:
189
  raise HTTPException(status_code=500, detail=f"Qdrant count échoué: {r.text}")
190
  return int(r.json().get("result", {}).get("count", 0))
@@ -293,9 +321,9 @@ async def run_index_job(job: JobState, req: IndexRequest) -> None:
293
  vec_dim = len(warmup_vec)
294
  job.log(f"Warmup embeddings dim={vec_dim}")
295
 
296
- # Collection
297
  coll = f"{COLLECTION_PREFIX}{req.project_id}"
298
- await ensure_collection(client, coll, vector_size=vec_dim)
299
  job.log(f"Collection prête: {coll} (dim={vec_dim})")
300
 
301
  # Upsert
@@ -306,7 +334,7 @@ async def run_index_job(job: JobState, req: IndexRequest) -> None:
306
  nonlocal batch_points
307
  if not batch_points:
308
  return 0
309
- added = await qdrant_upsert(client, coll, batch_points)
310
  job.upserted += added
311
  job.log(f"+{added} points upsert (total={job.upserted})")
312
  batch_points = []
@@ -433,8 +461,12 @@ async def coll_count(project_id: str):
433
  raise HTTPException(status_code=400, detail="QDRANT_URL / QDRANT_API_KEY requis")
434
  coll = f"{COLLECTION_PREFIX}{project_id}"
435
  async with httpx.AsyncClient() as client:
436
- cnt = await qdrant_count(client, coll)
437
- return {"project_id": project_id, "collection": coll, "count": cnt}
 
 
 
 
438
 
439
  @fastapi_app.post("/query")
440
  async def query(req: QueryRequest):
@@ -446,8 +478,18 @@ async def query(req: QueryRequest):
446
  data = await qdrant_search(client, coll, vec, limit=req.top_k)
447
  return data
448
 
 
 
 
 
 
 
 
 
 
 
449
  # ------------------------------------------------------------------------------
450
- # Gradio UI (avec auto-refresh)
451
  # ------------------------------------------------------------------------------
452
  def _default_two_docs() -> List[Dict[str, str]]:
453
  a = "Alpha bravo charlie delta echo foxtrot golf hotel india. " * 3
@@ -497,8 +539,11 @@ async def ui_status(job_id: str):
497
 
498
  async def ui_count(project: str):
499
  try:
500
- resp = await coll_count(project)
501
- return f"📊 Count collection={resp['collection']} → {resp['count']} points"
 
 
 
502
  except Exception as e:
503
  LOG.exception("count UI error")
504
  return f"❌ Count erreur: {e}"
@@ -522,6 +567,14 @@ async def ui_query(project: str, text: str, topk: int):
522
  LOG.exception("query UI error")
523
  return f"❌ Query erreur: {e}"
524
 
 
 
 
 
 
 
 
 
525
  with gr.Blocks(title="Remote Indexer — Tests sans console", analytics_enabled=False) as ui:
526
  gr.Markdown("## 🔬 Remote Indexer — Tests sans console\n"
527
  "Wipe → Index 2 docs → Status → Count → Query\n"
@@ -547,6 +600,10 @@ with gr.Blocks(title="Remote Indexer — Tests sans console", analytics_enabled=
547
  status_btn = gr.Button("📡 Status (refresh)")
548
  auto_chk = gr.Checkbox(False, label="⏱️ Auto-refresh status (2 s)")
549
 
 
 
 
 
550
  with gr.Row():
551
  query_tb = gr.Textbox(label="Query text", value="alpha bravo")
552
  topk = gr.Slider(1, 20, value=5, step=1, label="top_k")
@@ -562,6 +619,8 @@ with gr.Blocks(title="Remote Indexer — Tests sans console", analytics_enabled=
562
  timer.tick(ui_status, inputs=[jobid_tb], outputs=[out_log])
563
  auto_chk.change(lambda x: gr.update(active=x), inputs=auto_chk, outputs=timer)
564
 
 
 
565
  # Monte l'UI Gradio
566
  app = gr.mount_gradio_app(fastapi_app, ui, path=UI_PATH)
567
 
 
1
  # -*- coding: utf-8 -*-
2
  """
3
  Remote Indexer (HF Space) — Qdrant + embeddings (HF ou dummy)
4
+ Version: worker-thread + ensure_collection_hard (retries + verify) + count tolérant.
5
 
6
  Endpoints:
7
  - GET / → redirige vers UI_PATH (défaut: /ui)
 
14
  - GET /status/{job_id}
15
  - GET /collections/{project_id}/count
16
  - POST /query
17
+ - POST /collections/{project_id}/ensure?dim=XXX (debug)
18
 
19
  ENV:
20
  - QDRANT_URL, QDRANT_API_KEY (requis pour upsert)
 
154
  # ------------------------------------------------------------------------------
155
  # Qdrant
156
  # ------------------------------------------------------------------------------
157
+ async def ensure_collection_hard(client: httpx.AsyncClient, coll: str, vector_size: int, job: Optional[JobState] = None) -> None:
158
+ """
159
+ Crée la collection si absente, ou la recrée si dimension ≠, puis
160
+ vérifie qu'elle existe bien (poll GET avec retries).
161
+ """
162
+ def _j(msg: str):
163
+ if job: job.log(msg)
164
+ else: LOG.debug(msg)
165
+
166
  url = f"{QDRANT_URL}/collections/{coll}"
167
  r = await client.get(url, headers={"api-key": QDRANT_API_KEY}, timeout=20)
168
  recreate = False
169
  if r.status_code == 200:
170
  data = r.json()
171
  existing_size = data.get("result", {}).get("vectors", {}).get("size")
172
+ _j(f"GET collection '{coll}' → 200 dim={existing_size}")
173
  if existing_size and int(existing_size) != int(vector_size):
174
+ _j(f"Dimension ({existing_size} ≠ {vector_size})suppression + recréation")
175
+ rd = await client.delete(url, headers={"api-key": QDRANT_API_KEY}, timeout=20)
176
+ _j(f"DELETE collection '{coll}' → {rd.status_code}")
177
  recreate = True
178
+ elif r.status_code == 404:
179
+ _j(f"GET collection '{coll}' 404 (à créer)")
180
+ else:
181
+ _j(f"GET collection '{coll}' → {r.status_code} {r.text}")
182
+
183
  if r.status_code != 200 or recreate:
184
  body = {"vectors": {"size": vector_size, "distance": "Cosine"}}
185
  r2 = await client.put(url, headers={"api-key": QDRANT_API_KEY}, json=body, timeout=30)
186
+ _j(f"PUT create collection '{coll}' dim={vector_size} → {r2.status_code}")
187
  if r2.status_code not in (200, 201):
188
  raise HTTPException(status_code=500, detail=f"Qdrant PUT collection a échoué: {r2.text}")
189
 
190
+ # Poll jusqu'à visibilité
191
+ for i in range(10):
192
+ rg = await client.get(url, headers={"api-key": QDRANT_API_KEY}, timeout=20)
193
+ if rg.status_code == 200:
194
+ _j(f"Collection '{coll}' disponible (try={i+1})")
195
+ return
196
+ await asyncio.sleep(0.3)
197
+ raise HTTPException(status_code=500, detail=f"Collection '{coll}' non visible après création.")
198
+
199
+ async def qdrant_upsert(client: httpx.AsyncClient, coll: str, points: List[Dict[str, Any]], job: Optional[JobState] = None) -> int:
200
  if not points:
201
  return 0
202
  url = f"{QDRANT_URL}/collections/{coll}/points?wait=true"
203
  body = {"points": points}
204
  r = await client.put(url, headers={"api-key": QDRANT_API_KEY}, json=body, timeout=60)
205
  if r.status_code not in (200, 202):
206
+ if job: job.log(f"Upsert → {r.status_code}: {r.text[:200]}")
207
  raise HTTPException(status_code=500, detail=f"Qdrant upsert échoué: {r.text}")
208
  return len(points)
209
 
210
  async def qdrant_count(client: httpx.AsyncClient, coll: str) -> int:
211
  url = f"{QDRANT_URL}/collections/{coll}/points/count"
212
  r = await client.post(url, headers={"api-key": QDRANT_API_KEY}, json={"exact": True}, timeout=20)
213
+ if r.status_code == 404 and "doesn't exist" in (r.text or ""):
214
+ # Tolérant: collection absente → 0 (utile pour l'UI)
215
+ return 0
216
  if r.status_code != 200:
217
  raise HTTPException(status_code=500, detail=f"Qdrant count échoué: {r.text}")
218
  return int(r.json().get("result", {}).get("count", 0))
 
321
  vec_dim = len(warmup_vec)
322
  job.log(f"Warmup embeddings dim={vec_dim}")
323
 
324
+ # Collection (hard ensure)
325
  coll = f"{COLLECTION_PREFIX}{req.project_id}"
326
+ await ensure_collection_hard(client, coll, vector_size=vec_dim, job=job)
327
  job.log(f"Collection prête: {coll} (dim={vec_dim})")
328
 
329
  # Upsert
 
334
  nonlocal batch_points
335
  if not batch_points:
336
  return 0
337
+ added = await qdrant_upsert(client, coll, batch_points, job=job)
338
  job.upserted += added
339
  job.log(f"+{added} points upsert (total={job.upserted})")
340
  batch_points = []
 
461
  raise HTTPException(status_code=400, detail="QDRANT_URL / QDRANT_API_KEY requis")
462
  coll = f"{COLLECTION_PREFIX}{project_id}"
463
  async with httpx.AsyncClient() as client:
464
+ try:
465
+ cnt = await qdrant_count(client, coll)
466
+ return {"project_id": project_id, "collection": coll, "count": cnt}
467
+ except HTTPException as he:
468
+ # On remonte le 500 (autre qu'un 404 not found géré dans qdrant_count)
469
+ raise he
470
 
471
  @fastapi_app.post("/query")
472
  async def query(req: QueryRequest):
 
478
  data = await qdrant_search(client, coll, vec, limit=req.top_k)
479
  return data
480
 
481
+ @fastapi_app.post("/collections/{project_id}/ensure")
482
+ async def http_ensure(project_id: str, dim: int = Query(..., ge=16, le=8192)):
483
+ """Endpoint debug pour forcer la création d'une collection à une dimension donnée."""
484
+ if not QDRANT_URL or not QDRANT_API_KEY:
485
+ raise HTTPException(status_code=400, detail="QDRANT_URL / QDRANT_API_KEY requis")
486
+ coll = f"{COLLECTION_PREFIX}{project_id}"
487
+ async with httpx.AsyncClient() as client:
488
+ await ensure_collection_hard(client, coll, vector_size=dim, job=None)
489
+ return {"ok": True, "collection": coll, "dim": dim}
490
+
491
  # ------------------------------------------------------------------------------
492
+ # Gradio UI (avec auto-refresh + bouton Ensure debug)
493
  # ------------------------------------------------------------------------------
494
  def _default_two_docs() -> List[Dict[str, str]]:
495
  a = "Alpha bravo charlie delta echo foxtrot golf hotel india. " * 3
 
539
 
540
  async def ui_count(project: str):
541
  try:
542
+ # Count tolérant (0 si collection absente)
543
+ async with httpx.AsyncClient() as client:
544
+ coll = f"{COLLECTION_PREFIX}{project}"
545
+ cnt = await qdrant_count(client, coll)
546
+ return f"📊 Count — collection={coll} → {cnt} points"
547
  except Exception as e:
548
  LOG.exception("count UI error")
549
  return f"❌ Count erreur: {e}"
 
567
  LOG.exception("query UI error")
568
  return f"❌ Query erreur: {e}"
569
 
570
+ async def ui_ensure(project: str, dim: int):
571
+ try:
572
+ resp = await http_ensure(project, dim)
573
+ return f"🛠️ Ensure — collection={resp['collection']} dim={resp['dim']} OK"
574
+ except Exception as e:
575
+ LOG.exception("ensure UI error")
576
+ return f"❌ Ensure erreur: {e}"
577
+
578
  with gr.Blocks(title="Remote Indexer — Tests sans console", analytics_enabled=False) as ui:
579
  gr.Markdown("## 🔬 Remote Indexer — Tests sans console\n"
580
  "Wipe → Index 2 docs → Status → Count → Query\n"
 
600
  status_btn = gr.Button("📡 Status (refresh)")
601
  auto_chk = gr.Checkbox(False, label="⏱️ Auto-refresh status (2 s)")
602
 
603
+ with gr.Row():
604
+ ensure_dim = gr.Slider(16, 2048, value=128, step=16, label="ensure dim (debug)")
605
+ ensure_btn = gr.Button("🛠️ Ensure collection (debug)")
606
+
607
  with gr.Row():
608
  query_tb = gr.Textbox(label="Query text", value="alpha bravo")
609
  topk = gr.Slider(1, 20, value=5, step=1, label="top_k")
 
619
  timer.tick(ui_status, inputs=[jobid_tb], outputs=[out_log])
620
  auto_chk.change(lambda x: gr.update(active=x), inputs=auto_chk, outputs=timer)
621
 
622
+ ensure_btn.click(ui_ensure, inputs=[project_tb, ensure_dim], outputs=[out_log])
623
+
624
  # Monte l'UI Gradio
625
  app = gr.mount_gradio_app(fastapi_app, ui, path=UI_PATH)
626