|
|
""" |
|
|
Device Management Module |
|
|
Handles hardware detection, optimization, and device switching |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import logging |
|
|
import platform |
|
|
import subprocess |
|
|
from typing import Optional, Dict, Any, List |
|
|
from exceptions import DeviceError |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class DeviceManager: |
|
|
""" |
|
|
Manages device detection, validation, and optimization for video processing |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._optimal_device = None |
|
|
self._device_info = {} |
|
|
self._cuda_tested = False |
|
|
self._mps_tested = False |
|
|
self._initialize_device_info() |
|
|
|
|
|
def _initialize_device_info(self): |
|
|
"""Initialize comprehensive device information""" |
|
|
self._device_info = { |
|
|
'platform': platform.system(), |
|
|
'python_version': platform.python_version(), |
|
|
'pytorch_version': torch.__version__, |
|
|
'cuda_available': torch.cuda.is_available(), |
|
|
'cuda_version': torch.version.cuda if torch.cuda.is_available() else None, |
|
|
'mps_available': self._check_mps_availability(), |
|
|
'cpu_count': torch.get_num_threads(), |
|
|
} |
|
|
|
|
|
if self._device_info['cuda_available']: |
|
|
self._device_info.update(self._get_cuda_info()) |
|
|
|
|
|
if self._device_info['mps_available']: |
|
|
self._device_info.update(self._get_mps_info()) |
|
|
|
|
|
logger.debug(f"Device info initialized: {self._device_info}") |
|
|
|
|
|
def _check_mps_availability(self) -> bool: |
|
|
"""Check if Metal Performance Shaders (MPS) is available on macOS""" |
|
|
try: |
|
|
if platform.system() == 'Darwin': |
|
|
return hasattr(torch.backends, 'mps') and torch.backends.mps.is_available() |
|
|
except Exception: |
|
|
pass |
|
|
return False |
|
|
|
|
|
def _get_cuda_info(self) -> Dict[str, Any]: |
|
|
"""Get detailed CUDA information""" |
|
|
cuda_info = {} |
|
|
try: |
|
|
if torch.cuda.is_available(): |
|
|
cuda_info.update({ |
|
|
'cuda_device_count': torch.cuda.device_count(), |
|
|
'cuda_current_device': torch.cuda.current_device(), |
|
|
'cuda_devices': [] |
|
|
}) |
|
|
|
|
|
for i in range(torch.cuda.device_count()): |
|
|
device_props = torch.cuda.get_device_properties(i) |
|
|
device_info = { |
|
|
'index': i, |
|
|
'name': device_props.name, |
|
|
'memory_total_gb': device_props.total_memory / (1024**3), |
|
|
'memory_total_mb': device_props.total_memory / (1024**2), |
|
|
'multiprocessor_count': device_props.multiprocessor_count, |
|
|
'compute_capability': f"{device_props.major}.{device_props.minor}" |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
memory_allocated = torch.cuda.memory_allocated(i) / (1024**3) |
|
|
memory_reserved = torch.cuda.memory_reserved(i) / (1024**3) |
|
|
device_info.update({ |
|
|
'memory_allocated_gb': memory_allocated, |
|
|
'memory_reserved_gb': memory_reserved, |
|
|
'memory_free_gb': device_info['memory_total_gb'] - memory_reserved |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get memory info for CUDA device {i}: {e}") |
|
|
|
|
|
cuda_info['cuda_devices'].append(device_info) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting CUDA info: {e}") |
|
|
|
|
|
return cuda_info |
|
|
|
|
|
def _get_mps_info(self) -> Dict[str, Any]: |
|
|
"""Get Metal Performance Shaders information""" |
|
|
mps_info = {} |
|
|
try: |
|
|
if self._device_info['mps_available']: |
|
|
|
|
|
try: |
|
|
result = subprocess.run(['sysctl', 'hw.memsize'], |
|
|
capture_output=True, text=True, timeout=5) |
|
|
if result.returncode == 0: |
|
|
memory_bytes = int(result.stdout.split(':')[1].strip()) |
|
|
mps_info['mps_system_memory_gb'] = memory_bytes / (1024**3) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get system memory info: {e}") |
|
|
|
|
|
mps_info['mps_device'] = 'Apple Silicon GPU' |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting MPS info: {e}") |
|
|
|
|
|
return mps_info |
|
|
|
|
|
def get_optimal_device(self) -> torch.device: |
|
|
""" |
|
|
Get the optimal device for video processing with comprehensive testing |
|
|
""" |
|
|
if self._optimal_device is not None: |
|
|
return self._optimal_device |
|
|
|
|
|
logger.info("Determining optimal device for video processing...") |
|
|
|
|
|
|
|
|
if self._device_info['cuda_available'] and not self._cuda_tested: |
|
|
cuda_device = self._test_cuda_device() |
|
|
if cuda_device is not None: |
|
|
self._optimal_device = cuda_device |
|
|
logger.info(f"Selected CUDA device: {self._get_device_name(cuda_device)}") |
|
|
return self._optimal_device |
|
|
|
|
|
|
|
|
if self._device_info['mps_available'] and not self._mps_tested: |
|
|
mps_device = self._test_mps_device() |
|
|
if mps_device is not None: |
|
|
self._optimal_device = mps_device |
|
|
logger.info(f"Selected MPS device: {self._get_device_name(mps_device)}") |
|
|
return self._optimal_device |
|
|
|
|
|
|
|
|
self._optimal_device = torch.device("cpu") |
|
|
logger.info("Using CPU device (no suitable GPU found or GPU tests failed)") |
|
|
return self._optimal_device |
|
|
|
|
|
def _test_cuda_device(self) -> Optional[torch.device]: |
|
|
"""Test CUDA device functionality""" |
|
|
self._cuda_tested = True |
|
|
|
|
|
try: |
|
|
|
|
|
best_device_idx = 0 |
|
|
best_memory = 0 |
|
|
|
|
|
for device_info in self._device_info.get('cuda_devices', []): |
|
|
if device_info['memory_free_gb'] > best_memory: |
|
|
best_memory = device_info['memory_free_gb'] |
|
|
best_device_idx = device_info['index'] |
|
|
|
|
|
device = torch.device(f"cuda:{best_device_idx}") |
|
|
|
|
|
|
|
|
test_tensor = torch.tensor([1.0], device=device) |
|
|
result = test_tensor * 2 |
|
|
|
|
|
|
|
|
large_tensor = torch.randn(1000, 1000, device=device) |
|
|
del large_tensor, test_tensor, result |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
logger.info(f"CUDA device {best_device_idx} passed functionality tests") |
|
|
return device |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"CUDA device test failed: {e}") |
|
|
return None |
|
|
|
|
|
def _test_mps_device(self) -> Optional[torch.device]: |
|
|
"""Test MPS device functionality""" |
|
|
self._mps_tested = True |
|
|
|
|
|
try: |
|
|
device = torch.device("mps") |
|
|
|
|
|
|
|
|
test_tensor = torch.tensor([1.0], device=device) |
|
|
result = test_tensor * 2 |
|
|
|
|
|
|
|
|
large_tensor = torch.randn(1000, 1000, device=device) |
|
|
del large_tensor, test_tensor, result |
|
|
|
|
|
|
|
|
logger.info("MPS device passed functionality tests") |
|
|
return device |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"MPS device test failed: {e}") |
|
|
return None |
|
|
|
|
|
def _get_device_name(self, device: torch.device) -> str: |
|
|
"""Get human-readable device name""" |
|
|
if device.type == 'cuda': |
|
|
if self._device_info.get('cuda_devices'): |
|
|
device_idx = device.index or 0 |
|
|
for cuda_device in self._device_info['cuda_devices']: |
|
|
if cuda_device['index'] == device_idx: |
|
|
return cuda_device['name'] |
|
|
return f"CUDA Device {device.index or 0}" |
|
|
elif device.type == 'mps': |
|
|
return "Apple Silicon GPU (MPS)" |
|
|
else: |
|
|
return "CPU" |
|
|
|
|
|
def get_device_capabilities(self, device: Optional[torch.device] = None) -> Dict[str, Any]: |
|
|
"""Get capabilities of the specified device""" |
|
|
if device is None: |
|
|
device = self.get_optimal_device() |
|
|
|
|
|
capabilities = { |
|
|
'device_type': device.type, |
|
|
'device_name': self._get_device_name(device), |
|
|
'supports_mixed_precision': False, |
|
|
'recommended_batch_size': 1, |
|
|
'memory_efficiency': 'medium' |
|
|
} |
|
|
|
|
|
if device.type == 'cuda': |
|
|
device_idx = device.index or 0 |
|
|
for cuda_device in self._device_info.get('cuda_devices', []): |
|
|
if cuda_device['index'] == device_idx: |
|
|
|
|
|
compute_version = float(cuda_device.get('compute_capability', '0.0')) |
|
|
capabilities['supports_mixed_precision'] = compute_version >= 7.0 |
|
|
|
|
|
|
|
|
memory_gb = cuda_device.get('memory_free_gb', 0) |
|
|
if memory_gb >= 24: |
|
|
capabilities['recommended_batch_size'] = 4 |
|
|
capabilities['memory_efficiency'] = 'high' |
|
|
elif memory_gb >= 12: |
|
|
capabilities['recommended_batch_size'] = 2 |
|
|
capabilities['memory_efficiency'] = 'high' |
|
|
elif memory_gb >= 6: |
|
|
capabilities['recommended_batch_size'] = 1 |
|
|
capabilities['memory_efficiency'] = 'medium' |
|
|
else: |
|
|
capabilities['memory_efficiency'] = 'low' |
|
|
|
|
|
capabilities['memory_available_gb'] = memory_gb |
|
|
break |
|
|
|
|
|
elif device.type == 'mps': |
|
|
capabilities['supports_mixed_precision'] = True |
|
|
capabilities['memory_efficiency'] = 'high' |
|
|
system_memory = self._device_info.get('mps_system_memory_gb', 8) |
|
|
if system_memory >= 16: |
|
|
capabilities['recommended_batch_size'] = 2 |
|
|
capabilities['memory_available_gb'] = system_memory * 0.7 |
|
|
|
|
|
else: |
|
|
capabilities['memory_efficiency'] = 'low' |
|
|
capabilities['supports_mixed_precision'] = False |
|
|
|
|
|
return capabilities |
|
|
|
|
|
def switch_device(self, device_type: str) -> torch.device: |
|
|
""" |
|
|
Switch to a specific device type |
|
|
|
|
|
Args: |
|
|
device_type: 'cuda', 'mps', or 'cpu' |
|
|
""" |
|
|
try: |
|
|
if device_type.lower() == 'cuda': |
|
|
if not self._device_info['cuda_available']: |
|
|
raise DeviceError('cuda', 'CUDA not available on this system') |
|
|
|
|
|
device = self._test_cuda_device() |
|
|
if device is None: |
|
|
raise DeviceError('cuda', 'CUDA device failed functionality tests') |
|
|
|
|
|
elif device_type.lower() == 'mps': |
|
|
if not self._device_info['mps_available']: |
|
|
raise DeviceError('mps', 'MPS not available on this system') |
|
|
|
|
|
device = self._test_mps_device() |
|
|
if device is None: |
|
|
raise DeviceError('mps', 'MPS device failed functionality tests') |
|
|
|
|
|
elif device_type.lower() == 'cpu': |
|
|
device = torch.device('cpu') |
|
|
|
|
|
else: |
|
|
raise DeviceError('unknown', f'Unknown device type: {device_type}') |
|
|
|
|
|
self._optimal_device = device |
|
|
logger.info(f"Switched to device: {self._get_device_name(device)}") |
|
|
return device |
|
|
|
|
|
except DeviceError: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise DeviceError(device_type, f"Failed to switch to {device_type}: {str(e)}") |
|
|
|
|
|
def get_available_devices(self) -> List[str]: |
|
|
"""Get list of available device types""" |
|
|
devices = ['cpu'] |
|
|
|
|
|
if self._device_info['cuda_available']: |
|
|
devices.append('cuda') |
|
|
|
|
|
if self._device_info['mps_available']: |
|
|
devices.append('mps') |
|
|
|
|
|
return devices |
|
|
|
|
|
def get_device_status(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive device status""" |
|
|
current_device = self.get_optimal_device() |
|
|
|
|
|
status = { |
|
|
'current_device': str(current_device), |
|
|
'current_device_name': self._get_device_name(current_device), |
|
|
'available_devices': self.get_available_devices(), |
|
|
'device_info': self._device_info.copy(), |
|
|
'capabilities': self.get_device_capabilities(current_device) |
|
|
} |
|
|
|
|
|
|
|
|
if current_device.type == 'cuda': |
|
|
try: |
|
|
device_idx = current_device.index or 0 |
|
|
status['current_memory_usage'] = { |
|
|
'allocated_gb': torch.cuda.memory_allocated(device_idx) / (1024**3), |
|
|
'reserved_gb': torch.cuda.memory_reserved(device_idx) / (1024**3), |
|
|
'max_allocated_gb': torch.cuda.max_memory_allocated(device_idx) / (1024**3), |
|
|
'max_reserved_gb': torch.cuda.max_memory_reserved(device_idx) / (1024**3) |
|
|
} |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get current memory usage: {e}") |
|
|
|
|
|
return status |
|
|
|
|
|
def optimize_for_processing(self) -> Dict[str, Any]: |
|
|
"""Optimize device settings for video processing""" |
|
|
device = self.get_optimal_device() |
|
|
optimizations = { |
|
|
'device': str(device), |
|
|
'optimizations_applied': [] |
|
|
} |
|
|
|
|
|
try: |
|
|
if device.type == 'cuda': |
|
|
|
|
|
torch.backends.cudnn.benchmark = True |
|
|
optimizations['optimizations_applied'].append('cudnn_benchmark') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
optimizations['optimizations_applied'].append('cuda_memory_strategy') |
|
|
|
|
|
elif device.type == 'mps': |
|
|
|
|
|
optimizations['optimizations_applied'].append('mps_optimized') |
|
|
|
|
|
else: |
|
|
|
|
|
torch.set_num_threads(min(torch.get_num_threads(), 8)) |
|
|
optimizations['optimizations_applied'].append('cpu_thread_optimization') |
|
|
|
|
|
logger.info(f"Applied optimizations for {device}: {optimizations['optimizations_applied']}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Some optimizations failed: {e}") |
|
|
optimizations['optimization_errors'] = str(e) |
|
|
|
|
|
return optimizations |
|
|
|
|
|
def cleanup_device_memory(self): |
|
|
"""Clean up device memory""" |
|
|
device = self.get_optimal_device() |
|
|
|
|
|
if device.type == 'cuda': |
|
|
try: |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.synchronize() |
|
|
logger.debug("CUDA memory cache cleared") |
|
|
except Exception as e: |
|
|
logger.warning(f"CUDA memory cleanup failed: {e}") |
|
|
|
|
|
elif device.type == 'mps': |
|
|
try: |
|
|
|
|
|
|
|
|
import gc |
|
|
gc.collect() |
|
|
logger.debug("MPS memory cleanup completed") |
|
|
except Exception as e: |
|
|
logger.warning(f"MPS memory cleanup failed: {e}") |
|
|
|
|
|
else: |
|
|
try: |
|
|
import gc |
|
|
gc.collect() |
|
|
logger.debug("CPU memory cleanup completed") |
|
|
except Exception as e: |
|
|
logger.warning(f"CPU memory cleanup failed: {e}") |