File size: 15,988 Bytes
1e98ab1
 
 
 
7e8483f
 
 
 
 
1e98ab1
 
 
 
7e8483f
1e98ab1
7e8483f
1e98ab1
 
 
 
 
 
7e8483f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e98ab1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e8483f
 
 
 
1e98ab1
 
7e8483f
1e98ab1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6efa796
 
 
 
 
 
 
 
 
 
1e98ab1
 
 
 
8e1770a
 
 
 
1e98ab1
03eb9aa
1e98ab1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e8483f
8e1770a
 
 
 
 
 
 
 
1e98ab1
 
 
 
 
 
 
 
7e8483f
1e98ab1
 
7e8483f
 
 
 
 
1e98ab1
 
 
7e8483f
 
 
1e98ab1
 
7e8483f
 
1e98ab1
7e8483f
 
 
1e98ab1
 
7e8483f
1e98ab1
 
 
 
7e8483f
 
 
 
1e98ab1
7e8483f
 
 
1e98ab1
 
7e8483f
 
 
 
 
 
 
 
1e98ab1
 
7e8483f
1e98ab1
 
7e8483f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03eb9aa
fa2c1d8
1e98ab1
7e8483f
1e98ab1
fa2c1d8
 
 
 
1e98ab1
 
 
 
fa2c1d8
1e98ab1
 
 
 
7e8483f
 
 
 
fa2c1d8
7e8483f
fa2c1d8
7e8483f
 
 
 
 
 
 
 
 
 
 
 
 
 
fa2c1d8
7e8483f
 
 
 
fa2c1d8
 
1e98ab1
7e8483f
 
 
 
 
 
1e98ab1
7e8483f
 
 
 
 
 
 
 
 
 
 
1e98ab1
7e8483f
 
1e98ab1
7e8483f
 
 
 
 
 
 
 
 
 
1e98ab1
7e8483f
 
 
 
 
 
 
 
 
1e98ab1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
"""
Shared LLM Model Manager
Single Qwen2.5-Coder-1.5B instance shared by NL translator and AI analysis
Prevents duplicate model loading and memory waste

OPTIMIZED FOR NON-BLOCKING OPERATION:
- Async request submission (returns immediately)
- Result polling (check if ready)
- Request cancellation if game loop needs to continue
"""
import threading
import queue
import time
from typing import Optional, Dict, Any, List, Tuple
from pathlib import Path
from enum import Enum

try:
    from llama_cpp import Llama
except ImportError:
    Llama = None

class RequestStatus(Enum):
    """Status of an async request"""
    PENDING = "pending"      # In queue, not yet processed
    PROCESSING = "processing"  # Currently being processed
    COMPLETED = "completed"    # Done, result available
    FAILED = "failed"          # Error occurred
    CANCELLED = "cancelled"    # Request was cancelled

class AsyncRequest:
    """Represents an async LLM request"""
    def __init__(self, request_id: str, messages: List[Dict[str, str]], 
                 max_tokens: int, temperature: float):
        self.request_id = request_id
        self.messages = messages
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.status = RequestStatus.PENDING
        self.result_text: Optional[str] = None
        self.error_message: Optional[str] = None
        self.submitted_at = time.time()
        self.completed_at: Optional[float] = None

