chouchouvs commited on
Commit
75159bc
·
verified ·
1 Parent(s): edeec3c

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +133 -68
main.py CHANGED
@@ -7,9 +7,8 @@ import uuid
7
  import random
8
  import logging
9
  import hashlib
 
10
  import json
11
- import tempfile
12
- import threading
13
  from typing import List, Optional, Dict, Any, Tuple
14
 
15
  import numpy as np
@@ -72,15 +71,10 @@ WIPE_BEFORE_INDEX = os.getenv("WIPE_BEFORE_INDEX", "false").lower() in ("1","tru
72
  # --- Auth d’API de ce service (simple header) ---
73
  AUTH_TOKEN = os.getenv("REMOTE_INDEX_TOKEN", "").strip()
74
 
75
- # --- Jobs persistence / runtime behavior ---
76
- JOBS_FILE = os.getenv("JOBS_FILE", "/tmp/remote_index_jobs.json")
77
- RUN_INDEX_BLOCKING = os.getenv("RUN_INDEX_BLOCKING", "false").lower() in ("1","true","yes","on")
78
-
79
  LOG.info(f"Embeddings backend order = {EMB_BACKEND_ORDER}")
80
  LOG.info(f"HF pipeline URL = {HF_URL_PIPE}")
81
  LOG.info(f"HF models URL = {HF_URL_MODL}")
82
  LOG.info(f"VECTOR_STORE = {VECTOR_STORE}")
83
- LOG.info(f"JOBS_FILE = {JOBS_FILE} RUN_INDEX_BLOCKING = {RUN_INDEX_BLOCKING}")
84
 
85
  if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN:
86
  LOG.warning("DEEPINFRA_API_KEY manquant — tentatives DeepInfra échoueront.")
@@ -301,59 +295,19 @@ class StatusBody(BaseModel):
301
  job_id: str
302
 
303
  # ======================================================================================
304
- # Jobs store (persistence simple)
305
  # ======================================================================================
306
  JOBS: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": "...", "logs": [...], "created": ts}}
307
- _jobs_lock = threading.Lock()
308
-
309
- def _atomic_write(path: str, data: str):
310
- dirn = os.path.dirname(path) or "/tmp"
311
- fd, tmp_path = tempfile.mkstemp(dir=dirn)
312
- try:
313
- with os.fdopen(fd, "w", encoding="utf-8") as f:
314
- f.write(data)
315
- os.replace(tmp_path, path)
316
- finally:
317
- if os.path.exists(tmp_path):
318
- try:
319
- os.unlink(tmp_path)
320
- except Exception:
321
- pass
322
-
323
- def _load_jobs():
324
- global JOBS
325
- try:
326
- if os.path.exists(JOBS_FILE):
327
- with open(JOBS_FILE, "r", encoding="utf-8") as f:
328
- JOBS = json.load(f)
329
- LOG.info("Jobs loaded from %s (%d entries)", JOBS_FILE, len(JOBS))
330
- except Exception as e:
331
- LOG.warning("Could not load jobs file %s: %s", JOBS_FILE, e)
332
-
333
- def _save_jobs():
334
- try:
335
- with _jobs_lock:
336
- _atomic_write(JOBS_FILE, json.dumps(JOBS, ensure_ascii=False))
337
- except Exception as e:
338
- LOG.warning("Could not save jobs file %s: %s", JOBS_FILE, e)
339
-
340
- # load existing jobs at startup
341
- _load_jobs()
342
 
343
  def _append_log(job_id: str, line: str):
344
  job = JOBS.get(job_id)
345
- if job is not None:
346
- job.setdefault("logs", []).append(line)
347
- # keep logs bounded to avoid huge files
348
- if len(job["logs"]) > 5000:
349
- job["logs"] = job["logs"][-5000:]
350
- _save_jobs()
351
 
352
  def _set_status(job_id: str, status: str):
353
  job = JOBS.get(job_id)
354
- if job is not None:
355
  job["status"] = status
356
- _save_jobs()
357
 
358
  def _auth(x_auth: Optional[str]):
359
  if AUTH_TOKEN and (x_auth or "") != AUTH_TOKEN:
@@ -531,8 +485,50 @@ def _chunk_with_spans(text: str, size: int, overlap: int):
531
  if i >= n:
532
  break
533
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
534
  # ======================================================================================
535
- # Background task : indexation — VERSION CORRIGÉE (persistance JOBS)
536
  # ======================================================================================
537
 
538
  def run_index_job(job_id: str, req: IndexRequest):
@@ -672,21 +668,97 @@ def run_index_job(job_id: str, req: IndexRequest):
672
  continue
673
 
674
  ext = os.path.splitext(path)[1].lower()
675
- if ext not in TEXT_EXTS:
676
- _append_log(job_id, f"📁 Ignored: {path} (extension non supportée: {ext})")
677
- continue
678
-
679
- text = f.text.strip()
680
- if len(text) < 50: # ✅ Ignorer les fichiers trop courts
681
- _append_log(job_id, f"📄 Ignored: {path} (texte trop court: {len(text)} chars)")
682
  continue
683
 
684
  _append_log(job_id, f"📄 Processing: {path} ({len(text)} chars)")
685
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
686
  for ci, (start, end, chunk_txt) in enumerate(_chunk_with_spans(text, req.chunk_size, req.overlap)):
687
  chunk_txt = chunk_txt.strip()
688
  if len(chunk_txt) < 30: # ✅ Ignorer les chunks trop courts
689
  continue
 
 
 
 
 
690
  buf_chunks.append(chunk_txt)
691
  meta = {
692
  "path": path,
@@ -754,16 +826,9 @@ def start_index(req: IndexRequest, background_tasks: BackgroundTasks, x_auth_tok
754
  _check_backend_ready()
755
  job_id = uuid.uuid4().hex[:12]
756
  JOBS[job_id] = {"status": "queued", "logs": [], "created": time.time()}
757
- _save_jobs()
758
  LOG.info(f"Created job {job_id} for project {req.project_id}")
759
  _append_log(job_id, f"Job created: {job_id} project={req.project_id}")
760
-
761
- if RUN_INDEX_BLOCKING:
762
- # utile pour environnements où les BackgroundTasks ne persistent pas / sont killés
763
- run_index_job(job_id, req)
764
- else:
765
- background_tasks.add_task(run_index_job, job_id, req)
766
-
767
  return {"job_id": job_id}
768
 
769
  # --- 3 variantes pour /status ---
@@ -774,7 +839,7 @@ def status_path(job_id: str, x_auth_token: Optional[str] = Header(default=None))
774
  if not j:
775
  # Response JSON plus explicite pour faciliter le debug côté client
776
  raise HTTPException(status_code=404, detail={"error": "job inconnu", "advice": "POST /index to create a new job"})
777
- return {"status": j["status"], "logs": j.get("logs", [])[-1500:]}
778
 
779
  @app.get("/status")
780
  def status_query(job_id: str = Query(...), x_auth_token: Optional[str] = Header(default=None)):
 
7
  import random
8
  import logging
9
  import hashlib
10
+ import re
11
  import json
 
 
12
  from typing import List, Optional, Dict, Any, Tuple
13
 
14
  import numpy as np
 
71
  # --- Auth d’API de ce service (simple header) ---
72
  AUTH_TOKEN = os.getenv("REMOTE_INDEX_TOKEN", "").strip()
73
 
 
 
 
 
74
  LOG.info(f"Embeddings backend order = {EMB_BACKEND_ORDER}")
75
  LOG.info(f"HF pipeline URL = {HF_URL_PIPE}")
76
  LOG.info(f"HF models URL = {HF_URL_MODL}")
77
  LOG.info(f"VECTOR_STORE = {VECTOR_STORE}")
 
78
 
79
  if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN:
80
  LOG.warning("DEEPINFRA_API_KEY manquant — tentatives DeepInfra échoueront.")
 
295
  job_id: str
296
 
297
  # ======================================================================================
298
+ # Jobs store (mémoire)
299
  # ======================================================================================
300
  JOBS: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": "...", "logs": [...], "created": ts}}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
301
 
302
  def _append_log(job_id: str, line: str):
303
  job = JOBS.get(job_id)
304
+ if job:
305
+ job["logs"].append(line)
 
 
 
 
306
 
307
  def _set_status(job_id: str, status: str):
308
  job = JOBS.get(job_id)
309
+ if job:
310
  job["status"] = status
 
311
 
312
  def _auth(x_auth: Optional[str]):
313
  if AUTH_TOKEN and (x_auth or "") != AUTH_TOKEN:
 
485
  if i >= n:
486
  break
487
 
488
+ def _clean_chunk_text(text: str) -> str:
489
+ """
490
+ Nettoyage simple des fragments JSON / artefacts:
491
+ - supprime un champ "indexed_at" tronqué à la fin,
492
+ - supprime accolades/caractères isolés en début/fin,
493
+ - compacte sauts de ligne multiples,
494
+ - tente d'extraire des valeurs textuelles si le chunk ressemble fortement à du JSON.
495
+ """
496
+ if not text:
497
+ return text
498
+ t = text.strip()
499
+
500
+ # retirer un suffixe typique: , "indexed_at": "2025-..."}}
501
+ t = re.sub(r',\s*"indexed_at"\s*:\s*"[^"]*"\s*}+\s*$', '', t, flags=re.IGNORECASE)
502
+
503
+ # retirer d'autres clés timestamps communes à la fin si tronquées
504
+ t = re.sub(r',\s*"(created_at|timestamp|time|date)"\s*:\s*"[^"]*"\s*}+\s*$', '', t, flags=re.IGNORECASE)
505
+
506
+ # retirer accolades ou crochets seuls en début/fin
507
+ t = re.sub(r'^[\s\]\}\,]+', '', t)
508
+ t = re.sub(r'[\s\]\}\,]+$', '', t)
509
+
510
+ # si le chunk ressemble majoritairement à du JSON (beaucoup de ":" ou "{"), essayer d'en extraire les valeurs textuelles
511
+ if t.count(':') >= 3 and (t.count('{') + t.count('}')) >= 1:
512
+ try:
513
+ j = json.loads(t)
514
+ if isinstance(j, dict):
515
+ # concatène les valeurs textuelles pertinentes
516
+ vals = []
517
+ for v in j.values():
518
+ if isinstance(v, (str, int, float)):
519
+ vals.append(str(v))
520
+ if vals:
521
+ t = " ".join(vals)
522
+ except Exception:
523
+ # ignore, on garde t tel quel
524
+ pass
525
+
526
+ # compacter sauts de ligne
527
+ t = re.sub(r'\n{3,}', '\n\n', t)
528
+ return t.strip()
529
+
530
  # ======================================================================================
531
+ # Background task : indexation — VERSION CORRIGÉE
532
  # ======================================================================================
533
 
534
  def run_index_job(job_id: str, req: IndexRequest):
 
668
  continue
669
 
670
  ext = os.path.splitext(path)[1].lower()
671
+ text = f.text or ""
672
+ if len(text.strip()) < 50: # Ignorer les fichiers trop courts
673
+ _append_log(job_id, f"📄 Ignored: {path} (texte trop court: {len(text.strip())} chars)")
 
 
 
 
674
  continue
675
 
676
  _append_log(job_id, f"📄 Processing: {path} ({len(text)} chars)")
677
 
678
+ # --- traitement spécial JSON / NDJSON ---
679
+ if ext in {".json"} or path.lower().endswith(".ndjson"):
680
+ # essayer JSON complet
681
+ handled = False
682
+ try:
683
+ parsed = json.loads(text)
684
+ # si c'est une liste -> indexer chaque entrée séparément
685
+ if isinstance(parsed, list):
686
+ for idx, obj in enumerate(parsed):
687
+ if isinstance(obj, dict):
688
+ s = " ".join(str(v) for v in obj.values() if isinstance(v, (str, int, float)))
689
+ else:
690
+ s = str(obj)
691
+ s = _clean_chunk_text(s)
692
+ if len(s) < 30:
693
+ continue
694
+ meta = {"path": path, "chunk": idx, "start": 0, "end": len(s)}
695
+ if req.store_text:
696
+ meta["text"] = s
697
+ buf_chunks.append(s); buf_metas.append(meta)
698
+ if len(buf_chunks) >= req.batch_size:
699
+ _flush()
700
+ handled = True
701
+ elif isinstance(parsed, dict):
702
+ s = " ".join(str(v) for v in parsed.values() if isinstance(v, (str, int, float)))
703
+ s = _clean_chunk_text(s)
704
+ if len(s) >= 30:
705
+ meta = {"path": path, "chunk": 0, "start": 0, "end": len(s)}
706
+ if req.store_text:
707
+ meta["text"] = s
708
+ buf_chunks.append(s); buf_metas.append(meta)
709
+ if len(buf_chunks) >= req.batch_size:
710
+ _flush()
711
+ handled = True
712
+ except Exception:
713
+ # fallback NDJSON: une ligne == un JSON
714
+ try:
715
+ lines = [L for L in text.splitlines() if L.strip()]
716
+ for li, line in enumerate(lines):
717
+ try:
718
+ obj = json.loads(line)
719
+ if isinstance(obj, dict):
720
+ s = " ".join(str(v) for v in obj.values() if isinstance(v, (str, int, float)))
721
+ else:
722
+ s = str(obj)
723
+ s = _clean_chunk_text(s)
724
+ if len(s) < 30:
725
+ continue
726
+ meta = {"path": path, "chunk": li, "start": 0, "end": len(s)}
727
+ if req.store_text:
728
+ meta["text"] = s
729
+ buf_chunks.append(s); buf_metas.append(meta)
730
+ if len(buf_chunks) >= req.batch_size:
731
+ _flush()
732
+ except Exception:
733
+ # ligne non JSON -> indexer comme texte si longue
734
+ sl = line.strip()
735
+ if len(sl) >= 30:
736
+ sl = _clean_chunk_text(sl)
737
+ meta = {"path": path, "chunk": li, "start": 0, "end": len(sl)}
738
+ if req.store_text:
739
+ meta["text"] = sl
740
+ buf_chunks.append(sl); buf_metas.append(meta)
741
+ if len(buf_chunks) >= req.batch_size:
742
+ _flush()
743
+ handled = True
744
+ except Exception:
745
+ handled = False
746
+
747
+ if handled:
748
+ _flush()
749
+ _append_log(job_id, f"File done: {path}")
750
+ continue # passe au fichier suivant
751
+
752
+ # --- traitement normal pour fichiers texte ---
753
  for ci, (start, end, chunk_txt) in enumerate(_chunk_with_spans(text, req.chunk_size, req.overlap)):
754
  chunk_txt = chunk_txt.strip()
755
  if len(chunk_txt) < 30: # ✅ Ignorer les chunks trop courts
756
  continue
757
+ # nettoyage pour éviter artefacts JSON / timestamps
758
+ chunk_txt = _clean_chunk_text(chunk_txt)
759
+ if len(chunk_txt) < 30:
760
+ continue
761
+
762
  buf_chunks.append(chunk_txt)
763
  meta = {
764
  "path": path,
 
826
  _check_backend_ready()
827
  job_id = uuid.uuid4().hex[:12]
828
  JOBS[job_id] = {"status": "queued", "logs": [], "created": time.time()}
 
829
  LOG.info(f"Created job {job_id} for project {req.project_id}")
830
  _append_log(job_id, f"Job created: {job_id} project={req.project_id}")
831
+ background_tasks.add_task(run_index_job, job_id, req)
 
 
 
 
 
 
832
  return {"job_id": job_id}
833
 
834
  # --- 3 variantes pour /status ---
 
839
  if not j:
840
  # Response JSON plus explicite pour faciliter le debug côté client
841
  raise HTTPException(status_code=404, detail={"error": "job inconnu", "advice": "POST /index to create a new job"})
842
+ return {"status": j["status"], "logs": j["logs"][-1500:]}
843
 
844
  @app.get("/status")
845
  def status_query(job_id: str = Query(...), x_auth_token: Optional[str] = Header(default=None)):