Spaces:
Runtime error
Runtime error
| """Unified reasoning engine that combines multiple reasoning strategies.""" | |
| import logging | |
| from typing import Dict, Any, List, Optional, Set, Union, Type | |
| import json | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from datetime import datetime | |
| import asyncio | |
| from collections import defaultdict | |
| from .base import ReasoningStrategy | |
| from .chain_of_thought import ChainOfThoughtStrategy | |
| from .tree_of_thoughts import TreeOfThoughtsStrategy | |
| from .meta_learning import MetaLearningStrategy | |
| from .recursive import RecursiveReasoning | |
| from .analogical import AnalogicalReasoning | |
| from .local_llm import LocalLLMStrategy | |
| from .agentic import ( | |
| TaskDecompositionStrategy, | |
| ResourceManagementStrategy, | |
| ContextualPlanningStrategy, | |
| AdaptiveExecutionStrategy, | |
| FeedbackIntegrationStrategy | |
| ) | |
| class StrategyType(str, Enum): | |
| """Types of reasoning strategies.""" | |
| CHAIN_OF_THOUGHT = "chain_of_thought" | |
| TREE_OF_THOUGHTS = "tree_of_thoughts" | |
| META_LEARNING = "meta_learning" | |
| RECURSIVE = "recursive" | |
| ANALOGICAL = "analogical" | |
| TASK_DECOMPOSITION = "task_decomposition" | |
| RESOURCE_MANAGEMENT = "resource_management" | |
| CONTEXTUAL_PLANNING = "contextual_planning" | |
| ADAPTIVE_EXECUTION = "adaptive_execution" | |
| FEEDBACK_INTEGRATION = "feedback_integration" | |
| LOCAL_LLM = "local_llm" | |
| class StrategyResult: | |
| """Result from a reasoning strategy.""" | |
| strategy_type: StrategyType | |
| success: bool | |
| answer: Optional[str] | |
| confidence: float | |
| reasoning_trace: List[Dict[str, Any]] | |
| metadata: Dict[str, Any] | |
| performance_metrics: Dict[str, Any] | |
| timestamp: datetime = field(default_factory=datetime.now) | |
| class UnifiedResult: | |
| """Combined result from multiple strategies.""" | |
| success: bool | |
| answer: str | |
| confidence: float | |
| strategy_results: Dict[StrategyType, StrategyResult] | |
| synthesis_method: str | |
| meta_insights: List[str] | |
| performance_metrics: Dict[str, Any] | |
| timestamp: datetime = field(default_factory=datetime.now) | |
| class UnifiedReasoningEngine: | |
| """ | |
| Advanced unified reasoning engine that: | |
| 1. Combines multiple reasoning strategies | |
| 2. Dynamically selects and weights strategies | |
| 3. Synthesizes results from different approaches | |
| 4. Learns from experience | |
| 5. Adapts to different types of tasks | |
| """ | |
| def __init__(self, | |
| min_confidence: float = 0.7, | |
| strategy_weights: Optional[Dict[StrategyType, float]] = None, | |
| parallel_threshold: int = 3, | |
| learning_rate: float = 0.1): | |
| self.min_confidence = min_confidence | |
| self.parallel_threshold = parallel_threshold | |
| self.learning_rate = learning_rate | |
| # Initialize strategies | |
| self.strategies: Dict[StrategyType, ReasoningStrategy] = { | |
| StrategyType.CHAIN_OF_THOUGHT: ChainOfThoughtStrategy(), | |
| StrategyType.TREE_OF_THOUGHTS: TreeOfThoughtsStrategy(), | |
| StrategyType.META_LEARNING: MetaLearningStrategy(), | |
| StrategyType.RECURSIVE: RecursiveReasoning(), | |
| StrategyType.ANALOGICAL: AnalogicalReasoning(), | |
| StrategyType.TASK_DECOMPOSITION: TaskDecompositionStrategy(), | |
| StrategyType.RESOURCE_MANAGEMENT: ResourceManagementStrategy(), | |
| StrategyType.CONTEXTUAL_PLANNING: ContextualPlanningStrategy(), | |
| StrategyType.ADAPTIVE_EXECUTION: AdaptiveExecutionStrategy(), | |
| StrategyType.FEEDBACK_INTEGRATION: FeedbackIntegrationStrategy(), | |
| StrategyType.LOCAL_LLM: LocalLLMStrategy() # Add local LLM strategy | |
| } | |
| # Strategy weights with higher weight for LOCAL_LLM | |
| self.strategy_weights = strategy_weights or { | |
| **{strategy_type: 1.0 for strategy_type in StrategyType}, | |
| StrategyType.LOCAL_LLM: 2.0 # Higher weight for local LLM | |
| } | |
| # Performance tracking | |
| self.strategy_performance: Dict[StrategyType, List[float]] = defaultdict(list) | |
| self.task_type_performance: Dict[str, Dict[StrategyType, float]] = defaultdict(lambda: defaultdict(float)) | |
| self.synthesis_performance: Dict[str, List[float]] = defaultdict(list) | |
| async def reason(self, query: str, context: Dict[str, Any]) -> UnifiedResult: | |
| """Main reasoning method combining multiple strategies.""" | |
| try: | |
| # Analyze task | |
| task_analysis = await self._analyze_task(query, context) | |
| # Select strategies | |
| selected_strategies = await self._select_strategies(task_analysis, context) | |
| # Execute strategies | |
| strategy_results = await self._execute_strategies( | |
| selected_strategies, query, context) | |
| # Synthesize results | |
| unified_result = await self._synthesize_results( | |
| strategy_results, task_analysis, context) | |
| # Learn from experience | |
| self._update_performance(unified_result) | |
| return unified_result | |
| except Exception as e: | |
| logging.error(f"Error in unified reasoning: {str(e)}") | |
| return UnifiedResult( | |
| success=False, | |
| answer=f"Error: {str(e)}", | |
| confidence=0.0, | |
| strategy_results={}, | |
| synthesis_method="failed", | |
| meta_insights=[f"Error occurred: {str(e)}"], | |
| performance_metrics={} | |
| ) | |
| async def _analyze_task(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """Analyze the task to determine optimal strategy selection.""" | |
| prompt = f""" | |
| Analyze reasoning task: | |
| Query: {query} | |
| Context: {json.dumps(context)} | |
| Determine: | |
| 1. Task type and complexity | |
| 2. Required reasoning capabilities | |
| 3. Resource requirements | |
| 4. Success criteria | |
| 5. Risk factors | |
| Format as: | |
| [Analysis] | |
| Type: ... | |
| Complexity: ... | |
| Capabilities: ... | |
| Resources: ... | |
| Criteria: ... | |
| Risks: ... | |
| """ | |
| response = await context["groq_api"].predict(prompt) | |
| return self._parse_task_analysis(response["answer"]) | |
| async def _select_strategies(self, task_analysis: Dict[str, Any], context: Dict[str, Any]) -> List[StrategyType]: | |
| """Select appropriate strategies based on task analysis.""" | |
| # Calculate strategy scores | |
| scores: Dict[StrategyType, float] = {} | |
| for strategy_type in StrategyType: | |
| base_score = self.strategy_weights[strategy_type] | |
| # Task type performance | |
| task_type = task_analysis["type"] | |
| type_score = self.task_type_performance[task_type][strategy_type] | |
| # Recent performance | |
| recent_performance = ( | |
| sum(self.strategy_performance[strategy_type][-5:]) / 5 | |
| if self.strategy_performance[strategy_type] else 0.5 | |
| ) | |
| # Resource match | |
| resource_match = self._calculate_resource_match( | |
| strategy_type, task_analysis["resources"]) | |
| # Capability match | |
| capability_match = self._calculate_capability_match( | |
| strategy_type, task_analysis["capabilities"]) | |
| # Combined score | |
| scores[strategy_type] = ( | |
| 0.3 * base_score + | |
| 0.2 * type_score + | |
| 0.2 * recent_performance + | |
| 0.15 * resource_match + | |
| 0.15 * capability_match | |
| ) | |
| # Select top strategies | |
| selected = sorted( | |
| StrategyType, | |
| key=lambda x: scores[x], | |
| reverse=True | |
| )[:self.parallel_threshold] | |
| return selected | |
| async def _execute_strategies(self, | |
| strategies: List[StrategyType], | |
| query: str, | |
| context: Dict[str, Any]) -> Dict[StrategyType, StrategyResult]: | |
| """Execute selected strategies in parallel.""" | |
| async def execute_strategy(strategy_type: StrategyType) -> StrategyResult: | |
| strategy = self.strategies[strategy_type] | |
| start_time = datetime.now() | |
| try: | |
| result = await strategy.reason(query, context) | |
| return StrategyResult( | |
| strategy_type=strategy_type, | |
| success=result.get("success", False), | |
| answer=result.get("answer"), | |
| confidence=result.get("confidence", 0.0), | |
| reasoning_trace=result.get("reasoning_trace", []), | |
| metadata=result.get("metadata", {}), | |
| performance_metrics={ | |
| "execution_time": (datetime.now() - start_time).total_seconds(), | |
| **result.get("performance_metrics", {}) | |
| } | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error in strategy {strategy_type}: {str(e)}") | |
| return StrategyResult( | |
| strategy_type=strategy_type, | |
| success=False, | |
| answer=None, | |
| confidence=0.0, | |
| reasoning_trace=[{"error": str(e)}], | |
| metadata={}, | |
| performance_metrics={"execution_time": (datetime.now() - start_time).total_seconds()} | |
| ) | |
| # Execute strategies in parallel | |
| tasks = [execute_strategy(strategy) for strategy in strategies] | |
| results = await asyncio.gather(*tasks) | |
| return {result.strategy_type: result for result in results} | |
| async def _synthesize_results(self, | |
| strategy_results: Dict[StrategyType, StrategyResult], | |
| task_analysis: Dict[str, Any], | |
| context: Dict[str, Any]) -> UnifiedResult: | |
| """Synthesize results from multiple strategies.""" | |
| prompt = f""" | |
| Synthesize reasoning results: | |
| Results: {json.dumps({str(k): self._strategy_result_to_dict(v) | |
| for k, v in strategy_results.items()})} | |
| Task Analysis: {json.dumps(task_analysis)} | |
| Context: {json.dumps(context)} | |
| Provide: | |
| 1. Optimal synthesis method | |
| 2. Combined answer | |
| 3. Confidence assessment | |
| 4. Meta-insights | |
| 5. Performance analysis | |
| Format as: | |
| [Synthesis] | |
| Method: ... | |
| Answer: ... | |
| Confidence: ... | |
| Insights: ... | |
| Performance: ... | |
| """ | |
| response = await context["groq_api"].predict(prompt) | |
| synthesis = self._parse_synthesis(response["answer"]) | |
| return UnifiedResult( | |
| success=synthesis["confidence"] >= self.min_confidence, | |
| answer=synthesis["answer"], | |
| confidence=synthesis["confidence"], | |
| strategy_results=strategy_results, | |
| synthesis_method=synthesis["method"], | |
| meta_insights=synthesis["insights"], | |
| performance_metrics=synthesis["performance"] | |
| ) | |
| def _update_performance(self, result: UnifiedResult): | |
| """Update performance metrics and strategy weights.""" | |
| # Update strategy performance | |
| for strategy_type, strategy_result in result.strategy_results.items(): | |
| self.strategy_performance[strategy_type].append(strategy_result.confidence) | |
| # Update weights using exponential moving average | |
| current_weight = self.strategy_weights[strategy_type] | |
| performance = strategy_result.confidence | |
| self.strategy_weights[strategy_type] = ( | |
| (1 - self.learning_rate) * current_weight + | |
| self.learning_rate * performance | |
| ) | |
| # Update synthesis performance | |
| self.synthesis_performance[result.synthesis_method].append(result.confidence) | |
| def _calculate_resource_match(self, strategy_type: StrategyType, required_resources: Dict[str, Any]) -> float: | |
| """Calculate how well a strategy matches required resources.""" | |
| # Implementation-specific resource matching logic | |
| return 0.8 # Placeholder | |
| def _calculate_capability_match(self, strategy_type: StrategyType, required_capabilities: List[str]) -> float: | |
| """Calculate how well a strategy matches required capabilities.""" | |
| # Implementation-specific capability matching logic | |
| return 0.8 # Placeholder | |
| def _parse_task_analysis(self, response: str) -> Dict[str, Any]: | |
| """Parse task analysis from response.""" | |
| analysis = { | |
| "type": "", | |
| "complexity": 0.0, | |
| "capabilities": [], | |
| "resources": {}, | |
| "criteria": [], | |
| "risks": [] | |
| } | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line.startswith('Type:'): | |
| analysis["type"] = line[5:].strip() | |
| elif line.startswith('Complexity:'): | |
| try: | |
| analysis["complexity"] = float(line[11:].strip()) | |
| except: | |
| pass | |
| elif line.startswith('Capabilities:'): | |
| analysis["capabilities"] = [c.strip() for c in line[13:].split(',')] | |
| elif line.startswith('Resources:'): | |
| try: | |
| analysis["resources"] = json.loads(line[10:].strip()) | |
| except: | |
| analysis["resources"] = {"raw": line[10:].strip()} | |
| elif line.startswith('Criteria:'): | |
| analysis["criteria"] = [c.strip() for c in line[9:].split(',')] | |
| elif line.startswith('Risks:'): | |
| analysis["risks"] = [r.strip() for r in line[7:].split(',')] | |
| return analysis | |
| def _parse_synthesis(self, response: str) -> Dict[str, Any]: | |
| """Parse synthesis result from response.""" | |
| synthesis = { | |
| "method": "", | |
| "answer": "", | |
| "confidence": 0.0, | |
| "insights": [], | |
| "performance": {} | |
| } | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line.startswith('Method:'): | |
| synthesis["method"] = line[7:].strip() | |
| elif line.startswith('Answer:'): | |
| synthesis["answer"] = line[7:].strip() | |
| elif line.startswith('Confidence:'): | |
| try: | |
| synthesis["confidence"] = float(line[11:].strip()) | |
| except: | |
| pass | |
| elif line.startswith('Insights:'): | |
| synthesis["insights"] = [i.strip() for i in line[9:].split(',')] | |
| elif line.startswith('Performance:'): | |
| try: | |
| synthesis["performance"] = json.loads(line[12:].strip()) | |
| except: | |
| synthesis["performance"] = {"raw": line[12:].strip()} | |
| return synthesis | |
| def _strategy_result_to_dict(self, result: StrategyResult) -> Dict[str, Any]: | |
| """Convert strategy result to dictionary for serialization.""" | |
| return { | |
| "strategy_type": result.strategy_type.value, | |
| "success": result.success, | |
| "answer": result.answer, | |
| "confidence": result.confidence, | |
| "reasoning_trace": result.reasoning_trace, | |
| "metadata": result.metadata, | |
| "performance_metrics": result.performance_metrics, | |
| "timestamp": result.timestamp.isoformat() | |
| } | |
| def get_performance_metrics(self) -> Dict[str, Any]: | |
| """Get comprehensive performance metrics.""" | |
| return { | |
| "strategy_weights": dict(self.strategy_weights), | |
| "average_performance": { | |
| strategy_type.value: sum(scores) / len(scores) if scores else 0 | |
| for strategy_type, scores in self.strategy_performance.items() | |
| }, | |
| "synthesis_success": { | |
| method: sum(scores) / len(scores) if scores else 0 | |
| for method, scores in self.synthesis_performance.items() | |
| }, | |
| "task_type_performance": { | |
| task_type: dict(strategy_scores) | |
| for task_type, strategy_scores in self.task_type_performance.items() | |
| } | |
| } | |
| def clear_performance_history(self): | |
| """Clear performance history and reset weights.""" | |
| self.strategy_performance.clear() | |
| self.task_type_performance.clear() | |
| self.synthesis_performance.clear() | |
| self.strategy_weights = { | |
| strategy_type: 1.0 for strategy_type in StrategyType | |
| } | |