Spaces:
Runtime error
Runtime error
| """ | |
| Feedback System | |
| Collect and learn from user ratings and corrections | |
| """ | |
| import json | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, List | |
| from enum import Enum | |
| class FeedbackType(str, Enum): | |
| """Types of feedback""" | |
| RATING = "rating" | |
| CORRECTION = "correction" | |
| THUMBS_UP = "thumbs_up" | |
| THUMBS_DOWN = "thumbs_down" | |
| REPORT = "report" | |
| class FeedbackCategory(str, Enum): | |
| """Feedback categories""" | |
| ACCURACY = "accuracy" | |
| HELPFULNESS = "helpfulness" | |
| TONE = "tone" | |
| COMPLETENESS = "completeness" | |
| SAFETY = "safety" | |
| OTHER = "other" | |
| class FeedbackCollector: | |
| """Collect user feedback on agent responses""" | |
| def __init__(self, storage_dir: str = "feedback/data"): | |
| self.storage_dir = Path(storage_dir) | |
| self.storage_dir.mkdir(parents=True, exist_ok=True) | |
| # Create subdirectories | |
| (self.storage_dir / "ratings").mkdir(exist_ok=True) | |
| (self.storage_dir / "corrections").mkdir(exist_ok=True) | |
| (self.storage_dir / "reports").mkdir(exist_ok=True) | |
| def collect_rating( | |
| self, | |
| user_id: str, | |
| agent_name: str, | |
| user_message: str, | |
| agent_response: str, | |
| rating: int, | |
| category: Optional[FeedbackCategory] = None, | |
| comment: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Collect user rating for an agent response | |
| Args: | |
| user_id: User identifier | |
| agent_name: Name of the agent | |
| user_message: User's original message | |
| agent_response: Agent's response | |
| rating: Rating (1-5 stars) | |
| category: Feedback category | |
| comment: Optional user comment | |
| metadata: Additional metadata | |
| Returns: | |
| Feedback ID | |
| """ | |
| feedback_id = f"{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| feedback_data = { | |
| 'feedback_id': feedback_id, | |
| 'user_id': user_id, | |
| 'agent_name': agent_name, | |
| 'feedback_type': FeedbackType.RATING, | |
| 'rating': rating, | |
| 'category': category.value if category else None, | |
| 'user_message': user_message, | |
| 'agent_response': agent_response, | |
| 'comment': comment, | |
| 'metadata': metadata or {}, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # Save to file | |
| file_path = self.storage_dir / "ratings" / f"{feedback_id}.json" | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(feedback_data, f, ensure_ascii=False, indent=2) | |
| return feedback_id | |
| def collect_correction( | |
| self, | |
| user_id: str, | |
| agent_name: str, | |
| user_message: str, | |
| agent_response: str, | |
| corrected_response: str, | |
| correction_reason: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Collect user correction for an agent response | |
| Args: | |
| user_id: User identifier | |
| agent_name: Name of the agent | |
| user_message: User's original message | |
| agent_response: Agent's incorrect response | |
| corrected_response: User's corrected response | |
| correction_reason: Why the correction was needed | |
| metadata: Additional metadata | |
| Returns: | |
| Feedback ID | |
| """ | |
| feedback_id = f"{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| feedback_data = { | |
| 'feedback_id': feedback_id, | |
| 'user_id': user_id, | |
| 'agent_name': agent_name, | |
| 'feedback_type': FeedbackType.CORRECTION, | |
| 'user_message': user_message, | |
| 'agent_response': agent_response, | |
| 'corrected_response': corrected_response, | |
| 'correction_reason': correction_reason, | |
| 'metadata': metadata or {}, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # Save to file | |
| file_path = self.storage_dir / "corrections" / f"{feedback_id}.json" | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(feedback_data, f, ensure_ascii=False, indent=2) | |
| return feedback_id | |
| def collect_thumbs( | |
| self, | |
| user_id: str, | |
| agent_name: str, | |
| user_message: str, | |
| agent_response: str, | |
| is_positive: bool, | |
| comment: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Collect thumbs up/down feedback | |
| Args: | |
| user_id: User identifier | |
| agent_name: Name of the agent | |
| user_message: User's original message | |
| agent_response: Agent's response | |
| is_positive: True for thumbs up, False for thumbs down | |
| comment: Optional comment | |
| Returns: | |
| Feedback ID | |
| """ | |
| feedback_type = FeedbackType.THUMBS_UP if is_positive else FeedbackType.THUMBS_DOWN | |
| return self.collect_rating( | |
| user_id=user_id, | |
| agent_name=agent_name, | |
| user_message=user_message, | |
| agent_response=agent_response, | |
| rating=5 if is_positive else 1, | |
| comment=comment, | |
| metadata={'feedback_type': feedback_type} | |
| ) | |
| def report_issue( | |
| self, | |
| user_id: str, | |
| agent_name: str, | |
| user_message: str, | |
| agent_response: str, | |
| issue_type: str, | |
| description: str, | |
| severity: str = "medium" | |
| ) -> str: | |
| """ | |
| Report an issue with agent response | |
| Args: | |
| user_id: User identifier | |
| agent_name: Name of the agent | |
| user_message: User's original message | |
| agent_response: Agent's problematic response | |
| issue_type: Type of issue (harmful/incorrect/inappropriate/other) | |
| description: Detailed description | |
| severity: low/medium/high/critical | |
| Returns: | |
| Report ID | |
| """ | |
| report_id = f"report_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| report_data = { | |
| 'report_id': report_id, | |
| 'user_id': user_id, | |
| 'agent_name': agent_name, | |
| 'feedback_type': FeedbackType.REPORT, | |
| 'user_message': user_message, | |
| 'agent_response': agent_response, | |
| 'issue_type': issue_type, | |
| 'description': description, | |
| 'severity': severity, | |
| 'status': 'pending', | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # Save to file | |
| file_path = self.storage_dir / "reports" / f"{report_id}.json" | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(report_data, f, ensure_ascii=False, indent=2) | |
| return report_id | |
| def get_feedback_stats(self, agent_name: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Get feedback statistics | |
| Args: | |
| agent_name: Filter by agent name (optional) | |
| Returns: | |
| Statistics dictionary | |
| """ | |
| stats = { | |
| 'total_ratings': 0, | |
| 'total_corrections': 0, | |
| 'total_reports': 0, | |
| 'average_rating': 0.0, | |
| 'rating_distribution': {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}, | |
| 'by_agent': {}, | |
| 'by_category': {} | |
| } | |
| # Count ratings | |
| ratings = [] | |
| for file_path in (self.storage_dir / "ratings").glob("*.json"): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if agent_name and data.get('agent_name') != agent_name: | |
| continue | |
| rating = data.get('rating', 0) | |
| ratings.append(rating) | |
| stats['rating_distribution'][rating] += 1 | |
| # By agent | |
| agent = data.get('agent_name', 'unknown') | |
| if agent not in stats['by_agent']: | |
| stats['by_agent'][agent] = {'count': 0, 'total_rating': 0} | |
| stats['by_agent'][agent]['count'] += 1 | |
| stats['by_agent'][agent]['total_rating'] += rating | |
| # By category | |
| category = data.get('category', 'other') | |
| if category not in stats['by_category']: | |
| stats['by_category'][category] = 0 | |
| stats['by_category'][category] += 1 | |
| stats['total_ratings'] = len(ratings) | |
| stats['average_rating'] = sum(ratings) / len(ratings) if ratings else 0.0 | |
| # Calculate average per agent | |
| for agent in stats['by_agent']: | |
| count = stats['by_agent'][agent]['count'] | |
| total = stats['by_agent'][agent]['total_rating'] | |
| stats['by_agent'][agent]['average'] = total / count if count > 0 else 0.0 | |
| # Count corrections | |
| stats['total_corrections'] = len(list((self.storage_dir / "corrections").glob("*.json"))) | |
| # Count reports | |
| stats['total_reports'] = len(list((self.storage_dir / "reports").glob("*.json"))) | |
| return stats | |
| def get_low_rated_responses( | |
| self, | |
| min_rating: int = 2, | |
| agent_name: Optional[str] = None, | |
| limit: int = 50 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get low-rated responses for improvement | |
| Args: | |
| min_rating: Maximum rating to include (1-5) | |
| agent_name: Filter by agent name | |
| limit: Maximum number of results | |
| Returns: | |
| List of low-rated responses | |
| """ | |
| low_rated = [] | |
| for file_path in (self.storage_dir / "ratings").glob("*.json"): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if data.get('rating', 5) <= min_rating: | |
| if agent_name is None or data.get('agent_name') == agent_name: | |
| low_rated.append(data) | |
| # Sort by rating (lowest first) | |
| low_rated.sort(key=lambda x: x.get('rating', 5)) | |
| return low_rated[:limit] | |
| def get_corrections( | |
| self, | |
| agent_name: Optional[str] = None, | |
| limit: int = 100 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get user corrections for learning | |
| Args: | |
| agent_name: Filter by agent name | |
| limit: Maximum number of results | |
| Returns: | |
| List of corrections | |
| """ | |
| corrections = [] | |
| for file_path in (self.storage_dir / "corrections").glob("*.json"): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if agent_name is None or data.get('agent_name') == agent_name: | |
| corrections.append(data) | |
| # Sort by timestamp (newest first) | |
| corrections.sort(key=lambda x: x.get('timestamp', ''), reverse=True) | |
| return corrections[:limit] | |
| def export_for_fine_tuning( | |
| self, | |
| agent_name: str, | |
| min_rating: int = 4, | |
| include_corrections: bool = True, | |
| output_file: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Export high-quality feedback for fine-tuning | |
| Args: | |
| agent_name: Agent to export for | |
| min_rating: Minimum rating to include | |
| include_corrections: Include user corrections | |
| output_file: Output file path | |
| Returns: | |
| Path to exported file | |
| """ | |
| if output_file is None: | |
| output_file = f"feedback_training_{agent_name}_{datetime.now().strftime('%Y%m%d')}.jsonl" | |
| output_path = self.storage_dir / output_file | |
| training_data = [] | |
| # Add high-rated responses | |
| for file_path in (self.storage_dir / "ratings").glob("*.json"): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if data.get('agent_name') == agent_name and data.get('rating', 0) >= min_rating: | |
| training_data.append({ | |
| 'messages': [ | |
| {'role': 'user', 'content': data['user_message']}, | |
| {'role': 'assistant', 'content': data['agent_response']} | |
| ], | |
| 'metadata': { | |
| 'rating': data['rating'], | |
| 'source': 'user_rating' | |
| } | |
| }) | |
| # Add corrections | |
| if include_corrections: | |
| for file_path in (self.storage_dir / "corrections").glob("*.json"): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if data.get('agent_name') == agent_name: | |
| training_data.append({ | |
| 'messages': [ | |
| {'role': 'user', 'content': data['user_message']}, | |
| {'role': 'assistant', 'content': data['corrected_response']} | |
| ], | |
| 'metadata': { | |
| 'source': 'user_correction', | |
| 'reason': data.get('correction_reason') | |
| } | |
| }) | |
| # Write to JSONL | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| for item in training_data: | |
| f.write(json.dumps(item, ensure_ascii=False) + '\n') | |
| return str(output_path) | |
| # Global instance | |
| _feedback_collector = None | |
| def get_feedback_collector() -> FeedbackCollector: | |
| """Get global feedback collector instance""" | |
| global _feedback_collector | |
| if _feedback_collector is None: | |
| _feedback_collector = FeedbackCollector() | |
| return _feedback_collector | |