Spaces:
Runtime error
Runtime error
| """Recursive reasoning strategy implementation.""" | |
| import logging | |
| from typing import Dict, Any, List, Optional, Set, Tuple, Callable | |
| import json | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from datetime import datetime | |
| import asyncio | |
| from collections import defaultdict | |
| from .base import ReasoningStrategy, StrategyResult | |
| class SubproblemType(Enum): | |
| """Types of subproblems in recursive reasoning.""" | |
| ATOMIC = "atomic" | |
| COMPOSITE = "composite" | |
| PARALLEL = "parallel" | |
| SEQUENTIAL = "sequential" | |
| CONDITIONAL = "conditional" | |
| ITERATIVE = "iterative" | |
| class SolutionStatus(Enum): | |
| """Status of subproblem solutions.""" | |
| PENDING = "pending" | |
| IN_PROGRESS = "in_progress" | |
| SOLVED = "solved" | |
| FAILED = "failed" | |
| BLOCKED = "blocked" | |
| OPTIMIZING = "optimizing" | |
| class Subproblem: | |
| """Represents a subproblem in recursive reasoning.""" | |
| id: str | |
| type: SubproblemType | |
| query: str | |
| context: Dict[str, Any] | |
| parent_id: Optional[str] | |
| children: List[str] | |
| status: SolutionStatus | |
| solution: Optional[Dict[str, Any]] | |
| confidence: float | |
| dependencies: List[str] | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) | |
| class RecursiveStep: | |
| """Represents a step in recursive reasoning.""" | |
| id: str | |
| subproblem_id: str | |
| action: str | |
| result: Dict[str, Any] | |
| timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) | |
| class RecursiveStrategy(ReasoningStrategy): | |
| """Advanced recursive reasoning that: | |
| 1. Breaks down complex problems | |
| 2. Solves sub-problems recursively | |
| 3. Combines solutions | |
| 4. Handles base cases | |
| 5. Optimizes performance | |
| """ | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """Initialize recursive reasoning.""" | |
| super().__init__() | |
| self.config = config or {} | |
| # Standard reasoning parameters | |
| self.min_confidence = self.config.get('min_confidence', 0.7) | |
| self.max_depth = self.config.get('max_depth', 5) | |
| self.optimization_rounds = self.config.get('optimization_rounds', 2) | |
| # Problem tracking | |
| self.subproblems: Dict[str, Subproblem] = {} | |
| self.steps: List[RecursiveStep] = [] | |
| self.solution_cache: Dict[str, Dict[str, Any]] = {} | |
| self.cycle_detection: Set[str] = set() | |
| # Performance metrics | |
| self.performance_metrics = { | |
| 'depth_distribution': defaultdict(int), | |
| 'type_distribution': defaultdict(int), | |
| 'success_rate': defaultdict(float), | |
| 'total_subproblems': 0, | |
| 'solved_subproblems': 0, | |
| 'failed_subproblems': 0, | |
| 'optimization_rounds': 0, | |
| 'cache_hits': 0, | |
| 'cycles_detected': 0 | |
| } | |
| async def reason( | |
| self, | |
| query: str, | |
| context: Dict[str, Any] | |
| ) -> StrategyResult: | |
| """ | |
| Apply recursive reasoning to analyze the query. | |
| Args: | |
| query: The query to reason about | |
| context: Additional context and parameters | |
| Returns: | |
| StrategyResult containing the reasoning output and metadata | |
| """ | |
| try: | |
| # Initialize root problem | |
| root_problem = await self._initialize_problem(query, context) | |
| root_id = root_problem.id | |
| # Solve recursively | |
| solution = await self._solve_recursive(root_id, depth=0) | |
| # Optimize solution | |
| if solution and solution.get('success', False): | |
| solution = await self._optimize_solution(solution, root_problem, context) | |
| # Update metrics | |
| self._update_metrics(root_id) | |
| # Build solution trace | |
| solution_trace = self._get_solution_trace(root_id) | |
| # Calculate overall confidence | |
| confidence = self._calculate_confidence(solution_trace) | |
| return StrategyResult( | |
| strategy_type="recursive", | |
| success=bool(solution and solution.get('success', False)), | |
| answer=solution.get('answer') if solution else None, | |
| confidence=confidence, | |
| reasoning_trace=solution_trace, | |
| metadata={ | |
| 'problem_tree': self._get_problem_tree(root_id), | |
| 'steps': [self._step_to_dict(step) for step in self.steps], | |
| 'solution_details': solution if solution else {} | |
| }, | |
| performance_metrics=self.performance_metrics | |
| ) | |
| except Exception as e: | |
| logging.error(f"Recursive reasoning error: {str(e)}") | |
| return StrategyResult( | |
| strategy_type="recursive", | |
| success=False, | |
| answer=None, | |
| confidence=0.0, | |
| reasoning_trace=[{ | |
| 'step': 'error', | |
| 'error': str(e), | |
| 'timestamp': datetime.now().isoformat() | |
| }], | |
| metadata={'error': str(e)}, | |
| performance_metrics=self.performance_metrics | |
| ) | |
| async def _initialize_problem( | |
| self, | |
| query: str, | |
| context: Dict[str, Any] | |
| ) -> Subproblem: | |
| """Initialize the root problem.""" | |
| problem = Subproblem( | |
| id="root", | |
| type=SubproblemType.COMPOSITE, | |
| query=query, | |
| context=context, | |
| parent_id=None, | |
| children=[], | |
| status=SolutionStatus.PENDING, | |
| solution=None, | |
| confidence=1.0, | |
| dependencies=[], | |
| metadata={'depth': 0} | |
| ) | |
| self.subproblems[problem.id] = problem | |
| self._record_step(RecursiveStep( | |
| id=f"init_{problem.id}", | |
| subproblem_id=problem.id, | |
| action="initialize", | |
| result={'type': problem.type.value, 'query': query} | |
| )) | |
| return problem | |
| async def _solve_recursive( | |
| self, | |
| problem_id: str, | |
| depth: int | |
| ) -> Optional[Dict[str, Any]]: | |
| """Recursively solve a problem and its subproblems.""" | |
| if depth > self.max_depth: | |
| return None | |
| problem = self.subproblems[problem_id] | |
| # Check cycle | |
| if problem_id in self.cycle_detection: | |
| self.performance_metrics['cycles_detected'] += 1 | |
| return None | |
| self.cycle_detection.add(problem_id) | |
| try: | |
| # Check cache | |
| if problem_id in self.solution_cache: | |
| self.performance_metrics['cache_hits'] += 1 | |
| return self.solution_cache[problem_id] | |
| # Decompose if composite | |
| if problem.type != SubproblemType.ATOMIC: | |
| await self._decompose_problem(problem, problem.context) | |
| # Solve atomic problem | |
| if problem.type == SubproblemType.ATOMIC: | |
| solution = await self._solve_atomic(problem) | |
| if solution: | |
| problem.solution = solution | |
| problem.status = SolutionStatus.SOLVED | |
| return solution | |
| else: | |
| problem.status = SolutionStatus.FAILED | |
| return None | |
| # Solve subproblems | |
| subsolutions = [] | |
| for child_id in problem.children: | |
| child_solution = await self._solve_recursive(child_id, depth + 1) | |
| if child_solution: | |
| subsolutions.append(child_solution) | |
| # Synthesize solutions | |
| if subsolutions: | |
| solution = await self._synthesize_solutions(subsolutions, problem, problem.context) | |
| if solution: | |
| problem.solution = solution | |
| problem.status = SolutionStatus.SOLVED | |
| self.solution_cache[problem_id] = solution | |
| return solution | |
| problem.status = SolutionStatus.FAILED | |
| return None | |
| finally: | |
| self.cycle_detection.remove(problem_id) | |
| async def _decompose_problem( | |
| self, | |
| problem: Subproblem, | |
| context: Dict[str, Any] | |
| ) -> None: | |
| """Decompose a problem into subproblems.""" | |
| subproblems = self._generate_subproblems(problem, context) | |
| for subproblem in subproblems: | |
| self.subproblems[subproblem.id] = subproblem | |
| problem.children.append(subproblem.id) | |
| self._record_step(RecursiveStep( | |
| id=f"decompose_{problem.id}", | |
| subproblem_id=problem.id, | |
| action="decompose", | |
| result={'num_subproblems': len(subproblems)} | |
| )) | |
| def _generate_subproblems( | |
| self, | |
| parent: Subproblem, | |
| context: Dict[str, Any] | |
| ) -> List[Subproblem]: | |
| """Generate subproblems for a composite problem.""" | |
| # This is a placeholder implementation | |
| # In practice, this would use more sophisticated decomposition | |
| subproblems = [] | |
| # Example: Split into 2-3 subproblems | |
| parts = parent.query.split('.')[:3] | |
| for i, part in enumerate(parts): | |
| if part.strip(): | |
| subproblem = Subproblem( | |
| id=f"{parent.id}_sub{i}", | |
| type=SubproblemType.ATOMIC, | |
| query=part.strip(), | |
| context=context, | |
| parent_id=parent.id, | |
| children=[], | |
| status=SolutionStatus.PENDING, | |
| solution=None, | |
| confidence=0.0, | |
| dependencies=[], | |
| metadata={'depth': parent.metadata['depth'] + 1} | |
| ) | |
| subproblems.append(subproblem) | |
| return subproblems | |
| async def _solve_atomic( | |
| self, | |
| problem: Subproblem | |
| ) -> Optional[Dict[str, Any]]: | |
| """Solve an atomic problem.""" | |
| # This is a placeholder implementation | |
| # In practice, this would use more sophisticated solving strategies | |
| solution = { | |
| 'success': True, | |
| 'answer': f"Solution for {problem.query}", | |
| 'confidence': 0.8 | |
| } | |
| self._record_step(RecursiveStep( | |
| id=f"solve_{problem.id}", | |
| subproblem_id=problem.id, | |
| action="solve_atomic", | |
| result=solution | |
| )) | |
| return solution | |
| async def _synthesize_solutions( | |
| self, | |
| subsolutions: List[Dict[str, Any]], | |
| problem: Subproblem, | |
| context: Dict[str, Any] | |
| ) -> Optional[Dict[str, Any]]: | |
| """Synthesize solutions from subproblems.""" | |
| if not subsolutions: | |
| return None | |
| # Combine answers | |
| combined_answer = " ".join( | |
| sol['answer'] for sol in subsolutions if sol.get('answer') | |
| ) | |
| # Average confidence | |
| avg_confidence = sum( | |
| sol['confidence'] for sol in subsolutions | |
| ) / len(subsolutions) | |
| synthesis = { | |
| 'success': True, | |
| 'answer': combined_answer, | |
| 'confidence': avg_confidence, | |
| 'subsolutions': subsolutions | |
| } | |
| self._record_step(RecursiveStep( | |
| id=f"synthesize_{problem.id}", | |
| subproblem_id=problem.id, | |
| action="synthesize", | |
| result={'num_solutions': len(subsolutions)} | |
| )) | |
| return synthesis | |
| async def _optimize_solution( | |
| self, | |
| solution: Dict[str, Any], | |
| problem: Subproblem, | |
| context: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Optimize the final solution.""" | |
| optimized = solution.copy() | |
| for _ in range(self.optimization_rounds): | |
| self.performance_metrics['optimization_rounds'] += 1 | |
| # Example optimization: Improve confidence | |
| if optimized['confidence'] < 0.9: | |
| optimized['confidence'] *= 1.1 | |
| self._record_step(RecursiveStep( | |
| id=f"optimize_{problem.id}", | |
| subproblem_id=problem.id, | |
| action="optimize", | |
| result={'confidence_improvement': optimized['confidence'] - solution['confidence']} | |
| )) | |
| return optimized | |
| def _calculate_confidence( | |
| self, | |
| solution_trace: List[Dict[str, Any]] | |
| ) -> float: | |
| """Calculate overall confidence from solution trace.""" | |
| if not solution_trace: | |
| return 0.0 | |
| confidences = [ | |
| step.get('confidence', 0.0) | |
| for step in solution_trace | |
| if isinstance(step.get('confidence'), (int, float)) | |
| ] | |
| return sum(confidences) / len(confidences) if confidences else 0.0 | |
| def _update_metrics(self, root_id: str) -> None: | |
| """Update performance metrics.""" | |
| def update_recursive(problem_id: str): | |
| problem = self.subproblems[problem_id] | |
| depth = problem.metadata.get('depth', 0) | |
| self.performance_metrics['depth_distribution'][depth] += 1 | |
| self.performance_metrics['type_distribution'][problem.type] += 1 | |
| self.performance_metrics['total_subproblems'] += 1 | |
| if problem.status == SolutionStatus.SOLVED: | |
| self.performance_metrics['solved_subproblems'] += 1 | |
| elif problem.status == SolutionStatus.FAILED: | |
| self.performance_metrics['failed_subproblems'] += 1 | |
| for child_id in problem.children: | |
| update_recursive(child_id) | |
| update_recursive(root_id) | |
| # Calculate success rates | |
| total = self.performance_metrics['total_subproblems'] | |
| if total > 0: | |
| for problem_type in SubproblemType: | |
| type_count = self.performance_metrics['type_distribution'][problem_type] | |
| if type_count > 0: | |
| success_count = sum( | |
| 1 for p in self.subproblems.values() | |
| if p.type == problem_type and p.status == SolutionStatus.SOLVED | |
| ) | |
| self.performance_metrics['success_rate'][problem_type] = success_count / type_count | |
| def _get_problem_tree(self, root_id: str) -> Dict[str, Any]: | |
| """Get the problem decomposition tree.""" | |
| def build_tree(problem_id: str) -> Dict[str, Any]: | |
| problem = self.subproblems[problem_id] | |
| return { | |
| 'id': problem.id, | |
| 'type': problem.type.value, | |
| 'status': problem.status.value, | |
| 'confidence': problem.confidence, | |
| 'children': [build_tree(child_id) for child_id in problem.children] | |
| } | |
| return build_tree(root_id) | |
| def _get_solution_trace(self, root_id: str) -> List[Dict[str, Any]]: | |
| """Get the solution trace for a problem.""" | |
| trace = [] | |
| def build_trace(problem_id: str): | |
| problem = self.subproblems[problem_id] | |
| step = { | |
| 'id': problem.id, | |
| 'type': problem.type.value, | |
| 'status': problem.status.value, | |
| 'confidence': problem.confidence, | |
| 'timestamp': problem.timestamp | |
| } | |
| if problem.solution: | |
| step.update(problem.solution) | |
| trace.append(step) | |
| for child_id in problem.children: | |
| build_trace(child_id) | |
| build_trace(root_id) | |
| return trace | |
| def _record_step(self, step: RecursiveStep) -> None: | |
| """Record a reasoning step.""" | |
| self.steps.append(step) | |
| def _step_to_dict(self, step: RecursiveStep) -> Dict[str, Any]: | |
| """Convert step to dictionary for serialization.""" | |
| return { | |
| 'id': step.id, | |
| 'subproblem_id': step.subproblem_id, | |
| 'action': step.action, | |
| 'result': step.result, | |
| 'timestamp': step.timestamp | |
| } | |
| def clear_cache(self) -> None: | |
| """Clear solution cache.""" | |
| self.solution_cache.clear() | |
| self.performance_metrics['cache_hits'] = 0 | |