Dhruv Pawar
Enhanced UI: Added collapsible sidebar toggle, increased chat area size, fixed streaming bugs
92a25bc
| import os | |
| import time | |
| import json | |
| import hashlib | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Generator, Optional, Any, Tuple | |
| from dataclasses import dataclass, field, asdict | |
| from functools import wraps, lru_cache | |
| from contextlib import contextmanager | |
| from collections import deque, defaultdict | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| from dotenv import load_dotenv | |
| from groq import Groq | |
| import groq | |
| from config import logger, AppConfig, ReasoningMode, ModelConfig | |
| class ResponseCache: | |
| """Thread-safe LRU cache for API responses""" | |
| def __init__(self, maxsize: int = 100, ttl: int = 3600): | |
| self.cache: Dict[str, Tuple[Any, float]] = {} | |
| self.maxsize = maxsize | |
| self.ttl = ttl | |
| self.lock = threading.Lock() | |
| self.hits = 0 | |
| self.misses = 0 | |
| def get(self, key: str) -> Optional[Any]: | |
| """Get cached value if not expired""" | |
| with self.lock: | |
| if key in self.cache: | |
| value, timestamp = self.cache[key] | |
| if time.time() - timestamp < self.ttl: | |
| self.hits += 1 | |
| logger.debug(f"Cache hit for key: {key[:20]}...") | |
| return value | |
| else: | |
| del self.cache[key] | |
| self.misses += 1 | |
| return None | |
| def set(self, key: str, value: Any) -> None: | |
| """Set cached value with timestamp""" | |
| with self.lock: | |
| if len(self.cache) >= self.maxsize: | |
| oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1]) | |
| del self.cache[oldest_key] | |
| self.cache[key] = (value, time.time()) | |
| logger.debug(f"Cached response for key: {key[:20]}...") | |
| def clear(self) -> None: | |
| """Clear cache""" | |
| with self.lock: | |
| self.cache.clear() | |
| self.hits = 0 | |
| self.misses = 0 | |
| logger.info("Cache cleared") | |
| def get_stats(self) -> Dict[str, int]: | |
| """Get cache statistics""" | |
| with self.lock: | |
| total = self.hits + self.misses | |
| hit_rate = (self.hits / total * 100) if total > 0 else 0 | |
| return { | |
| "hits": self.hits, | |
| "misses": self.misses, | |
| "hit_rate": round(hit_rate, 2), | |
| "size": len(self.cache) | |
| } | |
| class RateLimiter: | |
| """Token bucket rate limiter""" | |
| def __init__(self, max_requests: int = 50, window: int = 60): | |
| self.max_requests = max_requests | |
| self.window = window | |
| self.requests = deque() | |
| self.lock = threading.Lock() | |
| def is_allowed(self) -> Tuple[bool, Optional[float]]: | |
| """Check if request is allowed""" | |
| with self.lock: | |
| now = time.time() | |
| while self.requests and self.requests[0] < now - self.window: | |
| self.requests.popleft() | |
| if len(self.requests) < self.max_requests: | |
| self.requests.append(now) | |
| return True, None | |
| else: | |
| wait_time = self.window - (now - self.requests[0]) | |
| return False, wait_time | |
| def reset(self) -> None: | |
| """Reset rate limiter""" | |
| with self.lock: | |
| self.requests.clear() | |
| class ConversationMetrics: | |
| """Enhanced metrics with thread-safe operations""" | |
| reasoning_depth: int = 0 | |
| self_corrections: int = 0 | |
| confidence_score: float = 0.0 | |
| inference_time: float = 0.0 | |
| tokens_used: int = 0 | |
| tokens_per_second: float = 0.0 | |
| reasoning_paths_explored: int = 0 | |
| total_conversations: int = 0 | |
| avg_response_time: float = 0.0 | |
| cache_hits: int = 0 | |
| cache_misses: int = 0 | |
| error_count: int = 0 | |
| retry_count: int = 0 | |
| last_updated: str = field(default_factory=lambda: datetime.now().strftime("%H:%M:%S")) | |
| session_start: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S")) | |
| model_switches: int = 0 | |
| mode_switches: int = 0 | |
| peak_tokens: int = 0 | |
| total_latency: float = 0.0 | |
| _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False) | |
| def update_confidence(self) -> None: | |
| """Calculate confidence based on multiple factors""" | |
| with self._lock: | |
| depth_score = min(30, self.reasoning_depth * 5) | |
| correction_score = min(20, self.self_corrections * 10) | |
| speed_score = min(25, 25 / max(1, self.avg_response_time)) | |
| consistency_score = 25 | |
| self.confidence_score = min(95.0, depth_score + correction_score + speed_score + consistency_score) | |
| def update_tokens_per_second(self, tokens: int, time_taken: float) -> None: | |
| """Calculate tokens per second""" | |
| with self._lock: | |
| if time_taken > 0: | |
| self.tokens_per_second = tokens / time_taken | |
| def increment_field(self, field_name: str, value: Any = 1) -> None: | |
| """Thread-safe field increment""" | |
| with self._lock: | |
| current = getattr(self, field_name) | |
| setattr(self, field_name, current + value) | |
| def set_field(self, field_name: str, value: Any) -> None: | |
| """Thread-safe field setter""" | |
| with self._lock: | |
| setattr(self, field_name, value) | |
| def reset(self) -> None: | |
| """Reset metrics for new session""" | |
| with self._lock: | |
| self.__init__() | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary""" | |
| with self._lock: | |
| data = asdict(self) | |
| data.pop('_lock', None) | |
| return data | |
| class ConversationEntry: | |
| """Enhanced conversation entry with metadata""" | |
| timestamp: str | |
| user_message: str | |
| ai_response: str | |
| model: str | |
| reasoning_mode: str | |
| inference_time: float | |
| tokens: int | |
| feedback: str = "" | |
| tags: List[str] = field(default_factory=list) | |
| rating: Optional[int] = None | |
| session_id: str = "" | |
| conversation_id: str = "" | |
| parent_id: Optional[str] = None | |
| temperature: float = 0.7 | |
| max_tokens: int = 4000 | |
| cache_hit: bool = False | |
| error_occurred: bool = False | |
| retry_count: int = 0 | |
| tokens_per_second: float = 0.0 | |
| def __post_init__(self): | |
| """Generate unique IDs""" | |
| if not self.conversation_id: | |
| self.conversation_id = self._generate_id() | |
| def _generate_id(self) -> str: | |
| """Generate unique conversation ID""" | |
| content = f"{self.timestamp}{self.user_message[:100]}" | |
| return hashlib.md5(content.encode()).hexdigest()[:12] | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary with sanitization""" | |
| return asdict(self) | |
| def from_dict(cls, data: Dict[str, Any]) -> 'ConversationEntry': | |
| """Create instance from dictionary""" | |
| return cls(**data) | |
| def add_tag(self, tag: str) -> None: | |
| """Add tag to conversation""" | |
| if tag not in self.tags: | |
| self.tags.append(tag) | |
| def set_rating(self, rating: int) -> None: | |
| """Set user rating (1-5)""" | |
| if 1 <= rating <= 5: | |
| self.rating = rating | |
| def error_handler(func): | |
| """Enhanced error handling decorator for generator functions""" | |
| def wrapper(*args, **kwargs): | |
| max_retries = AppConfig.MAX_RETRIES | |
| retry_delay = AppConfig.RETRY_DELAY | |
| for attempt in range(max_retries): | |
| try: | |
| # Check if function is a generator | |
| result = func(*args, **kwargs) | |
| if hasattr(result, '__iter__') and hasattr(result, '__next__'): | |
| # It's a generator, yield from it | |
| yield from result | |
| else: | |
| # Regular function, return result | |
| return result | |
| return # Exit after successful completion | |
| except groq.APIConnectionError as e: | |
| error_msg = f"🔌 **Connection Error**: Cannot reach Groq API.\n\n" | |
| error_msg += "Please check your internet connection and try again." | |
| logger.error(f"API Connection Error in {func.__name__}: {str(e)}") | |
| except groq.RateLimitError as e: | |
| error_msg = f"⏱️ **Rate Limit Exceeded**: Too many requests.\n\n" | |
| error_msg += "Please wait a moment and try again." | |
| logger.error(f"Rate Limit Error in {func.__name__}: {str(e)}") | |
| except groq.AuthenticationError as e: | |
| error_msg = f"🔐 **Authentication Error**: Invalid API key.\n\n" | |
| error_msg += "Please verify your GROQ_API_KEY in the .env file." | |
| logger.error(f"Authentication Error in {func.__name__}: {str(e)}") | |
| yield error_msg | |
| return # Don't retry authentication errors | |
| except groq.APIStatusError as e: | |
| error_msg = f"⚠️ **API Error** (Status {e.status_code}):\n\n" | |
| error_msg += f"{str(e)}\n\nPlease try again or select a different model." | |
| logger.error(f"API Status Error in {func.__name__}: {str(e)}") | |
| except Exception as e: | |
| error_msg = f"❌ **System Error**: {str(e)}\n\n" | |
| error_msg += "Please try again or contact support if the issue persists." | |
| logger.error(f"Unexpected error in {func.__name__}: {str(e)}", exc_info=True) | |
| if attempt < max_retries - 1: | |
| logger.info(f"Retrying in {retry_delay}s... (attempt {attempt+1}/{max_retries})") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| yield error_msg | |
| return | |
| return wrapper | |
| def timer(operation: str = "Operation"): | |
| """Enhanced context manager for timing operations""" | |
| start = time.time() | |
| logger.info(f"Starting: {operation}") | |
| try: | |
| yield | |
| finally: | |
| duration = time.time() - start | |
| logger.info(f"Completed: {operation} in {duration:.3f}s") | |
| def validate_input(text: str, max_length: int = 10000) -> Tuple[bool, Optional[str]]: | |
| """Validate user input""" | |
| if not text or not text.strip(): | |
| return False, "Input cannot be empty" | |
| if len(text) > max_length: | |
| return False, f"Input too long (max {max_length} characters)" | |
| suspicious_patterns = ["<script", "javascript:", "onerror=", "onclick="] | |
| text_lower = text.lower() | |
| for pattern in suspicious_patterns: | |
| if pattern in text_lower: | |
| return False, "Input contains potentially unsafe content" | |
| return True, None | |
| class GroqClientManager: | |
| """Enhanced singleton manager for Groq client""" | |
| _instance: Optional[Groq] = None | |
| _lock = threading.Lock() | |
| _initialized = False | |
| _health_check_time: Optional[float] = None | |
| _health_check_interval = 300 | |
| def get_client(cls) -> Groq: | |
| """Get or create Groq client instance with health check""" | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._initialize_client() | |
| if cls._should_health_check(): | |
| cls._perform_health_check() | |
| return cls._instance | |
| def _initialize_client(cls) -> None: | |
| """Initialize Groq client""" | |
| load_dotenv() | |
| api_key = os.environ.get("GROQ_API_KEY") | |
| if not api_key: | |
| logger.error("GROQ_API_KEY not found in environment") | |
| raise ValueError("GROQ_API_KEY not found. Please set it in your .env file.") | |
| try: | |
| cls._instance = Groq(api_key=api_key, timeout=AppConfig.REQUEST_TIMEOUT) | |
| cls._initialized = True | |
| cls._health_check_time = time.time() | |
| logger.info("Groq client initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Groq client: {e}") | |
| raise | |
| def _should_health_check(cls) -> bool: | |
| """Check if health check is needed""" | |
| if not cls._health_check_time: | |
| return True | |
| return time.time() - cls._health_check_time > cls._health_check_interval | |
| def _perform_health_check(cls) -> None: | |
| """Perform health check on client""" | |
| try: | |
| if cls._instance: | |
| cls._health_check_time = time.time() | |
| logger.debug("Health check passed") | |
| except Exception as e: | |
| logger.warning(f"Health check failed: {e}") | |
| cls._instance = None | |
| cls._initialized = False | |
| class PromptEngine: | |
| """Enhanced centralized prompt management""" | |
| SYSTEM_PROMPTS = { | |
| ReasoningMode.TREE_OF_THOUGHTS: """You are an advanced reasoning system using Tree of Thoughts methodology. | |
| Explore multiple reasoning paths systematically before converging on the best solution. | |
| Always show your thought process explicitly.""", | |
| ReasoningMode.CHAIN_OF_THOUGHT: """You are a systematic problem solver using Chain of Thought reasoning. | |
| Break down complex problems into clear, logical steps with explicit reasoning.""", | |
| ReasoningMode.SELF_CONSISTENCY: """You are a consistency-focused reasoning system. | |
| Generate multiple independent solutions and identify the most consistent answer.""", | |
| ReasoningMode.REFLEXION: """You are a self-reflective AI system. | |
| Solve problems, critique your own reasoning, and refine your solutions iteratively.""", | |
| ReasoningMode.DEBATE: """You are a multi-agent debate system. | |
| Present multiple perspectives and synthesize the strongest arguments.""", | |
| ReasoningMode.ANALOGICAL: """You are an analogical reasoning system. | |
| Find similar problems and apply their solutions.""" | |
| } | |
| TEMPLATES = { | |
| "Code Review": { | |
| "prompt": "Analyze the following code for bugs, performance issues, and best practices:\n\n{query}", | |
| "context": "code_analysis" | |
| }, | |
| "Research Summary": { | |
| "prompt": "Provide a comprehensive research summary on:\n\n{query}\n\nInclude key findings, methodologies, and implications.", | |
| "context": "research" | |
| }, | |
| "Problem Solving": { | |
| "prompt": "Solve this problem step-by-step with detailed explanations:\n\n{query}", | |
| "context": "problem_solving" | |
| }, | |
| "Creative Writing": { | |
| "prompt": "Generate creative content based on:\n\n{query}\n\nBe imaginative and engaging.", | |
| "context": "creative" | |
| }, | |
| "Data Analysis": { | |
| "prompt": "Analyze this data/scenario and provide insights:\n\n{query}", | |
| "context": "analysis" | |
| }, | |
| "Debugging": { | |
| "prompt": "Debug this code/issue systematically:\n\n{query}", | |
| "context": "debugging" | |
| }, | |
| "Custom": { | |
| "prompt": "{query}", | |
| "context": "general" | |
| } | |
| } | |
| REASONING_PROMPTS = { | |
| ReasoningMode.TREE_OF_THOUGHTS: """ | |
| **Tree of Thoughts Analysis** | |
| Problem: {query} | |
| **Exploration Phase:** | |
| PATH A (Analytical): [Examine from first principles] | |
| PATH B (Alternative): [Consider different angle] | |
| PATH C (Synthesis): [Integrate insights] | |
| **Evaluation Phase:** | |
| - Assess each path's validity | |
| - Identify strongest reasoning chain | |
| - Converge on optimal solution | |
| **Final Solution:** [Most robust answer with justification]""", | |
| ReasoningMode.CHAIN_OF_THOUGHT: """ | |
| **Step-by-Step Reasoning** | |
| Problem: {query} | |
| Step 1: Understand the question | |
| Step 2: Identify key components | |
| Step 3: Apply relevant logic/principles | |
| Step 4: Derive solution | |
| Step 5: Validate and verify | |
| Final Answer: [Clear, justified conclusion]""", | |
| ReasoningMode.SELF_CONSISTENCY: """ | |
| **Multi-Path Consistency Check** | |
| Problem: {query} | |
| **Attempt 1:** [First independent solution] | |
| **Attempt 2:** [Alternative approach] | |
| **Attempt 3:** [Third perspective] | |
| **Consensus:** [Most consistent answer across attempts]""", | |
| ReasoningMode.REFLEXION: """ | |
| **Reflexion with Self-Correction** | |
| Problem: {query} | |
| **Initial Solution:** [First attempt] | |
| **Self-Critique:** | |
| - Assumptions made? | |
| - Logical flaws? | |
| - Missing elements? | |
| **Refined Solution:** [Improved answer based on reflection]""", | |
| ReasoningMode.DEBATE: """ | |
| **Multi-Agent Debate** | |
| Problem: {query} | |
| **Position A:** [Strongest case for one approach] | |
| **Position B:** [Critical examination] | |
| **Synthesis:** [Balanced conclusion]""", | |
| ReasoningMode.ANALOGICAL: """ | |
| **Analogical Reasoning** | |
| Problem: {query} | |
| **Similar Problems:** [Identify analogous situations] | |
| **Solution Transfer:** [Adapt known solutions] | |
| **Final Answer:** [Solution derived from analogy]""" | |
| } | |
| def build_prompt(cls, query: str, mode: ReasoningMode, template: str) -> str: | |
| """Build enhanced reasoning prompt""" | |
| template_data = cls.TEMPLATES.get(template, cls.TEMPLATES["Custom"]) | |
| formatted_query = template_data["prompt"].format(query=query) | |
| return cls.REASONING_PROMPTS[mode].format(query=formatted_query) | |
| def build_critique_prompt(cls) -> str: | |
| """Build validation prompt for self-critique""" | |
| return """ | |
| **Validation Check:** | |
| Review the previous response for: | |
| 1. Factual accuracy | |
| 2. Logical consistency | |
| 3. Completeness | |
| 4. Potential biases or errors | |
| Provide brief validation or corrections if needed.""" | |
| def get_template_context(cls, template: str) -> str: | |
| """Get context for template""" | |
| return cls.TEMPLATES.get(template, {}).get("context", "general") | |
| class ConversationExporter: | |
| """Enhanced conversation export with multiple formats including PDF""" | |
| def to_json(entries: List[ConversationEntry], pretty: bool = True) -> str: | |
| """Export to JSON format""" | |
| data = [entry.to_dict() for entry in entries] | |
| indent = 2 if pretty else None | |
| return json.dumps(data, indent=indent, ensure_ascii=False) | |
| def to_markdown(entries: List[ConversationEntry], include_metadata: bool = True) -> str: | |
| """Export to Markdown format""" | |
| md = "# Conversation History\n\n" | |
| md += f"*Exported on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n" | |
| md += "---\n\n" | |
| for i, entry in enumerate(entries, 1): | |
| md += f"## Conversation {i}\n\n" | |
| md += f"**Timestamp:** {entry.timestamp} \n" | |
| md += f"**Model:** {entry.model} \n" | |
| md += f"**Mode:** {entry.reasoning_mode} \n" | |
| md += f"**Performance:** {entry.inference_time:.2f}s | {entry.tokens} tokens\n\n" | |
| md += f"### User\n\n{entry.user_message}\n\n" | |
| md += f"### Assistant\n\n{entry.ai_response}\n\n" | |
| md += "---\n\n" | |
| return md | |
| def to_text(entries: List[ConversationEntry]) -> str: | |
| """Export to plain text format""" | |
| txt = "="*70 + "\n" | |
| txt += "CONVERSATION HISTORY\n" | |
| txt += f"Exported: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" | |
| txt += "="*70 + "\n\n" | |
| for i, entry in enumerate(entries, 1): | |
| txt += f"Conversation {i}\n" | |
| txt += f"Time: {entry.timestamp}\n" | |
| txt += f"Model: {entry.model} | Mode: {entry.reasoning_mode}\n" | |
| txt += f"Performance: {entry.inference_time:.2f}s | {entry.tokens} tokens\n" | |
| txt += "\n" | |
| txt += f"USER:\n{entry.user_message}\n\n" | |
| txt += f"ASSISTANT:\n{entry.ai_response}\n" | |
| txt += "\n" + "-"*70 + "\n\n" | |
| return txt | |
| def to_pdf(entries: List[ConversationEntry], filename: str) -> str: | |
| """Export to PDF format with memory optimization""" | |
| try: | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak | |
| from reportlab.lib.enums import TA_LEFT, TA_CENTER | |
| from reportlab.lib.colors import HexColor | |
| doc = SimpleDocTemplate(filename, pagesize=letter) | |
| story = [] | |
| styles = getSampleStyleSheet() | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=24, | |
| textColor=HexColor('#667eea'), | |
| spaceAfter=30, | |
| alignment=TA_CENTER | |
| ) | |
| heading_style = ParagraphStyle( | |
| 'CustomHeading', | |
| parent=styles['Heading2'], | |
| fontSize=14, | |
| textColor=HexColor('#764ba2'), | |
| spaceAfter=12, | |
| spaceBefore=12 | |
| ) | |
| user_style = ParagraphStyle( | |
| 'UserStyle', | |
| parent=styles['Normal'], | |
| fontSize=11, | |
| textColor=HexColor('#2c3e50'), | |
| leftIndent=20, | |
| spaceAfter=10 | |
| ) | |
| ai_style = ParagraphStyle( | |
| 'AIStyle', | |
| parent=styles['Normal'], | |
| fontSize=11, | |
| textColor=HexColor('#34495e'), | |
| leftIndent=20, | |
| spaceAfter=10 | |
| ) | |
| meta_style = ParagraphStyle( | |
| 'MetaStyle', | |
| parent=styles['Normal'], | |
| fontSize=9, | |
| textColor=HexColor('#7f8c8d'), | |
| spaceAfter=6 | |
| ) | |
| story.append(Paragraph("AI Reasoning Chat History", title_style)) | |
| story.append(Paragraph( | |
| f"Exported on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| meta_style | |
| )) | |
| story.append(Spacer(1, 0.3*inch)) | |
| for i, entry in enumerate(entries, 1): | |
| story.append(Paragraph(f"Conversation {i}", heading_style)) | |
| meta_text = f"<b>Time:</b> {entry.timestamp} | <b>Model:</b> {entry.model} | <b>Mode:</b> {entry.reasoning_mode}" | |
| story.append(Paragraph(meta_text, meta_style)) | |
| perf_text = f"<b>Performance:</b> {entry.inference_time:.2f}s | {entry.tokens} tokens | {entry.tokens_per_second:.1f} tok/s" | |
| story.append(Paragraph(perf_text, meta_style)) | |
| story.append(Spacer(1, 0.1*inch)) | |
| story.append(Paragraph("<b>User:</b>", user_style)) | |
| # Escape and truncate for memory efficiency | |
| user_msg = entry.user_message.replace('<', '<').replace('>', '>').replace('\n', '<br/>')[:3000] | |
| if len(entry.user_message) > 3000: | |
| user_msg += "... (truncated)" | |
| story.append(Paragraph(user_msg, user_style)) | |
| story.append(Spacer(1, 0.15*inch)) | |
| story.append(Paragraph("<b>Assistant:</b>", ai_style)) | |
| # Escape and truncate for memory efficiency | |
| ai_resp = entry.ai_response.replace('<', '<').replace('>', '>').replace('\n', '<br/>')[:5000] | |
| if len(entry.ai_response) > 5000: | |
| ai_resp += "... (truncated)" | |
| story.append(Paragraph(ai_resp, ai_style)) | |
| if i < len(entries): | |
| story.append(PageBreak()) | |
| doc.build(story) | |
| logger.info(f"PDF exported successfully to {filename}") | |
| return filename | |
| except ImportError: | |
| error_msg = "reportlab library not installed. Run: pip install reportlab" | |
| logger.error(error_msg) | |
| return "" | |
| except Exception as e: | |
| logger.error(f"PDF export failed: {e}", exc_info=True) | |
| return "" | |
| def export(cls, entries: List[ConversationEntry], format_type: str, | |
| include_metadata: bool = True) -> Tuple[str, str]: | |
| """Export conversation and return content and filename""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| if format_type == "pdf": | |
| ext = "pdf" | |
| filename = AppConfig.EXPORT_DIR / f"conversation_{timestamp}.{ext}" | |
| result = cls.to_pdf(entries, str(filename)) | |
| if result: | |
| return f"✅ PDF exported successfully! File: conversation_{timestamp}.pdf", str(filename) | |
| else: | |
| return "❌ PDF export failed. Install reportlab: pip install reportlab", "" | |
| exporters = { | |
| "json": lambda: cls.to_json(entries), | |
| "markdown": lambda: cls.to_markdown(entries, include_metadata), | |
| "txt": lambda: cls.to_text(entries) | |
| } | |
| if format_type not in exporters: | |
| format_type = "markdown" | |
| content = exporters[format_type]() | |
| ext = "md" if format_type == "markdown" else format_type | |
| filename = AppConfig.EXPORT_DIR / f"conversation_{timestamp}.{ext}" | |
| try: | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| logger.info(f"Conversation exported to {filename}") | |
| return content, str(filename) | |
| except Exception as e: | |
| logger.error(f"Failed to export conversation: {e}") | |
| return f"Error: {str(e)}", "" | |
| def create_backup(entries: List[ConversationEntry]) -> str: | |
| """Create automatic backup""" | |
| if not entries: | |
| return "" | |
| try: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = AppConfig.BACKUP_DIR / f"backup_{timestamp}.json" | |
| data = [entry.to_dict() for entry in entries] | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| logger.info(f"Backup created: {filename}") | |
| return str(filename) | |
| except Exception as e: | |
| logger.error(f"Backup failed: {e}") | |
| return "" | |
| class AdvancedReasoner: | |
| """Enhanced reasoning engine with caching, rate limiting, and advanced features""" | |
| def __init__(self): | |
| self.client = GroqClientManager.get_client() | |
| self.metrics = ConversationMetrics() | |
| self.conversation_history: List[ConversationEntry] = [] | |
| self.response_times: List[float] = [] | |
| self.prompt_engine = PromptEngine() | |
| self.exporter = ConversationExporter() | |
| self.cache = ResponseCache(maxsize=AppConfig.CACHE_SIZE, ttl=AppConfig.CACHE_TTL) | |
| self.rate_limiter = RateLimiter( | |
| max_requests=AppConfig.RATE_LIMIT_REQUESTS, | |
| window=AppConfig.RATE_LIMIT_WINDOW | |
| ) | |
| self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:12] | |
| self.executor = ThreadPoolExecutor(max_workers=3) | |
| self.model_usage: Dict[str, int] = defaultdict(int) | |
| self.mode_usage: Dict[str, int] = defaultdict(int) | |
| self.error_log: List[Dict[str, Any]] = [] | |
| logger.info(f"AdvancedReasoner initialized with session ID: {self.session_id}") | |
| def _generate_cache_key(self, query: str, model: str, mode: str, | |
| temp: float, template: str) -> str: | |
| """Generate stable cache key for request""" | |
| # Normalize inputs for consistent key generation | |
| normalized_query = query.strip().lower()[:500] # Limit length | |
| content = f"{normalized_query}|{model}|{mode}|{temp:.2f}|{template}" | |
| return hashlib.sha256(content.encode('utf-8')).hexdigest() | |
| def _calculate_reasoning_depth(self, response: str) -> int: | |
| """Calculate reasoning depth from response""" | |
| indicators = { | |
| "Step": 3, "PATH": 4, "Attempt": 3, "Phase": 3, | |
| "Analysis": 2, "Consider": 1, "Therefore": 2, | |
| "Conclusion": 2, "Evidence": 2, "Reasoning": 1 | |
| } | |
| depth = 0 | |
| for indicator, weight in indicators.items(): | |
| depth += response.count(indicator) * weight | |
| return min(depth, 100) | |
| def _build_messages( | |
| self, | |
| query: str, | |
| history: List[Dict], | |
| mode: ReasoningMode, | |
| template: str | |
| ) -> List[Dict[str, str]]: | |
| """Build message list for API call with validation""" | |
| messages = [ | |
| {"role": "system", "content": self.prompt_engine.SYSTEM_PROMPTS[mode]} | |
| ] | |
| recent_history = history[-AppConfig.MAX_HISTORY_LENGTH:] if history else [] | |
| for msg in recent_history: | |
| # Validate message structure | |
| if isinstance(msg, dict) and "role" in msg and "content" in msg: | |
| role = msg.get("role") | |
| content = msg.get("content", "") | |
| # Only add valid user/assistant messages | |
| if role in ["user", "assistant"] and content: | |
| messages.append({"role": role, "content": str(content)}) | |
| enhanced_query = self.prompt_engine.build_prompt(query, mode, template) | |
| messages.append({"role": "user", "content": enhanced_query}) | |
| return messages | |
| def _log_error(self, error: Exception, context: Dict[str, Any]) -> None: | |
| """Log error with context""" | |
| error_entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| "error": str(error), | |
| "type": type(error).__name__, | |
| "context": context | |
| } | |
| self.error_log.append(error_entry) | |
| self.metrics.increment_field("error_count") | |
| logger.error(f"Error logged: {error_entry}") | |
| def generate_response( | |
| self, | |
| query: str, | |
| history: List[Dict], | |
| model: str, | |
| reasoning_mode: ReasoningMode, | |
| enable_critique: bool, | |
| temperature: float, | |
| max_tokens: int, | |
| prompt_template: str = "Custom", | |
| use_cache: bool = True | |
| ) -> Generator[str, None, None]: | |
| """Generate response with advanced features - FIXED for streaming""" | |
| is_valid, error_msg = validate_input(query) | |
| if not is_valid: | |
| yield f"⚠️ Validation Error: {error_msg}" | |
| return | |
| allowed, wait_time = self.rate_limiter.is_allowed() | |
| if not allowed: | |
| yield f"⏱️ Rate Limit: Please wait {wait_time:.1f} seconds." | |
| return | |
| cache_key = self._generate_cache_key(query, model, reasoning_mode.value, temperature, prompt_template) | |
| if use_cache: | |
| cached_response = self.cache.get(cache_key) | |
| if cached_response: | |
| self.metrics.increment_field("cache_hits") | |
| logger.info("Returning cached response") | |
| yield cached_response | |
| return | |
| self.metrics.increment_field("cache_misses") | |
| with timer(f"Response generation for {model}"): | |
| start_time = time.time() | |
| messages = self._build_messages(query, history, reasoning_mode, prompt_template) | |
| full_response = "" | |
| token_count = 0 | |
| try: | |
| stream = self.client.chat.completions.create( | |
| messages=messages, | |
| model=model, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| stream=True, | |
| ) | |
| # FIXED: Only yield new content, not full_response repeatedly | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content: | |
| content = chunk.choices[0].delta.content | |
| full_response += content | |
| token_count += 1 | |
| self.metrics.increment_field("tokens_used") | |
| # Yield only the accumulated response so far | |
| yield full_response | |
| except Exception as e: | |
| self._log_error(e, { | |
| "query": query[:100], | |
| "model": model, | |
| "mode": reasoning_mode.value | |
| }) | |
| raise | |
| inference_time = time.time() - start_time | |
| self.metrics.set_field("reasoning_depth", self._calculate_reasoning_depth(full_response)) | |
| self.metrics.update_tokens_per_second(token_count, inference_time) | |
| self.metrics.set_field("peak_tokens", max(self.metrics.peak_tokens, token_count)) | |
| if enable_critique and len(full_response) > 150: | |
| messages.append({"role": "assistant", "content": full_response}) | |
| messages.append({ | |
| "role": "user", | |
| "content": self.prompt_engine.build_critique_prompt() | |
| }) | |
| full_response += "\n\n---\n### Validation & Self-Critique\n" | |
| yield full_response | |
| try: | |
| critique_stream = self.client.chat.completions.create( | |
| messages=messages, | |
| model=model, | |
| temperature=temperature * 0.7, | |
| max_tokens=max_tokens // 3, | |
| stream=True, | |
| ) | |
| for chunk in critique_stream: | |
| if chunk.choices[0].delta.content: | |
| content = chunk.choices[0].delta.content | |
| full_response += content | |
| token_count += 1 | |
| yield full_response | |
| self.metrics.increment_field("self_corrections") | |
| except Exception as e: | |
| logger.warning(f"Critique phase failed: {e}") | |
| final_inference_time = time.time() - start_time | |
| self.metrics.set_field("inference_time", final_inference_time) | |
| self.metrics.increment_field("total_latency", final_inference_time) | |
| self.response_times.append(final_inference_time) | |
| self.metrics.set_field("avg_response_time", sum(self.response_times) / len(self.response_times)) | |
| self.metrics.set_field("last_updated", datetime.now().strftime("%H:%M:%S")) | |
| self.metrics.update_confidence() | |
| self.metrics.increment_field("total_conversations") | |
| self.model_usage[model] += 1 | |
| self.mode_usage[reasoning_mode.value] += 1 | |
| tokens_per_sec = token_count / final_inference_time if final_inference_time > 0 else 0 | |
| entry = ConversationEntry( | |
| timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| user_message=query, | |
| ai_response=full_response, | |
| model=model, | |
| reasoning_mode=reasoning_mode.value, | |
| inference_time=final_inference_time, | |
| tokens=token_count, | |
| session_id=self.session_id, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| cache_hit=False, | |
| tokens_per_second=tokens_per_sec | |
| ) | |
| self.conversation_history.append(entry) | |
| if use_cache: | |
| self.cache.set(cache_key, full_response) | |
| if len(self.conversation_history) % 10 == 0: | |
| try: | |
| self.executor.submit(self.exporter.create_backup, self.conversation_history.copy()) | |
| except Exception as e: | |
| logger.warning(f"Auto-backup failed: {e}") | |
| if len(self.conversation_history) > AppConfig.MAX_CONVERSATION_STORAGE: | |
| self.conversation_history = self.conversation_history[-AppConfig.MAX_CONVERSATION_STORAGE:] | |
| logger.info(f"Trimmed history to {AppConfig.MAX_CONVERSATION_STORAGE} entries") | |
| def export_conversation(self, format_type: str, include_metadata: bool = True) -> Tuple[str, str]: | |
| """Export conversation history""" | |
| if not self.conversation_history: | |
| return "No conversations to export.", "" | |
| try: | |
| return self.exporter.export(self.conversation_history, format_type, include_metadata) | |
| except Exception as e: | |
| logger.error(f"Export failed: {e}") | |
| return f"Export failed: {str(e)}", "" | |
| def export_current_chat_pdf(self) -> Optional[str]: | |
| """Export current chat as PDF - for quick download button""" | |
| if not self.conversation_history: | |
| return None | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = AppConfig.EXPORT_DIR / f"chat_{timestamp}.pdf" | |
| result = self.exporter.to_pdf(self.conversation_history, str(filename)) | |
| return result if result else None | |
| def search_conversations(self, keyword: str) -> List[Tuple[int, ConversationEntry]]: | |
| """Search through conversation history""" | |
| keyword_lower = keyword.lower() | |
| return [ | |
| (i, entry) for i, entry in enumerate(self.conversation_history) | |
| if keyword_lower in entry.user_message.lower() | |
| or keyword_lower in entry.ai_response.lower() | |
| ] | |
| def get_analytics(self) -> Optional[Dict[str, Any]]: | |
| """Generate analytics data""" | |
| if not self.conversation_history: | |
| return None | |
| models = [e.model for e in self.conversation_history] | |
| modes = [e.reasoning_mode for e in self.conversation_history] | |
| total_time = sum(e.inference_time for e in self.conversation_history) | |
| total_tokens = sum(e.tokens for e in self.conversation_history) | |
| return { | |
| "session_id": self.session_id, | |
| "total_conversations": len(self.conversation_history), | |
| "total_tokens": total_tokens, | |
| "total_time": total_time, | |
| "avg_inference_time": self.metrics.avg_response_time, | |
| "peak_tokens": self.metrics.peak_tokens, | |
| "most_used_model": max(set(models), key=models.count) if models else "N/A", | |
| "most_used_mode": max(set(modes), key=modes.count) if modes else "N/A", | |
| "cache_hits": self.metrics.cache_hits, | |
| "cache_misses": self.metrics.cache_misses, | |
| "error_count": self.metrics.error_count | |
| } | |
| def clear_history(self) -> None: | |
| """Clear conversation history and reset metrics""" | |
| if self.conversation_history: | |
| try: | |
| self.executor.submit(self.exporter.create_backup, self.conversation_history.copy()) | |
| except Exception as e: | |
| logger.warning(f"Failed to backup before clearing: {e}") | |
| self.conversation_history.clear() | |
| self.response_times.clear() | |
| self.metrics.reset() | |
| self.cache.clear() | |
| self.rate_limiter.reset() | |
| self.model_usage.clear() | |
| self.mode_usage.clear() | |
| logger.info("History cleared and metrics reset") | |
| def __del__(self): | |
| """Cleanup on deletion""" | |
| try: | |
| self.executor.shutdown(wait=False) | |
| logger.info("AdvancedReasoner cleanup completed") | |
| except: | |
| pass | |