chouchouvs commited on
Commit
a931c35
·
verified ·
1 Parent(s): a93f9b3

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +134 -87
main.py CHANGED
@@ -1,30 +1,29 @@
1
  # -*- coding: utf-8 -*-
2
  """
3
- HF Space - main.py de substitution pour tests Qdrant / indexation minimale
4
 
5
  Endpoints:
6
  - GET / → redirige vers UI_PATH (défaut: /ui)
7
  - GET /ui (UI_PATH) → UI Gradio
8
  - GET /health → healthcheck
9
  - GET /api → infos service
 
10
  - POST /wipe?project_id=XXX → supprime la collection Qdrant
11
  - POST /index → lance un job d'indexation
12
  - GET /status/{job_id} → état + logs du job
13
  - GET /collections/{proj}/count → count points dans Qdrant
14
  - POST /query → recherche sémantique
15
 
16
- ENV attendues :
17
  - QDRANT_URL, QDRANT_API_KEY (requis pour upsert)
18
  - COLLECTION_PREFIX (défaut "proj_")
19
  - EMB_PROVIDER ("hf" par défaut, "dummy" sinon)
20
  - HF_EMBED_MODEL (défaut "BAAI/bge-m3")
21
  - HUGGINGFACEHUB_API_TOKEN (si EMB_PROVIDER=hf)
 
22
  - LOG_LEVEL (défaut DEBUG)
23
  - PORT (fourni par HF, défaut 7860)
24
  - UI_PATH (défaut "/ui")
25
-
26
- Dépendances suggérées :
27
- fastapi>=0.111, uvicorn>=0.30, httpx>=0.27, pydantic>=2.7, gradio>=4.43, numpy>=2.0
28
  """
29
 
30
  from __future__ import annotations
@@ -63,13 +62,17 @@ EMB_PROVIDER = os.getenv("EMB_PROVIDER", "hf").lower() # "hf" | "dummy"
63
  HF_EMBED_MODEL = os.getenv("HF_EMBED_MODEL", "BAAI/bge-m3")
64
  HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN", "")
65
 
 
 
66
  UI_PATH = os.getenv("UI_PATH", "/ui") # UI montée ici par défaut
67
 
68
  if not QDRANT_URL or not QDRANT_API_KEY:
69
  LOG.warning("QDRANT_URL / QDRANT_API_KEY non fournis : l'upsert échouera.")
70
 
71
  if EMB_PROVIDER == "hf" and not HF_TOKEN:
72
- LOG.warning("EMB_PROVIDER=hf sans HUGGINGFACEHUB_API_TOKEN. Utilise EMB_PROVIDER=dummy pour tester sans token.")
 
 
73
 
74
  # ------------------------------------------------------------------------------
75
  # Schémas Pydantic
@@ -154,6 +157,9 @@ def chunk_text(text: str, chunk_size: int, overlap: int) -> List[Tuple[int, int,
154
  i = j
155
  return res
156
 
 
 
 
157
  async def ensure_collection(client: httpx.AsyncClient, coll: str, vector_size: int) -> None:
158
  """Crée la collection Qdrant (distance=Cosine), ou la recrée si dim mismatch."""
159
  url = f"{QDRANT_URL}/collections/{coll}"
@@ -214,7 +220,9 @@ async def embed_hf(client: httpx.AsyncClient, texts: List[str], model: str = HF_
214
  payload = {"inputs": texts, "options": {"wait_for_model": True}}
215
  r = await client.post(url, headers=headers, json=payload, timeout=120)
216
  if r.status_code != 200:
217
- raise HTTPException(status_code=502, detail=f"HF Inference error: {r.text}")
 
 
218
  data = r.json()
219
  embeddings: List[List[float]] = []
220
  if isinstance(data, list):
@@ -237,89 +245,107 @@ def embed_dummy(texts: List[str], dim: int = 128) -> List[List[float]]:
237
  return out
238
 
239
  async def embed_texts(client: httpx.AsyncClient, texts: List[str]) -> List[List[float]]:
 
240
  if EMB_PROVIDER == "hf":
241
- return await embed_hf(client, texts)
 
 
 
 
 
 
242
  return embed_dummy(texts, dim=128)
243
 
244
  # ------------------------------------------------------------------------------
245
- # Pipeline d'indexation
246
  # ------------------------------------------------------------------------------
247
  async def run_index_job(job: JobState, req: IndexRequest) -> None:
248
- job.stage = "embedding"
249
- job.total_files = len(req.files)
250
- job.log(f"Index start project={req.project_id} files={len(req.files)} chunk_size={req.chunk_size} overlap={req.overlap} batch_size={req.batch_size} store_text={req.store_text}")
251
-
252
- # Dédup global par hash du texte de fichier
253
- file_hashes = [hash8(f.text) for f in req.files]
254
- uniq = len(set(file_hashes))
255
- if uniq != len(file_hashes):
256
- job.log(f"Attention: {len(file_hashes)-uniq} fichier(s) ont un texte identique (hash dupliqué).")
257
-
258
- # Chunking
259
- records: List[Dict[str, Any]] = []
260
- for f in req.files:
261
- chunks = chunk_text(f.text, req.chunk_size, req.overlap)
262
- if not chunks:
263
- job.log(f"{f.path}: 0 chunk (trop court ou vide)")
264
- for idx, (start, end, ch) in enumerate(chunks):
265
- payload = {"path": f.path, "chunk": idx, "start": start, "end": end}
266
- if req.store_text:
267
- payload["text"] = ch
268
- records.append({"payload": payload, "raw": ch})
269
- job.total_chunks = len(records)
270
- job.log(f"Total chunks = {job.total_chunks}")
271
-
272
- if job.total_chunks == 0:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
273
  job.stage = "failed"
274
- job.errors.append("Aucun chunk à indexer.")
275
  job.finished_at = time.time()
276
- return
277
-
278
- async with httpx.AsyncClient(timeout=120) as client:
279
- # Warmup dim
280
- warmup_vec = (await embed_texts(client, [records[0]["raw"]]))[0]
281
- vec_dim = len(warmup_vec)
282
- job.log(f"Warmup embeddings dim={vec_dim} provider={EMB_PROVIDER}")
283
-
284
- # Collection Qdrant
285
- coll = f"{COLLECTION_PREFIX}{req.project_id}"
286
- await ensure_collection(client, coll, vector_size=vec_dim)
287
-
288
- job.stage = "upserting"
289
- batch_points: List[Dict[str, Any]] = []
290
-
291
- async def flush_batch():
292
- nonlocal batch_points
293
- if not batch_points:
294
- return 0
295
- added = await qdrant_upsert(client, coll, batch_points)
296
- job.upserted += added
297
- job.log(f"+{added} points upsert (total={job.upserted})")
298
- batch_points = []
299
- return added
300
-
301
- EMB_BATCH = max(8, min(64, req.batch_size * 2))
302
- i = 0
303
- while i < len(records):
304
- sub = records[i : i + EMB_BATCH]
305
- texts = [r["raw"] for r in sub]
306
- vecs = await embed_texts(client, texts)
307
- if len(vecs) != len(sub):
308
- raise HTTPException(status_code=500, detail="Embedding batch size mismatch")
309
- job.embedded += len(vecs)
310
-
311
- for r, v in zip(sub, vecs):
312
- point = {"id": str(uuid.uuid4()), "vector": v, "payload": r["payload"]}
313
- batch_points.append(point)
314
- if len(batch_points) >= req.batch_size:
315
- await flush_batch()
316
- i += EMB_BATCH
317
-
318
- await flush_batch()
319
-
320
- job.stage = "done"
321
- job.finished_at = time.time()
322
- job.log("Index job terminé.")
323
 
324
  # ------------------------------------------------------------------------------
325
  # FastAPI app + endpoints
@@ -338,9 +364,29 @@ async def health():
338
 
339
  @fastapi_app.get("/api")
340
  async def api_info():
341
- return {"ok": True, "service": "remote-indexer-min", "qdrant": bool(QDRANT_URL), "emb_provider": EMB_PROVIDER, "ui_path": UI_PATH}
342
-
343
- # Redirige "/" → UI_PATH (ex.: /ui). Ça évite tout conflit avec la route racine.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
344
  @fastapi_app.get("/")
345
  async def root_redirect():
346
  return RedirectResponse(url=UI_PATH, status_code=307)
@@ -474,8 +520,9 @@ with gr.Blocks(title="Remote Indexer - Minimal Test", analytics_enabled=False) a
474
  gr.Markdown("## 🔬 Remote Indexer — Tests sans console\n"
475
  "Wipe → Index 2 docs → Status → Count → Query\n"
476
  f"- **Embeddings**: `{EMB_PROVIDER}` (model: `{HF_EMBED_MODEL}`)\n"
477
- f"- **Qdrant**: `{'OK' if QDRANT_URL else 'ABSENT'}`\n"
478
- "Astuce: si pas de token HF, mets `EMB_PROVIDER=dummy`.")
 
479
  with gr.Row():
480
  project_tb = gr.Textbox(label="Project ID", value="DEEPWEB")
481
  jobid_tb = gr.Textbox(label="Job ID (pour Status)", value="", interactive=True)
 
1
  # -*- coding: utf-8 -*-
2
  """
3
+ HF Space - main.py de substitution pour tests Qdrant / indexation minimale (robuste)
4
 
5
  Endpoints:
6
  - GET / → redirige vers UI_PATH (défaut: /ui)
7
  - GET /ui (UI_PATH) → UI Gradio
8
  - GET /health → healthcheck
9
  - GET /api → infos service
10
+ - GET /debug/env → aperçu config (sans secrets)
11
  - POST /wipe?project_id=XXX → supprime la collection Qdrant
12
  - POST /index → lance un job d'indexation
13
  - GET /status/{job_id} → état + logs du job
14
  - GET /collections/{proj}/count → count points dans Qdrant
15
  - POST /query → recherche sémantique
16
 
17
+ ENV:
18
  - QDRANT_URL, QDRANT_API_KEY (requis pour upsert)
19
  - COLLECTION_PREFIX (défaut "proj_")
20
  - EMB_PROVIDER ("hf" par défaut, "dummy" sinon)
21
  - HF_EMBED_MODEL (défaut "BAAI/bge-m3")
22
  - HUGGINGFACEHUB_API_TOKEN (si EMB_PROVIDER=hf)
23
+ - EMB_FALLBACK_TO_DUMMY (true/false) → si vrai, bascule dummy si HF indisponible
24
  - LOG_LEVEL (défaut DEBUG)
25
  - PORT (fourni par HF, défaut 7860)
26
  - UI_PATH (défaut "/ui")
 
 
 
27
  """
28
 
29
  from __future__ import annotations
 
62
  HF_EMBED_MODEL = os.getenv("HF_EMBED_MODEL", "BAAI/bge-m3")
63
  HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN", "")
64
 
65
+ EMB_FALLBACK_TO_DUMMY = os.getenv("EMB_FALLBACK_TO_DUMMY", "false").lower() in ("1","true","yes","on")
66
+
67
  UI_PATH = os.getenv("UI_PATH", "/ui") # UI montée ici par défaut
68
 
69
  if not QDRANT_URL or not QDRANT_API_KEY:
70
  LOG.warning("QDRANT_URL / QDRANT_API_KEY non fournis : l'upsert échouera.")
71
 
72
  if EMB_PROVIDER == "hf" and not HF_TOKEN:
73
+ LOG.warning("EMB_PROVIDER=hf sans HUGGINGFACEHUB_API_TOKEN. "
74
+ "→ soit définis le token, soit mets EMB_PROVIDER=dummy, "
75
+ "soit active EMB_FALLBACK_TO_DUMMY=true.")
76
 
77
  # ------------------------------------------------------------------------------
78
  # Schémas Pydantic
 
157
  i = j
158
  return res
159
 
160
+ # ------------------------------------------------------------------------------
161
+ # Qdrant helpers
162
+ # ------------------------------------------------------------------------------
163
  async def ensure_collection(client: httpx.AsyncClient, coll: str, vector_size: int) -> None:
164
  """Crée la collection Qdrant (distance=Cosine), ou la recrée si dim mismatch."""
165
  url = f"{QDRANT_URL}/collections/{coll}"
 
220
  payload = {"inputs": texts, "options": {"wait_for_model": True}}
221
  r = await client.post(url, headers=headers, json=payload, timeout=120)
222
  if r.status_code != 200:
223
+ detail = r.text
224
+ LOG.error(f"HF Inference error {r.status_code}: {detail[:400]}")
225
+ raise HTTPException(status_code=502, detail=f"HF Inference error {r.status_code}: {detail}")
226
  data = r.json()
227
  embeddings: List[List[float]] = []
228
  if isinstance(data, list):
 
245
  return out
246
 
247
  async def embed_texts(client: httpx.AsyncClient, texts: List[str]) -> List[List[float]]:
248
+ # Fallback optionnel si HF indisponible
249
  if EMB_PROVIDER == "hf":
250
+ try:
251
+ return await embed_hf(client, texts)
252
+ except Exception as e:
253
+ if EMB_FALLBACK_TO_DUMMY:
254
+ LOG.warning(f"Fallback embeddings → dummy (cause: {e})")
255
+ return embed_dummy(texts, dim=128)
256
+ raise
257
  return embed_dummy(texts, dim=128)
258
 
259
  # ------------------------------------------------------------------------------
260
+ # Pipeline d'indexation (robuste)
261
  # ------------------------------------------------------------------------------
262
  async def run_index_job(job: JobState, req: IndexRequest) -> None:
263
+ try:
264
+ job.stage = "embedding"
265
+ job.total_files = len(req.files)
266
+ job.log(
267
+ f"Index start project={req.project_id} files={len(req.files)} "
268
+ f"chunk_size={req.chunk_size} overlap={req.overlap} batch_size={req.batch_size} store_text={req.store_text} "
269
+ f"provider={EMB_PROVIDER} model={HF_EMBED_MODEL}"
270
+ )
271
+
272
+ # Dédup global par hash du texte de fichier
273
+ file_hashes = [hash8(f.text) for f in req.files]
274
+ uniq = len(set(file_hashes))
275
+ if uniq != len(file_hashes):
276
+ job.log(f"Attention: {len(file_hashes)-uniq} fichier(s) ont un texte identique (hash dupliqué).")
277
+
278
+ # Chunking
279
+ records: List[Dict[str, Any]] = []
280
+ for f in req.files:
281
+ chunks = chunk_text(f.text, req.chunk_size, req.overlap)
282
+ if not chunks:
283
+ job.log(f"{f.path}: 0 chunk (trop court ou vide)")
284
+ for idx, (start, end, ch) in enumerate(chunks):
285
+ payload = {"path": f.path, "chunk": idx, "start": start, "end": end}
286
+ if req.store_text:
287
+ payload["text"] = ch
288
+ records.append({"payload": payload, "raw": ch})
289
+ job.total_chunks = len(records)
290
+ job.log(f"Total chunks = {job.total_chunks}")
291
+
292
+ if job.total_chunks == 0:
293
+ job.stage = "failed"
294
+ job.errors.append("Aucun chunk à indexer.")
295
+ job.finished_at = time.time()
296
+ return
297
+
298
+ async with httpx.AsyncClient(timeout=120) as client:
299
+ # Warmup dim
300
+ warmup_vec = (await embed_texts(client, [records[0]["raw"]]))[0]
301
+ vec_dim = len(warmup_vec)
302
+ job.log(f"Warmup embeddings dim={vec_dim}")
303
+
304
+ # Collection Qdrant
305
+ coll = f"{COLLECTION_PREFIX}{req.project_id}"
306
+ await ensure_collection(client, coll, vector_size=vec_dim)
307
+ job.log(f"Collection prête: {coll} (dim={vec_dim})")
308
+
309
+ job.stage = "upserting"
310
+ batch_points: List[Dict[str, Any]] = []
311
+
312
+ async def flush_batch():
313
+ nonlocal batch_points
314
+ if not batch_points:
315
+ return 0
316
+ added = await qdrant_upsert(client, coll, batch_points)
317
+ job.upserted += added
318
+ job.log(f"+{added} points upsert (total={job.upserted})")
319
+ batch_points = []
320
+ return added
321
+
322
+ EMB_BATCH = max(8, min(64, req.batch_size * 2))
323
+ i = 0
324
+ while i < len(records):
325
+ sub = records[i : i + EMB_BATCH]
326
+ texts = [r["raw"] for r in sub]
327
+ vecs = await embed_texts(client, texts)
328
+ if len(vecs) != len(sub):
329
+ raise HTTPException(status_code=500, detail="Embedding batch size mismatch")
330
+ job.embedded += len(vecs)
331
+
332
+ for r, v in zip(sub, vecs):
333
+ point = {"id": str(uuid.uuid4()), "vector": v, "payload": r["payload"]}
334
+ batch_points.append(point)
335
+ if len(batch_points) >= req.batch_size:
336
+ await flush_batch()
337
+ i += EMB_BATCH
338
+
339
+ await flush_batch()
340
+
341
+ job.stage = "done"
342
+ job.finished_at = time.time()
343
+ job.log("Index job terminé.")
344
+ except Exception as e:
345
  job.stage = "failed"
346
+ job.errors.append(str(e))
347
  job.finished_at = time.time()
348
+ job.log(f"❌ Exception: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
349
 
350
  # ------------------------------------------------------------------------------
351
  # FastAPI app + endpoints
 
364
 
365
  @fastapi_app.get("/api")
366
  async def api_info():
367
+ return {
368
+ "ok": True,
369
+ "service": "remote-indexer-min",
370
+ "qdrant": bool(QDRANT_URL),
371
+ "emb_provider": EMB_PROVIDER,
372
+ "hf_model": HF_EMBED_MODEL,
373
+ "ui_path": UI_PATH,
374
+ "fallback_to_dummy": EMB_FALLBACK_TO_DUMMY,
375
+ }
376
+
377
+ @fastapi_app.get("/debug/env")
378
+ async def debug_env():
379
+ return {
380
+ "qdrant_url_set": bool(QDRANT_URL),
381
+ "qdrant_key_set": bool(QDRANT_API_KEY),
382
+ "emb_provider": EMB_PROVIDER,
383
+ "hf_model": HF_EMBED_MODEL,
384
+ "hf_token_set": bool(HF_TOKEN),
385
+ "fallback_to_dummy": EMB_FALLBACK_TO_DUMMY,
386
+ "collection_prefix": COLLECTION_PREFIX,
387
+ }
388
+
389
+ # Redirige "/" → UI_PATH (ex.: /ui).
390
  @fastapi_app.get("/")
391
  async def root_redirect():
392
  return RedirectResponse(url=UI_PATH, status_code=307)
 
520
  gr.Markdown("## 🔬 Remote Indexer — Tests sans console\n"
521
  "Wipe → Index 2 docs → Status → Count → Query\n"
522
  f"- **Embeddings**: `{EMB_PROVIDER}` (model: `{HF_EMBED_MODEL}`)\n"
523
+ f"- **Token HF présent**: `{'oui' if bool(HF_TOKEN) else 'non'}` — "
524
+ f"**Fallback dummy**: `{'on' if EMB_FALLBACK_TO_DUMMY else 'off'}`\n"
525
+ f"- **Qdrant**: `{'OK' if QDRANT_URL else 'ABSENT'}`")
526
  with gr.Row():
527
  project_tb = gr.Textbox(label="Project ID", value="DEEPWEB")
528
  jobid_tb = gr.Textbox(label="Job ID (pour Status)", value="", interactive=True)