Spaces:
Runtime error
Runtime error
File size: 8,412 Bytes
eeb0f9c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
"""
Data Collector for Fine-tuning
Collects and stores conversation data for training custom models
"""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
class ConversationDataCollector:
"""Collects conversation data for fine-tuning"""
def __init__(self, data_dir: str = "fine_tuning/data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Create subdirectories for each agent
self.agent_dirs = {
'nutrition': self.data_dir / 'nutrition',
'exercise': self.data_dir / 'exercise',
'symptom': self.data_dir / 'symptom',
'mental_health': self.data_dir / 'mental_health',
'general_health': self.data_dir / 'general_health'
}
for agent_dir in self.agent_dirs.values():
agent_dir.mkdir(exist_ok=True)
def log_conversation(
self,
agent_name: str,
user_message: str,
agent_response: str,
user_data: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Log a conversation turn for fine-tuning
Args:
agent_name: Name of the agent (nutrition, exercise, etc.)
user_message: User's message
agent_response: Agent's response
user_data: User profile data (age, gender, etc.)
metadata: Additional metadata (rating, feedback, etc.)
"""
conversation_entry = {
'timestamp': datetime.now().isoformat(),
'agent': agent_name,
'user_message': user_message,
'agent_response': agent_response,
'user_data': user_data or {},
'metadata': metadata or {}
}
# Save to agent-specific file
agent_key = agent_name.replace('_agent', '')
if agent_key in self.agent_dirs:
filename = f"conversations_{datetime.now().strftime('%Y%m%d')}.jsonl"
filepath = self.agent_dirs[agent_key] / filename
with open(filepath, 'a', encoding='utf-8') as f:
f.write(json.dumps(conversation_entry, ensure_ascii=False) + '\n')
def log_multi_turn_conversation(
self,
agent_name: str,
conversation_history: List[tuple],
user_data: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Log a multi-turn conversation
Args:
agent_name: Name of the agent
conversation_history: List of (user_msg, agent_msg) tuples
user_data: User profile data
metadata: Additional metadata
"""
multi_turn_entry = {
'timestamp': datetime.now().isoformat(),
'agent': agent_name,
'conversation': [
{'user': user_msg, 'agent': agent_msg}
for user_msg, agent_msg in conversation_history
],
'user_data': user_data or {},
'metadata': metadata or {}
}
agent_key = agent_name.replace('_agent', '')
if agent_key in self.agent_dirs:
filename = f"multi_turn_{datetime.now().strftime('%Y%m%d')}.jsonl"
filepath = self.agent_dirs[agent_key] / filename
with open(filepath, 'a', encoding='utf-8') as f:
f.write(json.dumps(multi_turn_entry, ensure_ascii=False) + '\n')
def get_conversation_count(self, agent_name: Optional[str] = None) -> Dict[str, int]:
"""
Get count of logged conversations
Args:
agent_name: Optional agent name to filter by
Returns:
Dict with agent names and conversation counts
"""
counts = {}
agents_to_check = [agent_name.replace('_agent', '')] if agent_name else self.agent_dirs.keys()
for agent_key in agents_to_check:
if agent_key in self.agent_dirs:
agent_dir = self.agent_dirs[agent_key]
count = 0
for file in agent_dir.glob('conversations_*.jsonl'):
with open(file, 'r', encoding='utf-8') as f:
count += sum(1 for _ in f)
counts[agent_key] = count
return counts
def export_for_openai_finetuning(
self,
agent_name: str,
output_file: Optional[str] = None,
min_quality_rating: Optional[float] = None
) -> str:
"""
Export conversations in OpenAI fine-tuning format
Args:
agent_name: Agent to export data for
output_file: Output file path
min_quality_rating: Minimum quality rating to include
Returns:
Path to exported file
"""
agent_key = agent_name.replace('_agent', '')
if agent_key not in self.agent_dirs:
raise ValueError(f"Unknown agent: {agent_name}")
if output_file is None:
output_file = self.data_dir / f"{agent_key}_finetuning_{datetime.now().strftime('%Y%m%d')}.jsonl"
agent_dir = self.agent_dirs[agent_key]
exported_count = 0
with open(output_file, 'w', encoding='utf-8') as out_f:
# Process single-turn conversations
for file in agent_dir.glob('conversations_*.jsonl'):
with open(file, 'r', encoding='utf-8') as in_f:
for line in in_f:
entry = json.loads(line)
# Filter by quality rating if specified
if min_quality_rating:
rating = entry.get('metadata', {}).get('rating')
if rating is None or rating < min_quality_rating:
continue
# Convert to OpenAI format
openai_format = {
"messages": [
{"role": "system", "content": f"You are a {agent_key} specialist."},
{"role": "user", "content": entry['user_message']},
{"role": "assistant", "content": entry['agent_response']}
]
}
out_f.write(json.dumps(openai_format, ensure_ascii=False) + '\n')
exported_count += 1
# Process multi-turn conversations
for file in agent_dir.glob('multi_turn_*.jsonl'):
with open(file, 'r', encoding='utf-8') as in_f:
for line in in_f:
entry = json.loads(line)
# Filter by quality rating if specified
if min_quality_rating:
rating = entry.get('metadata', {}).get('rating')
if rating is None or rating < min_quality_rating:
continue
# Convert to OpenAI format
messages = [{"role": "system", "content": f"You are a {agent_key} specialist."}]
for turn in entry['conversation']:
messages.append({"role": "user", "content": turn['user']})
messages.append({"role": "assistant", "content": turn['agent']})
openai_format = {"messages": messages}
out_f.write(json.dumps(openai_format, ensure_ascii=False) + '\n')
exported_count += 1
print(f"✅ Exported {exported_count} conversations to {output_file}")
return str(output_file)
# Global instance
_collector = None
def get_data_collector() -> ConversationDataCollector:
"""Get global data collector instance"""
global _collector
if _collector is None:
_collector = ConversationDataCollector()
return _collector
|