Spaces:
Runtime error
Runtime error
| """ | |
| Data Collector for Fine-tuning | |
| Collects and stores conversation data for training custom models | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| class ConversationDataCollector: | |
| """Collects conversation data for fine-tuning""" | |
| def __init__(self, data_dir: str = "fine_tuning/data"): | |
| self.data_dir = Path(data_dir) | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| # Create subdirectories for each agent | |
| self.agent_dirs = { | |
| 'nutrition': self.data_dir / 'nutrition', | |
| 'exercise': self.data_dir / 'exercise', | |
| 'symptom': self.data_dir / 'symptom', | |
| 'mental_health': self.data_dir / 'mental_health', | |
| 'general_health': self.data_dir / 'general_health' | |
| } | |
| for agent_dir in self.agent_dirs.values(): | |
| agent_dir.mkdir(exist_ok=True) | |
| def log_conversation( | |
| self, | |
| agent_name: str, | |
| user_message: str, | |
| agent_response: str, | |
| user_data: Optional[Dict[str, Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> None: | |
| """ | |
| Log a conversation turn for fine-tuning | |
| Args: | |
| agent_name: Name of the agent (nutrition, exercise, etc.) | |
| user_message: User's message | |
| agent_response: Agent's response | |
| user_data: User profile data (age, gender, etc.) | |
| metadata: Additional metadata (rating, feedback, etc.) | |
| """ | |
| conversation_entry = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'agent': agent_name, | |
| 'user_message': user_message, | |
| 'agent_response': agent_response, | |
| 'user_data': user_data or {}, | |
| 'metadata': metadata or {} | |
| } | |
| # Save to agent-specific file | |
| agent_key = agent_name.replace('_agent', '') | |
| if agent_key in self.agent_dirs: | |
| filename = f"conversations_{datetime.now().strftime('%Y%m%d')}.jsonl" | |
| filepath = self.agent_dirs[agent_key] / filename | |
| with open(filepath, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(conversation_entry, ensure_ascii=False) + '\n') | |
| def log_multi_turn_conversation( | |
| self, | |
| agent_name: str, | |
| conversation_history: List[tuple], | |
| user_data: Optional[Dict[str, Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> None: | |
| """ | |
| Log a multi-turn conversation | |
| Args: | |
| agent_name: Name of the agent | |
| conversation_history: List of (user_msg, agent_msg) tuples | |
| user_data: User profile data | |
| metadata: Additional metadata | |
| """ | |
| multi_turn_entry = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'agent': agent_name, | |
| 'conversation': [ | |
| {'user': user_msg, 'agent': agent_msg} | |
| for user_msg, agent_msg in conversation_history | |
| ], | |
| 'user_data': user_data or {}, | |
| 'metadata': metadata or {} | |
| } | |
| agent_key = agent_name.replace('_agent', '') | |
| if agent_key in self.agent_dirs: | |
| filename = f"multi_turn_{datetime.now().strftime('%Y%m%d')}.jsonl" | |
| filepath = self.agent_dirs[agent_key] / filename | |
| with open(filepath, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(multi_turn_entry, ensure_ascii=False) + '\n') | |
| def get_conversation_count(self, agent_name: Optional[str] = None) -> Dict[str, int]: | |
| """ | |
| Get count of logged conversations | |
| Args: | |
| agent_name: Optional agent name to filter by | |
| Returns: | |
| Dict with agent names and conversation counts | |
| """ | |
| counts = {} | |
| agents_to_check = [agent_name.replace('_agent', '')] if agent_name else self.agent_dirs.keys() | |
| for agent_key in agents_to_check: | |
| if agent_key in self.agent_dirs: | |
| agent_dir = self.agent_dirs[agent_key] | |
| count = 0 | |
| for file in agent_dir.glob('conversations_*.jsonl'): | |
| with open(file, 'r', encoding='utf-8') as f: | |
| count += sum(1 for _ in f) | |
| counts[agent_key] = count | |
| return counts | |
| def export_for_openai_finetuning( | |
| self, | |
| agent_name: str, | |
| output_file: Optional[str] = None, | |
| min_quality_rating: Optional[float] = None | |
| ) -> str: | |
| """ | |
| Export conversations in OpenAI fine-tuning format | |
| Args: | |
| agent_name: Agent to export data for | |
| output_file: Output file path | |
| min_quality_rating: Minimum quality rating to include | |
| Returns: | |
| Path to exported file | |
| """ | |
| agent_key = agent_name.replace('_agent', '') | |
| if agent_key not in self.agent_dirs: | |
| raise ValueError(f"Unknown agent: {agent_name}") | |
| if output_file is None: | |
| output_file = self.data_dir / f"{agent_key}_finetuning_{datetime.now().strftime('%Y%m%d')}.jsonl" | |
| agent_dir = self.agent_dirs[agent_key] | |
| exported_count = 0 | |
| with open(output_file, 'w', encoding='utf-8') as out_f: | |
| # Process single-turn conversations | |
| for file in agent_dir.glob('conversations_*.jsonl'): | |
| with open(file, 'r', encoding='utf-8') as in_f: | |
| for line in in_f: | |
| entry = json.loads(line) | |
| # Filter by quality rating if specified | |
| if min_quality_rating: | |
| rating = entry.get('metadata', {}).get('rating') | |
| if rating is None or rating < min_quality_rating: | |
| continue | |
| # Convert to OpenAI format | |
| openai_format = { | |
| "messages": [ | |
| {"role": "system", "content": f"You are a {agent_key} specialist."}, | |
| {"role": "user", "content": entry['user_message']}, | |
| {"role": "assistant", "content": entry['agent_response']} | |
| ] | |
| } | |
| out_f.write(json.dumps(openai_format, ensure_ascii=False) + '\n') | |
| exported_count += 1 | |
| # Process multi-turn conversations | |
| for file in agent_dir.glob('multi_turn_*.jsonl'): | |
| with open(file, 'r', encoding='utf-8') as in_f: | |
| for line in in_f: | |
| entry = json.loads(line) | |
| # Filter by quality rating if specified | |
| if min_quality_rating: | |
| rating = entry.get('metadata', {}).get('rating') | |
| if rating is None or rating < min_quality_rating: | |
| continue | |
| # Convert to OpenAI format | |
| messages = [{"role": "system", "content": f"You are a {agent_key} specialist."}] | |
| for turn in entry['conversation']: | |
| messages.append({"role": "user", "content": turn['user']}) | |
| messages.append({"role": "assistant", "content": turn['agent']}) | |
| openai_format = {"messages": messages} | |
| out_f.write(json.dumps(openai_format, ensure_ascii=False) + '\n') | |
| exported_count += 1 | |
| print(f"✅ Exported {exported_count} conversations to {output_file}") | |
| return str(output_file) | |
| # Global instance | |
| _collector = None | |
| def get_data_collector() -> ConversationDataCollector: | |
| """Get global data collector instance""" | |
| global _collector | |
| if _collector is None: | |
| _collector = ConversationDataCollector() | |
| return _collector | |