Spaces:
Runtime error
Runtime error
| """Unified reasoning engine that combines multiple reasoning strategies.""" | |
| import logging | |
| from typing import ( | |
| Dict, Any, List, Optional, Set, Union, Type, | |
| AsyncGenerator, Callable, Tuple, Generator | |
| ) | |
| import json | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from datetime import datetime | |
| import asyncio | |
| from collections import defaultdict | |
| import numpy as np | |
| from .base import ReasoningStrategy, StrategyResult | |
| from .groq_strategy import GroqStrategy | |
| from .chain_of_thought import ChainOfThoughtStrategy | |
| from .tree_of_thoughts import TreeOfThoughtsStrategy | |
| from .meta_learning import MetaLearningStrategy | |
| from .recursive import RecursiveStrategy | |
| from .analogical import AnalogicalStrategy | |
| from .local_llm import LocalLLMStrategy | |
| from .agentic import ( | |
| TaskDecompositionStrategy, | |
| ResourceManagementStrategy, | |
| ContextualPlanningStrategy, | |
| AdaptiveExecutionStrategy, | |
| FeedbackIntegrationStrategy | |
| ) | |
| # Import additional strategies | |
| from .bayesian import BayesianStrategy | |
| from .market_analysis import MarketAnalysisStrategy | |
| from .monetization import MonetizationStrategy | |
| from .multimodal import MultimodalStrategy | |
| from .neurosymbolic import NeurosymbolicStrategy | |
| from .portfolio_optimization import PortfolioOptimizationStrategy | |
| from .specialized import SpecializedStrategy | |
| from .venture_strategies import VentureStrategy | |
| from .venture_types import ( | |
| AIInfrastructureStrategy, | |
| AIConsultingStrategy, | |
| AIProductStrategy, | |
| FinTechStrategy, | |
| HealthTechStrategy, | |
| EdTechStrategy, | |
| BlockchainStrategy, | |
| AIMarketplaceStrategy | |
| ) | |
| class StrategyType(str, Enum): | |
| """Types of reasoning strategies.""" | |
| GROQ = "groq" | |
| CHAIN_OF_THOUGHT = "chain_of_thought" | |
| TREE_OF_THOUGHTS = "tree_of_thoughts" | |
| META_LEARNING = "meta_learning" | |
| RECURSIVE = "recursive" | |
| ANALOGICAL = "analogical" | |
| LOCAL_LLM = "local_llm" | |
| TASK_DECOMPOSITION = "task_decomposition" | |
| RESOURCE_MANAGEMENT = "resource_management" | |
| CONTEXTUAL_PLANNING = "contextual_planning" | |
| ADAPTIVE_EXECUTION = "adaptive_execution" | |
| FEEDBACK_INTEGRATION = "feedback_integration" | |
| BAYESIAN = "bayesian" | |
| MARKET_ANALYSIS = "market_analysis" | |
| MONETIZATION = "monetization" | |
| MULTIMODAL = "multimodal" | |
| NEUROSYMBOLIC = "neurosymbolic" | |
| PORTFOLIO_OPTIMIZATION = "portfolio_optimization" | |
| SPECIALIZED = "specialized" | |
| VENTURE = "venture" | |
| VENTURE_TYPE = "venture_type" | |
| AI_INFRASTRUCTURE = "ai_infrastructure" | |
| AI_CONSULTING = "ai_consulting" | |
| AI_PRODUCT = "ai_product" | |
| FINTECH = "fintech" | |
| HEALTHTECH = "healthtech" | |
| EDTECH = "edtech" | |
| BLOCKCHAIN = "blockchain" | |
| AI_MARKETPLACE = "ai_marketplace" | |
| class UnifiedResult: | |
| """Combined result from multiple strategies.""" | |
| success: bool | |
| answer: str | |
| confidence: float | |
| strategy_results: Dict[StrategyType, StrategyResult] | |
| synthesis_method: str | |
| meta_insights: List[str] | |
| performance_metrics: Dict[str, Any] | |
| timestamp: datetime = field(default_factory=datetime.now) | |
| class UnifiedReasoningEngine: | |
| """ | |
| Advanced unified reasoning engine that: | |
| 1. Combines multiple reasoning strategies | |
| 2. Dynamically selects and weights strategies | |
| 3. Synthesizes results from different approaches | |
| 4. Learns from experience | |
| 5. Adapts to different types of tasks | |
| """ | |
| def __init__(self, | |
| min_confidence: float = 0.7, | |
| strategy_weights: Optional[Dict[StrategyType, float]] = None, | |
| parallel_threshold: int = 3, | |
| learning_rate: float = 0.1): | |
| self.min_confidence = min_confidence | |
| self.parallel_threshold = parallel_threshold | |
| self.learning_rate = learning_rate | |
| # Initialize strategies | |
| self.strategies: Dict[StrategyType, ReasoningStrategy] = { | |
| # Primary strategy (Groq) | |
| StrategyType.GROQ: GroqStrategy(), | |
| # Core strategies | |
| StrategyType.CHAIN_OF_THOUGHT: ChainOfThoughtStrategy(), | |
| StrategyType.TREE_OF_THOUGHTS: TreeOfThoughtsStrategy(), | |
| StrategyType.META_LEARNING: MetaLearningStrategy(), | |
| StrategyType.RECURSIVE: RecursiveStrategy(), | |
| StrategyType.ANALOGICAL: AnalogicalStrategy(), | |
| StrategyType.LOCAL_LLM: LocalLLMStrategy(), | |
| # Agentic strategies | |
| StrategyType.TASK_DECOMPOSITION: TaskDecompositionStrategy(), | |
| StrategyType.RESOURCE_MANAGEMENT: ResourceManagementStrategy(), | |
| StrategyType.CONTEXTUAL_PLANNING: ContextualPlanningStrategy(), | |
| StrategyType.ADAPTIVE_EXECUTION: AdaptiveExecutionStrategy(), | |
| StrategyType.FEEDBACK_INTEGRATION: FeedbackIntegrationStrategy(), | |
| # Additional specialized strategies | |
| StrategyType.BAYESIAN: BayesianStrategy(), | |
| StrategyType.MARKET_ANALYSIS: MarketAnalysisStrategy(), | |
| StrategyType.MONETIZATION: MonetizationStrategy(), | |
| StrategyType.MULTIMODAL: MultimodalStrategy(), | |
| StrategyType.NEUROSYMBOLIC: NeurosymbolicStrategy(), | |
| StrategyType.PORTFOLIO_OPTIMIZATION: PortfolioOptimizationStrategy(), | |
| StrategyType.SPECIALIZED: SpecializedStrategy(), | |
| StrategyType.VENTURE: VentureStrategy(), | |
| StrategyType.AI_INFRASTRUCTURE: AIInfrastructureStrategy(), | |
| StrategyType.AI_CONSULTING: AIConsultingStrategy(), | |
| StrategyType.AI_PRODUCT: AIProductStrategy(), | |
| StrategyType.FINTECH: FinTechStrategy(), | |
| StrategyType.HEALTHTECH: HealthTechStrategy(), | |
| StrategyType.EDTECH: EdTechStrategy(), | |
| StrategyType.BLOCKCHAIN: BlockchainStrategy(), | |
| StrategyType.AI_MARKETPLACE: AIMarketplaceStrategy() | |
| } | |
| # Strategy weights with Groq as primary | |
| self.strategy_weights = strategy_weights or { | |
| # Primary strategy (highest weight) | |
| StrategyType.GROQ: 2.5, | |
| # Core strategies (high weights) | |
| StrategyType.CHAIN_OF_THOUGHT: 1.5, | |
| StrategyType.TREE_OF_THOUGHTS: 1.5, | |
| StrategyType.META_LEARNING: 1.5, | |
| # Agentic strategies (medium-high weights) | |
| StrategyType.TASK_DECOMPOSITION: 1.3, | |
| StrategyType.RESOURCE_MANAGEMENT: 1.3, | |
| StrategyType.CONTEXTUAL_PLANNING: 1.3, | |
| StrategyType.ADAPTIVE_EXECUTION: 1.3, | |
| StrategyType.FEEDBACK_INTEGRATION: 1.3, | |
| # Domain-specific strategies (context-dependent weights) | |
| StrategyType.BAYESIAN: 1.2, | |
| StrategyType.MARKET_ANALYSIS: 1.2, | |
| StrategyType.PORTFOLIO_OPTIMIZATION: 1.2, | |
| StrategyType.VENTURE: 1.2, | |
| # Other specialized strategies (base weights) | |
| StrategyType.MONETIZATION: 1.0, | |
| StrategyType.MULTIMODAL: 1.0, | |
| StrategyType.NEUROSYMBOLIC: 1.0, | |
| StrategyType.SPECIALIZED: 1.0, | |
| StrategyType.RECURSIVE: 1.0, | |
| StrategyType.ANALOGICAL: 1.0, | |
| StrategyType.LOCAL_LLM: 1.0, # Reduced weight since using Groq | |
| StrategyType.AI_INFRASTRUCTURE: 1.0, | |
| StrategyType.AI_CONSULTING: 1.0, | |
| StrategyType.AI_PRODUCT: 1.0, | |
| StrategyType.FINTECH: 1.0, | |
| StrategyType.HEALTHTECH: 1.0, | |
| StrategyType.EDTECH: 1.0, | |
| StrategyType.BLOCKCHAIN: 1.0, | |
| StrategyType.AI_MARKETPLACE: 1.0 | |
| } | |
| # Performance tracking | |
| self.strategy_performance: Dict[StrategyType, List[float]] = defaultdict(list) | |
| self.task_type_performance: Dict[str, Dict[StrategyType, float]] = defaultdict(lambda: defaultdict(float)) | |
| self.synthesis_performance: Dict[str, List[float]] = defaultdict(list) | |
| async def reason(self, query: str, context: Dict[str, Any]) -> UnifiedResult: | |
| """Main reasoning method combining multiple strategies.""" | |
| try: | |
| # Analyze task | |
| task_analysis = await self._analyze_task(query, context) | |
| # Select strategies | |
| selected_strategies = await self._select_strategies(task_analysis, context) | |
| # Execute strategies | |
| strategy_results = await self._execute_strategies( | |
| selected_strategies, query, context) | |
| # Synthesize results | |
| unified_result = await self._synthesize_results( | |
| strategy_results, task_analysis, context) | |
| # Learn from experience | |
| self._update_performance(unified_result) | |
| return unified_result | |
| except Exception as e: | |
| logging.error(f"Error in unified reasoning: {str(e)}") | |
| return UnifiedResult( | |
| success=False, | |
| answer=f"Error: {str(e)}", | |
| confidence=0.0, | |
| strategy_results={}, | |
| synthesis_method="failed", | |
| meta_insights=[f"Error occurred: {str(e)}"], | |
| performance_metrics={} | |
| ) | |
| async def reason_stream( | |
| self, | |
| query: str, | |
| context: Dict[str, Any] = None, | |
| strategy_type: Optional[StrategyType] = None, | |
| chunk_handler: Optional[callable] = None | |
| ) -> AsyncGenerator[str, None]: | |
| """ | |
| Stream reasoning results from the selected strategy. | |
| Args: | |
| query: Query to reason about | |
| context: Additional context for reasoning | |
| strategy_type: Specific strategy to use (optional) | |
| chunk_handler: Optional callback for handling chunks | |
| """ | |
| context = context or {} | |
| # Default to Groq strategy for streaming | |
| if not strategy_type: | |
| strategy_type = StrategyType.GROQ | |
| strategy = self.strategies.get(strategy_type) | |
| if not strategy: | |
| yield f"Error: Strategy {strategy_type} not found" | |
| return | |
| if not hasattr(strategy, 'reason_stream'): | |
| yield f"Error: Strategy {strategy_type} does not support streaming" | |
| return | |
| try: | |
| async for chunk in strategy.reason_stream( | |
| query=query, | |
| context=context, | |
| chunk_handler=chunk_handler | |
| ): | |
| yield chunk | |
| except Exception as e: | |
| logging.error(f"Streaming error: {str(e)}") | |
| yield f"Error: {str(e)}" | |
| async def _analyze_task(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """Analyze the task to determine optimal strategy selection.""" | |
| prompt = f""" | |
| Analyze reasoning task: | |
| Query: {query} | |
| Context: {json.dumps(context)} | |
| Determine: | |
| 1. Task type and complexity | |
| 2. Required reasoning capabilities | |
| 3. Resource requirements | |
| 4. Success criteria | |
| 5. Risk factors | |
| Format as: | |
| [Analysis] | |
| Type: ... | |
| Complexity: ... | |
| Capabilities: ... | |
| Resources: ... | |
| Criteria: ... | |
| Risks: ... | |
| """ | |
| response = await context["groq_api"].predict(prompt) | |
| return self._parse_task_analysis(response["answer"]) | |
| async def _select_strategies(self, task_analysis: Dict[str, Any], context: Dict[str, Any]) -> List[StrategyType]: | |
| """Select appropriate strategies based on task analysis.""" | |
| # Calculate strategy scores | |
| scores: Dict[StrategyType, float] = {} | |
| for strategy_type in StrategyType: | |
| base_score = self.strategy_weights[strategy_type] | |
| # Task type performance | |
| task_type = task_analysis["type"] | |
| type_score = self.task_type_performance[task_type][strategy_type] | |
| # Recent performance | |
| recent_performance = ( | |
| sum(self.strategy_performance[strategy_type][-5:]) / 5 | |
| if self.strategy_performance[strategy_type] else 0.5 | |
| ) | |
| # Resource match | |
| resource_match = self._calculate_resource_match( | |
| strategy_type, task_analysis["resources"]) | |
| # Capability match | |
| capability_match = self._calculate_capability_match( | |
| strategy_type, task_analysis["capabilities"]) | |
| # Combined score | |
| scores[strategy_type] = ( | |
| 0.3 * base_score + | |
| 0.2 * type_score + | |
| 0.2 * recent_performance + | |
| 0.15 * resource_match + | |
| 0.15 * capability_match | |
| ) | |
| # Select top strategies | |
| selected = sorted( | |
| StrategyType, | |
| key=lambda x: scores[x], | |
| reverse=True | |
| )[:self.parallel_threshold] | |
| return selected | |
| async def _execute_strategies(self, | |
| strategies: List[StrategyType], | |
| query: str, | |
| context: Dict[str, Any]) -> Dict[StrategyType, StrategyResult]: | |
| """Execute selected strategies in parallel.""" | |
| async def execute_strategy(strategy_type: StrategyType) -> StrategyResult: | |
| strategy = self.strategies[strategy_type] | |
| start_time = datetime.now() | |
| try: | |
| result = await strategy.reason(query, context) | |
| return StrategyResult( | |
| strategy_type=strategy_type, | |
| success=result.get("success", False), | |
| answer=result.get("answer"), | |
| confidence=result.get("confidence", 0.0), | |
| reasoning_trace=result.get("reasoning_trace", []), | |
| metadata=result.get("metadata", {}), | |
| performance_metrics={ | |
| "execution_time": (datetime.now() - start_time).total_seconds(), | |
| **result.get("performance_metrics", {}) | |
| } | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error in strategy {strategy_type}: {str(e)}") | |
| return StrategyResult( | |
| strategy_type=strategy_type, | |
| success=False, | |
| answer=None, | |
| confidence=0.0, | |
| reasoning_trace=[{"error": str(e)}], | |
| metadata={}, | |
| performance_metrics={"execution_time": (datetime.now() - start_time).total_seconds()} | |
| ) | |
| # Execute strategies in parallel | |
| tasks = [execute_strategy(strategy) for strategy in strategies] | |
| results = await asyncio.gather(*tasks) | |
| return {result.strategy_type: result for result in results} | |
| async def _synthesize_results(self, | |
| strategy_results: Dict[StrategyType, StrategyResult], | |
| task_analysis: Dict[str, Any], | |
| context: Dict[str, Any]) -> UnifiedResult: | |
| """Synthesize results from multiple strategies with specialized combination methods.""" | |
| if not strategy_results: | |
| return UnifiedResult( | |
| success=False, | |
| answer="No strategy results available", | |
| confidence=0.0, | |
| strategy_results={}, | |
| synthesis_method="none", | |
| meta_insights=[], | |
| performance_metrics={} | |
| ) | |
| # Group results by strategy category | |
| core_results = {k: v for k, v in strategy_results.items() | |
| if k in {StrategyType.CHAIN_OF_THOUGHT, StrategyType.TREE_OF_THOUGHTS, | |
| StrategyType.META_LEARNING, StrategyType.LOCAL_LLM}} | |
| agentic_results = {k: v for k, v in strategy_results.items() | |
| if k in {StrategyType.TASK_DECOMPOSITION, StrategyType.RESOURCE_MANAGEMENT, | |
| StrategyType.CONTEXTUAL_PLANNING, StrategyType.ADAPTIVE_EXECUTION, | |
| StrategyType.FEEDBACK_INTEGRATION}} | |
| market_results = {k: v for k, v in strategy_results.items() | |
| if k in {StrategyType.MARKET_ANALYSIS, StrategyType.PORTFOLIO_OPTIMIZATION, | |
| StrategyType.VENTURE, StrategyType.MONETIZATION}} | |
| analytical_results = {k: v for k, v in strategy_results.items() | |
| if k in {StrategyType.BAYESIAN, StrategyType.NEUROSYMBOLIC, | |
| StrategyType.SPECIALIZED, StrategyType.MULTIMODAL}} | |
| # Determine synthesis method based on task type and available results | |
| task_type = task_analysis.get('task_type', 'general') | |
| synthesis_method = self._determine_synthesis_method(task_type, strategy_results.keys()) | |
| # Apply specialized synthesis based on method | |
| if synthesis_method == "weighted_voting": | |
| final_result = await self._weighted_voting_synthesis(strategy_results) | |
| elif synthesis_method == "market_focused": | |
| final_result = await self._market_focused_synthesis(market_results, core_results) | |
| elif synthesis_method == "analytical_consensus": | |
| final_result = await self._analytical_consensus_synthesis(analytical_results, core_results) | |
| elif synthesis_method == "agentic_orchestration": | |
| final_result = await self._agentic_orchestration_synthesis(agentic_results, strategy_results) | |
| else: | |
| final_result = await self._ensemble_synthesis(strategy_results) | |
| # Generate meta-insights about the synthesis process | |
| meta_insights = self._generate_meta_insights(strategy_results, synthesis_method) | |
| # Calculate aggregate performance metrics | |
| performance_metrics = self._calculate_synthesis_metrics(strategy_results, final_result) | |
| return UnifiedResult( | |
| success=final_result['success'], | |
| answer=final_result['answer'], | |
| confidence=final_result['confidence'], | |
| strategy_results=strategy_results, | |
| synthesis_method=synthesis_method, | |
| meta_insights=meta_insights, | |
| performance_metrics=performance_metrics | |
| ) | |
| def _determine_synthesis_method(self, task_type: str, available_strategies: Set[StrategyType]) -> str: | |
| """Determine the best synthesis method based on task type and available strategies.""" | |
| market_strategies = {StrategyType.MARKET_ANALYSIS, StrategyType.PORTFOLIO_OPTIMIZATION, | |
| StrategyType.VENTURE, StrategyType.MONETIZATION} | |
| analytical_strategies = {StrategyType.BAYESIAN, StrategyType.NEUROSYMBOLIC} | |
| agentic_strategies = {StrategyType.TASK_DECOMPOSITION, StrategyType.RESOURCE_MANAGEMENT, | |
| StrategyType.CONTEXTUAL_PLANNING} | |
| # Calculate strategy type coverage | |
| market_coverage = len(market_strategies.intersection(available_strategies)) | |
| analytical_coverage = len(analytical_strategies.intersection(available_strategies)) | |
| agentic_coverage = len(agentic_strategies.intersection(available_strategies)) | |
| if task_type in ['market_analysis', 'investment'] and market_coverage >= 2: | |
| return "market_focused" | |
| elif task_type in ['analysis', 'prediction'] and analytical_coverage >= 2: | |
| return "analytical_consensus" | |
| elif task_type in ['planning', 'execution'] and agentic_coverage >= 2: | |
| return "agentic_orchestration" | |
| else: | |
| return "weighted_voting" | |
| async def _weighted_voting_synthesis(self, strategy_results: Dict[StrategyType, StrategyResult]) -> Dict[str, Any]: | |
| """Combine results using weighted voting based on strategy confidence and historical performance.""" | |
| weighted_answers = defaultdict(float) | |
| total_weight = 0 | |
| for strategy_type, result in strategy_results.items(): | |
| # Calculate weight based on strategy confidence and historical performance | |
| historical_performance = np.mean(self.strategy_performance[strategy_type]) if self.strategy_performance[strategy_type] else 1.0 | |
| weight = self.strategy_weights[strategy_type] * result.confidence * historical_performance | |
| weighted_answers[result.answer] += weight | |
| total_weight += weight | |
| if not total_weight: | |
| return {'success': False, 'answer': '', 'confidence': 0.0} | |
| # Select answer with highest weighted votes | |
| best_answer = max(weighted_answers.items(), key=lambda x: x[1]) | |
| confidence = best_answer[1] / total_weight | |
| return { | |
| 'success': confidence >= self.min_confidence, | |
| 'answer': best_answer[0], | |
| 'confidence': confidence | |
| } | |
| async def _market_focused_synthesis(self, market_results: Dict[StrategyType, StrategyResult], | |
| core_results: Dict[StrategyType, StrategyResult]) -> Dict[str, Any]: | |
| """Synthesize results with emphasis on market-related strategies.""" | |
| market_consensus = await self._weighted_voting_synthesis(market_results) | |
| core_consensus = await self._weighted_voting_synthesis(core_results) | |
| # Combine market and core insights with higher weight for market results | |
| if market_consensus['confidence'] >= self.min_confidence: | |
| return { | |
| 'success': True, | |
| 'answer': f"{market_consensus['answer']} (Supported by core analysis: {core_consensus['answer']})", | |
| 'confidence': 0.7 * market_consensus['confidence'] + 0.3 * core_consensus['confidence'] | |
| } | |
| else: | |
| return core_consensus | |
| async def _analytical_consensus_synthesis(self, analytical_results: Dict[StrategyType, StrategyResult], | |
| core_results: Dict[StrategyType, StrategyResult]) -> Dict[str, Any]: | |
| """Synthesize results with emphasis on analytical and probabilistic reasoning.""" | |
| analytical_consensus = await self._weighted_voting_synthesis(analytical_results) | |
| core_consensus = await self._weighted_voting_synthesis(core_results) | |
| # Combine analytical and core insights with uncertainty quantification | |
| if analytical_consensus['confidence'] >= self.min_confidence: | |
| return { | |
| 'success': True, | |
| 'answer': f"{analytical_consensus['answer']} (Confidence interval: {analytical_consensus['confidence']:.2f})", | |
| 'confidence': 0.6 * analytical_consensus['confidence'] + 0.4 * core_consensus['confidence'] | |
| } | |
| else: | |
| return core_consensus | |
| async def _agentic_orchestration_synthesis(self, agentic_results: Dict[StrategyType, StrategyResult], | |
| all_results: Dict[StrategyType, StrategyResult]) -> Dict[str, Any]: | |
| """Synthesize results with emphasis on task decomposition and execution planning.""" | |
| # Extract task decomposition and planning insights | |
| task_structure = self._extract_task_structure(agentic_results) | |
| execution_plan = self._create_execution_plan(task_structure, all_results) | |
| # Combine results according to the execution plan | |
| synthesized_result = self._execute_synthesis_plan(execution_plan, all_results) | |
| return { | |
| 'success': synthesized_result['confidence'] >= self.min_confidence, | |
| 'answer': synthesized_result['answer'], | |
| 'confidence': synthesized_result['confidence'] | |
| } | |
| def _generate_meta_insights(self, strategy_results: Dict[StrategyType, StrategyResult], | |
| synthesis_method: str) -> List[str]: | |
| """Generate meta-insights about the synthesis process and strategy performance.""" | |
| insights = [] | |
| # Analyze strategy agreement | |
| agreement_rate = self._calculate_strategy_agreement(strategy_results) | |
| insights.append(f"Strategy agreement rate: {agreement_rate:.2f}") | |
| # Identify strongest and weakest strategies | |
| strategy_performances = [(st, res.confidence) for st, res in strategy_results.items()] | |
| best_strategy = max(strategy_performances, key=lambda x: x[1]) | |
| worst_strategy = min(strategy_performances, key=lambda x: x[1]) | |
| insights.append(f"Most confident strategy: {best_strategy[0]} ({best_strategy[1]:.2f})") | |
| insights.append(f"Synthesis method used: {synthesis_method}") | |
| return insights | |
| def _calculate_synthesis_metrics(self, strategy_results: Dict[StrategyType, StrategyResult], | |
| final_result: Dict[str, Any]) -> Dict[str, Any]: | |
| """Calculate comprehensive metrics about the synthesis process.""" | |
| return { | |
| 'strategy_count': len(strategy_results), | |
| 'average_confidence': np.mean([r.confidence for r in strategy_results.values()]), | |
| 'confidence_std': np.std([r.confidence for r in strategy_results.values()]), | |
| 'final_confidence': final_result['confidence'], | |
| 'strategy_agreement': self._calculate_strategy_agreement(strategy_results) | |
| } | |
| def _update_performance(self, result: UnifiedResult): | |
| """Update performance metrics and strategy weights.""" | |
| # Update strategy performance | |
| for strategy_type, strategy_result in result.strategy_results.items(): | |
| self.strategy_performance[strategy_type].append(strategy_result.confidence) | |
| # Update weights using exponential moving average | |
| current_weight = self.strategy_weights[strategy_type] | |
| performance = strategy_result.confidence | |
| self.strategy_weights[strategy_type] = ( | |
| (1 - self.learning_rate) * current_weight + | |
| self.learning_rate * performance | |
| ) | |
| # Update synthesis performance | |
| self.synthesis_performance[result.synthesis_method].append(result.confidence) | |
| def _calculate_resource_match(self, strategy_type: StrategyType, required_resources: Dict[str, Any]) -> float: | |
| """Calculate how well a strategy matches required resources.""" | |
| # Implementation-specific resource matching logic | |
| return 0.8 # Placeholder | |
| def _calculate_capability_match(self, strategy_type: StrategyType, required_capabilities: List[str]) -> float: | |
| """Calculate how well a strategy matches required capabilities.""" | |
| # Implementation-specific capability matching logic | |
| return 0.8 # Placeholder | |
| def _parse_task_analysis(self, response: str) -> Dict[str, Any]: | |
| """Parse task analysis from response.""" | |
| analysis = { | |
| "type": "", | |
| "complexity": 0.0, | |
| "capabilities": [], | |
| "resources": {}, | |
| "criteria": [], | |
| "risks": [] | |
| } | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line.startswith('Type:'): | |
| analysis["type"] = line[5:].strip() | |
| elif line.startswith('Complexity:'): | |
| try: | |
| analysis["complexity"] = float(line[11:].strip()) | |
| except: | |
| pass | |
| elif line.startswith('Capabilities:'): | |
| analysis["capabilities"] = [c.strip() for c in line[13:].split(',')] | |
| elif line.startswith('Resources:'): | |
| try: | |
| analysis["resources"] = json.loads(line[10:].strip()) | |
| except: | |
| analysis["resources"] = {"raw": line[10:].strip()} | |
| elif line.startswith('Criteria:'): | |
| analysis["criteria"] = [c.strip() for c in line[9:].split(',')] | |
| elif line.startswith('Risks:'): | |
| analysis["risks"] = [r.strip() for r in line[7:].split(',')] | |
| return analysis | |
| def _parse_synthesis(self, response: str) -> Dict[str, Any]: | |
| """Parse synthesis result from response.""" | |
| synthesis = { | |
| "method": "", | |
| "answer": "", | |
| "confidence": 0.0, | |
| "insights": [], | |
| "performance": {} | |
| } | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line.startswith('Method:'): | |
| synthesis["method"] = line[7:].strip() | |
| elif line.startswith('Answer:'): | |
| synthesis["answer"] = line[7:].strip() | |
| elif line.startswith('Confidence:'): | |
| try: | |
| synthesis["confidence"] = float(line[11:].strip()) | |
| except: | |
| pass | |
| elif line.startswith('Insights:'): | |
| synthesis["insights"] = [i.strip() for i in line[9:].split(',')] | |
| elif line.startswith('Performance:'): | |
| try: | |
| synthesis["performance"] = json.loads(line[12:].strip()) | |
| except: | |
| synthesis["performance"] = {"raw": line[12:].strip()} | |
| return synthesis | |
| def _strategy_result_to_dict(self, result: StrategyResult) -> Dict[str, Any]: | |
| """Convert strategy result to dictionary for serialization.""" | |
| return { | |
| "strategy_type": result.strategy_type.value, | |
| "success": result.success, | |
| "answer": result.answer, | |
| "confidence": result.confidence, | |
| "reasoning_trace": result.reasoning_trace, | |
| "metadata": result.metadata, | |
| "performance_metrics": result.performance_metrics, | |
| "timestamp": result.timestamp.isoformat() | |
| } | |
| def get_performance_metrics(self) -> Dict[str, Any]: | |
| """Get comprehensive performance metrics.""" | |
| return { | |
| "strategy_weights": dict(self.strategy_weights), | |
| "average_performance": { | |
| strategy_type.value: sum(scores) / len(scores) if scores else 0 | |
| for strategy_type, scores in self.strategy_performance.items() | |
| }, | |
| "synthesis_success": { | |
| method: sum(scores) / len(scores) if scores else 0 | |
| for method, scores in self.synthesis_performance.items() | |
| }, | |
| "task_type_performance": { | |
| task_type: dict(strategy_scores) | |
| for task_type, strategy_scores in self.task_type_performance.items() | |
| } | |
| } | |
| def clear_performance_history(self): | |
| """Clear performance history and reset weights.""" | |
| self.strategy_performance.clear() | |
| self.task_type_performance.clear() | |
| self.synthesis_performance.clear() | |
| self.strategy_weights = { | |
| strategy_type: 1.0 for strategy_type in StrategyType | |
| } | |
| def _extract_task_structure(self, agentic_results: Dict[StrategyType, StrategyResult]) -> Dict[str, Any]: | |
| """Extract task structure from agentic strategy results.""" | |
| # Implementation-specific task structure extraction logic | |
| return {} | |
| def _create_execution_plan(self, task_structure: Dict[str, Any], all_results: Dict[StrategyType, StrategyResult]) -> Dict[str, Any]: | |
| """Create execution plan based on task structure and strategy results.""" | |
| # Implementation-specific execution plan creation logic | |
| return {} | |
| def _execute_synthesis_plan(self, execution_plan: Dict[str, Any], all_results: Dict[StrategyType, StrategyResult]) -> Dict[str, Any]: | |
| """Execute synthesis plan and combine results.""" | |
| # Implementation-specific synthesis plan execution logic | |
| return {} | |
| def _calculate_strategy_agreement(self, strategy_results: Dict[StrategyType, StrategyResult]) -> float: | |
| """Calculate agreement rate among strategies.""" | |
| # Implementation-specific strategy agreement calculation logic | |
| return 0.0 | |