Spaces:
Paused
Paused
File size: 10,016 Bytes
29577c9 755d25a 29577c9 e530e72 29577c9 e768a4d 29577c9 19facf9 29577c9 1a5a976 29577c9 e768a4d 29577c9 e768a4d 29577c9 e768a4d 29577c9 e768a4d 29577c9 e768a4d 29577c9 6f09ee6 29577c9 6f09ee6 29577c9 755d25a 29577c9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
"""Unified FastAPI application for Mirage system (face swap pipeline).
All obsolete LivePortrait / reenactment / Gradio demo code removed.
This file supersedes original_fastapi_app.py and prior app.py stub.
"""
from fastapi import FastAPI, HTTPException, File, UploadFile
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from pathlib import Path
import os
import cv2
import numpy as np
from typing import Any, Dict
from metrics import metrics as _metrics_singleton, Metrics
from config import config
from voice_processor import voice_processor
from swap_pipeline import get_pipeline
app = FastAPI(title="Mirage Real-time AI Avatar System")
pipeline = get_pipeline()
# Since get_pipeline() now initializes immediately, mark initialized if pipeline.initialized True
pipeline_initialized = getattr(pipeline, 'initialized', False)
if config.metrics_fps_window != 30:
metrics = Metrics(fps_window=config.metrics_fps_window)
else:
metrics = _metrics_singleton
static_dir = Path(__file__).parent / "static"
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
WEBRTC_ROUTER_LOADED = False
WEBRTC_IMPORT_ERROR = None
try:
from webrtc_server import router as webrtc_router # type: ignore
# webrtc_server router is defined WITHOUT a prefix; mount under /webrtc here so
# endpoints resolve at /webrtc/ping, /webrtc/offer, etc.
app.include_router(webrtc_router, prefix="/webrtc")
WEBRTC_ROUTER_LOADED = True
# --- Compatibility layer ---
# In some deployed revisions the underlying router still carried an internal
# '/webrtc' prefix, yielding effective paths like /webrtc/webrtc/ping.
# The production JS client calls /webrtc/ping, /webrtc/token, /webrtc/ice_config, /webrtc/offer.
# To avoid breaking older images (or double-prefix drift during hot reload),
# expose lightweight pass-through wrappers at the expected single-prefix paths
# ONLY if those paths are currently missing (best-effort). Since FastAPI does
# not provide a simple public API to query registered routes before definition
# without introspection, we always register wrappers; duplicates are avoided
# because underlying double-prefixed versions have different paths.
from typing import Optional as _Opt
from fastapi import Body as _Body, Header as _Header
import inspect as _inspect
try: # pragma: no cover - defensive
# Import underlying handlers for direct reuse
from webrtc_server import webrtc_ping as _rt_webrtc_ping, _ice_configuration as _rt_ice_conf, webrtc_offer as _rt_webrtc_offer, _mint_token as _rt_mint_token # type: ignore
except Exception: # noqa: BLE001
_rt_webrtc_ping = None # type: ignore
_rt_ice_conf = None # type: ignore
_rt_webrtc_offer = None # type: ignore
_rt_mint_token = None # type: ignore
@app.get("/webrtc/ping")
async def _compat_webrtc_ping(): # type: ignore
if _rt_webrtc_ping:
return await _rt_webrtc_ping() # type: ignore
return {"router": False, "error": "webrtc_ping unavailable"}
@app.get("/webrtc/ice_config")
async def _compat_webrtc_ice_config(): # type: ignore
if _rt_ice_conf:
cfg = _rt_ice_conf() # returns RTCConfiguration
# Convert to serializable form similar to original endpoint
servers = []
for s in getattr(cfg, 'iceServers', []) or []:
entry = {"urls": s.urls}
if getattr(s, 'username', None):
entry["username"] = s.username
if getattr(s, 'credential', None):
entry["credential"] = s.credential
servers.append(entry)
return {"iceServers": servers}
return {"iceServers": []}
@app.get("/webrtc/token")
async def _compat_webrtc_token(): # type: ignore
if _rt_mint_token and WEBRTC_ROUTER_LOADED:
try:
return {"token": _rt_mint_token()}
except Exception as e: # noqa: BLE001
return {"error": str(e)}
return {"error": "token_unavailable"}
@app.post("/webrtc/offer")
async def _compat_webrtc_offer(
offer: dict = _Body(...),
x_api_key: _Opt[str] = _Header(default=None, alias="x-api-key"),
x_auth_token: _Opt[str] = _Header(default=None, alias="x-auth-token"),
): # type: ignore
if _rt_webrtc_offer:
return await _rt_webrtc_offer(offer=offer, x_api_key=x_api_key, x_auth_token=x_auth_token) # type: ignore
raise HTTPException(status_code=503, detail="WebRTC offer handler unavailable")
except Exception as e: # pragma: no cover
WEBRTC_IMPORT_ERROR = str(e)
from fastapi import HTTPException as _HTTPException
@app.get("/webrtc/token")
async def _fallback_token(): # type: ignore[override]
raise _HTTPException(status_code=503, detail=f"WebRTC unavailable: {WEBRTC_IMPORT_ERROR or 'router not loaded'}")
@app.post("/webrtc/offer")
async def _fallback_offer(): # type: ignore[override]
raise _HTTPException(status_code=503, detail=f"WebRTC unavailable: {WEBRTC_IMPORT_ERROR or 'router not loaded'}")
@app.get("/", response_class=HTMLResponse)
async def root():
index_path = static_dir / "index.html"
try:
content = index_path.read_text(encoding="utf-8")
except FileNotFoundError:
content = "<html><body><h1>Mirage AI Avatar System</h1><p>FastAPI unified app.</p></body></html>"
return HTMLResponse(content)
@app.get("/health")
async def health():
return {
"status": "ok",
"system": "real-time-ai-avatar",
"pipeline_loaded": pipeline_initialized,
"gpu_available": pipeline.app is not None, # coarse indicator
"webrtc_router_loaded": WEBRTC_ROUTER_LOADED,
"webrtc_import_error": WEBRTC_IMPORT_ERROR,
}
@app.post("/initialize")
async def initialize_pipeline():
global pipeline_initialized
if pipeline_initialized:
return {"status": "already_initialized"}
try:
ok = pipeline.initialize()
pipeline_initialized = ok
return {"status": "success" if ok else "error"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/set_reference")
async def set_reference_image(file: UploadFile = File(...)):
"""Set (or reset) the source reference face.
Behavior:
- Ensures pipeline initialized (synchronous) before setting reference.
- Returns detailed status including number of faces detected in reference.
"""
global pipeline_initialized
try:
contents = await file.read()
import cv2, numpy as _np
arr = _np.frombuffer(contents, _np.uint8)
frame = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if frame is None:
raise HTTPException(status_code=400, detail="Invalid image")
if not pipeline_initialized or not getattr(pipeline, 'initialized', False):
pipeline.initialize()
pipeline_initialized = True
ok = pipeline.set_source_image(frame)
meta = getattr(pipeline, 'source_img_meta', {}) if ok else {}
return {"status": "success" if ok else "error", "meta": meta}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def get_metrics():
snap = metrics.snapshot()
if pipeline_initialized:
snap["ai_pipeline"] = pipeline.get_stats()
return snap
@app.get("/pipeline_status")
async def pipeline_status():
if not pipeline_initialized:
return {"initialized": False}
return {"initialized": True, "stats": pipeline.get_stats(), "source_set": pipeline.source_face is not None}
@app.get("/debug/pipeline")
async def debug_pipeline():
"""Return detailed pipeline diagnostics for debugging."""
exists = pipeline is not None
if not exists:
return {"exists": False}
try:
stats = pipeline.get_stats()
except Exception as e: # pragma: no cover
stats = {"error": str(e)}
return {
"exists": True,
"initialized": getattr(pipeline, 'initialized', False),
"loaded": getattr(pipeline, 'loaded', False),
"source_set": getattr(pipeline, 'source_face', None) is not None,
"stats": stats,
}
@app.get("/gpu")
async def gpu():
# Minimal GPU presence check
try:
import torch # type: ignore
if torch.cuda.is_available():
return {"available": True, "name": torch.cuda.get_device_name(0)}
except Exception:
pass
return {"available": False}
@app.get("/debug/models")
async def debug_models():
root = Path(__file__).parent / 'models'
ins = root / 'inswapper' / 'inswapper_128_fp16.onnx'
codef = root / 'codeformer' / 'codeformer.pth'
sentinel = root / '.provisioned'
meta = root / '.provisioned_meta.json'
# Detect symlink & target
is_symlink = root.is_symlink()
target = None
if is_symlink:
try:
target = root.resolve()
except Exception:
target = None
storage_mode = os.environ.get('MIRAGE_PERSIST_MODELS', '1')
return {
'inswapper': {'exists': ins.exists(), 'size': ins.stat().st_size if ins.exists() else 0},
'codeformer': {'exists': codef.exists(), 'size': codef.stat().st_size if codef.exists() else 0},
'sentinel': {'exists': sentinel.exists(), 'meta_exists': meta.exists()},
'storage': {
'root_is_symlink': is_symlink,
'root_path': str(root),
'target': str(target) if target else None,
'persist_mode_env': storage_mode
},
'pipeline_initialized': pipeline_initialized
}
@app.on_event("startup")
async def startup_log():
print("[startup] unified app starting")
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860) |