Spaces:
Runtime error
Runtime error
| """Recursive reasoning implementation with advanced decomposition and synthesis.""" | |
| import logging | |
| from typing import Dict, Any, List, Optional, Set, Tuple, Callable | |
| import json | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from datetime import datetime | |
| import asyncio | |
| from collections import defaultdict | |
| from .base import ReasoningStrategy | |
| class SubproblemType(Enum): | |
| """Types of subproblems in recursive reasoning.""" | |
| ATOMIC = "atomic" | |
| COMPOSITE = "composite" | |
| PARALLEL = "parallel" | |
| SEQUENTIAL = "sequential" | |
| CONDITIONAL = "conditional" | |
| ITERATIVE = "iterative" | |
| class SolutionStatus(Enum): | |
| """Status of subproblem solutions.""" | |
| PENDING = "pending" | |
| IN_PROGRESS = "in_progress" | |
| SOLVED = "solved" | |
| FAILED = "failed" | |
| BLOCKED = "blocked" | |
| OPTIMIZING = "optimizing" | |
| class Subproblem: | |
| """Represents a subproblem in recursive reasoning.""" | |
| id: str | |
| type: SubproblemType | |
| query: str | |
| context: Dict[str, Any] | |
| parent_id: Optional[str] | |
| children: List[str] | |
| status: SolutionStatus | |
| solution: Optional[Dict[str, Any]] | |
| confidence: float | |
| dependencies: List[str] | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class RecursiveStep: | |
| """Represents a step in recursive reasoning.""" | |
| id: str | |
| subproblem_id: str | |
| action: str | |
| timestamp: datetime | |
| result: Optional[Dict[str, Any]] | |
| metrics: Dict[str, float] | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class RecursiveReasoning(ReasoningStrategy): | |
| """ | |
| Advanced Recursive Reasoning implementation with: | |
| - Dynamic problem decomposition | |
| - Parallel subproblem solving | |
| - Solution synthesis | |
| - Cycle detection | |
| - Optimization strategies | |
| """ | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """Initialize recursive reasoning.""" | |
| super().__init__() | |
| self.config = config or {} | |
| # Standard reasoning parameters | |
| self.min_confidence = self.config.get('min_confidence', 0.7) | |
| self.parallel_threshold = self.config.get('parallel_threshold', 3) | |
| self.learning_rate = self.config.get('learning_rate', 0.1) | |
| self.strategy_weights = self.config.get('strategy_weights', { | |
| "LOCAL_LLM": 0.8, | |
| "CHAIN_OF_THOUGHT": 0.6, | |
| "TREE_OF_THOUGHTS": 0.5, | |
| "META_LEARNING": 0.4 | |
| }) | |
| # Recursive reasoning specific parameters | |
| self.max_depth = self.config.get('max_depth', 5) | |
| self.optimization_rounds = self.config.get('optimization_rounds', 2) | |
| # Problem tracking | |
| self.subproblems: Dict[str, Subproblem] = {} | |
| self.steps: List[RecursiveStep] = [] | |
| self.solution_cache: Dict[str, Dict[str, Any]] = {} | |
| self.cycle_detection: Set[str] = set() | |
| # Performance metrics | |
| self.depth_distribution: Dict[int, int] = defaultdict(int) | |
| self.type_distribution: Dict[SubproblemType, int] = defaultdict(int) | |
| self.success_rate: Dict[SubproblemType, float] = defaultdict(float) | |
| async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """Main reasoning method implementing recursive reasoning.""" | |
| try: | |
| # Initialize root problem | |
| root = await self._initialize_problem(query, context) | |
| self.subproblems[root.id] = root | |
| # Recursively solve | |
| solution = await self._solve_recursive(root.id, depth=0) | |
| # Optimize solution | |
| optimized = await self._optimize_solution(solution, root, context) | |
| # Update metrics | |
| self._update_metrics(root.id) | |
| return { | |
| "success": True, | |
| "answer": optimized["answer"], | |
| "confidence": optimized["confidence"], | |
| "decomposition": self._get_problem_tree(root.id), | |
| "solution_trace": self._get_solution_trace(root.id), | |
| "performance_metrics": self._get_performance_metrics(), | |
| "meta_insights": optimized["meta_insights"] | |
| } | |
| except Exception as e: | |
| logging.error(f"Error in recursive reasoning: {str(e)}") | |
| return {"success": False, "error": str(e)} | |
| async def _initialize_problem(self, query: str, context: Dict[str, Any]) -> Subproblem: | |
| """Initialize the root problem.""" | |
| prompt = f""" | |
| Initialize recursive reasoning problem: | |
| Query: {query} | |
| Context: {json.dumps(context)} | |
| Analyze for: | |
| 1. Problem type classification | |
| 2. Initial decomposition strategy | |
| 3. Key dependencies | |
| 4. Solution approach | |
| Format as: | |
| [Problem] | |
| Type: ... | |
| Strategy: ... | |
| Dependencies: ... | |
| Approach: ... | |
| """ | |
| response = await context["groq_api"].predict(prompt) | |
| return self._parse_problem_init(response["answer"], query, context) | |
| async def _decompose_problem(self, problem: Subproblem, context: Dict[str, Any]) -> List[Subproblem]: | |
| """Decompose a problem into subproblems.""" | |
| prompt = f""" | |
| Decompose problem into subproblems: | |
| Problem: {json.dumps(self._problem_to_dict(problem))} | |
| Context: {json.dumps(context)} | |
| For each subproblem specify: | |
| 1. [Type]: {" | ".join([t.value for t in SubproblemType])} | |
| 2. [Query]: Specific question | |
| 3. [Dependencies]: Required solutions | |
| 4. [Approach]: Solution strategy | |
| Format as: | |
| [S1] | |
| Type: ... | |
| Query: ... | |
| Dependencies: ... | |
| Approach: ... | |
| """ | |
| response = await context["groq_api"].predict(prompt) | |
| return self._parse_subproblems(response["answer"], problem.id, context) | |
| async def _solve_recursive(self, problem_id: str, depth: int) -> Dict[str, Any]: | |
| """Recursively solve a problem and its subproblems.""" | |
| if depth > self.max_depth: | |
| return {"success": False, "error": "Maximum recursion depth exceeded"} | |
| if problem_id in self.cycle_detection: | |
| return {"success": False, "error": "Cycle detected in recursive solving"} | |
| problem = self.subproblems[problem_id] | |
| self.cycle_detection.add(problem_id) | |
| self.depth_distribution[depth] += 1 | |
| try: | |
| # Check cache | |
| cache_key = f"{problem.query}:{json.dumps(problem.context)}" | |
| if cache_key in self.solution_cache: | |
| return self.solution_cache[cache_key] | |
| # Check if atomic | |
| if problem.type == SubproblemType.ATOMIC: | |
| solution = await self._solve_atomic(problem) | |
| else: | |
| # Decompose | |
| subproblems = await self._decompose_problem(problem, problem.context) | |
| for sub in subproblems: | |
| self.subproblems[sub.id] = sub | |
| problem.children.append(sub.id) | |
| # Solve subproblems | |
| if problem.type == SubproblemType.PARALLEL and len(subproblems) >= self.parallel_threshold: | |
| # Solve in parallel | |
| tasks = [self._solve_recursive(sub.id, depth + 1) for sub in subproblems] | |
| subsolutions = await asyncio.gather(*tasks) | |
| else: | |
| # Solve sequentially | |
| subsolutions = [] | |
| for sub in subproblems: | |
| subsolution = await self._solve_recursive(sub.id, depth + 1) | |
| subsolutions.append(subsolution) | |
| # Synthesize solutions | |
| solution = await self._synthesize_solutions(subsolutions, problem, problem.context) | |
| # Cache solution | |
| self.solution_cache[cache_key] = solution | |
| problem.solution = solution | |
| problem.status = SolutionStatus.SOLVED if solution["success"] else SolutionStatus.FAILED | |
| return solution | |
| finally: | |
| self.cycle_detection.remove(problem_id) | |
| async def _solve_atomic(self, problem: Subproblem) -> Dict[str, Any]: | |
| """Solve an atomic problem.""" | |
| prompt = f""" | |
| Solve atomic problem: | |
| Problem: {json.dumps(self._problem_to_dict(problem))} | |
| Provide: | |
| 1. Direct solution | |
| 2. Confidence level | |
| 3. Supporting evidence | |
| 4. Alternative approaches | |
| Format as: | |
| [Solution] | |
| Answer: ... | |
| Confidence: ... | |
| Evidence: ... | |
| Alternatives: ... | |
| """ | |
| response = await problem.context["groq_api"].predict(prompt) | |
| solution = self._parse_atomic_solution(response["answer"]) | |
| self._record_step(RecursiveStep( | |
| id=f"step_{len(self.steps)}", | |
| subproblem_id=problem.id, | |
| action="atomic_solve", | |
| timestamp=datetime.now(), | |
| result=solution, | |
| metrics={"confidence": solution.get("confidence", 0.0)}, | |
| metadata={} | |
| )) | |
| return solution | |
| async def _synthesize_solutions(self, subsolutions: List[Dict[str, Any]], problem: Subproblem, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """Synthesize solutions from subproblems.""" | |
| prompt = f""" | |
| Synthesize solutions: | |
| Problem: {json.dumps(self._problem_to_dict(problem))} | |
| Solutions: {json.dumps(subsolutions)} | |
| Context: {json.dumps(context)} | |
| Provide: | |
| 1. Integrated solution | |
| 2. Confidence assessment | |
| 3. Integration method | |
| 4. Quality metrics | |
| Format as: | |
| [Synthesis] | |
| Solution: ... | |
| Confidence: ... | |
| Method: ... | |
| Metrics: ... | |
| """ | |
| response = await context["groq_api"].predict(prompt) | |
| synthesis = self._parse_synthesis(response["answer"]) | |
| self._record_step(RecursiveStep( | |
| id=f"step_{len(self.steps)}", | |
| subproblem_id=problem.id, | |
| action="synthesize", | |
| timestamp=datetime.now(), | |
| result=synthesis, | |
| metrics={"confidence": synthesis.get("confidence", 0.0)}, | |
| metadata={"num_subsolutions": len(subsolutions)} | |
| )) | |
| return synthesis | |
| async def _optimize_solution(self, solution: Dict[str, Any], problem: Subproblem, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """Optimize the final solution.""" | |
| prompt = f""" | |
| Optimize recursive solution: | |
| Original: {json.dumps(solution)} | |
| Problem: {json.dumps(self._problem_to_dict(problem))} | |
| Context: {json.dumps(context)} | |
| Optimize for: | |
| 1. Completeness | |
| 2. Consistency | |
| 3. Efficiency | |
| 4. Clarity | |
| Format as: | |
| [Optimization] | |
| Answer: ... | |
| Improvements: ... | |
| Metrics: ... | |
| Insights: ... | |
| """ | |
| response = await context["groq_api"].predict(prompt) | |
| return self._parse_optimization(response["answer"]) | |
| def _update_metrics(self, root_id: str): | |
| """Update performance metrics.""" | |
| def update_recursive(problem_id: str): | |
| problem = self.subproblems[problem_id] | |
| self.type_distribution[problem.type] += 1 | |
| if problem.status == SolutionStatus.SOLVED: | |
| self.success_rate[problem.type] = ( | |
| self.success_rate[problem.type] * (self.type_distribution[problem.type] - 1) + | |
| problem.confidence | |
| ) / self.type_distribution[problem.type] | |
| for child_id in problem.children: | |
| update_recursive(child_id) | |
| update_recursive(root_id) | |
| def _get_problem_tree(self, root_id: str) -> Dict[str, Any]: | |
| """Get the problem decomposition tree.""" | |
| def build_tree(problem_id: str) -> Dict[str, Any]: | |
| problem = self.subproblems[problem_id] | |
| return { | |
| "id": problem.id, | |
| "type": problem.type.value, | |
| "query": problem.query, | |
| "status": problem.status.value, | |
| "confidence": problem.confidence, | |
| "children": [build_tree(child_id) for child_id in problem.children] | |
| } | |
| return build_tree(root_id) | |
| def _get_solution_trace(self, root_id: str) -> List[Dict[str, Any]]: | |
| """Get the solution trace for a problem.""" | |
| return [self._step_to_dict(step) for step in self.steps | |
| if step.subproblem_id == root_id or | |
| any(step.subproblem_id == sub_id for sub_id in self.subproblems[root_id].children)] | |
| def _get_performance_metrics(self) -> Dict[str, Any]: | |
| """Get current performance metrics.""" | |
| return { | |
| "depth_distribution": dict(self.depth_distribution), | |
| "type_distribution": {t.value: c for t, c in self.type_distribution.items()}, | |
| "success_rate": {t.value: r for t, r in self.success_rate.items()}, | |
| "cache_hits": len(self.solution_cache), | |
| "total_steps": len(self.steps) | |
| } | |
| def _record_step(self, step: RecursiveStep): | |
| """Record a reasoning step.""" | |
| self.steps.append(step) | |
| def _parse_problem_init(self, response: str, query: str, context: Dict[str, Any]) -> Subproblem: | |
| """Parse initial problem configuration.""" | |
| problem_type = SubproblemType.COMPOSITE # default | |
| dependencies = [] | |
| metadata = {} | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line.startswith('Type:'): | |
| try: | |
| problem_type = SubproblemType(line[5:].strip().lower()) | |
| except ValueError: | |
| pass | |
| elif line.startswith('Dependencies:'): | |
| dependencies = [d.strip() for d in line[13:].split(',')] | |
| elif line.startswith('Strategy:') or line.startswith('Approach:'): | |
| metadata["strategy"] = line.split(':', 1)[1].strip() | |
| return Subproblem( | |
| id="root", | |
| type=problem_type, | |
| query=query, | |
| context=context, | |
| parent_id=None, | |
| children=[], | |
| status=SolutionStatus.PENDING, | |
| solution=None, | |
| confidence=0.0, | |
| dependencies=dependencies, | |
| metadata=metadata | |
| ) | |
| def _parse_subproblems(self, response: str, parent_id: str, context: Dict[str, Any]) -> List[Subproblem]: | |
| """Parse subproblems from response.""" | |
| subproblems = [] | |
| current = None | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if line.startswith('[S'): | |
| if current: | |
| subproblems.append(current) | |
| current = None | |
| elif line.startswith('Type:'): | |
| try: | |
| problem_type = SubproblemType(line[5:].strip().lower()) | |
| current = Subproblem( | |
| id=f"{parent_id}_{len(subproblems)}", | |
| type=problem_type, | |
| query="", | |
| context=context, | |
| parent_id=parent_id, | |
| children=[], | |
| status=SolutionStatus.PENDING, | |
| solution=None, | |
| confidence=0.0, | |
| dependencies=[], | |
| metadata={} | |
| ) | |
| except ValueError: | |
| current = None | |
| elif current: | |
| if line.startswith('Query:'): | |
| current.query = line[6:].strip() | |
| elif line.startswith('Dependencies:'): | |
| current.dependencies = [d.strip() for d in line[13:].split(',')] | |
| elif line.startswith('Approach:'): | |
| current.metadata["approach"] = line[9:].strip() | |
| if current: | |
| subproblems.append(current) | |
| return subproblems | |
| def _parse_atomic_solution(self, response: str) -> Dict[str, Any]: | |
| """Parse atomic solution from response.""" | |
| solution = { | |
| "success": True, | |
| "answer": "", | |
| "confidence": 0.0, | |
| "evidence": [], | |
| "alternatives": [] | |
| } | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line.startswith('Answer:'): | |
| solution["answer"] = line[7:].strip() | |
| elif line.startswith('Confidence:'): | |
| try: | |
| solution["confidence"] = float(line[11:].strip()) | |
| except: | |
| pass | |
| elif line.startswith('Evidence:'): | |
| solution["evidence"] = [e.strip() for e in line[9:].split(',')] | |
| elif line.startswith('Alternatives:'): | |
| solution["alternatives"] = [a.strip() for a in line[13:].split(',')] | |
| return solution | |
| def _parse_synthesis(self, response: str) -> Dict[str, Any]: | |
| """Parse synthesis result from response.""" | |
| synthesis = { | |
| "success": True, | |
| "solution": "", | |
| "confidence": 0.0, | |
| "method": "", | |
| "metrics": {} | |
| } | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line.startswith('Solution:'): | |
| synthesis["solution"] = line[9:].strip() | |
| elif line.startswith('Confidence:'): | |
| try: | |
| synthesis["confidence"] = float(line[11:].strip()) | |
| except: | |
| pass | |
| elif line.startswith('Method:'): | |
| synthesis["method"] = line[7:].strip() | |
| elif line.startswith('Metrics:'): | |
| try: | |
| synthesis["metrics"] = json.loads(line[8:].strip()) | |
| except: | |
| pass | |
| return synthesis | |
| def _parse_optimization(self, response: str) -> Dict[str, Any]: | |
| """Parse optimization result from response.""" | |
| optimization = { | |
| "answer": "", | |
| "confidence": 0.0, | |
| "improvements": [], | |
| "metrics": {}, | |
| "meta_insights": [] | |
| } | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line.startswith('Answer:'): | |
| optimization["answer"] = line[7:].strip() | |
| elif line.startswith('Improvements:'): | |
| optimization["improvements"] = [i.strip() for i in line[13:].split(',')] | |
| elif line.startswith('Metrics:'): | |
| try: | |
| optimization["metrics"] = json.loads(line[8:].strip()) | |
| except: | |
| pass | |
| elif line.startswith('Insights:'): | |
| optimization["meta_insights"] = [i.strip() for i in line[9:].split(',')] | |
| return optimization | |
| def _problem_to_dict(self, problem: Subproblem) -> Dict[str, Any]: | |
| """Convert problem to dictionary for serialization.""" | |
| return { | |
| "id": problem.id, | |
| "type": problem.type.value, | |
| "query": problem.query, | |
| "parent_id": problem.parent_id, | |
| "children": problem.children, | |
| "status": problem.status.value, | |
| "confidence": problem.confidence, | |
| "dependencies": problem.dependencies, | |
| "metadata": problem.metadata | |
| } | |
| def _step_to_dict(self, step: RecursiveStep) -> Dict[str, Any]: | |
| """Convert step to dictionary for serialization.""" | |
| return { | |
| "id": step.id, | |
| "subproblem_id": step.subproblem_id, | |
| "action": step.action, | |
| "timestamp": step.timestamp.isoformat(), | |
| "result": step.result, | |
| "metrics": step.metrics, | |
| "metadata": step.metadata | |
| } | |
| def clear_cache(self): | |
| """Clear solution cache.""" | |
| self.solution_cache.clear() | |
| def get_statistics(self) -> Dict[str, Any]: | |
| """Get detailed statistics about the reasoning process.""" | |
| return { | |
| "total_problems": len(self.subproblems), | |
| "total_steps": len(self.steps), | |
| "cache_size": len(self.solution_cache), | |
| "type_distribution": dict(self.type_distribution), | |
| "depth_distribution": dict(self.depth_distribution), | |
| "success_rates": dict(self.success_rate), | |
| "average_confidence": sum(p.confidence for p in self.subproblems.values()) / len(self.subproblems) if self.subproblems else 0.0 | |
| } | |