Spaces:
Runtime error
Runtime error
| """Advanced Bayesian reasoning for probabilistic analysis.""" | |
| import logging | |
| from typing import Dict, Any, List, Optional, Set, Union, Type, Tuple | |
| import json | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from datetime import datetime | |
| import numpy as np | |
| from collections import defaultdict | |
| from .base import ReasoningStrategy | |
| class BayesianHypothesis: | |
| """Bayesian hypothesis with probabilities.""" | |
| name: str | |
| prior: float | |
| likelihood: float | |
| posterior: float = 0.0 | |
| evidence: List[Dict[str, Any]] = field(default_factory=list) | |
| class BayesianReasoning(ReasoningStrategy): | |
| """ | |
| Advanced Bayesian reasoning that: | |
| 1. Generates hypotheses | |
| 2. Calculates prior probabilities | |
| 3. Updates with evidence | |
| 4. Computes posteriors | |
| 5. Provides probabilistic analysis | |
| """ | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """Initialize Bayesian reasoning.""" | |
| super().__init__() | |
| self.config = config or {} | |
| # Configure Bayesian parameters | |
| self.prior_weight = self.config.get('prior_weight', 0.3) | |
| self.evidence_threshold = self.config.get('evidence_threshold', 0.1) | |
| self.min_likelihood = self.config.get('min_likelihood', 0.01) | |
| async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Apply Bayesian reasoning to analyze probabilities and update beliefs. | |
| Args: | |
| query: The input query to reason about | |
| context: Additional context and parameters | |
| Returns: | |
| Dict containing reasoning results and confidence scores | |
| """ | |
| try: | |
| # Generate hypotheses | |
| hypotheses = await self._generate_hypotheses(query, context) | |
| # Calculate priors | |
| priors = await self._calculate_priors(hypotheses, context) | |
| # Update with evidence | |
| posteriors = await self._update_with_evidence( | |
| hypotheses, | |
| priors, | |
| context | |
| ) | |
| # Generate analysis | |
| analysis = await self._generate_analysis(posteriors, context) | |
| return { | |
| 'answer': self._format_analysis(analysis), | |
| 'confidence': self._calculate_confidence(posteriors), | |
| 'hypotheses': hypotheses, | |
| 'priors': priors, | |
| 'posteriors': posteriors, | |
| 'analysis': analysis | |
| } | |
| except Exception as e: | |
| logging.error(f"Bayesian reasoning failed: {str(e)}") | |
| return { | |
| 'error': f"Bayesian reasoning failed: {str(e)}", | |
| 'confidence': 0.0 | |
| } | |
| async def _generate_hypotheses( | |
| self, | |
| query: str, | |
| context: Dict[str, Any] | |
| ) -> List[Dict[str, Any]]: | |
| """Generate plausible hypotheses.""" | |
| hypotheses = [] | |
| # Extract key terms for hypothesis generation | |
| terms = set(query.lower().split()) | |
| # Generate hypotheses based on context and terms | |
| if 'options' in context: | |
| # Use provided options as hypotheses | |
| for option in context['options']: | |
| hypotheses.append({ | |
| 'name': option, | |
| 'description': f"Hypothesis based on option: {option}", | |
| 'factors': self._extract_factors(option, terms) | |
| }) | |
| else: | |
| # Generate default hypotheses | |
| hypotheses.extend([ | |
| { | |
| 'name': 'primary', | |
| 'description': "Primary hypothesis based on direct interpretation", | |
| 'factors': self._extract_factors(query, terms) | |
| }, | |
| { | |
| 'name': 'alternative', | |
| 'description': "Alternative hypothesis considering other factors", | |
| 'factors': self._generate_alternative_factors(terms) | |
| } | |
| ]) | |
| return hypotheses | |
| async def _calculate_priors( | |
| self, | |
| hypotheses: List[Dict[str, Any]], | |
| context: Dict[str, Any] | |
| ) -> Dict[str, float]: | |
| """Calculate prior probabilities.""" | |
| priors = {} | |
| # Get historical data if available | |
| history = context.get('history', {}) | |
| total_cases = sum(history.values()) if history else len(hypotheses) | |
| for hypothesis in hypotheses: | |
| name = hypothesis['name'] | |
| # Calculate prior from history or use uniform prior | |
| if name in history: | |
| priors[name] = history[name] / total_cases | |
| else: | |
| priors[name] = 1.0 / len(hypotheses) | |
| # Adjust prior based on factors | |
| factor_weight = len(hypothesis['factors']) / 10 # Normalize factor count | |
| priors[name] = ( | |
| priors[name] * (1 - self.prior_weight) + | |
| factor_weight * self.prior_weight | |
| ) | |
| # Normalize priors | |
| total_prior = sum(priors.values()) | |
| if total_prior > 0: | |
| priors = { | |
| name: prob / total_prior | |
| for name, prob in priors.items() | |
| } | |
| return priors | |
| async def _update_with_evidence( | |
| self, | |
| hypotheses: List[Dict[str, Any]], | |
| priors: Dict[str, float], | |
| context: Dict[str, Any] | |
| ) -> Dict[str, float]: | |
| """Update probabilities with evidence.""" | |
| posteriors = priors.copy() | |
| # Get evidence from context | |
| evidence = context.get('evidence', []) | |
| if not evidence: | |
| return posteriors | |
| for e in evidence: | |
| # Calculate likelihood for each hypothesis | |
| likelihoods = {} | |
| for hypothesis in hypotheses: | |
| name = hypothesis['name'] | |
| likelihood = self._calculate_likelihood(hypothesis, e) | |
| likelihoods[name] = max(likelihood, self.min_likelihood) | |
| # Update posteriors using Bayes' rule | |
| total_probability = sum( | |
| likelihoods[name] * posteriors[name] | |
| for name in posteriors | |
| ) | |
| if total_probability > 0: | |
| posteriors = { | |
| name: (likelihoods[name] * posteriors[name]) / total_probability | |
| for name in posteriors | |
| } | |
| return posteriors | |
| def _calculate_likelihood( | |
| self, | |
| hypothesis: Dict[str, Any], | |
| evidence: Dict[str, Any] | |
| ) -> float: | |
| """Calculate likelihood of evidence given hypothesis.""" | |
| # Extract evidence factors | |
| evidence_factors = set( | |
| str(v).lower() | |
| for v in evidence.values() | |
| if isinstance(v, (str, int, float)) | |
| ) | |
| # Compare with hypothesis factors | |
| common_factors = evidence_factors.intersection(hypothesis['factors']) | |
| if not evidence_factors: | |
| return 0.5 # Neutral likelihood if no factors | |
| return len(common_factors) / len(evidence_factors) | |
| async def _generate_analysis( | |
| self, | |
| posteriors: Dict[str, float], | |
| context: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Generate probabilistic analysis.""" | |
| # Sort hypotheses by posterior probability | |
| ranked_hypotheses = sorted( | |
| posteriors.items(), | |
| key=lambda x: x[1], | |
| reverse=True | |
| ) | |
| # Calculate statistics | |
| mean = np.mean(list(posteriors.values())) | |
| std = np.std(list(posteriors.values())) | |
| entropy = -sum( | |
| p * np.log2(p) if p > 0 else 0 | |
| for p in posteriors.values() | |
| ) | |
| return { | |
| 'top_hypothesis': ranked_hypotheses[0][0], | |
| 'probability': ranked_hypotheses[0][1], | |
| 'alternatives': [ | |
| {'name': name, 'probability': prob} | |
| for name, prob in ranked_hypotheses[1:] | |
| ], | |
| 'statistics': { | |
| 'mean': mean, | |
| 'std': std, | |
| 'entropy': entropy | |
| } | |
| } | |
| def _format_analysis(self, analysis: Dict[str, Any]) -> str: | |
| """Format analysis into readable text.""" | |
| sections = [] | |
| # Top hypothesis | |
| sections.append( | |
| f"Most likely hypothesis: {analysis['top_hypothesis']} " | |
| f"(probability: {analysis['probability']:.2%})" | |
| ) | |
| # Alternative hypotheses | |
| if analysis['alternatives']: | |
| sections.append("\nAlternative hypotheses:") | |
| for alt in analysis['alternatives']: | |
| sections.append( | |
| f"- {alt['name']}: {alt['probability']:.2%}" | |
| ) | |
| # Statistics | |
| stats = analysis['statistics'] | |
| sections.append("\nDistribution statistics:") | |
| sections.append(f"- Mean probability: {stats['mean']:.2%}") | |
| sections.append(f"- Standard deviation: {stats['std']:.2%}") | |
| sections.append(f"- Entropy: {stats['entropy']:.2f} bits") | |
| return "\n".join(sections) | |
| def _calculate_confidence(self, posteriors: Dict[str, float]) -> float: | |
| """Calculate overall confidence score.""" | |
| if not posteriors: | |
| return 0.0 | |
| # Base confidence | |
| confidence = 0.5 | |
| # Adjust based on probability distribution | |
| probs = list(posteriors.values()) | |
| # Strong leading hypothesis increases confidence | |
| max_prob = max(probs) | |
| if max_prob > 0.8: | |
| confidence += 0.3 | |
| elif max_prob > 0.6: | |
| confidence += 0.2 | |
| elif max_prob > 0.4: | |
| confidence += 0.1 | |
| # Low entropy (clear distinction) increases confidence | |
| entropy = -sum(p * np.log2(p) if p > 0 else 0 for p in probs) | |
| max_entropy = -np.log2(1/len(probs)) # Maximum possible entropy | |
| if entropy < 0.3 * max_entropy: | |
| confidence += 0.2 | |
| elif entropy < 0.6 * max_entropy: | |
| confidence += 0.1 | |
| return min(confidence, 1.0) | |
| def _extract_factors(self, text: str, terms: Set[str]) -> Set[str]: | |
| """Extract relevant factors from text.""" | |
| return set(word.lower() for word in text.split() if word.lower() in terms) | |
| def _generate_alternative_factors(self, terms: Set[str]) -> Set[str]: | |
| """Generate factors for alternative hypothesis.""" | |
| # Simple approach: use terms not in primary hypothesis | |
| return set( | |
| word for word in terms | |
| if not any( | |
| similar in word or word in similar | |
| for similar in terms | |
| ) | |
| ) | |