Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import json | |
| import requests | |
| from pathlib import Path | |
| API_URL = os.environ.get("API_URL", "http://127.0.0.1:8000") | |
| ROOT = Path(__file__).resolve().parents[1] | |
| IMG1 = ROOT / "image-test1.jpg" | |
| IMG2 = ROOT / "image-test2.jpg" | |
| AUDIO1 = ROOT / "test-audio1.wav" | |
| def wait_for_health(timeout=60): | |
| url = f"{API_URL}/health" | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| try: | |
| r = requests.get(url, timeout=3) | |
| if r.status_code == 200: | |
| return True | |
| except Exception: | |
| pass | |
| time.sleep(1) | |
| return False | |
| def pretty(obj): | |
| try: | |
| return json.dumps(obj, indent=2, ensure_ascii=False) | |
| except Exception: | |
| return str(obj) | |
| def post_json(path, payload): | |
| url = f"{API_URL}{path}" | |
| r = requests.post(url, json=payload, timeout=180) | |
| try: | |
| return r.status_code, r.json() | |
| except Exception: | |
| return r.status_code, {"text": r.text} | |
| def post_form(path, data, files): | |
| url = f"{API_URL}{path}" | |
| r = requests.post(url, data=data, files=files, timeout=300) | |
| try: | |
| return r.status_code, r.json() | |
| except Exception: | |
| return r.status_code, {"text": r.text} | |
| def test_unified_text(): | |
| payload = { | |
| "message": "Patient with rash and fever for 2 days.", | |
| "message_type": "text", | |
| } | |
| return post_json("/api/chat/unified", payload) | |
| def test_unified_image(path: Path): | |
| payload = { | |
| "message": str(path), # local path supported by server (converted to data URL) | |
| "message_type": "image", | |
| } | |
| return post_json("/api/chat/unified", payload) | |
| def test_unified_audio(path: Path): | |
| payload = { | |
| "message": str(path), # local path supported by server | |
| "message_type": "audio", | |
| } | |
| return post_json("/api/chat/unified", payload) | |
| def test_form(text: str | None = None, images: list[Path] | None = None, audios: list[Path] | None = None): | |
| data = {} | |
| if text is not None: | |
| data["text"] = text | |
| data["want_stats"] = "true" | |
| files = [] | |
| for img in images or []: | |
| mime = "image/jpeg" if img.suffix.lower() in {".jpg", ".jpeg"} else "image/png" | |
| files.append(("images", (img.name, open(img, "rb"), mime))) | |
| for au in audios or []: | |
| files.append(("audios", (au.name, open(au, "rb"), "audio/wav"))) | |
| try: | |
| return post_form("/api/ai/form", data, files) | |
| finally: | |
| # Close any file handles we opened | |
| for _, (name, fh, _mime) in files: | |
| try: | |
| fh.close() | |
| except Exception: | |
| pass | |
| def main(): | |
| print(f"API_URL: {API_URL}") | |
| # Validate media presence | |
| missing = [p for p in [IMG1, IMG2, AUDIO1] if not p.exists()] | |
| if missing: | |
| print("Missing local files:", ", ".join(map(str, missing))) | |
| return 2 | |
| print("Waiting for API /health ...") | |
| if not wait_for_health(timeout=90): | |
| print("Server not ready within timeout.") | |
| return 3 | |
| results = [] | |
| print("\n=== Unified - TEXT only ===") | |
| results.append(("unified_text",) + test_unified_text()) | |
| print("\n=== Unified - IMAGE only (image-test1.jpg) ===") | |
| results.append(("unified_image_img1",) + test_unified_image(IMG1)) | |
| print("\n=== Unified - IMAGE only (image-test2.jpg) ===") | |
| results.append(("unified_image_img2",) + test_unified_image(IMG2)) | |
| print("\n=== Unified - AUDIO only (test-audio1.wav) ===") | |
| results.append(("unified_audio_audio1",) + test_unified_audio(AUDIO1)) | |
| print("\n=== Form - TEXT + IMAGE (img1) ===") | |
| results.append(("form_text_img1",) + test_form( | |
| text="Patient with rash and fever for 2 days.", images=[IMG1], audios=[] | |
| )) | |
| print("\n=== Form - TEXT + AUDIO (audio1) ===") | |
| results.append(("form_text_audio1",) + test_form( | |
| text="Patient with cough and sore throat.", images=[], audios=[AUDIO1] | |
| )) | |
| print("\n=== Form - TEXT + IMAGES (img1, img2) + AUDIO (audio1) ===") | |
| results.append(("form_text_imgs_audio",) + test_form( | |
| text="Patient with rash, fever and mild headache.", images=[IMG1, IMG2], audios=[AUDIO1] | |
| )) | |
| # Print a compact summary at the end | |
| print("\n==== SUMMARY ====") | |
| for label, status, payload in results: | |
| status_str = "OK" if status == 200 else f"ERR({status})" | |
| print(f"- {label}: {status_str}") | |
| # Optionally dump detailed JSON for failures | |
| failures = [(l, s, p) for (l, s, p) in results if s != 200] | |
| if failures: | |
| print("\n==== FAILURES (detailed) ====") | |
| for label, status, payload in failures: | |
| print(f"\n## {label} -> HTTP {status}") | |
| print(pretty(payload)) | |
| else: | |
| print("\nAll tests returned HTTP 200.") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |