|
|
|
|
|
import asyncio |
|
|
from collections import defaultdict |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
import time |
|
|
import uuid |
|
|
from typing import List, Dict, Any, Optional |
|
|
from dataclasses import dataclass |
|
|
from threading import Lock |
|
|
import threading |
|
|
import json |
|
|
import os |
|
|
import queue |
|
|
import traceback |
|
|
import uuid |
|
|
from typing import Coroutine, Dict, List, Any, Optional, Callable |
|
|
from dataclasses import dataclass |
|
|
from queue import Queue, Empty |
|
|
from threading import Lock, Event, Thread |
|
|
import threading |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import time |
|
|
import gradio as gr |
|
|
from openai import AsyncOpenAI, OpenAI |
|
|
import pyttsx3 |
|
|
from rich.console import Console |
|
|
|
|
|
BASE_URL="http://localhost:1234/v1" |
|
|
BASE_API_KEY="not-needed" |
|
|
BASE_CLIENT = AsyncOpenAI( |
|
|
base_url=BASE_URL, |
|
|
api_key=BASE_API_KEY |
|
|
) |
|
|
BASEMODEL_ID = "leroydyer/qwen/qwen3-0.6b-q4_k_m.gguf" |
|
|
CLIENT =OpenAI( |
|
|
base_url=BASE_URL, |
|
|
api_key=BASE_API_KEY |
|
|
) |
|
|
|
|
|
console = Console() |
|
|
|
|
|
LOCAL_BASE_URL = "http://localhost:1234/v1" |
|
|
LOCAL_API_KEY = "not-needed" |
|
|
|
|
|
MODEL_OPTIONS = { |
|
|
"Local LM Studio": LOCAL_BASE_URL, |
|
|
} |
|
|
DEFAULT_TEMPERATURE = 0.7 |
|
|
DEFAULT_MAX_TOKENS = 5000 |
|
|
console = Console() |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CanvasArtifact: |
|
|
id: str |
|
|
type: str |
|
|
content: str |
|
|
title: str |
|
|
timestamp: float |
|
|
metadata: Dict[str, Any] |
|
|
|
|
|
@dataclass |
|
|
class LLMMessage: |
|
|
role: str |
|
|
content: str |
|
|
message_id: str = None |
|
|
conversation_id: str = None |
|
|
timestamp: float = None |
|
|
metadata: Dict[str, Any] = None |
|
|
def __post_init__(self): |
|
|
if self.message_id is None: |
|
|
self.message_id = str(uuid.uuid4()) |
|
|
if self.timestamp is None: |
|
|
self.timestamp = time.time() |
|
|
if self.metadata is None: |
|
|
self.metadata = {} |
|
|
|
|
|
@dataclass |
|
|
class LLMRequest: |
|
|
message: LLMMessage |
|
|
response_event: str = None |
|
|
callback: Callable = None |
|
|
def __post_init__(self): |
|
|
if self.response_event is None: |
|
|
self.response_event = f"llm_response_{self.message.message_id}" |
|
|
|
|
|
@dataclass |
|
|
class LLMResponse: |
|
|
message: LLMMessage |
|
|
request_id: str |
|
|
success: bool = True |
|
|
error: str = None |
|
|
|
|
|
|
|
|
class EventManager: |
|
|
def __init__(self): |
|
|
self._handlers = defaultdict(list) |
|
|
self._lock = threading.Lock() |
|
|
def register(self, event: str, handler: Callable): |
|
|
with self._lock: |
|
|
self._handlers[event].append(handler) |
|
|
def unregister(self, event: str, handler: Callable): |
|
|
with self._lock: |
|
|
if event in self._handlers and handler in self._handlers[event]: |
|
|
self._handlers[event].remove(handler) |
|
|
def raise_event(self, event: str, data: Any): |
|
|
with self._lock: |
|
|
handlers = self._handlers[event][:] |
|
|
for handler in handlers: |
|
|
try: |
|
|
handler(data) |
|
|
except Exception as e: |
|
|
console.log(f"Error in event handler for {event}: {e}", style="bold red") |
|
|
|
|
|
EVENT_MANAGER = EventManager() |
|
|
def RegisterEvent(event: str, handler: Callable): |
|
|
EVENT_MANAGER.register(event, handler) |
|
|
|
|
|
def RaiseEvent(event: str, data: Any): |
|
|
EVENT_MANAGER.raise_event(event, data) |
|
|
|
|
|
def UnregisterEvent(event: str, handler: Callable): |
|
|
EVENT_MANAGER.unregister(event, handler) |
|
|
|
|
|
|
|
|
class LLMAgent: |
|
|
"""Main Agent Driver ! |
|
|
Agent For Multiple messages at once , |
|
|
has a message queing service as well as agenerator method for easy intergration with console |
|
|
applications as well as ui !""" |
|
|
def __init__( |
|
|
self, |
|
|
model_id: str = BASEMODEL_ID, |
|
|
system_prompt: str = None, |
|
|
max_queue_size: int = 1000, |
|
|
max_retries: int = 3, |
|
|
timeout: int = 30000, |
|
|
max_tokens: int = 5000, |
|
|
temperature: float = 0.3, |
|
|
base_url: str = "http://localhost:1234/v1", |
|
|
api_key: str = "not-needed", |
|
|
generate_fn: Callable[[List[Dict[str, str]]], Coroutine[Any, Any, str]] = None, |
|
|
): |
|
|
self.model_id = model_id |
|
|
self.system_prompt = system_prompt or "You are a helpful AI assistant." |
|
|
self.request_queue = Queue(maxsize=max_queue_size) |
|
|
self.max_retries = max_retries |
|
|
self.timeout = timeout |
|
|
self.is_running = False |
|
|
self._stop_event = Event() |
|
|
self.processing_thread = None |
|
|
|
|
|
self.canvas_artifacts: Dict[str, List[CanvasArtifact]] = defaultdict(list) |
|
|
self.max_canvas_artifacts = 1000 |
|
|
|
|
|
self.conversations: Dict[str, List[LLMMessage]] = {} |
|
|
self.max_history_length = 100 |
|
|
self._generate = generate_fn or self._default_generate |
|
|
self.api_key = api_key |
|
|
self.base_url = base_url |
|
|
self.max_tokens = max_tokens |
|
|
self.temperature = temperature |
|
|
self.async_client = self.CreateClient(base_url, api_key) |
|
|
self.current_conversation = "default" |
|
|
|
|
|
|
|
|
self.pending_requests: Dict[str, LLMRequest] = {} |
|
|
self.pending_requests_lock = Lock() |
|
|
|
|
|
|
|
|
self._register_event_handlers() |
|
|
|
|
|
self._register_event_handlers() |
|
|
|
|
|
try: |
|
|
self.tts_engine = pyttsx3.init() |
|
|
self.setup_tts() |
|
|
self.speech_enabled = True |
|
|
except Exception as e: |
|
|
console.log(f"[yellow]TTS not available: {e}[/yellow]") |
|
|
self.speech_enabled = False |
|
|
|
|
|
console.log("[bold green]π Enhanced LLM Agent Initialized[/bold green]") |
|
|
|
|
|
|
|
|
self.start() |
|
|
def setup_tts(self): |
|
|
"""Configure text-to-speech engine""" |
|
|
if hasattr(self, 'tts_engine'): |
|
|
voices = self.tts_engine.getProperty('voices') |
|
|
if voices: |
|
|
self.tts_engine.setProperty('voice', voices[0].id) |
|
|
self.tts_engine.setProperty('rate', 150) |
|
|
self.tts_engine.setProperty('volume', 0.8) |
|
|
|
|
|
def speak(self, text: str): |
|
|
"""Convert text to speech in a non-blocking way""" |
|
|
if not hasattr(self, 'speech_enabled') or not self.speech_enabled: |
|
|
return |
|
|
|
|
|
def _speak(): |
|
|
try: |
|
|
|
|
|
clean_text = re.sub(r'```.*?```', '', text, flags=re.DOTALL) |
|
|
clean_text = re.sub(r'`.*?`', '', clean_text) |
|
|
clean_text = clean_text.strip() |
|
|
if clean_text: |
|
|
self.tts_engine.say(clean_text) |
|
|
self.tts_engine.runAndWait() |
|
|
else: |
|
|
self.tts_engine.say(text) |
|
|
self.tts_engine.runAndWait() |
|
|
except Exception as e: |
|
|
console.log(f"[red]TTS Error: {e}[/red]") |
|
|
|
|
|
thread = threading.Thread(target=_speak, daemon=True) |
|
|
thread.start() |
|
|
|
|
|
async def _default_generate(self, messages: List[Dict[str, str]]) -> str: |
|
|
"""Default generate function if none provided""" |
|
|
return await self.openai_generate(messages) |
|
|
def create_interface(self): |
|
|
"""Create the full LCARS-styled interface""" |
|
|
lcars_css = """ |
|
|
:root { |
|
|
--lcars-orange: #FF9900; |
|
|
--lcars-red: #FF0033; |
|
|
--lcars-blue: #6699FF; |
|
|
--lcars-purple: #CC99FF; |
|
|
--lcars-pale-blue: #99CCFF; |
|
|
--lcars-black: #000000; |
|
|
--lcars-dark-blue: #3366CC; |
|
|
--lcars-gray: #424242; |
|
|
--lcars-yellow: #FFFF66; |
|
|
} |
|
|
body { |
|
|
background: var(--lcars-black); |
|
|
color: var(--lcars-orange); |
|
|
font-family: 'Antonio', 'LCD', 'Courier New', monospace; |
|
|
margin: 0; |
|
|
padding: 0; |
|
|
} |
|
|
.gradio-container { |
|
|
background: var(--lcars-black) !important; |
|
|
min-height: 100vh; |
|
|
} |
|
|
.lcars-container { |
|
|
background: var(--lcars-black); |
|
|
border: 4px solid var(--lcars-orange); |
|
|
border-radius: 0 30px 0 0; |
|
|
min-height: 100vh; |
|
|
padding: 20px; |
|
|
} |
|
|
.lcars-header { |
|
|
background: linear-gradient(90deg, var(--lcars-red), var(--lcars-orange)); |
|
|
padding: 20px 40px; |
|
|
border-radius: 0 60px 0 0; |
|
|
margin: -20px -20px 20px -20px; |
|
|
border-bottom: 6px solid var(--lcars-blue); |
|
|
} |
|
|
.lcars-title { |
|
|
font-size: 2.5em; |
|
|
font-weight: bold; |
|
|
color: var(--lcars-black); |
|
|
margin: 0; |
|
|
} |
|
|
.lcars-subtitle { |
|
|
font-size: 1.2em; |
|
|
color: var(--lcars-black); |
|
|
margin: 10px 0 0 0; |
|
|
} |
|
|
.lcars-panel { |
|
|
background: rgba(66, 66, 66, 0.9); |
|
|
border: 2px solid var(--lcars-orange); |
|
|
border-radius: 0 20px 0 20px; |
|
|
padding: 15px; |
|
|
margin-bottom: 15px; |
|
|
} |
|
|
.lcars-button { |
|
|
background: var(--lcars-orange); |
|
|
color: var(--lcars-black) !important; |
|
|
border: none !important; |
|
|
border-radius: 0 15px 0 15px !important; |
|
|
padding: 10px 20px !important; |
|
|
font-family: inherit !important; |
|
|
font-weight: bold !important; |
|
|
margin: 5px !important; |
|
|
} |
|
|
.lcars-button:hover { |
|
|
background: var(--lcars-red) !important; |
|
|
} |
|
|
.lcars-input { |
|
|
background: var(--lcars-black) !important; |
|
|
color: var(--lcars-orange) !important; |
|
|
border: 2px solid var(--lcars-blue) !important; |
|
|
border-radius: 0 10px 0 10px !important; |
|
|
padding: 10px !important; |
|
|
} |
|
|
.lcars-chatbot { |
|
|
background: var(--lcars-black) !important; |
|
|
border: 2px solid var(--lcars-purple) !important; |
|
|
border-radius: 0 15px 0 15px !important; |
|
|
} |
|
|
.status-indicator { |
|
|
display: inline-block; |
|
|
width: 12px; |
|
|
height: 12px; |
|
|
border-radius: 50%; |
|
|
background: var(--lcars-red); |
|
|
margin-right: 8px; |
|
|
} |
|
|
.status-online { |
|
|
background: var(--lcars-blue); |
|
|
animation: pulse 2s infinite; |
|
|
} |
|
|
@keyframes pulse { |
|
|
0% { opacity: 1; } |
|
|
50% { opacity: 0.5; } |
|
|
100% { opacity: 1; } |
|
|
} |
|
|
""" |
|
|
with gr.Blocks(css=lcars_css, theme=gr.themes.Default(), title="LCARS Terminal") as interface: |
|
|
with gr.Column(elem_classes="lcars-container"): |
|
|
|
|
|
with gr.Row(elem_classes="lcars-header"): |
|
|
gr.Markdown(""" |
|
|
<div style="text-align: center; width: 100%;"> |
|
|
<div class="lcars-title">π LCARS TERMINAL</div> |
|
|
<div class="lcars-subtitle">STARFLEET AI DEVELOPMENT CONSOLE</div> |
|
|
<div style="margin-top: 10px;"> |
|
|
<span class="status-indicator status-online"></span> |
|
|
<span style="color: var(--lcars-black); font-weight: bold;">SYSTEM ONLINE</span> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
with gr.Column(elem_classes="lcars-panel"): |
|
|
gr.Markdown("### π§ LM STUDIO CONFIGURATION") |
|
|
|
|
|
with gr.Row(): |
|
|
base_url = gr.Textbox( |
|
|
value=BASE_URL, |
|
|
label="LM Studio URL", |
|
|
elem_classes="lcars-input" |
|
|
) |
|
|
api_key = gr.Textbox( |
|
|
value=BASE_API_KEY, |
|
|
label="API Key", |
|
|
type="password", |
|
|
elem_classes="lcars-input" |
|
|
) |
|
|
with gr.Row(): |
|
|
model_dropdown = gr.Dropdown( |
|
|
choices=["Fetching models..."], |
|
|
value="Fetching models...", |
|
|
label="AI Model", |
|
|
elem_classes="lcars-input" |
|
|
) |
|
|
fetch_models_btn = gr.Button("π‘ Fetch Models", elem_classes="lcars-button") |
|
|
with gr.Row(): |
|
|
temperature = gr.Slider(0.0, 2.0, value=0.7, label="Temperature") |
|
|
max_tokens = gr.Slider(128, 8192, value=2000, step=128, label="Max Tokens") |
|
|
with gr.Row(): |
|
|
update_config_btn = gr.Button("πΎ Apply Config", elem_classes="lcars-button") |
|
|
speech_toggle = gr.Checkbox(value=True, label="π Speech Output") |
|
|
|
|
|
with gr.Column(elem_classes="lcars-panel"): |
|
|
gr.Markdown("### π¨ CANVAS ARTIFACTS") |
|
|
artifact_display = gr.JSON(label="") |
|
|
with gr.Row(): |
|
|
refresh_artifacts_btn = gr.Button("π Refresh", elem_classes="lcars-button") |
|
|
clear_canvas_btn = gr.Button("ποΈ Clear Canvas", elem_classes="lcars-button") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
|
|
|
with gr.Accordion("π» COLLABORATIVE CODE CANVAS", open=False): |
|
|
code_editor = gr.Code( |
|
|
value="# Welcome to LCARS Collaborative Canvas\\nprint('Hello, Starfleet!')", |
|
|
language="python", |
|
|
lines=15, |
|
|
label="" |
|
|
) |
|
|
with gr.Row(): |
|
|
load_to_chat_btn = gr.Button("π¬ Discuss Code", elem_classes="lcars-button") |
|
|
analyze_btn = gr.Button("π Analyze", elem_classes="lcars-button") |
|
|
optimize_btn = gr.Button("β‘ Optimize", elem_classes="lcars-button") |
|
|
|
|
|
with gr.Column(elem_classes="lcars-panel"): |
|
|
gr.Markdown("### π¬ MISSION LOG") |
|
|
chatbot = gr.Chatbot(label="", height=300) |
|
|
with gr.Row(): |
|
|
message_input = gr.Textbox( |
|
|
placeholder="Enter your command or query...", |
|
|
show_label=False, |
|
|
lines=2, |
|
|
scale=4 |
|
|
) |
|
|
send_btn = gr.Button("π SEND", elem_classes="lcars-button", scale=1) |
|
|
|
|
|
with gr.Row(): |
|
|
status_display = gr.Textbox( |
|
|
value="LCARS terminal operational. Awaiting commands.", |
|
|
label="Status", |
|
|
max_lines=2 |
|
|
) |
|
|
with gr.Column(scale=0): |
|
|
clear_chat_btn = gr.Button("ποΈ Clear Chat", elem_classes="lcars-button") |
|
|
new_session_btn = gr.Button("π New Session", elem_classes="lcars-button") |
|
|
|
|
|
|
|
|
async def fetch_models_updated(base_url_val, api_key_val): |
|
|
models = await LLMAgent.fetch_available_models(base_url_val, api_key_val) |
|
|
if models: |
|
|
return gr.update(choices=models, value=models[0]) |
|
|
return gr.update(choices=["No models found"]) |
|
|
|
|
|
def update_agent_connection(model_id, base_url_val, api_key_val): |
|
|
self.agent = LLMAgent( |
|
|
model_id=model_id, |
|
|
base_url=base_url_val, |
|
|
api_key=api_key_val, |
|
|
generate_fn=LLMAgent.openai_generate |
|
|
) |
|
|
return f"β
Connected to LM Studio: {model_id}" |
|
|
|
|
|
async def process_message(message, history, speech_enabled): |
|
|
if not message.strip(): |
|
|
return "", history, "Please enter a message" |
|
|
history = history + [[message, None]] |
|
|
try: |
|
|
response = await self.chat_with_canvas( |
|
|
message, self.current_conversation, include_canvas=True |
|
|
) |
|
|
history[-1][1] = response |
|
|
if speech_enabled and self.speech_enabled: |
|
|
self.speak(response) |
|
|
artifacts = self.get_canvas_summary(self.current_conversation) |
|
|
status = f"β
Response received. Canvas artifacts: {len(artifacts)}" |
|
|
return "", history, status, artifacts |
|
|
except Exception as e: |
|
|
error_msg = f"β Error: {str(e)}" |
|
|
history[-1][1] = error_msg |
|
|
return "", history, error_msg, self.get_canvas_summary(self.current_conversation) |
|
|
|
|
|
def get_artifacts(): |
|
|
return self.get_canvas_summary(self.current_conversation) |
|
|
|
|
|
def clear_canvas(): |
|
|
self.clear_canvas(self.current_conversation) |
|
|
return [], "β
Canvas cleared" |
|
|
|
|
|
def clear_chat(): |
|
|
self.clear_conversation(self.current_conversation) |
|
|
return [], "β
Chat cleared" |
|
|
|
|
|
def new_session(): |
|
|
self.clear_conversation(self.current_conversation) |
|
|
self.clear_canvas(self.current_conversation) |
|
|
return [], "# New session started\\nprint('Ready!')", "π New session started", [] |
|
|
|
|
|
|
|
|
fetch_models_btn.click(fetch_models_updated, |
|
|
inputs=[base_url, api_key], |
|
|
outputs=model_dropdown) |
|
|
update_config_btn.click(update_agent_connection, |
|
|
inputs=[model_dropdown, base_url, api_key], |
|
|
outputs=status_display) |
|
|
send_btn.click(process_message, |
|
|
inputs=[message_input, chatbot, speech_toggle], |
|
|
outputs=[message_input, chatbot, status_display, artifact_display]) |
|
|
message_input.submit(process_message, |
|
|
inputs=[message_input, chatbot, speech_toggle], |
|
|
outputs=[message_input, chatbot, status_display, artifact_display]) |
|
|
refresh_artifacts_btn.click(get_artifacts, outputs=artifact_display) |
|
|
clear_canvas_btn.click(clear_canvas, outputs=[artifact_display, status_display]) |
|
|
clear_chat_btn.click(clear_chat, outputs=[chatbot, status_display]) |
|
|
new_session_btn.click(new_session, outputs=[chatbot, code_editor, status_display, artifact_display]) |
|
|
interface.load(get_artifacts, outputs=artifact_display) |
|
|
return interface |
|
|
|
|
|
def _register_event_handlers(self): |
|
|
"""Register internal event handlers for response routing""" |
|
|
RegisterEvent("llm_internal_response", self._handle_internal_response) |
|
|
|
|
|
def _handle_internal_response(self, response: LLMResponse): |
|
|
"""Route responses to the appropriate request handlers""" |
|
|
console.log(f"[bold cyan]Handling internal response for: {response.request_id}[/bold cyan]") |
|
|
|
|
|
request = None |
|
|
with self.pending_requests_lock: |
|
|
if response.request_id in self.pending_requests: |
|
|
request = self.pending_requests[response.request_id] |
|
|
del self.pending_requests[response.request_id] |
|
|
console.log(f"Found pending request for: {response.request_id}") |
|
|
else: |
|
|
console.log(f"No pending request found for: {response.request_id}", style="yellow") |
|
|
return |
|
|
|
|
|
|
|
|
if request.response_event: |
|
|
console.log(f"[bold green]Raising event: {request.response_event}[/bold green]") |
|
|
RaiseEvent(request.response_event, response) |
|
|
|
|
|
|
|
|
if request.callback: |
|
|
try: |
|
|
console.log(f"[bold yellow]Calling callback for: {response.request_id}[/bold yellow]") |
|
|
request.callback(response) |
|
|
except Exception as e: |
|
|
console.log(f"Error in callback: {e}", style="bold red") |
|
|
|
|
|
def _add_to_conversation_history(self, conversation_id: str, message: LLMMessage): |
|
|
"""Add message to conversation history""" |
|
|
if conversation_id not in self.conversations: |
|
|
self.conversations[conversation_id] = [] |
|
|
|
|
|
self.conversations[conversation_id].append(message) |
|
|
|
|
|
|
|
|
if len(self.conversations[conversation_id]) > self.max_history_length * 2: |
|
|
self.conversations[conversation_id] = self.conversations[conversation_id][-(self.max_history_length * 2):] |
|
|
|
|
|
def _build_messages_from_conversation(self, conversation_id: str, new_message: LLMMessage) -> List[Dict[str, str]]: |
|
|
"""Build message list from conversation history""" |
|
|
messages = [] |
|
|
|
|
|
|
|
|
if self.system_prompt: |
|
|
messages.append({"role": "system", "content": self.system_prompt}) |
|
|
|
|
|
|
|
|
if conversation_id in self.conversations: |
|
|
for msg in self.conversations[conversation_id][-self.max_history_length:]: |
|
|
messages.append({"role": msg.role, "content": msg.content}) |
|
|
|
|
|
|
|
|
messages.append({"role": new_message.role, "content": new_message.content}) |
|
|
|
|
|
return messages |
|
|
|
|
|
def _process_llm_request(self, request: LLMRequest): |
|
|
"""Process a single LLM request""" |
|
|
console.log(f"[bold green]Processing LLM request: {request.message.message_id}[/bold green]") |
|
|
try: |
|
|
|
|
|
messages = self._build_messages_from_conversation( |
|
|
request.message.conversation_id or "default", |
|
|
request.message |
|
|
) |
|
|
|
|
|
console.log(f"Calling LLM with {len(messages)} messages") |
|
|
|
|
|
|
|
|
response_content = self._call_llm_sync(messages) |
|
|
|
|
|
console.log(f"[bold green]LLM response received: {response_content}...[/bold green]") |
|
|
|
|
|
|
|
|
response_message = LLMMessage( |
|
|
role="assistant", |
|
|
content=response_content, |
|
|
conversation_id=request.message.conversation_id, |
|
|
metadata={"request_id": request.message.message_id} |
|
|
) |
|
|
|
|
|
|
|
|
self._add_to_conversation_history( |
|
|
request.message.conversation_id or "default", |
|
|
request.message |
|
|
) |
|
|
self._add_to_conversation_history( |
|
|
request.message.conversation_id or "default", |
|
|
response_message |
|
|
) |
|
|
|
|
|
|
|
|
response = LLMResponse( |
|
|
message=response_message, |
|
|
request_id=request.message.message_id, |
|
|
success=True |
|
|
) |
|
|
|
|
|
console.log(f"[bold blue]Sending internal response for: {request.message.message_id}[/bold blue]") |
|
|
RaiseEvent("llm_internal_response", response) |
|
|
|
|
|
except Exception as e: |
|
|
console.log(f"[bold red]Error processing LLM request: {e}[/bold red]") |
|
|
traceback.print_exc() |
|
|
|
|
|
error_response = LLMResponse( |
|
|
message=LLMMessage( |
|
|
role="system", |
|
|
content=f"Error: {str(e)}", |
|
|
conversation_id=request.message.conversation_id |
|
|
), |
|
|
request_id=request.message.message_id, |
|
|
success=False, |
|
|
error=str(e) |
|
|
) |
|
|
|
|
|
RaiseEvent("llm_internal_response", error_response) |
|
|
|
|
|
def _call_llm_sync(self, messages: List[Dict[str, str]]) -> str: |
|
|
"""Sync call to the LLM with retry logic""" |
|
|
console.log(f"Making LLM call to {self.model_id}") |
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
response = CLIENT.chat.completions.create( |
|
|
model=self.model_id, |
|
|
messages=messages, |
|
|
temperature=self.temperature, |
|
|
max_tokens=self.max_tokens |
|
|
) |
|
|
content = response.choices[0].message.content |
|
|
console.log(f"LLM call successful, response length: {len(content)}") |
|
|
return content |
|
|
except Exception as e: |
|
|
console.log(f"LLM call attempt {attempt + 1} failed: {e}") |
|
|
if attempt == self.max_retries - 1: |
|
|
raise e |
|
|
|
|
|
|
|
|
def _process_queue(self): |
|
|
"""Main queue processing loop""" |
|
|
console.log("[bold cyan]LLM Agent queue processor started[/bold cyan]") |
|
|
while not self._stop_event.is_set(): |
|
|
try: |
|
|
request = self.request_queue.get(timeout=1.0) |
|
|
if request: |
|
|
console.log(f"Got request from queue: {request.message.message_id}") |
|
|
self._process_llm_request(request) |
|
|
self.request_queue.task_done() |
|
|
except Empty: |
|
|
continue |
|
|
except Exception as e: |
|
|
console.log(f"Error in queue processing: {e}", style="bold red") |
|
|
traceback.print_exc() |
|
|
console.log("[bold cyan]LLM Agent queue processor stopped[/bold cyan]") |
|
|
|
|
|
def send_message( |
|
|
self, |
|
|
content: str, |
|
|
role: str = "user", |
|
|
conversation_id: str = None, |
|
|
response_event: str = None, |
|
|
callback: Callable = None, |
|
|
metadata: Dict = None |
|
|
) -> str: |
|
|
"""Send a message to the LLM and get response via events""" |
|
|
if not self.is_running: |
|
|
raise RuntimeError("LLM Agent is not running. Call start() first.") |
|
|
|
|
|
|
|
|
message = LLMMessage( |
|
|
role=role, |
|
|
content=content, |
|
|
conversation_id=conversation_id, |
|
|
metadata=metadata or {} |
|
|
) |
|
|
|
|
|
|
|
|
request = LLMRequest( |
|
|
message=message, |
|
|
response_event=response_event, |
|
|
callback=callback |
|
|
) |
|
|
|
|
|
|
|
|
with self.pending_requests_lock: |
|
|
self.pending_requests[message.message_id] = request |
|
|
console.log(f"Added to pending requests: {message.message_id}") |
|
|
|
|
|
|
|
|
try: |
|
|
self.request_queue.put(request, timeout=5.0) |
|
|
console.log(f"[bold magenta]Message queued: {message.message_id}, Content: {content[:50]}...[/bold magenta]") |
|
|
return message.message_id |
|
|
except queue.Full: |
|
|
console.log(f"[bold red]Queue full, cannot send message[/bold red]") |
|
|
with self.pending_requests_lock: |
|
|
if message.message_id in self.pending_requests: |
|
|
del self.pending_requests[message.message_id] |
|
|
raise RuntimeError("LLM Agent queue is full") |
|
|
|
|
|
async def chat(self, messages: List[Dict[str, str]]) -> str: |
|
|
""" |
|
|
Async chat method that sends message via queue and returns response string. |
|
|
This is the main method you should use. |
|
|
""" |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
response_future = loop.create_future() |
|
|
|
|
|
def chat_callback(response: LLMResponse): |
|
|
"""Callback when LLM responds - thread-safe""" |
|
|
console.log(f"[bold yellow]β CHAT CALLBACK TRIGGERED![/bold yellow]") |
|
|
|
|
|
if not response_future.done(): |
|
|
if response.success: |
|
|
content = response.message.content |
|
|
console.log(f"Callback received content: {content}...") |
|
|
|
|
|
loop.call_soon_threadsafe(response_future.set_result, content) |
|
|
else: |
|
|
console.log(f"Error in response: {response.error}") |
|
|
error_msg = f"β Error: {response.error}" |
|
|
loop.call_soon_threadsafe(response_future.set_result, error_msg) |
|
|
else: |
|
|
console.log(f"[bold red]Future already done, ignoring callback[/bold red]") |
|
|
|
|
|
console.log(f"Sending message to LLM agent...") |
|
|
|
|
|
|
|
|
user_message = "" |
|
|
for msg in messages: |
|
|
if msg.get("role") == "user": |
|
|
user_message = msg.get("content", "") |
|
|
break |
|
|
|
|
|
if not user_message.strip(): |
|
|
return "" |
|
|
|
|
|
|
|
|
try: |
|
|
message_id = self.send_message( |
|
|
content=user_message, |
|
|
conversation_id="default", |
|
|
callback=chat_callback |
|
|
) |
|
|
|
|
|
console.log(f"Message sent with ID: {message_id}, waiting for response...") |
|
|
|
|
|
|
|
|
try: |
|
|
response = await asyncio.wait_for(response_future, timeout=self.timeout) |
|
|
console.log(f"[bold green]β Chat complete! Response length: {len(response)}[/bold green]") |
|
|
return response |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
console.log("[bold red]Response timeout[/bold red]") |
|
|
|
|
|
with self.pending_requests_lock: |
|
|
if message_id in self.pending_requests: |
|
|
del self.pending_requests[message_id] |
|
|
return "β Response timeout - check if LLM server is running" |
|
|
|
|
|
except Exception as e: |
|
|
console.log(f"[bold red]Error sending message: {e}[/bold red]") |
|
|
traceback.print_exc() |
|
|
return f"β Error sending message: {e}" |
|
|
|
|
|
def start(self): |
|
|
"""Start the LLM agent""" |
|
|
if not self.is_running: |
|
|
self.is_running = True |
|
|
self._stop_event.clear() |
|
|
self.processing_thread = Thread(target=self._process_queue, daemon=True) |
|
|
self.processing_thread.start() |
|
|
console.log("[bold green]LLM Agent started[/bold green]") |
|
|
|
|
|
def stop(self): |
|
|
"""Stop the LLM agent""" |
|
|
console.log("Stopping LLM Agent...") |
|
|
self._stop_event.set() |
|
|
if self.processing_thread and self.processing_thread.is_alive(): |
|
|
self.processing_thread.join(timeout=10) |
|
|
self.is_running = False |
|
|
console.log("LLM Agent stopped") |
|
|
|
|
|
def get_conversation_history(self, conversation_id: str = "default") -> List[LLMMessage]: |
|
|
"""Get conversation history""" |
|
|
return self.conversations.get(conversation_id, [])[:] |
|
|
|
|
|
def clear_conversation(self, conversation_id: str = "default"): |
|
|
"""Clear conversation history""" |
|
|
if conversation_id in self.conversations: |
|
|
del self.conversations[conversation_id] |
|
|
|
|
|
|
|
|
async def _chat(self, messages: List[Dict[str, str]]) -> str: |
|
|
return await self._generate(messages) |
|
|
|
|
|
@staticmethod |
|
|
async def openai_generate(messages: List[Dict[str, str]], max_tokens: int = 8096, temperature: float = 0.4, model: str = BASEMODEL_ID,tools=None) -> str: |
|
|
"""Static method for generating responses using OpenAI API""" |
|
|
try: |
|
|
resp = await BASE_CLIENT.chat.completions.create( |
|
|
model=model, |
|
|
messages=messages, |
|
|
temperature=temperature, |
|
|
max_tokens=max_tokens, |
|
|
tools=tools |
|
|
) |
|
|
response_text = resp.choices[0].message.content or "" |
|
|
return response_text |
|
|
except Exception as e: |
|
|
console.log(f"[bold red]Error in openai_generate: {e}[/bold red]") |
|
|
return f"[LLM_Agent Error - openai_generate: {str(e)}]" |
|
|
|
|
|
async def _call_(self, messages: List[Dict[str, str]]) -> str: |
|
|
"""Internal call method using instance client""" |
|
|
try: |
|
|
resp = await self.async_client.chat.completions.create( |
|
|
model=self.model_id, |
|
|
messages=messages, |
|
|
temperature=self.temperature, |
|
|
max_tokens=self.max_tokens |
|
|
) |
|
|
response_text = resp.choices[0].message.content or "" |
|
|
return response_text |
|
|
except Exception as e: |
|
|
console.log(f"[bold red]Error in _call_: {e}[/bold red]") |
|
|
return f"[LLM_Agent Error - _call_: {str(e)}]" |
|
|
|
|
|
@staticmethod |
|
|
def CreateClient(base_url: str, api_key: str) -> AsyncOpenAI: |
|
|
'''Create async OpenAI Client required for multi tasking''' |
|
|
return AsyncOpenAI( |
|
|
base_url=base_url, |
|
|
api_key=api_key |
|
|
) |
|
|
|
|
|
@staticmethod |
|
|
async def fetch_available_models(base_url: str, api_key: str) -> List[str]: |
|
|
"""Fetches available models from the OpenAI API.""" |
|
|
try: |
|
|
async_client = AsyncOpenAI(base_url=base_url, api_key=api_key) |
|
|
models = await async_client.models.list() |
|
|
model_choices = [model.id for model in models.data] |
|
|
return model_choices |
|
|
except Exception as e: |
|
|
console.log(f"[bold red]LLM_Agent Error fetching models: {e}[/bold red]") |
|
|
return ["LLM_Agent Error fetching models"] |
|
|
|
|
|
def get_models(self) -> List[str]: |
|
|
"""Get available models using instance credentials""" |
|
|
return asyncio.run(self.fetch_available_models(self.base_url, self.api_key)) |
|
|
|
|
|
|
|
|
def get_queue_size(self) -> int: |
|
|
"""Get current queue size""" |
|
|
return self.request_queue.qsize() |
|
|
|
|
|
def get_pending_requests_count(self) -> int: |
|
|
"""Get number of pending requests""" |
|
|
with self.pending_requests_lock: |
|
|
return len(self.pending_requests) |
|
|
|
|
|
def get_status(self) : |
|
|
"""Get agent status information""" |
|
|
return str({ |
|
|
"is_running": self.is_running, |
|
|
"queue_size": self.get_queue_size(), |
|
|
"pending_requests": self.get_pending_requests_count(), |
|
|
"conversations_count": len(self.conversations), |
|
|
"model": self.model_id, "BaseURL": self.base_url |
|
|
}) |
|
|
|
|
|
|
|
|
def direct_chat(self, user_message: str, conversation_id: str = "default") -> str: |
|
|
""" |
|
|
Send a message and get a response using direct API call. |
|
|
""" |
|
|
try: |
|
|
|
|
|
message = LLMMessage(role="user", content=user_message, conversation_id=conversation_id) |
|
|
|
|
|
|
|
|
messages = self._build_messages_from_conversation(conversation_id, message) |
|
|
console.log(f"Calling LLM at {self.base_url} with {len(messages)} messages") |
|
|
|
|
|
|
|
|
response = CLIENT.chat.completions.create( |
|
|
model=self.model_id, |
|
|
messages=messages, |
|
|
temperature=self.temperature, |
|
|
max_tokens=self.max_tokens |
|
|
) |
|
|
response_content = response.choices[0].message.content |
|
|
console.log(f"[bold green]LLM response received: {response_content[:50]}...[/bold green]") |
|
|
|
|
|
|
|
|
self._add_to_conversation_history(conversation_id, message) |
|
|
response_message = LLMMessage(role="assistant", content=response_content, conversation_id=conversation_id) |
|
|
self._add_to_conversation_history(conversation_id, response_message) |
|
|
|
|
|
return response_content |
|
|
|
|
|
except Exception as e: |
|
|
console.log(f"[bold red]Error in chat: {e}[/bold red]") |
|
|
traceback.print_exc() |
|
|
return f"β Error communicating with LLM: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
def add_artifact(self, conversation_id: str, artifact_type: str, content: str, title: str = "", metadata: Dict = None): |
|
|
artifact = CanvasArtifact( |
|
|
id=str(uuid.uuid4()), |
|
|
type=artifact_type, |
|
|
content=content, |
|
|
title=title, |
|
|
timestamp=time.time(), |
|
|
metadata=metadata or {} |
|
|
) |
|
|
self.canvas_artifacts[conversation_id].append(artifact) |
|
|
|
|
|
def get_canvas_artifacts(self, conversation_id: str = "default") -> List[CanvasArtifact]: |
|
|
return self.canvas_artifacts.get(conversation_id, []) |
|
|
|
|
|
def get_canvas_summary(self, conversation_id: str = "default") -> List[Dict[str, Any]]: |
|
|
artifacts = self.get_canvas_artifacts(conversation_id) |
|
|
return [{"id": a.id, "type": a.type, "title": a.title, "timestamp": a.timestamp} for a in artifacts] |
|
|
|
|
|
def clear_canvas(self, conversation_id: str = "default"): |
|
|
if conversation_id in self.canvas_artifacts: |
|
|
self.canvas_artifacts[conversation_id] = [] |
|
|
|
|
|
def clear_conversation(self, conversation_id: str = "default"): |
|
|
if conversation_id in self.conversations: |
|
|
del self.conversations[conversation_id] |
|
|
|
|
|
def get_latest_code_artifact(self, conversation_id: str) -> Optional[str]: |
|
|
"""Get the most recent code artifact content""" |
|
|
if conversation_id not in self.canvas_artifacts: |
|
|
return None |
|
|
|
|
|
for artifact in reversed(self.canvas_artifacts[conversation_id]): |
|
|
if artifact.type == "code": |
|
|
return artifact.content |
|
|
return None |
|
|
|
|
|
def get_canvas_context(self, conversation_id: str) -> str: |
|
|
"""Get formatted canvas context for LLM prompts""" |
|
|
if conversation_id not in self.canvas_artifacts or not self.canvas_artifacts[conversation_id]: |
|
|
return "" |
|
|
|
|
|
context_lines = ["\n=== COLLABORATIVE CANVAS ARTIFACTS ==="] |
|
|
for artifact in self.canvas_artifacts[conversation_id][-10:]: |
|
|
context_lines.append(f"\n--- {artifact.title} [{artifact.type.upper()}] ---") |
|
|
preview = artifact.content[:500] + "..." if len(artifact.content) > 500 else artifact.content |
|
|
context_lines.append(preview) |
|
|
|
|
|
return "\n".join(context_lines) + "\n=================================\n" |
|
|
def get_artifact_by_id(self, conversation_id: str, artifact_id: str) -> Optional[CanvasArtifact]: |
|
|
"""Get specific artifact by ID""" |
|
|
if conversation_id not in self.canvas_artifacts: |
|
|
return None |
|
|
|
|
|
for artifact in self.canvas_artifacts[conversation_id]: |
|
|
if artifact.id == artifact_id: |
|
|
return artifact |
|
|
return None |
|
|
def _extract_artifacts_to_canvas(self, response: str, conversation_id: str): |
|
|
"""Automatically extract code blocks and add to canvas""" |
|
|
|
|
|
code_blocks = re.findall(r'```(?:(\w+)\n)?(.*?)```', response, re.DOTALL) |
|
|
for i, (lang, code_block) in enumerate(code_blocks): |
|
|
if len(code_block.strip()) > 10: |
|
|
self.add_artifact_to_canvas( |
|
|
conversation_id, |
|
|
code_block.strip(), |
|
|
"code", |
|
|
f"code_snippet_{lang or 'unknown'}_{len(self.canvas_artifacts.get(conversation_id, [])) + 1}" |
|
|
) |
|
|
|
|
|
async def chat_with_canvas(self, message: str, conversation_id: str, include_canvas: bool = False): |
|
|
"""Chat method that can optionally include canvas context.""" |
|
|
messages = [{"role": "user", "content": message}] |
|
|
|
|
|
if include_canvas: |
|
|
artifacts = self.get_canvas_summary(conversation_id) |
|
|
if artifacts: |
|
|
canvas_context = "Current Canvas Context:\\n" + "\\n".join([ |
|
|
f"- [{art['type'].upper()}] {art['title'] or 'Untitled'}: {art['content_preview']}" |
|
|
for art in artifacts |
|
|
]) |
|
|
messages.insert(0, {"role": "system", "content": canvas_context}) |
|
|
|
|
|
return await self.chat(messages) |
|
|
|
|
|
|
|
|
|
|
|
console = Console() |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CanvasArtifact: |
|
|
id: str |
|
|
type: str |
|
|
content: str |
|
|
title: str |
|
|
timestamp: float |
|
|
metadata: Dict[str, Any] |
|
|
|
|
|
class EnhancedAIAgent: |
|
|
""" |
|
|
Wrapper around your AI_Agent that adds canvas/artifact management |
|
|
without modifying the original agent. |
|
|
""" |
|
|
def __init__(self, ai_agent): |
|
|
self.agent = ai_agent |
|
|
self.canvas_artifacts: Dict[str, List[CanvasArtifact]] = {} |
|
|
self.max_canvas_artifacts = 50 |
|
|
console.log("[bold green]β Enhanced AI Agent wrapper initialized[/bold green]") |
|
|
|
|
|
def add_artifact_to_canvas(self, conversation_id: str, content: str, |
|
|
artifact_type: str = "code", title: str = None): |
|
|
"""Add artifacts to the collaborative canvas""" |
|
|
if conversation_id not in self.canvas_artifacts: |
|
|
self.canvas_artifacts[conversation_id] = [] |
|
|
|
|
|
artifact = CanvasArtifact( |
|
|
id=str(uuid.uuid4())[:8], |
|
|
type=artifact_type, |
|
|
content=content, |
|
|
title=title or f"{artifact_type}_{len(self.canvas_artifacts[conversation_id]) + 1}", |
|
|
timestamp=time.time(), |
|
|
metadata={"conversation_id": conversation_id} |
|
|
) |
|
|
|
|
|
self.canvas_artifacts[conversation_id].append(artifact) |
|
|
|
|
|
|
|
|
if len(self.canvas_artifacts[conversation_id]) > self.max_canvas_artifacts: |
|
|
self.canvas_artifacts[conversation_id] = self.canvas_artifacts[conversation_id][-self.max_canvas_artifacts:] |
|
|
|
|
|
console.log(f"[green]Added artifact to canvas: {artifact.title}[/green]") |
|
|
return artifact |
|
|
|
|
|
def get_canvas_context(self, conversation_id: str) -> str: |
|
|
"""Get formatted canvas context for LLM prompts""" |
|
|
if conversation_id not in self.canvas_artifacts or not self.canvas_artifacts[conversation_id]: |
|
|
return "" |
|
|
|
|
|
context_lines = ["\n=== COLLABORATIVE CANVAS ARTIFACTS ==="] |
|
|
for artifact in self.canvas_artifacts[conversation_id][-10:]: |
|
|
context_lines.append(f"\n--- {artifact.title} [{artifact.type.upper()}] ---") |
|
|
preview = artifact.content[:500] + "..." if len(artifact.content) > 500 else artifact.content |
|
|
context_lines.append(preview) |
|
|
|
|
|
return "\n".join(context_lines) + "\n=================================\n" |
|
|
|
|
|
async def chat_with_canvas(self, message: str, conversation_id: str = "default", |
|
|
include_canvas: bool = True) -> str: |
|
|
"""Enhanced chat that includes canvas context""" |
|
|
|
|
|
full_message = message |
|
|
if include_canvas: |
|
|
canvas_context = self.get_canvas_context(conversation_id) |
|
|
if canvas_context: |
|
|
full_message = f"{canvas_context}\n\nUser Query: {message}" |
|
|
|
|
|
try: |
|
|
|
|
|
response = await self.agent.multi_turn_chat(full_message) |
|
|
|
|
|
|
|
|
self._extract_artifacts_to_canvas(response, conversation_id) |
|
|
|
|
|
return response |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error in chat_with_canvas: {str(e)}" |
|
|
console.log(f"[red]{error_msg}[/red]") |
|
|
return error_msg |
|
|
|
|
|
def _extract_artifacts_to_canvas(self, response: str, conversation_id: str): |
|
|
"""Automatically extract code blocks and add to canvas""" |
|
|
|
|
|
code_blocks = re.findall(r'```(?:(\w+)\n)?(.*?)```', response, re.DOTALL) |
|
|
for i, (lang, code_block) in enumerate(code_blocks): |
|
|
if len(code_block.strip()) > 10: |
|
|
self.add_artifact_to_canvas( |
|
|
conversation_id, |
|
|
code_block.strip(), |
|
|
"code", |
|
|
f"code_snippet_{lang or 'unknown'}_{len(self.canvas_artifacts.get(conversation_id, [])) + 1}" |
|
|
) |
|
|
|
|
|
def get_canvas_summary(self, conversation_id: str) -> List[Dict]: |
|
|
"""Get summary of canvas artifacts for display""" |
|
|
if conversation_id not in self.canvas_artifacts: |
|
|
return [] |
|
|
|
|
|
artifacts = [] |
|
|
for artifact in reversed(self.canvas_artifacts[conversation_id]): |
|
|
artifacts.append({ |
|
|
"id": artifact.id, |
|
|
"type": artifact.type.upper(), |
|
|
"title": artifact.title, |
|
|
"preview": artifact.content[:100] + "..." if len(artifact.content) > 100 else artifact.content, |
|
|
"timestamp": time.strftime("%H:%M:%S", time.localtime(artifact.timestamp)) |
|
|
}) |
|
|
|
|
|
return artifacts |
|
|
|
|
|
def get_artifact_by_id(self, conversation_id: str, artifact_id: str) -> Optional[CanvasArtifact]: |
|
|
"""Get specific artifact by ID""" |
|
|
if conversation_id not in self.canvas_artifacts: |
|
|
return None |
|
|
|
|
|
for artifact in self.canvas_artifacts[conversation_id]: |
|
|
if artifact.id == artifact_id: |
|
|
return artifact |
|
|
return None |
|
|
|
|
|
def clear_canvas(self, conversation_id: str = "default"): |
|
|
"""Clear canvas artifacts""" |
|
|
if conversation_id in self.canvas_artifacts: |
|
|
self.canvas_artifacts[conversation_id] = [] |
|
|
console.log(f"[yellow]Cleared canvas: {conversation_id}[/yellow]") |
|
|
|
|
|
def get_latest_code_artifact(self, conversation_id: str) -> Optional[str]: |
|
|
"""Get the most recent code artifact content""" |
|
|
if conversation_id not in self.canvas_artifacts: |
|
|
return None |
|
|
|
|
|
for artifact in reversed(self.canvas_artifacts[conversation_id]): |
|
|
if artifact.type == "code": |
|
|
return artifact.content |
|
|
return None |
|
|
|
|
|
|
|
|
console = Console() |
|
|
|
|
|
|
|
|
class LcarsInterface: |
|
|
def __init__(self): |
|
|
|
|
|
self.use_huggingface = False |
|
|
self.agent = LLMAgent(generate_fn=LLMAgent.openai_generate) |
|
|
self.current_conversation = "default" |
|
|
|
|
|
def create_interface(self): |
|
|
"""Create the full LCARS-styled interface""" |
|
|
lcars_css = """ |
|
|
:root { |
|
|
--lcars-orange: #FF9900; |
|
|
--lcars-red: #FF0033; |
|
|
--lcars-blue: #6699FF; |
|
|
--lcars-purple: #CC99FF; |
|
|
--lcars-pale-blue: #99CCFF; |
|
|
--lcars-black: #000000; |
|
|
--lcars-dark-blue: #3366CC; |
|
|
--lcars-gray: #424242; |
|
|
--lcars-yellow: #FFFF66; |
|
|
} |
|
|
body { |
|
|
background: var(--lcars-black); |
|
|
color: var(--lcars-orange); |
|
|
font-family: 'Antonio', 'LCD', 'Courier New', monospace; |
|
|
margin: 0; |
|
|
padding: 0; |
|
|
} |
|
|
.gradio-container { |
|
|
background: var(--lcars-black) !important; |
|
|
min-height: 100vh; |
|
|
} |
|
|
.lcars-container { |
|
|
background: var(--lcars-black); |
|
|
border: 4px solid var(--lcars-orange); |
|
|
border-radius: 0 30px 0 0; |
|
|
min-height: 100vh; |
|
|
padding: 20px; |
|
|
} |
|
|
.lcars-header { |
|
|
background: linear-gradient(90deg, var(--lcars-red), var(--lcars-orange)); |
|
|
padding: 20px 40px; |
|
|
border-radius: 0 60px 0 0; |
|
|
margin: -20px -20px 20px -20px; |
|
|
border-bottom: 6px solid var(--lcars-blue); |
|
|
} |
|
|
.lcars-title { |
|
|
font-size: 2.5em; |
|
|
font-weight: bold; |
|
|
color: var(--lcars-black); |
|
|
margin: 0; |
|
|
} |
|
|
.lcars-subtitle { |
|
|
font-size: 1.2em; |
|
|
color: var(--lcars-black); |
|
|
margin: 10px 0 0 0; |
|
|
} |
|
|
.lcars-panel { |
|
|
background: rgba(66, 66, 66, 0.9); |
|
|
border: 2px solid var(--lcars-orange); |
|
|
border-radius: 0 20px 0 20px; |
|
|
padding: 15px; |
|
|
margin-bottom: 15px; |
|
|
} |
|
|
.lcars-button { |
|
|
background: var(--lcars-orange); |
|
|
color: var(--lcars-black) !important; |
|
|
border: none !important; |
|
|
border-radius: 0 15px 0 15px !important; |
|
|
padding: 10px 20px !important; |
|
|
font-family: inherit !important; |
|
|
font-weight: bold !important; |
|
|
margin: 5px !important; |
|
|
} |
|
|
.lcars-button:hover { |
|
|
background: var(--lcars-red) !important; |
|
|
} |
|
|
.lcars-input { |
|
|
background: var(--lcars-black) !important; |
|
|
color: var(--lcars-orange) !important; |
|
|
border: 2px solid var(--lcars-blue) !important; |
|
|
border-radius: 0 10px 0 10px !important; |
|
|
padding: 10px !important; |
|
|
} |
|
|
.lcars-chatbot { |
|
|
background: var(--lcars-black) !important; |
|
|
border: 2px solid var(--lcars-purple) !important; |
|
|
border-radius: 0 15px 0 15px !important; |
|
|
} |
|
|
.status-indicator { |
|
|
display: inline-block; |
|
|
width: 12px; |
|
|
height: 12px; |
|
|
border-radius: 50%; |
|
|
background: var(--lcars-red); |
|
|
margin-right: 8px; |
|
|
} |
|
|
.status-online { |
|
|
background: var(--lcars-blue); |
|
|
animation: pulse 2s infinite; |
|
|
} |
|
|
@keyframes pulse { |
|
|
0% { opacity: 1; } |
|
|
50% { opacity: 0.5; } |
|
|
100% { opacity: 1; } |
|
|
} |
|
|
""" |
|
|
with gr.Blocks(css=lcars_css, theme=gr.themes.Default(), title="LCARS Terminal") as interface: |
|
|
with gr.Column(elem_classes="lcars-container"): |
|
|
|
|
|
with gr.Row(elem_classes="lcars-header"): |
|
|
gr.Markdown(""" |
|
|
<div style="text-align: center; width: 100%;"> |
|
|
<div class="lcars-title">π LCARS TERMINAL</div> |
|
|
<div class="lcars-subtitle">STARFLEET AI DEVELOPMENT CONSOLE</div> |
|
|
<div style="margin-top: 10px;"> |
|
|
<span class="status-indicator status-online"></span> |
|
|
<span style="color: var(--lcars-black); font-weight: bold;">SYSTEM ONLINE</span> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
with gr.Column(elem_classes="lcars-panel"): |
|
|
gr.Markdown("### π§ LM STUDIO CONFIGURATION") |
|
|
|
|
|
with gr.Row(): |
|
|
base_url = gr.Textbox( |
|
|
value=LOCAL_BASE_URL, |
|
|
label="LM Studio URL", |
|
|
elem_classes="lcars-input" |
|
|
) |
|
|
api_key = gr.Textbox( |
|
|
value=LOCAL_API_KEY, |
|
|
label="API Key", |
|
|
type="password", |
|
|
elem_classes="lcars-input" |
|
|
) |
|
|
with gr.Row(): |
|
|
model_dropdown = gr.Dropdown( |
|
|
choices=["Fetching models..."], |
|
|
value="Fetching models...", |
|
|
label="AI Model", |
|
|
elem_classes="lcars-input" |
|
|
) |
|
|
fetch_models_btn = gr.Button("π‘ Fetch Models", elem_classes="lcars-button") |
|
|
with gr.Row(): |
|
|
temperature = gr.Slider(0.0, 2.0, value=0.7, label="Temperature") |
|
|
max_tokens = gr.Slider(128, 8192, value=2000, step=128, label="Max Tokens") |
|
|
with gr.Row(): |
|
|
update_config_btn = gr.Button("πΎ Apply Config", elem_classes="lcars-button") |
|
|
speech_toggle = gr.Checkbox(value=True, label="π Speech Output") |
|
|
|
|
|
with gr.Column(elem_classes="lcars-panel"): |
|
|
gr.Markdown("### π¨ CANVAS ARTIFACTS") |
|
|
artifact_display = gr.JSON(label="") |
|
|
with gr.Row(): |
|
|
refresh_artifacts_btn = gr.Button("π Refresh", elem_classes="lcars-button") |
|
|
clear_canvas_btn = gr.Button("ποΈ Clear Canvas", elem_classes="lcars-button") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
|
|
|
with gr.Accordion("π» COLLABORATIVE CODE CANVAS", open=False): |
|
|
code_editor = gr.Code( |
|
|
value="# Welcome to LCARS Collaborative Canvas\\nprint('Hello, Starfleet!')", |
|
|
language="python", |
|
|
lines=15, |
|
|
label="" |
|
|
) |
|
|
with gr.Row(): |
|
|
load_to_chat_btn = gr.Button("π¬ Discuss Code", elem_classes="lcars-button") |
|
|
analyze_btn = gr.Button("π Analyze", elem_classes="lcars-button") |
|
|
optimize_btn = gr.Button("β‘ Optimize", elem_classes="lcars-button") |
|
|
|
|
|
with gr.Column(elem_classes="lcars-panel"): |
|
|
gr.Markdown("### π¬ MISSION LOG") |
|
|
chatbot = gr.Chatbot(label="", height=300) |
|
|
with gr.Row(): |
|
|
message_input = gr.Textbox( |
|
|
placeholder="Enter your command or query...", |
|
|
show_label=False, |
|
|
lines=2, |
|
|
scale=4 |
|
|
) |
|
|
send_btn = gr.Button("π SEND", elem_classes="lcars-button", scale=1) |
|
|
|
|
|
with gr.Row(): |
|
|
status_display = gr.Textbox( |
|
|
value="LCARS terminal operational. Awaiting commands.", |
|
|
label="Status", |
|
|
max_lines=2 |
|
|
) |
|
|
with gr.Column(scale=0): |
|
|
clear_chat_btn = gr.Button("ποΈ Clear Chat", elem_classes="lcars-button") |
|
|
new_session_btn = gr.Button("π New Session", elem_classes="lcars-button") |
|
|
|
|
|
|
|
|
async def fetch_models_updated(base_url_val, api_key_val): |
|
|
models = await LLMAgent.fetch_available_models(base_url_val, api_key_val) |
|
|
if models: |
|
|
return gr.update(choices=models, value=models[0]) |
|
|
return gr.update(choices=["No models found"]) |
|
|
|
|
|
def update_agent_connection(model_id, base_url_val, api_key_val): |
|
|
self.agent = LLMAgent( |
|
|
model_id=model_id, |
|
|
base_url=base_url_val, |
|
|
api_key=api_key_val, |
|
|
generate_fn=LLMAgent.openai_generate |
|
|
) |
|
|
return f"β
Connected to LM Studio: {model_id}" |
|
|
|
|
|
async def process_message(message, history, speech_enabled): |
|
|
if not message.strip(): |
|
|
return "", history, "Please enter a message" |
|
|
history = history + [[message, None]] |
|
|
try: |
|
|
response = await self.agent.chat_with_canvas( |
|
|
message, self.current_conversation, include_canvas=True |
|
|
) |
|
|
history[-1][1] = response |
|
|
if speech_enabled and self.agent.speech_enabled: |
|
|
self.agent.speak(response) |
|
|
artifacts = self.agent.get_canvas_summary(self.current_conversation) |
|
|
status = f"β
Response received. Canvas artifacts: {len(artifacts)}" |
|
|
return "", history, status, artifacts |
|
|
except Exception as e: |
|
|
error_msg = f"β Error: {str(e)}" |
|
|
history[-1][1] = error_msg |
|
|
return "", history, error_msg, self.agent.get_canvas_summary(self.current_conversation) |
|
|
|
|
|
def get_artifacts(): |
|
|
return self.agent.get_canvas_summary(self.current_conversation) |
|
|
|
|
|
def clear_canvas(): |
|
|
self.agent.clear_canvas(self.current_conversation) |
|
|
return [], "β
Canvas cleared" |
|
|
|
|
|
def clear_chat(): |
|
|
self.agent.clear_conversation(self.current_conversation) |
|
|
return [], "β
Chat cleared" |
|
|
|
|
|
def new_session(): |
|
|
self.agent.clear_conversation(self.current_conversation) |
|
|
self.agent.clear_canvas(self.current_conversation) |
|
|
return [], "# New session started\\nprint('Ready!')", "π New session started", [] |
|
|
|
|
|
|
|
|
fetch_models_btn.click(fetch_models_updated, |
|
|
inputs=[base_url, api_key], |
|
|
outputs=model_dropdown) |
|
|
update_config_btn.click(update_agent_connection, |
|
|
inputs=[model_dropdown, base_url, api_key], |
|
|
outputs=status_display) |
|
|
send_btn.click(process_message, |
|
|
inputs=[message_input, chatbot, speech_toggle], |
|
|
outputs=[message_input, chatbot, status_display, artifact_display]) |
|
|
message_input.submit(process_message, |
|
|
inputs=[message_input, chatbot, speech_toggle], |
|
|
outputs=[message_input, chatbot, status_display, artifact_display]) |
|
|
refresh_artifacts_btn.click(get_artifacts, outputs=artifact_display) |
|
|
clear_canvas_btn.click(clear_canvas, outputs=[artifact_display, status_display]) |
|
|
clear_chat_btn.click(clear_chat, outputs=[chatbot, status_display]) |
|
|
new_session_btn.click(new_session, outputs=[chatbot, code_editor, status_display, artifact_display]) |
|
|
interface.load(get_artifacts, outputs=artifact_display) |
|
|
return interface |
|
|
|
|
|
|
|
|
def main(): |
|
|
console.log("[bold blue]π Starting LCARS Terminal...[/bold blue]") |
|
|
interface = LcarsInterface() |
|
|
demo = interface.create_interface() |
|
|
demo.launch( |
|
|
share=False |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |