my-gradio-app / health_data /record_merger.py
Nguyen Trong Lap
Recreate history without binary blobs
eeb0f9c
"""
Health Record Merger
Merge and aggregate health records from multiple days
"""
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from collections import defaultdict
from .pydantic_models import (
HealthRecord, NutritionRecord, ExerciseRecord,
SymptomRecord, MentalHealthRecord, RecordType
)
class HealthRecordMerger:
"""Merge and aggregate health records"""
@staticmethod
def merge_records(
records: List[HealthRecord],
merge_strategy: str = 'latest'
) -> Dict[str, Any]:
"""
Merge multiple health records into aggregated data
Args:
records: List of health records to merge
merge_strategy: 'latest', 'average', 'all'
Returns:
Merged data dictionary
"""
if not records:
return {}
# Group records by type
records_by_type = defaultdict(list)
for record in records:
records_by_type[record.record_type].append(record)
merged = {
'total_records': len(records),
'date_range': {
'start': min(r.timestamp for r in records).isoformat(),
'end': max(r.timestamp for r in records).isoformat()
},
'by_type': {}
}
# Merge each type
for record_type, type_records in records_by_type.items():
if record_type == RecordType.NUTRITION:
merged['by_type']['nutrition'] = HealthRecordMerger._merge_nutrition_records(
type_records, merge_strategy
)
elif record_type == RecordType.EXERCISE:
merged['by_type']['exercise'] = HealthRecordMerger._merge_exercise_records(
type_records, merge_strategy
)
elif record_type == RecordType.SYMPTOM:
merged['by_type']['symptom'] = HealthRecordMerger._merge_symptom_records(
type_records, merge_strategy
)
elif record_type == RecordType.MENTAL_HEALTH:
merged['by_type']['mental_health'] = HealthRecordMerger._merge_mental_health_records(
type_records, merge_strategy
)
# Extract common health metrics
merged['health_metrics'] = HealthRecordMerger._extract_health_metrics(
records, merge_strategy
)
return merged
@staticmethod
def _merge_nutrition_records(
records: List[HealthRecord],
strategy: str
) -> Dict[str, Any]:
"""Merge nutrition records"""
if strategy == 'latest':
latest = max(records, key=lambda r: r.timestamp)
return {
'latest_record': latest.model_dump(),
'total_records': len(records)
}
elif strategy == 'average':
# Calculate averages
total_calories = sum(r.data.get('calories', 0) for r in records if r.data.get('calories'))
total_protein = sum(r.data.get('protein', 0) for r in records if r.data.get('protein'))
total_carbs = sum(r.data.get('carbs', 0) for r in records if r.data.get('carbs'))
total_fat = sum(r.data.get('fat', 0) for r in records if r.data.get('fat'))
count = len(records)
return {
'average_daily': {
'calories': round(total_calories / count, 1) if count > 0 else 0,
'protein': round(total_protein / count, 1) if count > 0 else 0,
'carbs': round(total_carbs / count, 1) if count > 0 else 0,
'fat': round(total_fat / count, 1) if count > 0 else 0
},
'total': {
'calories': round(total_calories, 1),
'protein': round(total_protein, 1),
'carbs': round(total_carbs, 1),
'fat': round(total_fat, 1)
},
'total_records': count
}
else: # 'all'
return {
'all_records': [r.model_dump() for r in records],
'total_records': len(records)
}
@staticmethod
def _merge_exercise_records(
records: List[HealthRecord],
strategy: str
) -> Dict[str, Any]:
"""Merge exercise records"""
if strategy == 'latest':
latest = max(records, key=lambda r: r.timestamp)
return {
'latest_record': latest.model_dump(),
'total_records': len(records)
}
elif strategy == 'average':
total_duration = sum(r.data.get('duration_minutes', 0) for r in records)
total_calories = sum(r.data.get('calories_burned', 0) for r in records)
# Count by exercise type
exercise_types = defaultdict(int)
for r in records:
ex_type = r.data.get('exercise_type', 'unknown')
exercise_types[ex_type] += 1
count = len(records)
return {
'total_workouts': count,
'total_duration_minutes': total_duration,
'total_calories_burned': round(total_calories, 1),
'average_duration': round(total_duration / count, 1) if count > 0 else 0,
'average_calories': round(total_calories / count, 1) if count > 0 else 0,
'exercise_types': dict(exercise_types)
}
else: # 'all'
return {
'all_records': [r.model_dump() for r in records],
'total_records': len(records)
}
@staticmethod
def _merge_symptom_records(
records: List[HealthRecord],
strategy: str
) -> Dict[str, Any]:
"""Merge symptom records"""
if strategy == 'latest':
latest = max(records, key=lambda r: r.timestamp)
return {
'latest_record': latest.model_dump(),
'total_records': len(records)
}
# Collect all symptoms
all_symptoms = []
symptom_counts = defaultdict(int)
for r in records:
symptoms = r.data.get('symptoms', [])
if isinstance(symptoms, list):
all_symptoms.extend(symptoms)
for symptom in symptoms:
symptom_counts[symptom] += 1
return {
'total_reports': len(records),
'unique_symptoms': len(set(all_symptoms)),
'most_common_symptoms': sorted(
symptom_counts.items(),
key=lambda x: x[1],
reverse=True
)[:5],
'all_symptoms': list(set(all_symptoms))
}
@staticmethod
def _merge_mental_health_records(
records: List[HealthRecord],
strategy: str
) -> Dict[str, Any]:
"""Merge mental health records"""
if strategy == 'latest':
latest = max(records, key=lambda r: r.timestamp)
return {
'latest_record': latest.model_dump(),
'total_records': len(records)
}
# Calculate averages
stress_levels = [r.data.get('stress_level') for r in records if r.data.get('stress_level')]
sleep_hours = [r.data.get('sleep_hours') for r in records if r.data.get('sleep_hours')]
sleep_quality = [r.data.get('sleep_quality') for r in records if r.data.get('sleep_quality')]
return {
'total_records': len(records),
'average_stress_level': round(sum(stress_levels) / len(stress_levels), 1) if stress_levels else None,
'average_sleep_hours': round(sum(sleep_hours) / len(sleep_hours), 1) if sleep_hours else None,
'average_sleep_quality': round(sum(sleep_quality) / len(sleep_quality), 1) if sleep_quality else None,
'stress_trend': 'improving' if len(stress_levels) >= 2 and stress_levels[-1] < stress_levels[0] else 'stable'
}
@staticmethod
def _extract_health_metrics(
records: List[HealthRecord],
strategy: str
) -> Dict[str, Any]:
"""Extract common health metrics from records"""
weights = [r.weight for r in records if r.weight]
heights = [r.height for r in records if r.height]
bmis = [r.bmi for r in records if r.bmi]
metrics = {}
if weights:
metrics['weight'] = {
'latest': weights[-1],
'average': round(sum(weights) / len(weights), 1),
'min': min(weights),
'max': max(weights),
'change': round(weights[-1] - weights[0], 1) if len(weights) >= 2 else 0
}
if heights:
metrics['height'] = {
'latest': heights[-1],
'average': round(sum(heights) / len(heights), 1)
}
if bmis:
metrics['bmi'] = {
'latest': bmis[-1],
'average': round(sum(bmis) / len(bmis), 1),
'change': round(bmis[-1] - bmis[0], 1) if len(bmis) >= 2 else 0
}
return metrics
@staticmethod
def merge_by_date_range(
records: List[HealthRecord],
start_date: datetime,
end_date: datetime,
merge_strategy: str = 'average'
) -> Dict[str, Any]:
"""
Merge records within a specific date range
Args:
records: All health records
start_date: Start of date range
end_date: End of date range
merge_strategy: How to merge data
Returns:
Merged data for the date range
"""
# Filter records by date range
filtered = [
r for r in records
if start_date <= r.timestamp <= end_date
]
return HealthRecordMerger.merge_records(filtered, merge_strategy)
@staticmethod
def get_weekly_summary(
records: List[HealthRecord],
weeks_back: int = 1
) -> Dict[str, Any]:
"""
Get weekly summary of health records
Args:
records: All health records
weeks_back: Number of weeks to look back
Returns:
Weekly summary
"""
end_date = datetime.now()
start_date = end_date - timedelta(weeks=weeks_back)
return HealthRecordMerger.merge_by_date_range(
records,
start_date,
end_date,
merge_strategy='average'
)
@staticmethod
def get_monthly_summary(
records: List[HealthRecord],
months_back: int = 1
) -> Dict[str, Any]:
"""
Get monthly summary of health records
Args:
records: All health records
months_back: Number of months to look back
Returns:
Monthly summary
"""
end_date = datetime.now()
start_date = end_date - timedelta(days=30 * months_back)
return HealthRecordMerger.merge_by_date_range(
records,
start_date,
end_date,
merge_strategy='average'
)
def merge_records(
records: List[HealthRecord],
strategy: str = 'latest'
) -> Dict[str, Any]:
"""
Convenience function to merge health records
Args:
records: List of health records
strategy: 'latest', 'average', or 'all'
Returns:
Merged data dictionary
"""
return HealthRecordMerger.merge_records(records, strategy)