class SharedModelManager:
    """Thread-safe singleton manager for shared LLM model"""
    
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self):
        # Only initialize once
        if hasattr(self, '_initialized'):
            return
        
        self._initialized = True
        self.model = None  # type: Optional[Llama]
        self.model_path = None  # type: Optional[str]
        self.model_loaded = False
        self.last_error = None  # type: Optional[str]
        
        # Async request management
        self._request_queue = queue.Queue()  # type: queue.Queue[AsyncRequest]
        self._requests = {}  # type: Dict[str, AsyncRequest]
        self._requests_lock = threading.Lock()
        self._worker_thread = None  # type: Optional[threading.Thread]
        self._stop_worker = False
        self._current_request_id: Optional[str] = None  # Track what's being processed
        
    def load_model(self, model_path: str = "qwen2.5-coder-1.5b-instruct-q4_0.gguf") -> tuple[bool, Optional[str]]:
        """Load the shared model (thread-safe)"""
        with self._lock:
            if self.model_loaded and self.model_path == model_path:
                return True, None
            
            if Llama is None:
                self.last_error = "llama-cpp-python not installed"
                return False, self.last_error
            
            try:
                # Unload previous model if different
                if self.model is not None and self.model_path != model_path:
                    del self.model
                    self.model = None
                    self.model_loaded = False
                
                # Load new model
                # Try /tmp/rts first (HuggingFace Space download location)
                tmp_path = Path("/tmp/rts") / model_path
                local_path = Path(__file__).parent / model_path
                
                if tmp_path.exists():
                    full_path = tmp_path
                elif local_path.exists():
                    full_path = local_path
                else:
                    self.last_error = f"Model file not found: {model_path} (checked /tmp/rts/ and {Path(__file__).parent})"
                    return False, self.last_error
                
                self.model = Llama(
                    model_path=str(full_path),
                    n_ctx=2048,         # Reduced from 4096 for faster processing
                    n_threads=1,        # Prompt processing: 1 thread
                    n_threads_batch=1,  # Token generation: 1 thread (CRITICAL!)
                    n_batch=256,        # Increased from 128 for better throughput
                    verbose=False,
                    chat_format='qwen'
                )
                
                self.model_path = model_path
                self.model_loaded = True
                self.last_error = None
                
                # Start worker thread if not running
                if self._worker_thread is None or not self._worker_thread.is_alive():
                    self._stop_worker = False
                    self._worker_thread = threading.Thread(target=self._process_requests, daemon=True)
                    self._worker_thread.start()
                
                return True, None
                
            except Exception as e:
                self.last_error = f"Failed to load model: {str(e)}"
                self.model_loaded = False
                return False, self.last_error
    
    def _process_requests(self):
        """Worker thread to process model requests sequentially (async-friendly)"""
        # Lower thread priority so game gets CPU preference
        import os
        try:
            os.nice(10)  # Lower priority (0=normal, 19=lowest)
            print("📉 LLM worker thread priority lowered (nice +10)")
        except Exception as e:
            print(f"⚠️ Could not lower thread priority: {e}")
        
        while not self._stop_worker:
            try:
                # Get request with timeout to check stop flag
                try:
                    request = self._request_queue.get(timeout=0.5)
                except queue.Empty:
                    continue
                
                if not isinstance(request, AsyncRequest):
                    continue
                
                # Mark as processing
                with self._requests_lock:
                    self._current_request_id = request.request_id
                    request.status = RequestStatus.PROCESSING
                
                try:
                    # Check model is loaded
                    if not self.model_loaded or self.model is None:
                        request.status = RequestStatus.FAILED
                        request.error_message = 'Model not loaded'
                        request.completed_at = time.time()
                        continue
                    
                    # Process request (this is the blocking part)
                    start_time = time.time()
                    response = self.model.create_chat_completion(
                        messages=request.messages,
                        max_tokens=request.max_tokens,
                        temperature=request.temperature,
                        stream=False
                    )
                    elapsed = time.time() - start_time
                    
                    # Extract text from response
                    if response and 'choices' in response and len(response['choices']) > 0:
                        text = response['choices'][0].get('message', {}).get('content', '')
                        request.status = RequestStatus.COMPLETED
                        request.result_text = text
                        request.completed_at = time.time()
                        print(f"✅ LLM request completed in {elapsed:.2f}s")
                    else:
                        request.status = RequestStatus.FAILED
                        request.error_message = 'Empty response from model'
                        request.completed_at = time.time()
                    
                except Exception as e:
                    request.status = RequestStatus.FAILED
                    request.error_message = f"Model inference error: {str(e)}"
                    request.completed_at = time.time()
                    print(f"❌ LLM request failed: {e}")
                
                finally:
                    with self._requests_lock:
                        self._current_request_id = None
                
            except Exception as e:
                print(f"❌ Worker thread error: {e}")
                time.sleep(0.1)
    
    def submit_async(self, messages: List[Dict[str, str]], max_tokens: int = 256, 
                     temperature: float = 0.7) -> str:
        """
        Submit request asynchronously (non-blocking)
        
        Args:
            messages: List of {role, content} dicts
            max_tokens: Maximum tokens to generate
            temperature: Sampling temperature
            
        Returns:
            request_id: Use this to poll for results with get_result()
        """
        if not self.model_loaded:
            raise RuntimeError("Model not loaded. Call load_model() first.")
        
        # Create unique request ID
        request_id = f"req_{int(time.time() * 1000000)}_{id(threading.current_thread())}"
        
        # Create request object
        request = AsyncRequest(
            request_id=request_id,
            messages=messages,
            max_tokens=max_tokens,
            temperature=temperature
        )
        
        # Register and submit
        with self._requests_lock:
            self._requests[request_id] = request
        
        self._request_queue.put(request)
        print(f"📤 LLM request submitted: {request_id}")
        
        return request_id
    
    def get_result(self, request_id: str, remove: bool = True) -> Tuple[RequestStatus, Optional[str], Optional[str]]:
        """
        Check result of async request (non-blocking)
        
        Args:
            request_id: ID returned by submit_async()
            remove: If True, remove request after getting result
            
        Returns:
            (status, result_text, error_message)
        """
        with self._requests_lock:
            request = self._requests.get(request_id)
        
        if request is None:
            return RequestStatus.FAILED, None, "Request not found (may have been cleaned up)"
        
        # Return current status
        status = request.status
        result_text = request.result_text
        error_message = request.error_message
        
        # Cleanup if requested and completed
        if remove and status in [RequestStatus.COMPLETED, RequestStatus.FAILED, RequestStatus.CANCELLED]:
            with self._requests_lock:
                self._requests.pop(request_id, None)
        
        return status, result_text, error_message
    
    def cancel_request(self, request_id: str) -> bool:
        """
        Cancel a pending request (cannot cancel if already processing)
        
        Returns:
            True if cancelled, False if already processing/completed
        """
        with self._requests_lock:
            request = self._requests.get(request_id)
            if request is None:
                return False
            
            # Can only cancel pending requests
            if request.status == RequestStatus.PENDING:
                request.status = RequestStatus.CANCELLED
                request.completed_at = time.time()
                return True
            
            return False
    
    def generate(self, messages: List[Dict[str, str]], max_tokens: int = 256, 
                 temperature: float = 0.7, max_wait: float = 300.0) -> tuple[bool, Optional[str], Optional[str]]:
        """
        Generate response from model (blocking, for backward compatibility)
        
        NO TIMEOUT - waits for inference to complete naturally.
        Only cancelled if superseded by new request of same type.
        max_wait is a safety limit only.
        
        Args:
            messages: List of {role, content} dicts
            max_tokens: Maximum tokens to generate
            temperature: Sampling temperature
            max_wait: Safety limit in seconds (default 5min)
            
        Returns:
            (success, response_text, error_message)
        """
        try:
            # Submit async
            request_id = self.submit_async(messages, max_tokens, temperature)
            
            # Poll for result (no timeout, wait for completion)
            start_time = time.time()
            while time.time() - start_time < max_wait:  # Safety limit only
                status, result_text, error_message = self.get_result(request_id, remove=False)
                
                if status == RequestStatus.COMPLETED:
                    # Cleanup and return
                    self.get_result(request_id, remove=True)
                    return True, result_text, None
                
                elif status == RequestStatus.FAILED:
                    # Cleanup and return
                    self.get_result(request_id, remove=True)
                    return False, None, error_message
                
                elif status == RequestStatus.CANCELLED:
                    self.get_result(request_id, remove=True)
                    return False, None, "Request was cancelled by newer request"
                
                # Still pending/processing, wait a bit
                time.sleep(0.1)
            
            # Safety limit reached (model may be stuck)
            return False, None, f"Request exceeded safety limit ({max_wait}s) - model may be stuck"
        
        except Exception as e:
            return False, None, f"Error: {str(e)}"
    
    def cleanup_old_requests(self, max_age: float = 300.0):
        """
        Remove completed/failed requests older than max_age seconds
        
        Args:
            max_age: Maximum age in seconds (default 5 minutes)
        """
        now = time.time()
        with self._requests_lock:
            to_remove = []
            for request_id, request in self._requests.items():
                if request.completed_at is not None:
                    age = now - request.completed_at
                    if age > max_age:
                        to_remove.append(request_id)
            
            for request_id in to_remove:
                self._requests.pop(request_id, None)
            
            if to_remove:
                print(f"🧹 Cleaned up {len(to_remove)} old LLM requests")
    
    def get_queue_status(self) -> Dict[str, Any]:
        """Get current queue status for monitoring"""
        with self._requests_lock:
            pending = sum(1 for r in self._requests.values() if r.status == RequestStatus.PENDING)
            processing = sum(1 for r in self._requests.values() if r.status == RequestStatus.PROCESSING)
            completed = sum(1 for r in self._requests.values() if r.status == RequestStatus.COMPLETED)
            failed = sum(1 for r in self._requests.values() if r.status == RequestStatus.FAILED)
            
            return {
                'queue_size': self._request_queue.qsize(),
                'total_requests': len(self._requests),
                'pending': pending,
                'processing': processing,
                'completed': completed,
                'failed': failed,
                'current_request': self._current_request_id
            }
    
    def shutdown(self):
        """Cleanup resources"""
        self._stop_worker = True
        if self._worker_thread is not None:
            self._worker_thread.join(timeout=2.0)
        
        with self._lock:
            if self.model is not None:
                del self.model
                self.model = None
            self.model_loaded = False

# Global singleton instance
_shared_model_manager = SharedModelManager()

def get_shared_model() -> SharedModelManager:
    """Get the shared model manager singleton"""
    return _shared_model_manager