File size: 15,318 Bytes
d876213
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
"""
Testing and Validation Suite for Mirage AI Avatar System
Tests end-to-end functionality, latency, and performance
"""
import asyncio
import time
import aiohttp
import json
import numpy as np
import cv2
import logging
from pathlib import Path
import subprocess
import psutil
from typing import Dict, Any, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MirageSystemTester:
    """Comprehensive testing suite for the AI avatar system"""
    
    def __init__(self, base_url: str = "http://localhost:7860"):
        self.base_url = base_url
        self.session = None
        self.test_results = {}
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def test_health_endpoint(self) -> bool:
        """Test basic health endpoint"""
        try:
            async with self.session.get(f"{self.base_url}/health") as response:
                data = await response.json()
                
                success = (
                    response.status == 200 and
                    data.get("status") == "ok" and
                    data.get("system") == "real-time-ai-avatar"
                )
                
                self.test_results["health"] = {
                    "success": success,
                    "status": response.status,
                    "data": data
                }
                
                logger.info(f"Health check: {'βœ… PASS' if success else '❌ FAIL'}")
                return success
                
        except Exception as e:
            logger.error(f"Health check failed: {e}")
            self.test_results["health"] = {"success": False, "error": str(e)}
            return False
    
    async def test_pipeline_initialization(self) -> bool:
        """Test AI pipeline initialization"""
        try:
            start_time = time.time()
            async with self.session.post(f"{self.base_url}/initialize") as response:
                data = await response.json()
                init_time = time.time() - start_time
                
                success = (
                    response.status == 200 and
                    data.get("status") in ["success", "already_initialized"]
                )
                
                self.test_results["initialization"] = {
                    "success": success,
                    "status": response.status,
                    "data": data,
                    "init_time_seconds": init_time
                }
                
                logger.info(f"Pipeline init: {'βœ… PASS' if success else '❌ FAIL'} ({init_time:.1f}s)")
                return success
                
        except Exception as e:
            logger.error(f"Pipeline initialization failed: {e}")
            self.test_results["initialization"] = {"success": False, "error": str(e)}
            return False
    
    async def test_reference_image_upload(self) -> bool:
        """Test reference image upload functionality"""
        try:
            # Create a test image
            test_image = np.zeros((512, 512, 3), dtype=np.uint8)
            cv2.circle(test_image, (256, 200), 50, (255, 255, 255), -1)  # Face-like circle
            cv2.circle(test_image, (230, 180), 10, (0, 0, 0), -1)  # Eye
            cv2.circle(test_image, (280, 180), 10, (0, 0, 0), -1)  # Eye
            cv2.ellipse(test_image, (256, 220), (20, 10), 0, 0, 180, (0, 0, 0), 2)  # Mouth
            
            # Encode as JPEG
            _, encoded = cv2.imencode('.jpg', test_image)
            image_data = encoded.tobytes()
            
            # Upload test image
            form_data = aiohttp.FormData()
            form_data.add_field('file', image_data, filename='test_face.jpg', content_type='image/jpeg')
            
            async with self.session.post(f"{self.base_url}/set_reference", data=form_data) as response:
                data = await response.json()
                
                success = (
                    response.status == 200 and
                    data.get("status") == "success"
                )
                
                self.test_results["reference_upload"] = {
                    "success": success,
                    "status": response.status,
                    "data": data
                }
                
                logger.info(f"Reference upload: {'βœ… PASS' if success else '❌ FAIL'}")
                return success
                
        except Exception as e:
            logger.error(f"Reference image upload failed: {e}")
            self.test_results["reference_upload"] = {"success": False, "error": str(e)}
            return False
    
    async def test_websocket_connections(self) -> bool:
        """Test WebSocket connections for audio and video"""
        try:
            import websockets
            
            # Test audio WebSocket
            audio_success = await self._test_websocket_endpoint("/audio")
            
            # Test video WebSocket
            video_success = await self._test_websocket_endpoint("/video")
            
            success = audio_success and video_success
            
            self.test_results["websockets"] = {
                "success": success,
                "audio_success": audio_success,
                "video_success": video_success
            }
            
            logger.info(f"WebSocket connections: {'βœ… PASS' if success else '❌ FAIL'}")
            return success
            
        except Exception as e:
            logger.error(f"WebSocket test failed: {e}")
            self.test_results["websockets"] = {"success": False, "error": str(e)}
            return False
    
    async def _test_websocket_endpoint(self, endpoint: str) -> bool:
        """Test a specific WebSocket endpoint"""
        try:
            import websockets
            
            ws_url = self.base_url.replace("http://", "ws://") + endpoint
            
            async with websockets.connect(ws_url) as websocket:
                # Send test data
                if endpoint == "/audio":
                    # Send 160ms of silence (16kHz, 16-bit)
                    test_audio = np.zeros(int(16000 * 0.160), dtype=np.int16)
                    await websocket.send(test_audio.tobytes())
                else:  # video
                    # Send a small test JPEG
                    test_frame = np.zeros((256, 256, 3), dtype=np.uint8)
                    _, encoded = cv2.imencode('.jpg', test_frame, [cv2.IMWRITE_JPEG_QUALITY, 50])
                    await websocket.send(encoded.tobytes())
                
                # Wait for response
                response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
                return len(response) > 0
                
        except Exception as e:
            logger.error(f"WebSocket {endpoint} test failed: {e}")
            return False
    
    async def test_performance_metrics(self) -> bool:
        """Test performance metrics endpoint"""
        try:
            async with self.session.get(f"{self.base_url}/pipeline_status") as response:
                data = await response.json()
                
                success = response.status == 200 and data.get("initialized", False)
                
                self.test_results["performance_metrics"] = {
                    "success": success,
                    "status": response.status,
                    "data": data
                }
                
                if success:
                    stats = data.get("stats", {})
                    logger.info(f"Performance metrics: βœ… PASS")
                    logger.info(f"  GPU Memory: {stats.get('gpu_memory_used', 0):.1f} GB")
                    logger.info(f"  Video FPS: {stats.get('video_fps', 0):.1f}")
                    logger.info(f"  Avg Latency: {stats.get('avg_video_latency_ms', 0):.1f} ms")
                else:
                    logger.info("Performance metrics: ❌ FAIL")
                
                return success
                
        except Exception as e:
            logger.error(f"Performance metrics test failed: {e}")
            self.test_results["performance_metrics"] = {"success": False, "error": str(e)}
            return False
    
    async def test_latency_benchmark(self) -> Dict[str, float]:
        """Benchmark system latency"""
        latencies = []
        
        try:
            # Warm up
            for _ in range(5):
                start_time = time.time()
                async with self.session.get(f"{self.base_url}/health") as response:
                    await response.json()
                latencies.append((time.time() - start_time) * 1000)
            
            # Actual benchmark
            latencies = []
            for _ in range(20):
                start_time = time.time()
                async with self.session.get(f"{self.base_url}/pipeline_status") as response:
                    await response.json()
                latencies.append((time.time() - start_time) * 1000)
            
            results = {
                "avg_latency_ms": np.mean(latencies),
                "min_latency_ms": np.min(latencies),
                "max_latency_ms": np.max(latencies),
                "p95_latency_ms": np.percentile(latencies, 95),
                "p99_latency_ms": np.percentile(latencies, 99)
            }
            
            self.test_results["latency_benchmark"] = results
            
            logger.info("Latency benchmark results:")
            logger.info(f"  Average: {results['avg_latency_ms']:.1f} ms")
            logger.info(f"  P95: {results['p95_latency_ms']:.1f} ms")
            logger.info(f"  P99: {results['p99_latency_ms']:.1f} ms")
            
            return results
            
        except Exception as e:
            logger.error(f"Latency benchmark failed: {e}")
            return {}
    
    def test_system_requirements(self) -> Dict[str, Any]:
        """Test system requirements and capabilities"""
        results = {}
        
        try:
            # Check GPU availability
            try:
                import torch
                results["gpu_available"] = torch.cuda.is_available()
                if torch.cuda.is_available():
                    results["gpu_name"] = torch.cuda.get_device_name(0)
                    results["gpu_memory_gb"] = torch.cuda.get_device_properties(0).total_memory / 1024**3
                    results["cuda_version"] = torch.version.cuda
            except ImportError:
                results["gpu_available"] = False
            
            # Check system resources
            memory = psutil.virtual_memory()
            results["system_memory_gb"] = memory.total / 1024**3
            results["cpu_count"] = psutil.cpu_count()
            
            # Check disk space
            disk = psutil.disk_usage('/')
            results["disk_free_gb"] = disk.free / 1024**3
            
            # Check required packages
            required_packages = [
                "torch", "torchvision", "torchaudio", "opencv-python", 
                "numpy", "fastapi", "websockets"
            ]
            
            missing_packages = []
            for package in required_packages:
                try:
                    __import__(package.replace("-", "_"))
                except ImportError:
                    missing_packages.append(package)
            
            results["missing_packages"] = missing_packages
            results["requirements_met"] = len(missing_packages) == 0
            
            self.test_results["system_requirements"] = results
            
            logger.info("System requirements:")
            logger.info(f"  GPU: {'βœ…' if results['gpu_available'] else '❌'}")
            logger.info(f"  Memory: {results['system_memory_gb']:.1f} GB")
            logger.info(f"  CPU: {results['cpu_count']} cores")
            logger.info(f"  Packages: {'βœ…' if results['requirements_met'] else '❌'}")
            
            return results
            
        except Exception as e:
            logger.error(f"System requirements check failed: {e}")
            return {"error": str(e)}
    
    async def run_comprehensive_test(self) -> Dict[str, Any]:
        """Run all tests and return comprehensive results"""
        logger.info("πŸ§ͺ Starting comprehensive system test...")
        
        # System requirements (runs first, no server needed)
        self.test_system_requirements()
        
        # Server-dependent tests
        tests = [
            ("Health Check", self.test_health_endpoint()),
            ("Pipeline Initialization", self.test_pipeline_initialization()),
            ("Reference Image Upload", self.test_reference_image_upload()),
            ("WebSocket Connections", self.test_websocket_connections()),
            ("Performance Metrics", self.test_performance_metrics()),
        ]
        
        # Run tests sequentially
        for test_name, test_coro in tests:
            logger.info(f"Running: {test_name}...")
            try:
                result = await test_coro
                if not result:
                    logger.warning(f"{test_name} failed - may affect subsequent tests")
            except Exception as e:
                logger.error(f"{test_name} threw exception: {e}")
        
        # Latency benchmark (runs last)
        logger.info("Running latency benchmark...")
        await self.test_latency_benchmark()
        
        # Calculate overall success rate
        successful_tests = sum(1 for result in self.test_results.values() 
                             if isinstance(result, dict) and result.get("success", False))
        total_tests = len([r for r in self.test_results.values() if isinstance(r, dict) and "success" in r])
        
        overall_success = successful_tests / max(total_tests, 1) >= 0.8  # 80% success rate
        
        summary = {
            "overall_success": overall_success,
            "successful_tests": successful_tests,
            "total_tests": total_tests,
            "success_rate": successful_tests / max(total_tests, 1),
            "detailed_results": self.test_results
        }
        
        logger.info(f"🏁 Test completed: {successful_tests}/{total_tests} tests passed")
        logger.info(f"Overall result: {'βœ… PASS' if overall_success else '❌ FAIL'}")
        
        return summary

async def main():
    """Main test runner"""
    import sys
    
    base_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:7860"
    
    async with MirageSystemTester(base_url) as tester:
        results = await tester.run_comprehensive_test()
        
        # Save results to file
        results_file = Path("test_results.json")
        with open(results_file, "w") as f:
            json.dump(results, f, indent=2, default=str)
        
        logger.info(f"πŸ“Š Detailed results saved to: {results_file}")
        
        # Exit with appropriate code
        sys.exit(0 if results["overall_success"] else 1)

if __name__ == "__main__":
    asyncio.run(main())