chouchouvs commited on
Commit
edeec3c
·
verified ·
1 Parent(s): cfba1b0

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +62 -7
main.py CHANGED
@@ -7,6 +7,9 @@ import uuid
7
  import random
8
  import logging
9
  import hashlib
 
 
 
10
  from typing import List, Optional, Dict, Any, Tuple
11
 
12
  import numpy as np
@@ -69,10 +72,15 @@ WIPE_BEFORE_INDEX = os.getenv("WIPE_BEFORE_INDEX", "false").lower() in ("1","tru
69
  # --- Auth d’API de ce service (simple header) ---
70
  AUTH_TOKEN = os.getenv("REMOTE_INDEX_TOKEN", "").strip()
71
 
 
 
 
 
72
  LOG.info(f"Embeddings backend order = {EMB_BACKEND_ORDER}")
73
  LOG.info(f"HF pipeline URL = {HF_URL_PIPE}")
74
  LOG.info(f"HF models URL = {HF_URL_MODL}")
75
  LOG.info(f"VECTOR_STORE = {VECTOR_STORE}")
 
76
 
77
  if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN:
78
  LOG.warning("DEEPINFRA_API_KEY manquant — tentatives DeepInfra échoueront.")
@@ -293,19 +301,59 @@ class StatusBody(BaseModel):
293
  job_id: str
294
 
295
  # ======================================================================================
296
- # Jobs store (mémoire)
297
  # ======================================================================================
298
  JOBS: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": "...", "logs": [...], "created": ts}}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
299
 
300
  def _append_log(job_id: str, line: str):
301
  job = JOBS.get(job_id)
302
- if job:
303
- job["logs"].append(line)
 
 
 
 
304
 
305
  def _set_status(job_id: str, status: str):
306
  job = JOBS.get(job_id)
307
- if job:
308
  job["status"] = status
 
309
 
310
  def _auth(x_auth: Optional[str]):
311
  if AUTH_TOKEN and (x_auth or "") != AUTH_TOKEN:
@@ -484,7 +532,7 @@ def _chunk_with_spans(text: str, size: int, overlap: int):
484
  break
485
 
486
  # ======================================================================================
487
- # Background task : indexation — VERSION CORRIGÉE
488
  # ======================================================================================
489
 
490
  def run_index_job(job_id: str, req: IndexRequest):
@@ -706,9 +754,16 @@ def start_index(req: IndexRequest, background_tasks: BackgroundTasks, x_auth_tok
706
  _check_backend_ready()
707
  job_id = uuid.uuid4().hex[:12]
708
  JOBS[job_id] = {"status": "queued", "logs": [], "created": time.time()}
 
709
  LOG.info(f"Created job {job_id} for project {req.project_id}")
710
  _append_log(job_id, f"Job created: {job_id} project={req.project_id}")
711
- background_tasks.add_task(run_index_job, job_id, req)
 
 
 
 
 
 
712
  return {"job_id": job_id}
713
 
714
  # --- 3 variantes pour /status ---
@@ -719,7 +774,7 @@ def status_path(job_id: str, x_auth_token: Optional[str] = Header(default=None))
719
  if not j:
720
  # Response JSON plus explicite pour faciliter le debug côté client
721
  raise HTTPException(status_code=404, detail={"error": "job inconnu", "advice": "POST /index to create a new job"})
722
- return {"status": j["status"], "logs": j["logs"][-1500:]}
723
 
724
  @app.get("/status")
725
  def status_query(job_id: str = Query(...), x_auth_token: Optional[str] = Header(default=None)):
 
7
  import random
8
  import logging
9
  import hashlib
10
+ import json
11
+ import tempfile
12
+ import threading
13
  from typing import List, Optional, Dict, Any, Tuple
14
 
15
  import numpy as np
 
72
  # --- Auth d’API de ce service (simple header) ---
73
  AUTH_TOKEN = os.getenv("REMOTE_INDEX_TOKEN", "").strip()
74
 
75
+ # --- Jobs persistence / runtime behavior ---
76
+ JOBS_FILE = os.getenv("JOBS_FILE", "/tmp/remote_index_jobs.json")
77
+ RUN_INDEX_BLOCKING = os.getenv("RUN_INDEX_BLOCKING", "false").lower() in ("1","true","yes","on")
78
+
79
  LOG.info(f"Embeddings backend order = {EMB_BACKEND_ORDER}")
80
  LOG.info(f"HF pipeline URL = {HF_URL_PIPE}")
81
  LOG.info(f"HF models URL = {HF_URL_MODL}")
82
  LOG.info(f"VECTOR_STORE = {VECTOR_STORE}")
83
+ LOG.info(f"JOBS_FILE = {JOBS_FILE} RUN_INDEX_BLOCKING = {RUN_INDEX_BLOCKING}")
84
 
85
  if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN:
86
  LOG.warning("DEEPINFRA_API_KEY manquant — tentatives DeepInfra échoueront.")
 
301
  job_id: str
302
 
303
  # ======================================================================================
304
+ # Jobs store (persistence simple)
305
  # ======================================================================================
306
  JOBS: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": "...", "logs": [...], "created": ts}}
307
+ _jobs_lock = threading.Lock()
308
+
309
+ def _atomic_write(path: str, data: str):
310
+ dirn = os.path.dirname(path) or "/tmp"
311
+ fd, tmp_path = tempfile.mkstemp(dir=dirn)
312
+ try:
313
+ with os.fdopen(fd, "w", encoding="utf-8") as f:
314
+ f.write(data)
315
+ os.replace(tmp_path, path)
316
+ finally:
317
+ if os.path.exists(tmp_path):
318
+ try:
319
+ os.unlink(tmp_path)
320
+ except Exception:
321
+ pass
322
+
323
+ def _load_jobs():
324
+ global JOBS
325
+ try:
326
+ if os.path.exists(JOBS_FILE):
327
+ with open(JOBS_FILE, "r", encoding="utf-8") as f:
328
+ JOBS = json.load(f)
329
+ LOG.info("Jobs loaded from %s (%d entries)", JOBS_FILE, len(JOBS))
330
+ except Exception as e:
331
+ LOG.warning("Could not load jobs file %s: %s", JOBS_FILE, e)
332
+
333
+ def _save_jobs():
334
+ try:
335
+ with _jobs_lock:
336
+ _atomic_write(JOBS_FILE, json.dumps(JOBS, ensure_ascii=False))
337
+ except Exception as e:
338
+ LOG.warning("Could not save jobs file %s: %s", JOBS_FILE, e)
339
+
340
+ # load existing jobs at startup
341
+ _load_jobs()
342
 
343
  def _append_log(job_id: str, line: str):
344
  job = JOBS.get(job_id)
345
+ if job is not None:
346
+ job.setdefault("logs", []).append(line)
347
+ # keep logs bounded to avoid huge files
348
+ if len(job["logs"]) > 5000:
349
+ job["logs"] = job["logs"][-5000:]
350
+ _save_jobs()
351
 
352
  def _set_status(job_id: str, status: str):
353
  job = JOBS.get(job_id)
354
+ if job is not None:
355
  job["status"] = status
356
+ _save_jobs()
357
 
358
  def _auth(x_auth: Optional[str]):
359
  if AUTH_TOKEN and (x_auth or "") != AUTH_TOKEN:
 
532
  break
533
 
534
  # ======================================================================================
535
+ # Background task : indexation — VERSION CORRIGÉE (persistance JOBS)
536
  # ======================================================================================
537
 
538
  def run_index_job(job_id: str, req: IndexRequest):
 
754
  _check_backend_ready()
755
  job_id = uuid.uuid4().hex[:12]
756
  JOBS[job_id] = {"status": "queued", "logs": [], "created": time.time()}
757
+ _save_jobs()
758
  LOG.info(f"Created job {job_id} for project {req.project_id}")
759
  _append_log(job_id, f"Job created: {job_id} project={req.project_id}")
760
+
761
+ if RUN_INDEX_BLOCKING:
762
+ # utile pour environnements où les BackgroundTasks ne persistent pas / sont killés
763
+ run_index_job(job_id, req)
764
+ else:
765
+ background_tasks.add_task(run_index_job, job_id, req)
766
+
767
  return {"job_id": job_id}
768
 
769
  # --- 3 variantes pour /status ---
 
774
  if not j:
775
  # Response JSON plus explicite pour faciliter le debug côté client
776
  raise HTTPException(status_code=404, detail={"error": "job inconnu", "advice": "POST /index to create a new job"})
777
+ return {"status": j["status"], "logs": j.get("logs", [])[-1500:]}
778
 
779
  @app.get("/status")
780
  def status_query(job_id: str = Query(...), x_auth_token: Optional[str] = Header(default=None)):