Spaces:
Sleeping
Sleeping
File size: 8,370 Bytes
1e98ab1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
"""
Shared LLM Model Manager
Single Qwen2.5-Coder-1.5B instance shared by NL translator and AI analysis
Prevents duplicate model loading and memory waste
"""
import threading
import queue
import time
from typing import Optional, Dict, Any, List
from pathlib import Path
try:
from llama_cpp import Llama
except ImportError:
Llama = None
class SharedModelManager:
"""Thread-safe singleton manager for shared LLM model"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
# Only initialize once
if hasattr(self, '_initialized'):
return
self._initialized = True
self.model = None # type: Optional[Llama]
self.model_path = None # type: Optional[str]
self.model_loaded = False
self.last_error = None # type: Optional[str]
# Request queue for sequential access
self._request_queue = queue.Queue() # type: queue.Queue
self._result_queues = {} # type: Dict[int, queue.Queue]
self._queue_lock = threading.Lock()
self._worker_thread = None # type: Optional[threading.Thread]
self._stop_worker = False
def load_model(self, model_path: str = "qwen2.5-coder-1.5b-instruct-q4_0.gguf") -> tuple[bool, Optional[str]]:
"""Load the shared model (thread-safe)"""
with self._lock:
if self.model_loaded and self.model_path == model_path:
return True, None
if Llama is None:
self.last_error = "llama-cpp-python not installed"
return False, self.last_error
try:
# Unload previous model if different
if self.model is not None and self.model_path != model_path:
del self.model
self.model = None
self.model_loaded = False
# Load new model
full_path = Path(__file__).parent / model_path
if not full_path.exists():
self.last_error = f"Model file not found: {model_path}"
return False, self.last_error
self.model = Llama(
model_path=str(full_path),
n_ctx=4096,
n_threads=4,
verbose=False,
chat_format='qwen2'
)
self.model_path = model_path
self.model_loaded = True
self.last_error = None
# Start worker thread if not running
if self._worker_thread is None or not self._worker_thread.is_alive():
self._stop_worker = False
self._worker_thread = threading.Thread(target=self._process_requests, daemon=True)
self._worker_thread.start()
return True, None
except Exception as e:
self.last_error = f"Failed to load model: {str(e)}"
self.model_loaded = False
return False, self.last_error
def _process_requests(self):
"""Worker thread to process model requests sequentially"""
while not self._stop_worker:
try:
# Get request with timeout to check stop flag
try:
request = self._request_queue.get(timeout=0.5)
except queue.Empty:
continue
request_id = request['id']
messages = request['messages']
max_tokens = request.get('max_tokens', 512)
temperature = request.get('temperature', 0.7)
# Get result queue for this request
with self._queue_lock:
result_queue = self._result_queues.get(request_id)
if result_queue is None:
continue
try:
# Check model is loaded
if not self.model_loaded or self.model is None:
result_queue.put({
'status': 'error',
'message': 'Model not loaded'
})
continue
# Process request
response = self.model.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=False
)
# Extract text from response
if response and 'choices' in response and len(response['choices']) > 0:
text = response['choices'][0].get('message', {}).get('content', '')
result_queue.put({
'status': 'success',
'text': text
})
else:
result_queue.put({
'status': 'error',
'message': 'Empty response from model'
})
except Exception as e:
result_queue.put({
'status': 'error',
'message': f"Model inference error: {str(e)}"
})
except Exception as e:
print(f"Worker thread error: {e}")
time.sleep(0.1)
def generate(self, messages: List[Dict[str, str]], max_tokens: int = 512,
temperature: float = 0.7, timeout: float = 30.0) -> tuple[bool, Optional[str], Optional[str]]:
"""
Generate response from model (thread-safe, queued)
Args:
messages: List of {role, content} dicts
max_tokens: Maximum tokens to generate
temperature: Sampling temperature
timeout: Maximum wait time in seconds
Returns:
(success, response_text, error_message)
"""
if not self.model_loaded:
return False, None, "Model not loaded. Call load_model() first."
# Create request
request_id = id(threading.current_thread()) + int(time.time() * 1000000)
result_queue: queue.Queue = queue.Queue()
# Register result queue
with self._queue_lock:
self._result_queues[request_id] = result_queue
try:
# Submit request
self._request_queue.put({
'id': request_id,
'messages': messages,
'max_tokens': max_tokens,
'temperature': temperature
})
# Wait for result
try:
result = result_queue.get(timeout=timeout)
except queue.Empty:
return False, None, f"Request timeout after {timeout}s"
if result['status'] == 'success':
return True, result['text'], None
else:
return False, None, result.get('message', 'Unknown error')
finally:
# Cleanup result queue
with self._queue_lock:
self._result_queues.pop(request_id, None)
def shutdown(self):
"""Cleanup resources"""
self._stop_worker = True
if self._worker_thread is not None:
self._worker_thread.join(timeout=2.0)
with self._lock:
if self.model is not None:
del self.model
self.model = None
self.model_loaded = False
# Global singleton instance
_shared_model_manager = SharedModelManager()
def get_shared_model() -> SharedModelManager:
"""Get the shared model manager singleton"""
return _shared_model_manager
|