Spaces:
Running
Running
| # ========================================================== | |
| # SAFE-MODE PRELAUNCH CLEANUP (runs before any heavy imports) | |
| # ========================================================== | |
| import os, shutil, time, glob | |
| def _prelaunch_cleanup(threshold_gb=45.0): | |
| """Early cleanup to prevent Hugging Face Space eviction (50 GB limit).""" | |
| def _used_gb(path="/home/user/app"): | |
| try: | |
| total, used, free = shutil.disk_usage(path) | |
| used_gb = max(0.0, min(used / (1024**3), 49.9)) | |
| return used_gb | |
| except Exception: | |
| return 0.0 | |
| used = _used_gb() | |
| print(f"\nπΎ Startup disk usage: {used:.2f} GB") | |
| cache_paths = [ | |
| os.path.expanduser("~/.cache/huggingface"), | |
| os.path.expanduser("~/.cache/hfhub"), | |
| "/home/user/.cache/huggingface", | |
| "/home/user/.cache", | |
| "/home/user/app/__pycache__", | |
| "/home/user/app/data/__pycache__", | |
| ] | |
| for p in cache_paths: | |
| if os.path.exists(p): | |
| shutil.rmtree(p, ignore_errors=True) | |
| if used > threshold_gb: | |
| print(f"β οΈ Usage {used:.2f} GB > {threshold_gb} GB β performing aggressive cleanup.") | |
| preserve = {"faiss.index", "faiss.index.meta.json", "glossary.json"} | |
| folders = ["/home/user/app/data/docs_cache", "/home/user/app/tmp_docs", "/home/user/app/persistent"] | |
| for folder in folders: | |
| if os.path.exists(folder): | |
| for f in glob.glob(os.path.join(folder, "*")): | |
| if os.path.basename(f) in preserve: | |
| continue | |
| try: | |
| if os.path.isfile(f): | |
| os.remove(f) | |
| else: | |
| shutil.rmtree(f, ignore_errors=True) | |
| except Exception: | |
| pass | |
| print("π§Ή Aggressive cleanup complete.") | |
| print(f"β¨ Disk after cleanup: {_used_gb():.2f} GB\n") | |
| shutil.rmtree("/home/user/app/runtime_faiss", ignore_errors=True) | |
| _prelaunch_cleanup() | |
| # ========================================================== | |
| # MAIN APP β Clinical Trial Chatbot | |
| # ========================================================== | |
| import gradio as gr | |
| import pandas as pd | |
| import json, faiss, numpy as np, shutil | |
| from sentence_transformers import SentenceTransformer | |
| from core.hybrid_retriever import summarize_combined | |
| from core import vector_store, vector_sync | |
| APP_TITLE = "π§ Clinical Research Chatbot" | |
| APP_DESC = ( | |
| "Ask any clinical research or GCP-related question. " | |
| "Retrieves and summarizes from ICH, GCDMP, EMA, FDA, Excel, and Web datasets." | |
| ) | |
| DATA_PATHS = [ | |
| "/home/user/app/persistent/faiss.index", | |
| "/home/user/app/persistent/faiss.index.meta.json", | |
| "/home/user/app/data/docs_cache", | |
| ] | |
| # ---------------------------------------------------------- | |
| # CLEAR INDEX / CACHE | |
| # ---------------------------------------------------------- | |
| def clear_index(): | |
| removed = [] | |
| for p in DATA_PATHS: | |
| if os.path.isdir(p): | |
| shutil.rmtree(p, ignore_errors=True) | |
| removed.append(f"ποΈ Deleted folder: {p}") | |
| elif os.path.exists(p): | |
| os.remove(p) | |
| removed.append(f"ποΈ Deleted file: {p}") | |
| msg = "\n".join(removed) if removed else "βΉοΈ No cache files found." | |
| print(msg) | |
| return msg | |
| # ---------------------------------------------------------- | |
| # EMBEDDER HELPER | |
| # ---------------------------------------------------------- | |
| def _load_embedder(): | |
| print("π¦ Loading embedding model: sentence-transformers/all-MiniLM-L6-v2 ...") | |
| model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| print("β Model loaded.") | |
| return model | |
| # ---------------------------------------------------------- | |
| # WEB CRAWLER with LOCAL CACHE (Optimized & Safe) | |
| # ---------------------------------------------------------- | |
| def web_crawler_loader( | |
| urls_file="/home/user/app/data/urls.txt", | |
| cache_path="/home/user/app/persistent/web_cache.json", | |
| max_pages=3, | |
| timeout=20, | |
| force_refresh=False, | |
| ): | |
| """ | |
| Loads readable text content from URLs listed in urls.txt. | |
| Uses a local cache (web_cache.json) to skip re-downloading. | |
| Returns list of dicts: [{ 'source': URL, 'type': 'Website', 'text': text }] | |
| """ | |
| import requests, re, time, json | |
| from bs4 import BeautifulSoup | |
| # --- Load existing cache (if any) --- | |
| cache = {} | |
| if os.path.exists(cache_path) and not force_refresh: | |
| try: | |
| with open(cache_path, "r", encoding="utf-8") as f: | |
| cache = json.load(f) | |
| print(f"ποΈ Loaded cached web content ({len(cache)} entries).") | |
| except Exception as e: | |
| print(f"β οΈ Cache read error ({e}) β starting fresh.") | |
| cache = {} | |
| # --- Validate URL list --- | |
| if not os.path.exists(urls_file): | |
| print(f"β οΈ URLs file not found: {urls_file}") | |
| return list(cache.values()) | |
| with open(urls_file, "r", encoding="utf-8") as f: | |
| urls = [u.strip() for u in f if u.strip() and not u.startswith("#")] | |
| print(f"π Found {len(urls)} URLs in {urls_file}") | |
| new_entries = {} | |
| for i, url in enumerate(urls[: max_pages * 10]): | |
| if url in cache and not force_refresh: | |
| print(f"β»οΈ Using cached content for {url}") | |
| new_entries[url] = cache[url] | |
| continue | |
| try: | |
| print(f"π Fetching ({i+1}/{len(urls)}): {url}") | |
| resp = requests.get( | |
| url, | |
| timeout=timeout, | |
| headers={"User-Agent": "ClinicalTrialChatBot/1.0 (+https://huggingface.co/essprasad)"} | |
| ) | |
| if resp.status_code != 200: | |
| print(f"β οΈ Skipped {url}: HTTP {resp.status_code}") | |
| continue | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| # Remove unwanted elements | |
| for tag in soup(["script", "style", "nav", "header", "footer", "noscript", "iframe"]): | |
| tag.decompose() | |
| # Extract visible text | |
| text = " ".join(t.strip() for t in soup.get_text().split()) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| if len(text) < 500: | |
| print(f"β οΈ Skipped {url}: too little readable text ({len(text)} chars).") | |
| continue | |
| # Keep first 3000 chars to reduce vector size | |
| entry_text = f"Source URL: {url}. {text[:3000]}" | |
| new_entries[url] = {"source": url, "type": "Website", "text": entry_text} | |
| print(f"β Cached: {url}") | |
| time.sleep(1) # polite delay | |
| except Exception as e: | |
| print(f"β οΈ Failed to fetch {url}: {e}") | |
| # --- Merge & Save updated cache --- | |
| if new_entries: | |
| cache.update(new_entries) | |
| try: | |
| os.makedirs(os.path.dirname(cache_path), exist_ok=True) | |
| with open(cache_path, "w", encoding="utf-8") as f: | |
| json.dump(cache, f, indent=2) | |
| print(f"πΎ Web cache updated ({len(cache)} total URLs).") | |
| except Exception as e: | |
| print(f"β οΈ Failed to write cache: {e}") | |
| return list(cache.values()) | |
| def rebuild_index(): | |
| """Fully rebuild FAISS index using glossary + Excel + web sources (fresh start).""" | |
| print("π§ Rebuilding FAISS index (Glossary + Excel + Web)...") | |
| import os, json, re, shutil, pandas as pd, faiss, numpy as np | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from core.vector_sync import rebuild_faiss_from_glossary, _upload_to_dataset | |
| from sentence_transformers import SentenceTransformer | |
| repo_id_index = "essprasad/CT-Chat-Index" | |
| repo_id_docs = "essprasad/CT-Chat-Docs" | |
| local_dir = "/home/user/app/persistent" | |
| os.makedirs(local_dir, exist_ok=True) | |
| # --- STEP 0: CLEAN OLD INDEX --- | |
| for old_file in ["faiss.index", "faiss.index.meta.json"]: | |
| old_path = os.path.join(local_dir, old_file) | |
| if os.path.exists(old_path): | |
| os.remove(old_path) | |
| print(f"ποΈ Removed old FAISS artifact: {old_path}") | |
| # --- STEP 1: LOAD GLOSSARY BASE --- | |
| glossary_path = os.path.join(local_dir, "glossary.json") | |
| if not os.path.exists(glossary_path): | |
| print(f"π₯ Downloading glossary.json from {repo_id_index}...") | |
| downloaded_path = hf_hub_download( | |
| repo_id=repo_id_index, | |
| filename="persistent/glossary.json", | |
| repo_type="dataset", | |
| force_download=True, | |
| ) | |
| shutil.copy2(downloaded_path, glossary_path) | |
| print(f"β glossary.json copied to {glossary_path}") | |
| index, metas = rebuild_faiss_from_glossary(glossary_path=glossary_path) | |
| print(f"π Loaded {len(metas)} glossary entries.") | |
| # --- STEP 2: INDEX EXCEL FILES --- | |
| print("π Scanning Excel files...") | |
| repo_files = list_repo_files(repo_id_docs, repo_type="dataset") | |
| excel_files = [f for f in repo_files if f.lower().endswith((".xlsx", ".xls"))] | |
| model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| excel_entries = [] | |
| for file_name in excel_files: | |
| print(f"π Processing Excel: {file_name}") | |
| path = hf_hub_download(repo_id_docs, filename=file_name, repo_type="dataset") | |
| xls = pd.read_excel(path, sheet_name=None) | |
| for sheet_name, df in xls.items(): | |
| df = df.fillna("").dropna(how="all") | |
| df.columns = [str(c).strip().lower() for c in df.columns] | |
| term_col = next((c for c in df.columns if "term" in c or "word" in c), None) | |
| if not term_col: | |
| print(f"β οΈ No 'term' column in {file_name}:{sheet_name}") | |
| continue | |
| for _, row in df.iterrows(): | |
| term = str(row.get(term_col, "")).strip() | |
| if not term: | |
| continue | |
| # Combine all columns with values | |
| parts = [ | |
| f"{c.capitalize()}: {str(row[c]).strip()}" | |
| for c in df.columns if str(row[c]).strip() | |
| ] | |
| joined = " ".join(parts) | |
| if len(joined) < 80: # Skip tiny entries | |
| continue | |
| entry_text = f"Definition of {term}: {joined}" | |
| excel_entries.append({ | |
| "source": file_name, | |
| "sheet": sheet_name, | |
| "term": term, | |
| "type": "Excel", | |
| "file": file_name, | |
| "text": entry_text, | |
| }) | |
| if excel_entries: | |
| print(f"β Loaded {len(excel_entries)} Excel rows.") | |
| texts = [e["text"] for e in excel_entries] | |
| embeddings = model.encode(texts, show_progress_bar=True, convert_to_numpy=True).astype("float32") | |
| faiss.normalize_L2(embeddings) | |
| index.add(embeddings) | |
| metas.extend(excel_entries) | |
| print("β Excel content added to FAISS.") | |
| # --- STEP 3: WEB CONTENT --- | |
| try: | |
| print("π Loading and embedding web content...") | |
| web_entries = web_crawler_loader( | |
| urls_file="/home/user/app/data/urls.txt", | |
| cache_path="/home/user/app/persistent/web_cache.json", | |
| max_pages=3, | |
| timeout=20, | |
| force_refresh=False, | |
| ) | |
| if web_entries: | |
| web_entries = [e for e in web_entries if len(e.get("text", "")) > 200] | |
| print(f"β Retrieved {len(web_entries)} web entries.") | |
| web_texts = [e["text"] for e in web_entries] | |
| web_emb = model.encode(web_texts, show_progress_bar=True, convert_to_numpy=True).astype("float32") | |
| faiss.normalize_L2(web_emb) | |
| index.add(web_emb) | |
| metas.extend(web_entries) | |
| print("β Web content added to FAISS.") | |
| else: | |
| print("β οΈ No web entries found.") | |
| except Exception as e: | |
| print(f"β οΈ Web content embedding failed: {e}") | |
| # --- STEP 4: SAVE & UPLOAD --- | |
| faiss_path = os.path.join(local_dir, "faiss.index") | |
| meta_path = os.path.join(local_dir, "faiss.index.meta.json") | |
| faiss.write_index(index, faiss_path) | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump(metas, f, indent=2) | |
| print(f"πΎ Local FAISS index saved ({len(metas)} entries).") | |
| try: | |
| _upload_to_dataset(faiss_path, meta_path, repo_id_index) | |
| print(f"βοΈ Uploaded latest FAISS index ({len(metas)} entries) to {repo_id_index}.") | |
| except Exception as e: | |
| print(f"β οΈ Upload to Hugging Face failed: {e}") | |
| print("β Glossary + Excel + Web FAISS rebuilt successfully.") | |
| return f"β Rebuild complete: {len(metas)} entries (including Excel + Web)." | |
| # ---------------------------------------------------------- | |
| # 4. REBUILD GLOSSARY | |
| # ---------------------------------------------------------- | |
| def rebuild_glossary(): | |
| try: | |
| from core.glossary_builder import rebuild_and_upload | |
| rebuild_and_upload() | |
| return "β Glossary rebuilt and uploaded successfully." | |
| except Exception as e: | |
| return f"β οΈ Glossary rebuild failed: {e}" | |
| # ---------------------------------------------------------- | |
| # 5. CHATBOT LOGIC | |
| # ---------------------------------------------------------- | |
| def chat_answer(query, mode): | |
| try: | |
| query_clean = query.strip() | |
| if not query_clean: | |
| return "<i>β οΈ Please enter a valid query.</i>" | |
| from core.hybrid_retriever import summarize_combined | |
| return summarize_combined(query_clean, mode=mode) | |
| except Exception as e: | |
| print("β Chatbot error:", e) | |
| return f"<i>β οΈ Error: {e}</i>" | |
| # ---------------------------------------------------------- | |
| # 6. GRADIO UI (Simplified + Keyboard Support) | |
| # ---------------------------------------------------------- | |
| with gr.Blocks(theme="gradio/soft") as demo: | |
| gr.Markdown(f"# {APP_TITLE}") | |
| gr.Markdown(APP_DESC) | |
| # πΉ Main input + output areas | |
| query_box = gr.Textbox( | |
| label="Ask your clinical trial question", | |
| placeholder="e.g. What is an eCRF?", | |
| lines=2, | |
| show_label=True | |
| ) | |
| output_box = gr.HTML(label="Answer") | |
| # πΉ Control buttons row | |
| with gr.Row(): | |
| submit_btn = gr.Button("π Submit", variant="primary") | |
| rebuild_btn = gr.Button("π Rebuild Index") | |
| rebuild_glossary_btn = gr.Button("π Rebuild Glossary") | |
| clear_btn = gr.Button("π§Ή Clear Cache / Index") | |
| # πΉ Event bindings | |
| submit_btn.click(fn=chat_answer, inputs=[query_box], outputs=output_box) | |
| query_box.submit(fn=chat_answer, inputs=[query_box], outputs=output_box) # β΅ Press Enter = Submit | |
| rebuild_btn.click(fn=rebuild_index, outputs=output_box) | |
| rebuild_glossary_btn.click(fn=rebuild_glossary, outputs=output_box) | |
| clear_btn.click(fn=clear_index, outputs=output_box) | |
| # ---------------------------------------------------------- | |
| # 7. LAUNCH APP | |
| # ---------------------------------------------------------- | |
| if __name__ == "__main__": | |
| print("π Starting Clinical Trial Chatbot...") | |
| print("π§ Initializing retriever warm-up...") | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False) | |