""" Feedback System Collect and learn from user ratings and corrections """ import json from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any, List from enum import Enum class FeedbackType(str, Enum): """Types of feedback""" RATING = "rating" CORRECTION = "correction" THUMBS_UP = "thumbs_up" THUMBS_DOWN = "thumbs_down" REPORT = "report" class FeedbackCategory(str, Enum): """Feedback categories""" ACCURACY = "accuracy" HELPFULNESS = "helpfulness" TONE = "tone" COMPLETENESS = "completeness" SAFETY = "safety" OTHER = "other" class FeedbackCollector: """Collect user feedback on agent responses""" def __init__(self, storage_dir: str = "feedback/data"): self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) # Create subdirectories (self.storage_dir / "ratings").mkdir(exist_ok=True) (self.storage_dir / "corrections").mkdir(exist_ok=True) (self.storage_dir / "reports").mkdir(exist_ok=True) def collect_rating( self, user_id: str, agent_name: str, user_message: str, agent_response: str, rating: int, category: Optional[FeedbackCategory] = None, comment: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> str: """ Collect user rating for an agent response Args: user_id: User identifier agent_name: Name of the agent user_message: User's original message agent_response: Agent's response rating: Rating (1-5 stars) category: Feedback category comment: Optional user comment metadata: Additional metadata Returns: Feedback ID """ feedback_id = f"{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" feedback_data = { 'feedback_id': feedback_id, 'user_id': user_id, 'agent_name': agent_name, 'feedback_type': FeedbackType.RATING, 'rating': rating, 'category': category.value if category else None, 'user_message': user_message, 'agent_response': agent_response, 'comment': comment, 'metadata': metadata or {}, 'timestamp': datetime.now().isoformat() } # Save to file file_path = self.storage_dir / "ratings" / f"{feedback_id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(feedback_data, f, ensure_ascii=False, indent=2) return feedback_id def collect_correction( self, user_id: str, agent_name: str, user_message: str, agent_response: str, corrected_response: str, correction_reason: str, metadata: Optional[Dict[str, Any]] = None ) -> str: """ Collect user correction for an agent response Args: user_id: User identifier agent_name: Name of the agent user_message: User's original message agent_response: Agent's incorrect response corrected_response: User's corrected response correction_reason: Why the correction was needed metadata: Additional metadata Returns: Feedback ID """ feedback_id = f"{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" feedback_data = { 'feedback_id': feedback_id, 'user_id': user_id, 'agent_name': agent_name, 'feedback_type': FeedbackType.CORRECTION, 'user_message': user_message, 'agent_response': agent_response, 'corrected_response': corrected_response, 'correction_reason': correction_reason, 'metadata': metadata or {}, 'timestamp': datetime.now().isoformat() } # Save to file file_path = self.storage_dir / "corrections" / f"{feedback_id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(feedback_data, f, ensure_ascii=False, indent=2) return feedback_id def collect_thumbs( self, user_id: str, agent_name: str, user_message: str, agent_response: str, is_positive: bool, comment: Optional[str] = None ) -> str: """ Collect thumbs up/down feedback Args: user_id: User identifier agent_name: Name of the agent user_message: User's original message agent_response: Agent's response is_positive: True for thumbs up, False for thumbs down comment: Optional comment Returns: Feedback ID """ feedback_type = FeedbackType.THUMBS_UP if is_positive else FeedbackType.THUMBS_DOWN return self.collect_rating( user_id=user_id, agent_name=agent_name, user_message=user_message, agent_response=agent_response, rating=5 if is_positive else 1, comment=comment, metadata={'feedback_type': feedback_type} ) def report_issue( self, user_id: str, agent_name: str, user_message: str, agent_response: str, issue_type: str, description: str, severity: str = "medium" ) -> str: """ Report an issue with agent response Args: user_id: User identifier agent_name: Name of the agent user_message: User's original message agent_response: Agent's problematic response issue_type: Type of issue (harmful/incorrect/inappropriate/other) description: Detailed description severity: low/medium/high/critical Returns: Report ID """ report_id = f"report_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" report_data = { 'report_id': report_id, 'user_id': user_id, 'agent_name': agent_name, 'feedback_type': FeedbackType.REPORT, 'user_message': user_message, 'agent_response': agent_response, 'issue_type': issue_type, 'description': description, 'severity': severity, 'status': 'pending', 'timestamp': datetime.now().isoformat() } # Save to file file_path = self.storage_dir / "reports" / f"{report_id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(report_data, f, ensure_ascii=False, indent=2) return report_id def get_feedback_stats(self, agent_name: Optional[str] = None) -> Dict[str, Any]: """ Get feedback statistics Args: agent_name: Filter by agent name (optional) Returns: Statistics dictionary """ stats = { 'total_ratings': 0, 'total_corrections': 0, 'total_reports': 0, 'average_rating': 0.0, 'rating_distribution': {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}, 'by_agent': {}, 'by_category': {} } # Count ratings ratings = [] for file_path in (self.storage_dir / "ratings").glob("*.json"): with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if agent_name and data.get('agent_name') != agent_name: continue rating = data.get('rating', 0) ratings.append(rating) stats['rating_distribution'][rating] += 1 # By agent agent = data.get('agent_name', 'unknown') if agent not in stats['by_agent']: stats['by_agent'][agent] = {'count': 0, 'total_rating': 0} stats['by_agent'][agent]['count'] += 1 stats['by_agent'][agent]['total_rating'] += rating # By category category = data.get('category', 'other') if category not in stats['by_category']: stats['by_category'][category] = 0 stats['by_category'][category] += 1 stats['total_ratings'] = len(ratings) stats['average_rating'] = sum(ratings) / len(ratings) if ratings else 0.0 # Calculate average per agent for agent in stats['by_agent']: count = stats['by_agent'][agent]['count'] total = stats['by_agent'][agent]['total_rating'] stats['by_agent'][agent]['average'] = total / count if count > 0 else 0.0 # Count corrections stats['total_corrections'] = len(list((self.storage_dir / "corrections").glob("*.json"))) # Count reports stats['total_reports'] = len(list((self.storage_dir / "reports").glob("*.json"))) return stats def get_low_rated_responses( self, min_rating: int = 2, agent_name: Optional[str] = None, limit: int = 50 ) -> List[Dict[str, Any]]: """ Get low-rated responses for improvement Args: min_rating: Maximum rating to include (1-5) agent_name: Filter by agent name limit: Maximum number of results Returns: List of low-rated responses """ low_rated = [] for file_path in (self.storage_dir / "ratings").glob("*.json"): with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if data.get('rating', 5) <= min_rating: if agent_name is None or data.get('agent_name') == agent_name: low_rated.append(data) # Sort by rating (lowest first) low_rated.sort(key=lambda x: x.get('rating', 5)) return low_rated[:limit] def get_corrections( self, agent_name: Optional[str] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """ Get user corrections for learning Args: agent_name: Filter by agent name limit: Maximum number of results Returns: List of corrections """ corrections = [] for file_path in (self.storage_dir / "corrections").glob("*.json"): with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if agent_name is None or data.get('agent_name') == agent_name: corrections.append(data) # Sort by timestamp (newest first) corrections.sort(key=lambda x: x.get('timestamp', ''), reverse=True) return corrections[:limit] def export_for_fine_tuning( self, agent_name: str, min_rating: int = 4, include_corrections: bool = True, output_file: Optional[str] = None ) -> str: """ Export high-quality feedback for fine-tuning Args: agent_name: Agent to export for min_rating: Minimum rating to include include_corrections: Include user corrections output_file: Output file path Returns: Path to exported file """ if output_file is None: output_file = f"feedback_training_{agent_name}_{datetime.now().strftime('%Y%m%d')}.jsonl" output_path = self.storage_dir / output_file training_data = [] # Add high-rated responses for file_path in (self.storage_dir / "ratings").glob("*.json"): with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if data.get('agent_name') == agent_name and data.get('rating', 0) >= min_rating: training_data.append({ 'messages': [ {'role': 'user', 'content': data['user_message']}, {'role': 'assistant', 'content': data['agent_response']} ], 'metadata': { 'rating': data['rating'], 'source': 'user_rating' } }) # Add corrections if include_corrections: for file_path in (self.storage_dir / "corrections").glob("*.json"): with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if data.get('agent_name') == agent_name: training_data.append({ 'messages': [ {'role': 'user', 'content': data['user_message']}, {'role': 'assistant', 'content': data['corrected_response']} ], 'metadata': { 'source': 'user_correction', 'reason': data.get('correction_reason') } }) # Write to JSONL with open(output_path, 'w', encoding='utf-8') as f: for item in training_data: f.write(json.dumps(item, ensure_ascii=False) + '\n') return str(output_path) # Global instance _feedback_collector = None def get_feedback_collector() -> FeedbackCollector: """Get global feedback collector instance""" global _feedback_collector if _feedback_collector is None: _feedback_collector = FeedbackCollector() return _feedback_collector