somaapp / app /app.py
himanithakkar's picture
Update app/app.py
dd4bee8 verified
# app/app.py
import os, sys, time, csv, re
from pathlib import Path
import yaml
import pandas as pd
import numpy as np
from dotenv import load_dotenv
load_dotenv()
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
print("ANTHROPIC in env:", bool(os.getenv("ANTHROPIC_API_KEY")), flush=True)
if not ANTHROPIC_API_KEY:
raise SystemExit("Missing ANTHROPIC_API_KEY in environment or .env file.")
# os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1")
# os.environ.setdefault("TRANSFORMERS_OFFLINE", "1")
# --- Resolve paths (works both as .py and PyInstaller .exe)
APP_DIR = Path(__file__).resolve().parent # .../app
if hasattr(sys, "_MEIPASS"):
ROOT = Path(sys._MEIPASS)
else:
ROOT = APP_DIR.parent
CFG_PATH = ROOT / "app" / "config" / "app.yaml"
MODELS_DIR = ROOT / "models"
INDEX_DIR = ROOT / "outputs" / "index"
LOGS_DIR = ROOT / "local_logs"
DOCS_DIR = ROOT / "docs"
LOGS_DIR.mkdir(parents=True, exist_ok=True)
# --- Read config
DEFAULT_CFG = {
"retrieval": {"top_k": 12, "evidence_shown": 3, "answerability_threshold": 0.2},
"generator": {
"enabled_default": False,
"use_top_evidence": 5,
"temperature": 0.1,
"max_answer_sentences": 20,
"n_ctx": 4096,
"threads": max(2, (os.cpu_count() or 4) - 1)
},
"models": {
"embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
"embedding_local_dir": None,
"anthropic_model": "claude-3-5-sonnet-latest"
},
"ui": {"show_online_badge": True, "performance_mode": "standard"} # quick|standard
}
cfg = DEFAULT_CFG
if CFG_PATH.exists():
cfg = {**cfg, **yaml.safe_load(CFG_PATH.read_text(encoding="utf-8"))}
# --- Load FAISS index & embeddings
import faiss
from sentence_transformers import SentenceTransformer
META_PATH = INDEX_DIR / "meta.parquet"
FAISS_PATH = INDEX_DIR / "chunks.faiss"
if not META_PATH.exists() or not FAISS_PATH.exists():
raise SystemExit("Index not found. Ensure outputs/index/meta.parquet and chunks.faiss exist.")
df = pd.read_parquet(META_PATH)
df["text"] = df["text"].fillna("").astype(str)
#loading sentence transformer
emb_dir = cfg["models"].get("embedding_local_dir")
if emb_dir:
EMBED_MODEL_PATH = Path(emb_dir)
if not EMBED_MODEL_PATH.exists():
raise SystemExit(f"Embedding model folder not found: {EMBED_MODEL_PATH}")
embed_model = SentenceTransformer(str(EMBED_MODEL_PATH), trust_remote_code=False)
else:
embed_model = SentenceTransformer(cfg["models"]["embedding_model"], trust_remote_code=False)
index = faiss.read_index(str(FAISS_PATH))
def _format_citation(row):
p = int(row["page"]) if pd.notna(row.get("page")) else None
return f"{row['title']} (p.{p})" if p else f"{row['title']}"
def retrieve(query, top_k=6):
qv = embed_model.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype("float32")
scores, idxs = index.search(qv, top_k)
out = []
for s, ix in zip(scores[0], idxs[0]):
r = df.iloc[int(ix)]
out.append({
"score": float(s),
"citation": _format_citation(r),
"doc_id": r.get("doc_id", ""),
"page": None if pd.isna(r.get("page")) else int(r["page"]),
"chunk_id": int(r["chunk_id"]),
"text": r["text"]
})
return out
# Anthropic (Claude) LLM
from anthropic import Anthropic, APIError
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY")
if not ANTHROPIC_API_KEY:
raise SystemExit("Missing ANTHROPIC_API_KEY environment variable.")
anthropic_client = Anthropic(api_key=ANTHROPIC_API_KEY)
CLAUDE_MODEL = cfg["models"].get("anthropic_model", "claude-3-5-sonnet-latest")
SYSTEM_PROMPT = (
"You are a careful assistant for clinicians. "
"Use ONLY the provided context to answer. "
"Be concise. Add inline citations like [1], [2] matching the numbered context. "
"If the context does not fully answer, provide the best supported guidance you can, and point to the closest relevant passages with citations"
)
def _citations_valid(text, k):
nums = set(int(n) for n in re.findall(r"\[(\d+)\]", text))
return bool(nums) and all(1 <= n <= k for n in nums)
def _join_cites(nums):
nums = [f"[{n}]" for n in nums]
if not nums:
return ""
if len(nums) == 1:
return nums[0]
return ", ".join(nums[:-1]) + " and " + nums[-1]
def _make_context_block(ctx, use_n):
blocks = []
for i, c in enumerate(ctx[:use_n], 1):
blocks.append(f"[{i}] {c['citation']}\n{c['text']}\n")
return "\n".join(blocks)
def generate_answer(question, ctx, use_n, temp=0.1, max_sentences=6):
context_text = _make_context_block(ctx, use_n)
user_prompt = (
f"Context:\n\n{context_text}\n\n"
f"Question: {question}\n"
"Answer using ONLY the context above and cite with [1], [2], etc."
)
try:
resp = anthropic_client.messages.create(
model=CLAUDE_MODEL,
system=SYSTEM_PROMPT,
max_tokens=600,
temperature=float(temp),
messages=[{"role": "user", "content": [{"type": "text", "text": user_prompt}]}],
)
except APIError as e:
return f"_API error from Anthropic: {e}_"
# Claude returns a list of content blocks
parts = []
for blk in resp.content:
if blk.type == "text":
parts.append(blk.text)
full = ("\n".join(parts)).strip()
# # Log raw model output to console for debugging
# print("\n===== RAW MODEL OUTPUT =====\n", full, "\n============================\n", flush=True)
# Trim to ~N sentences to keep it short for testers
sents = re.split(r'(?<=[.!?])\s+', full)
short = " ".join(sents[:max_sentences]).strip()
return short
# gradio
import gradio as gr
ONLINE_BADGE = "Standards of Practice & Code of Ethics" if cfg["ui"].get("show_online_badge", True) else ""
def _top_sentences(text, n=3):
sents = re.split(r'(?<=[.!?])\s+', text.strip())
return [s for s in sents if s][:n]
def answer_extractive(query, k=6, per_chunk_sents=2):
ctx = retrieve(query, top_k=k)
bullets, refs = [], []
for i, c in enumerate(ctx, 1):
for s in _top_sentences(c["text"], per_chunk_sents):
bullets.append(f"- {s} [{i}]")
refs.append(f"[{i}] {c['citation']}")
if not bullets:
return "I couldn’t find relevant text in the corpus.", refs
return "\n".join(bullets) + "\n\nSources:\n" + "\n".join(refs), refs
def app_infer(question, do_generate, mode):
start = time.time()
if not question or not question.strip():
return "", "", "", f"{ONLINE_BADGE} Ready."
# Retrieval
top_k = int(cfg["retrieval"]["top_k"])
shown = int(cfg["retrieval"]["evidence_shown"])
use_n = int(cfg["generator"]["use_top_evidence"])
if mode == "quick":
shown = min(3, shown)
use_n = min(3, use_n)
ctx = retrieve(question, top_k=top_k)
# Prepare evidence panel (currently hidden as shown == 0)
if shown > 0:
ev_md_lines = []
for i, c in enumerate(ctx[:shown], 1):
title = c["citation"]
pg = f" (p.{c['page']})" if c["page"] else ""
body = c["text"].strip()
body_short = body if len(body) <= 1200 else body[:1200] + "..."
ev_md_lines.append(f"**[{i}] {title}**\n\n{body_short}\n")
evidence_md = "\n---\n".join(ev_md_lines)
else:
evidence_md = ""
# Decide if we should generate?
answer = ""
sources_md = ""
conf = float(ctx[0]["score"]) if ctx else 0.0
threshold = float(cfg["retrieval"].get("answerability_threshold", 0.01))
if not ctx:
status = f"{ONLINE_BADGE} No evidence found."
return evidence_md, answer, sources_md, status
if do_generate and conf >= threshold:
draft = generate_answer(
question=question,
ctx=ctx,
use_n=use_n,
temp=float(cfg["generator"]["temperature"]),
max_sentences=int(cfg["generator"]["max_answer_sentences"])
)
# Validate citations
# if not _citations_valid(draft, min(use_n, len(ctx))):
# answer = "_Not enough evidence to generate a reliable summary. See Evidence below._"
# else:
# answer = draft
if not _citations_valid(draft, min(use_n, len(ctx))):
extractive, _ = answer_extractive(question, k=use_n, per_chunk_sents=2)
answer = extractive
else:
answer = draft
elif do_generate and conf < threshold:
answer = "_Not enough evidence—see Evidence below._"
# Sources list
src_lines = [f"[{i}] {c['citation']}" for i, c in enumerate(ctx[:use_n], 1)]
sources_md = "Sources:\n" + "\n".join(src_lines)
if answer:
a = answer.strip()
if not a.lower().startswith("answer:"):
answer = f"Answer: {a}"
dur = time.time() - start
status = f"{ONLINE_BADGE} Done in {dur:.1f}s (conf={conf:.2f})."
return evidence_md, answer, sources_md, status
def save_feedback(question, rating, note, answer_shown):
fpath = LOGS_DIR / "feedback.csv"
new = not fpath.exists()
with fpath.open("a", newline="", encoding="utf-8") as f:
w = csv.writer(f)
if new:
w.writerow(["timestamp","question","rating","note","answer_shown"])
w.writerow([time.strftime("%Y-%m-%d %H:%M:%S"), question, rating, note, "yes" if answer_shown else "no"])
return "Feedback saved. Thank you!"
APP_CSS = """
:root{
--app-font: system-ui, -apple-system, "Segoe UI", Roboto, Helvetica, Arial,
"Apple Color Emoji","Segoe UI Emoji";
}
body, .gradio-container { font-family: var(--app-font) !important; }
/* make reading nicer */
.gr-markdown { font-size: 16px; line-height: 1.6; }
.gr-markdown h2 { font-size: 18px; margin-top: 0.6rem; }
.gr-textbox textarea { font-size: 16px; }
"""
with gr.Blocks(title="Clinician Q&A", theme="soft", css=APP_CSS) as demo:
gr.Markdown(f"## Clinician Q&A {'&nbsp;&nbsp;'+ONLINE_BADGE if ONLINE_BADGE else ''}")
with gr.Row():
with gr.Column(scale=1):
q = gr.Textbox(label="Ask a question", placeholder="e.g., When can confidentiality be broken?")
do_gen = gr.Checkbox(value=cfg["generator"]["enabled_default"], label="Use LLM")
mode = gr.Radio(choices=["standard","quick"], value=cfg["ui"].get("performance_mode","standard"), label="Performance mode")
run = gr.Button("Answer", variant="primary")
rating = gr.Radio(choices=["Helpful","Not sure","Incorrect"], label="Feedback", value=None)
note = gr.Textbox(label="Add a note (optional)")
submit = gr.Button("Submit feedback")
status = gr.Markdown("Ready.")
with gr.Column(scale=1):
ans = gr.Markdown(label="Answer")
ev = gr.Markdown(label="Evidence")
src = gr.Markdown(label="Sources")
run.click(app_infer, inputs=[q, do_gen, mode], outputs=[ ans,ev,src, status])
submit.click(lambda question, r, n, a: save_feedback(question, r, n, bool(a and a.strip())),
inputs=[q, rating, note, ans], outputs=[status])
# if __name__ == "__main__":
# # Bind to localhost only; opens a browser tab automatically.
# demo.launch(server_name="127.0.0.1", server_port=7860, inbrowser=True, show_error=True)
if __name__ == "__main__":
# In cloud (HF Spaces), bind to 0.0.0.0 and respect PORT if provided.
port = int(os.getenv("PORT", "7860"))
host = "0.0.0.0"
demo.queue(max_size=32).launch(server_name=host, server_port=port, show_error=True)