essprasad's picture
Upload 11 files
f9053c5 verified
raw
history blame
15.3 kB
# ==========================================================
# SAFE-MODE PRELAUNCH CLEANUP (runs before any heavy imports)
# ==========================================================
import os, shutil, time, glob
def _prelaunch_cleanup(threshold_gb=45.0):
"""Early cleanup to prevent Hugging Face Space eviction (50 GB limit)."""
def _used_gb(path="/home/user/app"):
try:
total, used, free = shutil.disk_usage(path)
used_gb = max(0.0, min(used / (1024**3), 49.9))
return used_gb
except Exception:
return 0.0
used = _used_gb()
print(f"\nπŸ’Ύ Startup disk usage: {used:.2f} GB")
cache_paths = [
os.path.expanduser("~/.cache/huggingface"),
os.path.expanduser("~/.cache/hfhub"),
"/home/user/.cache/huggingface",
"/home/user/.cache",
"/home/user/app/__pycache__",
"/home/user/app/data/__pycache__",
]
for p in cache_paths:
if os.path.exists(p):
shutil.rmtree(p, ignore_errors=True)
if used > threshold_gb:
print(f"⚠️ Usage {used:.2f} GB > {threshold_gb} GB β€” performing aggressive cleanup.")
preserve = {"faiss.index", "faiss.index.meta.json", "glossary.json"}
folders = ["/home/user/app/data/docs_cache", "/home/user/app/tmp_docs", "/home/user/app/persistent"]
for folder in folders:
if os.path.exists(folder):
for f in glob.glob(os.path.join(folder, "*")):
if os.path.basename(f) in preserve:
continue
try:
if os.path.isfile(f):
os.remove(f)
else:
shutil.rmtree(f, ignore_errors=True)
except Exception:
pass
print("🧹 Aggressive cleanup complete.")
print(f"✨ Disk after cleanup: {_used_gb():.2f} GB\n")
shutil.rmtree("/home/user/app/runtime_faiss", ignore_errors=True)
_prelaunch_cleanup()
# ==========================================================
# MAIN APP β€” Clinical Trial Chatbot
# ==========================================================
import gradio as gr
import pandas as pd
import json, faiss, numpy as np, shutil
from sentence_transformers import SentenceTransformer
from core.hybrid_retriever import summarize_combined
from core import vector_store, vector_sync
APP_TITLE = "🧠 Clinical Research Chatbot"
APP_DESC = (
"Ask any clinical research or GCP-related question. "
"Retrieves and summarizes from ICH, GCDMP, EMA, FDA, Excel, and Web datasets."
)
DATA_PATHS = [
"/home/user/app/persistent/faiss.index",
"/home/user/app/persistent/faiss.index.meta.json",
"/home/user/app/data/docs_cache",
]
# ----------------------------------------------------------
# CLEAR INDEX / CACHE
# ----------------------------------------------------------
def clear_index():
removed = []
for p in DATA_PATHS:
if os.path.isdir(p):
shutil.rmtree(p, ignore_errors=True)
removed.append(f"πŸ—‘οΈ Deleted folder: {p}")
elif os.path.exists(p):
os.remove(p)
removed.append(f"πŸ—‘οΈ Deleted file: {p}")
msg = "\n".join(removed) if removed else "ℹ️ No cache files found."
print(msg)
return msg
# ----------------------------------------------------------
# EMBEDDER HELPER
# ----------------------------------------------------------
def _load_embedder():
print("πŸ“¦ Loading embedding model: sentence-transformers/all-MiniLM-L6-v2 ...")
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
print("βœ… Model loaded.")
return model
# ----------------------------------------------------------
# WEB CRAWLER with LOCAL CACHE (Optimized & Safe)
# ----------------------------------------------------------
def web_crawler_loader(
urls_file="/home/user/app/data/urls.txt",
cache_path="/home/user/app/persistent/web_cache.json",
max_pages=3,
timeout=20,
force_refresh=False,
):
"""
Loads readable text content from URLs listed in urls.txt.
Uses a local cache (web_cache.json) to skip re-downloading.
Returns list of dicts: [{ 'source': URL, 'type': 'Website', 'text': text }]
"""
import requests, re, time, json
from bs4 import BeautifulSoup
# --- Load existing cache (if any) ---
cache = {}
if os.path.exists(cache_path) and not force_refresh:
try:
with open(cache_path, "r", encoding="utf-8") as f:
cache = json.load(f)
print(f"πŸ—‚οΈ Loaded cached web content ({len(cache)} entries).")
except Exception as e:
print(f"⚠️ Cache read error ({e}) β€” starting fresh.")
cache = {}
# --- Validate URL list ---
if not os.path.exists(urls_file):
print(f"⚠️ URLs file not found: {urls_file}")
return list(cache.values())
with open(urls_file, "r", encoding="utf-8") as f:
urls = [u.strip() for u in f if u.strip() and not u.startswith("#")]
print(f"🌐 Found {len(urls)} URLs in {urls_file}")
new_entries = {}
for i, url in enumerate(urls[: max_pages * 10]):
if url in cache and not force_refresh:
print(f"♻️ Using cached content for {url}")
new_entries[url] = cache[url]
continue
try:
print(f"🌐 Fetching ({i+1}/{len(urls)}): {url}")
resp = requests.get(
url,
timeout=timeout,
headers={"User-Agent": "ClinicalTrialChatBot/1.0 (+https://huggingface.co/essprasad)"}
)
if resp.status_code != 200:
print(f"⚠️ Skipped {url}: HTTP {resp.status_code}")
continue
soup = BeautifulSoup(resp.text, "html.parser")
# Remove unwanted elements
for tag in soup(["script", "style", "nav", "header", "footer", "noscript", "iframe"]):
tag.decompose()
# Extract visible text
text = " ".join(t.strip() for t in soup.get_text().split())
text = re.sub(r"\s+", " ", text).strip()
if len(text) < 500:
print(f"⚠️ Skipped {url}: too little readable text ({len(text)} chars).")
continue
# Keep first 3000 chars to reduce vector size
entry_text = f"Source URL: {url}. {text[:3000]}"
new_entries[url] = {"source": url, "type": "Website", "text": entry_text}
print(f"βœ… Cached: {url}")
time.sleep(1) # polite delay
except Exception as e:
print(f"⚠️ Failed to fetch {url}: {e}")
# --- Merge & Save updated cache ---
if new_entries:
cache.update(new_entries)
try:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(cache, f, indent=2)
print(f"πŸ’Ύ Web cache updated ({len(cache)} total URLs).")
except Exception as e:
print(f"⚠️ Failed to write cache: {e}")
return list(cache.values())
def rebuild_index():
"""Fully rebuild FAISS index using glossary + Excel + web sources (fresh start)."""
print("🧠 Rebuilding FAISS index (Glossary + Excel + Web)...")
import os, json, re, shutil, pandas as pd, faiss, numpy as np
from huggingface_hub import hf_hub_download, list_repo_files
from core.vector_sync import rebuild_faiss_from_glossary, _upload_to_dataset
from sentence_transformers import SentenceTransformer
repo_id_index = "essprasad/CT-Chat-Index"
repo_id_docs = "essprasad/CT-Chat-Docs"
local_dir = "/home/user/app/persistent"
os.makedirs(local_dir, exist_ok=True)
# --- STEP 0: CLEAN OLD INDEX ---
for old_file in ["faiss.index", "faiss.index.meta.json"]:
old_path = os.path.join(local_dir, old_file)
if os.path.exists(old_path):
os.remove(old_path)
print(f"πŸ—‘οΈ Removed old FAISS artifact: {old_path}")
# --- STEP 1: LOAD GLOSSARY BASE ---
glossary_path = os.path.join(local_dir, "glossary.json")
if not os.path.exists(glossary_path):
print(f"πŸ“₯ Downloading glossary.json from {repo_id_index}...")
downloaded_path = hf_hub_download(
repo_id=repo_id_index,
filename="persistent/glossary.json",
repo_type="dataset",
force_download=True,
)
shutil.copy2(downloaded_path, glossary_path)
print(f"βœ… glossary.json copied to {glossary_path}")
index, metas = rebuild_faiss_from_glossary(glossary_path=glossary_path)
print(f"πŸ“˜ Loaded {len(metas)} glossary entries.")
# --- STEP 2: INDEX EXCEL FILES ---
print("πŸ“‘ Scanning Excel files...")
repo_files = list_repo_files(repo_id_docs, repo_type="dataset")
excel_files = [f for f in repo_files if f.lower().endswith((".xlsx", ".xls"))]
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
excel_entries = []
for file_name in excel_files:
print(f"πŸ“„ Processing Excel: {file_name}")
path = hf_hub_download(repo_id_docs, filename=file_name, repo_type="dataset")
xls = pd.read_excel(path, sheet_name=None)
for sheet_name, df in xls.items():
df = df.fillna("").dropna(how="all")
df.columns = [str(c).strip().lower() for c in df.columns]
term_col = next((c for c in df.columns if "term" in c or "word" in c), None)
if not term_col:
print(f"⚠️ No 'term' column in {file_name}:{sheet_name}")
continue
for _, row in df.iterrows():
term = str(row.get(term_col, "")).strip()
if not term:
continue
# Combine all columns with values
parts = [
f"{c.capitalize()}: {str(row[c]).strip()}"
for c in df.columns if str(row[c]).strip()
]
joined = " ".join(parts)
if len(joined) < 80: # Skip tiny entries
continue
entry_text = f"Definition of {term}: {joined}"
excel_entries.append({
"source": file_name,
"sheet": sheet_name,
"term": term,
"type": "Excel",
"file": file_name,
"text": entry_text,
})
if excel_entries:
print(f"βœ… Loaded {len(excel_entries)} Excel rows.")
texts = [e["text"] for e in excel_entries]
embeddings = model.encode(texts, show_progress_bar=True, convert_to_numpy=True).astype("float32")
faiss.normalize_L2(embeddings)
index.add(embeddings)
metas.extend(excel_entries)
print("βœ… Excel content added to FAISS.")
# --- STEP 3: WEB CONTENT ---
try:
print("🌐 Loading and embedding web content...")
web_entries = web_crawler_loader(
urls_file="/home/user/app/data/urls.txt",
cache_path="/home/user/app/persistent/web_cache.json",
max_pages=3,
timeout=20,
force_refresh=False,
)
if web_entries:
web_entries = [e for e in web_entries if len(e.get("text", "")) > 200]
print(f"βœ… Retrieved {len(web_entries)} web entries.")
web_texts = [e["text"] for e in web_entries]
web_emb = model.encode(web_texts, show_progress_bar=True, convert_to_numpy=True).astype("float32")
faiss.normalize_L2(web_emb)
index.add(web_emb)
metas.extend(web_entries)
print("βœ… Web content added to FAISS.")
else:
print("⚠️ No web entries found.")
except Exception as e:
print(f"⚠️ Web content embedding failed: {e}")
# --- STEP 4: SAVE & UPLOAD ---
faiss_path = os.path.join(local_dir, "faiss.index")
meta_path = os.path.join(local_dir, "faiss.index.meta.json")
faiss.write_index(index, faiss_path)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(metas, f, indent=2)
print(f"πŸ’Ύ Local FAISS index saved ({len(metas)} entries).")
try:
_upload_to_dataset(faiss_path, meta_path, repo_id_index)
print(f"☁️ Uploaded latest FAISS index ({len(metas)} entries) to {repo_id_index}.")
except Exception as e:
print(f"⚠️ Upload to Hugging Face failed: {e}")
print("βœ… Glossary + Excel + Web FAISS rebuilt successfully.")
return f"βœ… Rebuild complete: {len(metas)} entries (including Excel + Web)."
# ----------------------------------------------------------
# 4. REBUILD GLOSSARY
# ----------------------------------------------------------
def rebuild_glossary():
try:
from core.glossary_builder import rebuild_and_upload
rebuild_and_upload()
return "βœ… Glossary rebuilt and uploaded successfully."
except Exception as e:
return f"⚠️ Glossary rebuild failed: {e}"
# ----------------------------------------------------------
# 5. CHATBOT LOGIC
# ----------------------------------------------------------
def chat_answer(query, mode):
try:
query_clean = query.strip()
if not query_clean:
return "<i>⚠️ Please enter a valid query.</i>"
from core.hybrid_retriever import summarize_combined
return summarize_combined(query_clean, mode=mode)
except Exception as e:
print("❌ Chatbot error:", e)
return f"<i>⚠️ Error: {e}</i>"
# ----------------------------------------------------------
# 6. GRADIO UI (Simplified + Keyboard Support)
# ----------------------------------------------------------
with gr.Blocks(theme="gradio/soft") as demo:
gr.Markdown(f"# {APP_TITLE}")
gr.Markdown(APP_DESC)
# πŸ”Ή Main input + output areas
query_box = gr.Textbox(
label="Ask your clinical trial question",
placeholder="e.g. What is an eCRF?",
lines=2,
show_label=True
)
output_box = gr.HTML(label="Answer")
# πŸ”Ή Control buttons row
with gr.Row():
submit_btn = gr.Button("πŸš€ Submit", variant="primary")
rebuild_btn = gr.Button("πŸ” Rebuild Index")
rebuild_glossary_btn = gr.Button("πŸ“˜ Rebuild Glossary")
clear_btn = gr.Button("🧹 Clear Cache / Index")
# πŸ”Ή Event bindings
submit_btn.click(fn=chat_answer, inputs=[query_box], outputs=output_box)
query_box.submit(fn=chat_answer, inputs=[query_box], outputs=output_box) # ↡ Press Enter = Submit
rebuild_btn.click(fn=rebuild_index, outputs=output_box)
rebuild_glossary_btn.click(fn=rebuild_glossary, outputs=output_box)
clear_btn.click(fn=clear_index, outputs=output_box)
# ----------------------------------------------------------
# 7. LAUNCH APP
# ----------------------------------------------------------
if __name__ == "__main__":
print("πŸš€ Starting Clinical Trial Chatbot...")
print("🧠 Initializing retriever warm-up...")
demo.launch(server_name="0.0.0.0", server_port=7860, share=False)