Spaces:
Sleeping
Sleeping
| # app/app.py | |
| import os, sys, time, csv, re | |
| from pathlib import Path | |
| import yaml | |
| import pandas as pd | |
| import numpy as np | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") | |
| print("ANTHROPIC in env:", bool(os.getenv("ANTHROPIC_API_KEY")), flush=True) | |
| if not ANTHROPIC_API_KEY: | |
| raise SystemExit("Missing ANTHROPIC_API_KEY in environment or .env file.") | |
| # os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") | |
| # os.environ.setdefault("TRANSFORMERS_OFFLINE", "1") | |
| # --- Resolve paths (works both as .py and PyInstaller .exe) | |
| APP_DIR = Path(__file__).resolve().parent # .../app | |
| if hasattr(sys, "_MEIPASS"): | |
| ROOT = Path(sys._MEIPASS) | |
| else: | |
| ROOT = APP_DIR.parent | |
| CFG_PATH = ROOT / "app" / "config" / "app.yaml" | |
| MODELS_DIR = ROOT / "models" | |
| INDEX_DIR = ROOT / "outputs" / "index" | |
| LOGS_DIR = ROOT / "local_logs" | |
| DOCS_DIR = ROOT / "docs" | |
| LOGS_DIR.mkdir(parents=True, exist_ok=True) | |
| # --- Read config | |
| DEFAULT_CFG = { | |
| "retrieval": {"top_k": 12, "evidence_shown": 3, "answerability_threshold": 0.2}, | |
| "generator": { | |
| "enabled_default": False, | |
| "use_top_evidence": 5, | |
| "temperature": 0.1, | |
| "max_answer_sentences": 20, | |
| "n_ctx": 4096, | |
| "threads": max(2, (os.cpu_count() or 4) - 1) | |
| }, | |
| "models": { | |
| "embedding_model": "sentence-transformers/all-MiniLM-L6-v2", | |
| "embedding_local_dir": None, | |
| "anthropic_model": "claude-3-5-sonnet-latest" | |
| }, | |
| "ui": {"show_online_badge": True, "performance_mode": "standard"} # quick|standard | |
| } | |
| cfg = DEFAULT_CFG | |
| if CFG_PATH.exists(): | |
| cfg = {**cfg, **yaml.safe_load(CFG_PATH.read_text(encoding="utf-8"))} | |
| # --- Load FAISS index & embeddings | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| META_PATH = INDEX_DIR / "meta.parquet" | |
| FAISS_PATH = INDEX_DIR / "chunks.faiss" | |
| if not META_PATH.exists() or not FAISS_PATH.exists(): | |
| raise SystemExit("Index not found. Ensure outputs/index/meta.parquet and chunks.faiss exist.") | |
| df = pd.read_parquet(META_PATH) | |
| df["text"] = df["text"].fillna("").astype(str) | |
| #loading sentence transformer | |
| emb_dir = cfg["models"].get("embedding_local_dir") | |
| if emb_dir: | |
| EMBED_MODEL_PATH = Path(emb_dir) | |
| if not EMBED_MODEL_PATH.exists(): | |
| raise SystemExit(f"Embedding model folder not found: {EMBED_MODEL_PATH}") | |
| embed_model = SentenceTransformer(str(EMBED_MODEL_PATH), trust_remote_code=False) | |
| else: | |
| embed_model = SentenceTransformer(cfg["models"]["embedding_model"], trust_remote_code=False) | |
| index = faiss.read_index(str(FAISS_PATH)) | |
| def _format_citation(row): | |
| p = int(row["page"]) if pd.notna(row.get("page")) else None | |
| return f"{row['title']} (p.{p})" if p else f"{row['title']}" | |
| def retrieve(query, top_k=6): | |
| qv = embed_model.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype("float32") | |
| scores, idxs = index.search(qv, top_k) | |
| out = [] | |
| for s, ix in zip(scores[0], idxs[0]): | |
| r = df.iloc[int(ix)] | |
| out.append({ | |
| "score": float(s), | |
| "citation": _format_citation(r), | |
| "doc_id": r.get("doc_id", ""), | |
| "page": None if pd.isna(r.get("page")) else int(r["page"]), | |
| "chunk_id": int(r["chunk_id"]), | |
| "text": r["text"] | |
| }) | |
| return out | |
| # Anthropic (Claude) LLM | |
| from anthropic import Anthropic, APIError | |
| ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY") | |
| if not ANTHROPIC_API_KEY: | |
| raise SystemExit("Missing ANTHROPIC_API_KEY environment variable.") | |
| anthropic_client = Anthropic(api_key=ANTHROPIC_API_KEY) | |
| CLAUDE_MODEL = cfg["models"].get("anthropic_model", "claude-3-5-sonnet-latest") | |
| SYSTEM_PROMPT = ( | |
| "You are a careful assistant for clinicians. " | |
| "Use ONLY the provided context to answer. " | |
| "Be concise. Add inline citations like [1], [2] matching the numbered context. " | |
| "If the context does not fully answer, provide the best supported guidance you can, and point to the closest relevant passages with citations" | |
| ) | |
| def _citations_valid(text, k): | |
| nums = set(int(n) for n in re.findall(r"\[(\d+)\]", text)) | |
| return bool(nums) and all(1 <= n <= k for n in nums) | |
| def _join_cites(nums): | |
| nums = [f"[{n}]" for n in nums] | |
| if not nums: | |
| return "" | |
| if len(nums) == 1: | |
| return nums[0] | |
| return ", ".join(nums[:-1]) + " and " + nums[-1] | |
| def _make_context_block(ctx, use_n): | |
| blocks = [] | |
| for i, c in enumerate(ctx[:use_n], 1): | |
| blocks.append(f"[{i}] {c['citation']}\n{c['text']}\n") | |
| return "\n".join(blocks) | |
| def generate_answer(question, ctx, use_n, temp=0.1, max_sentences=6): | |
| context_text = _make_context_block(ctx, use_n) | |
| user_prompt = ( | |
| f"Context:\n\n{context_text}\n\n" | |
| f"Question: {question}\n" | |
| "Answer using ONLY the context above and cite with [1], [2], etc." | |
| ) | |
| try: | |
| resp = anthropic_client.messages.create( | |
| model=CLAUDE_MODEL, | |
| system=SYSTEM_PROMPT, | |
| max_tokens=600, | |
| temperature=float(temp), | |
| messages=[{"role": "user", "content": [{"type": "text", "text": user_prompt}]}], | |
| ) | |
| except APIError as e: | |
| return f"_API error from Anthropic: {e}_" | |
| # Claude returns a list of content blocks | |
| parts = [] | |
| for blk in resp.content: | |
| if blk.type == "text": | |
| parts.append(blk.text) | |
| full = ("\n".join(parts)).strip() | |
| # # Log raw model output to console for debugging | |
| # print("\n===== RAW MODEL OUTPUT =====\n", full, "\n============================\n", flush=True) | |
| # Trim to ~N sentences to keep it short for testers | |
| sents = re.split(r'(?<=[.!?])\s+', full) | |
| short = " ".join(sents[:max_sentences]).strip() | |
| return short | |
| # gradio | |
| import gradio as gr | |
| ONLINE_BADGE = "Standards of Practice & Code of Ethics" if cfg["ui"].get("show_online_badge", True) else "" | |
| def _top_sentences(text, n=3): | |
| sents = re.split(r'(?<=[.!?])\s+', text.strip()) | |
| return [s for s in sents if s][:n] | |
| def answer_extractive(query, k=6, per_chunk_sents=2): | |
| ctx = retrieve(query, top_k=k) | |
| bullets, refs = [], [] | |
| for i, c in enumerate(ctx, 1): | |
| for s in _top_sentences(c["text"], per_chunk_sents): | |
| bullets.append(f"- {s} [{i}]") | |
| refs.append(f"[{i}] {c['citation']}") | |
| if not bullets: | |
| return "I couldn’t find relevant text in the corpus.", refs | |
| return "\n".join(bullets) + "\n\nSources:\n" + "\n".join(refs), refs | |
| def app_infer(question, do_generate, mode): | |
| start = time.time() | |
| if not question or not question.strip(): | |
| return "", "", "", f"{ONLINE_BADGE} Ready." | |
| # Retrieval | |
| top_k = int(cfg["retrieval"]["top_k"]) | |
| shown = int(cfg["retrieval"]["evidence_shown"]) | |
| use_n = int(cfg["generator"]["use_top_evidence"]) | |
| if mode == "quick": | |
| shown = min(3, shown) | |
| use_n = min(3, use_n) | |
| ctx = retrieve(question, top_k=top_k) | |
| # Prepare evidence panel (currently hidden as shown == 0) | |
| if shown > 0: | |
| ev_md_lines = [] | |
| for i, c in enumerate(ctx[:shown], 1): | |
| title = c["citation"] | |
| pg = f" (p.{c['page']})" if c["page"] else "" | |
| body = c["text"].strip() | |
| body_short = body if len(body) <= 1200 else body[:1200] + "..." | |
| ev_md_lines.append(f"**[{i}] {title}**\n\n{body_short}\n") | |
| evidence_md = "\n---\n".join(ev_md_lines) | |
| else: | |
| evidence_md = "" | |
| # Decide if we should generate? | |
| answer = "" | |
| sources_md = "" | |
| conf = float(ctx[0]["score"]) if ctx else 0.0 | |
| threshold = float(cfg["retrieval"].get("answerability_threshold", 0.01)) | |
| if not ctx: | |
| status = f"{ONLINE_BADGE} No evidence found." | |
| return evidence_md, answer, sources_md, status | |
| if do_generate and conf >= threshold: | |
| draft = generate_answer( | |
| question=question, | |
| ctx=ctx, | |
| use_n=use_n, | |
| temp=float(cfg["generator"]["temperature"]), | |
| max_sentences=int(cfg["generator"]["max_answer_sentences"]) | |
| ) | |
| # Validate citations | |
| # if not _citations_valid(draft, min(use_n, len(ctx))): | |
| # answer = "_Not enough evidence to generate a reliable summary. See Evidence below._" | |
| # else: | |
| # answer = draft | |
| if not _citations_valid(draft, min(use_n, len(ctx))): | |
| extractive, _ = answer_extractive(question, k=use_n, per_chunk_sents=2) | |
| answer = extractive | |
| else: | |
| answer = draft | |
| elif do_generate and conf < threshold: | |
| answer = "_Not enough evidence—see Evidence below._" | |
| # Sources list | |
| src_lines = [f"[{i}] {c['citation']}" for i, c in enumerate(ctx[:use_n], 1)] | |
| sources_md = "Sources:\n" + "\n".join(src_lines) | |
| if answer: | |
| a = answer.strip() | |
| if not a.lower().startswith("answer:"): | |
| answer = f"Answer: {a}" | |
| dur = time.time() - start | |
| status = f"{ONLINE_BADGE} Done in {dur:.1f}s (conf={conf:.2f})." | |
| return evidence_md, answer, sources_md, status | |
| def save_feedback(question, rating, note, answer_shown): | |
| fpath = LOGS_DIR / "feedback.csv" | |
| new = not fpath.exists() | |
| with fpath.open("a", newline="", encoding="utf-8") as f: | |
| w = csv.writer(f) | |
| if new: | |
| w.writerow(["timestamp","question","rating","note","answer_shown"]) | |
| w.writerow([time.strftime("%Y-%m-%d %H:%M:%S"), question, rating, note, "yes" if answer_shown else "no"]) | |
| return "Feedback saved. Thank you!" | |
| APP_CSS = """ | |
| :root{ | |
| --app-font: system-ui, -apple-system, "Segoe UI", Roboto, Helvetica, Arial, | |
| "Apple Color Emoji","Segoe UI Emoji"; | |
| } | |
| body, .gradio-container { font-family: var(--app-font) !important; } | |
| /* make reading nicer */ | |
| .gr-markdown { font-size: 16px; line-height: 1.6; } | |
| .gr-markdown h2 { font-size: 18px; margin-top: 0.6rem; } | |
| .gr-textbox textarea { font-size: 16px; } | |
| """ | |
| with gr.Blocks(title="Clinician Q&A", theme="soft", css=APP_CSS) as demo: | |
| gr.Markdown(f"## Clinician Q&A {' '+ONLINE_BADGE if ONLINE_BADGE else ''}") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| q = gr.Textbox(label="Ask a question", placeholder="e.g., When can confidentiality be broken?") | |
| do_gen = gr.Checkbox(value=cfg["generator"]["enabled_default"], label="Use LLM") | |
| mode = gr.Radio(choices=["standard","quick"], value=cfg["ui"].get("performance_mode","standard"), label="Performance mode") | |
| run = gr.Button("Answer", variant="primary") | |
| rating = gr.Radio(choices=["Helpful","Not sure","Incorrect"], label="Feedback", value=None) | |
| note = gr.Textbox(label="Add a note (optional)") | |
| submit = gr.Button("Submit feedback") | |
| status = gr.Markdown("Ready.") | |
| with gr.Column(scale=1): | |
| ans = gr.Markdown(label="Answer") | |
| ev = gr.Markdown(label="Evidence") | |
| src = gr.Markdown(label="Sources") | |
| run.click(app_infer, inputs=[q, do_gen, mode], outputs=[ ans,ev,src, status]) | |
| submit.click(lambda question, r, n, a: save_feedback(question, r, n, bool(a and a.strip())), | |
| inputs=[q, rating, note, ans], outputs=[status]) | |
| # if __name__ == "__main__": | |
| # # Bind to localhost only; opens a browser tab automatically. | |
| # demo.launch(server_name="127.0.0.1", server_port=7860, inbrowser=True, show_error=True) | |
| if __name__ == "__main__": | |
| # In cloud (HF Spaces), bind to 0.0.0.0 and respect PORT if provided. | |
| port = int(os.getenv("PORT", "7860")) | |
| host = "0.0.0.0" | |
| demo.queue(max_size=32).launch(server_name=host, server_port=port, show_error=True) | |