my-gradio-app / fine_tuning /data_collector.py
Nguyen Trong Lap
Recreate history without binary blobs
eeb0f9c
"""
Data Collector for Fine-tuning
Collects and stores conversation data for training custom models
"""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
class ConversationDataCollector:
"""Collects conversation data for fine-tuning"""
def __init__(self, data_dir: str = "fine_tuning/data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Create subdirectories for each agent
self.agent_dirs = {
'nutrition': self.data_dir / 'nutrition',
'exercise': self.data_dir / 'exercise',
'symptom': self.data_dir / 'symptom',
'mental_health': self.data_dir / 'mental_health',
'general_health': self.data_dir / 'general_health'
}
for agent_dir in self.agent_dirs.values():
agent_dir.mkdir(exist_ok=True)
def log_conversation(
self,
agent_name: str,
user_message: str,
agent_response: str,
user_data: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Log a conversation turn for fine-tuning
Args:
agent_name: Name of the agent (nutrition, exercise, etc.)
user_message: User's message
agent_response: Agent's response
user_data: User profile data (age, gender, etc.)
metadata: Additional metadata (rating, feedback, etc.)
"""
conversation_entry = {
'timestamp': datetime.now().isoformat(),
'agent': agent_name,
'user_message': user_message,
'agent_response': agent_response,
'user_data': user_data or {},
'metadata': metadata or {}
}
# Save to agent-specific file
agent_key = agent_name.replace('_agent', '')
if agent_key in self.agent_dirs:
filename = f"conversations_{datetime.now().strftime('%Y%m%d')}.jsonl"
filepath = self.agent_dirs[agent_key] / filename
with open(filepath, 'a', encoding='utf-8') as f:
f.write(json.dumps(conversation_entry, ensure_ascii=False) + '\n')
def log_multi_turn_conversation(
self,
agent_name: str,
conversation_history: List[tuple],
user_data: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Log a multi-turn conversation
Args:
agent_name: Name of the agent
conversation_history: List of (user_msg, agent_msg) tuples
user_data: User profile data
metadata: Additional metadata
"""
multi_turn_entry = {
'timestamp': datetime.now().isoformat(),
'agent': agent_name,
'conversation': [
{'user': user_msg, 'agent': agent_msg}
for user_msg, agent_msg in conversation_history
],
'user_data': user_data or {},
'metadata': metadata or {}
}
agent_key = agent_name.replace('_agent', '')
if agent_key in self.agent_dirs:
filename = f"multi_turn_{datetime.now().strftime('%Y%m%d')}.jsonl"
filepath = self.agent_dirs[agent_key] / filename
with open(filepath, 'a', encoding='utf-8') as f:
f.write(json.dumps(multi_turn_entry, ensure_ascii=False) + '\n')
def get_conversation_count(self, agent_name: Optional[str] = None) -> Dict[str, int]:
"""
Get count of logged conversations
Args:
agent_name: Optional agent name to filter by
Returns:
Dict with agent names and conversation counts
"""
counts = {}
agents_to_check = [agent_name.replace('_agent', '')] if agent_name else self.agent_dirs.keys()
for agent_key in agents_to_check:
if agent_key in self.agent_dirs:
agent_dir = self.agent_dirs[agent_key]
count = 0
for file in agent_dir.glob('conversations_*.jsonl'):
with open(file, 'r', encoding='utf-8') as f:
count += sum(1 for _ in f)
counts[agent_key] = count
return counts
def export_for_openai_finetuning(
self,
agent_name: str,
output_file: Optional[str] = None,
min_quality_rating: Optional[float] = None
) -> str:
"""
Export conversations in OpenAI fine-tuning format
Args:
agent_name: Agent to export data for
output_file: Output file path
min_quality_rating: Minimum quality rating to include
Returns:
Path to exported file
"""
agent_key = agent_name.replace('_agent', '')
if agent_key not in self.agent_dirs:
raise ValueError(f"Unknown agent: {agent_name}")
if output_file is None:
output_file = self.data_dir / f"{agent_key}_finetuning_{datetime.now().strftime('%Y%m%d')}.jsonl"
agent_dir = self.agent_dirs[agent_key]
exported_count = 0
with open(output_file, 'w', encoding='utf-8') as out_f:
# Process single-turn conversations
for file in agent_dir.glob('conversations_*.jsonl'):
with open(file, 'r', encoding='utf-8') as in_f:
for line in in_f:
entry = json.loads(line)
# Filter by quality rating if specified
if min_quality_rating:
rating = entry.get('metadata', {}).get('rating')
if rating is None or rating < min_quality_rating:
continue
# Convert to OpenAI format
openai_format = {
"messages": [
{"role": "system", "content": f"You are a {agent_key} specialist."},
{"role": "user", "content": entry['user_message']},
{"role": "assistant", "content": entry['agent_response']}
]
}
out_f.write(json.dumps(openai_format, ensure_ascii=False) + '\n')
exported_count += 1
# Process multi-turn conversations
for file in agent_dir.glob('multi_turn_*.jsonl'):
with open(file, 'r', encoding='utf-8') as in_f:
for line in in_f:
entry = json.loads(line)
# Filter by quality rating if specified
if min_quality_rating:
rating = entry.get('metadata', {}).get('rating')
if rating is None or rating < min_quality_rating:
continue
# Convert to OpenAI format
messages = [{"role": "system", "content": f"You are a {agent_key} specialist."}]
for turn in entry['conversation']:
messages.append({"role": "user", "content": turn['user']})
messages.append({"role": "assistant", "content": turn['agent']})
openai_format = {"messages": messages}
out_f.write(json.dumps(openai_format, ensure_ascii=False) + '\n')
exported_count += 1
print(f"✅ Exported {exported_count} conversations to {output_file}")
return str(output_file)
# Global instance
_collector = None
def get_data_collector() -> ConversationDataCollector:
"""Get global data collector instance"""
global _collector
if _collector is None:
_collector = ConversationDataCollector()
return _collector