Spaces:
Runtime error
Runtime error
| """ | |
| Health Record Merger | |
| Merge and aggregate health records from multiple days | |
| """ | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional | |
| from collections import defaultdict | |
| from .pydantic_models import ( | |
| HealthRecord, NutritionRecord, ExerciseRecord, | |
| SymptomRecord, MentalHealthRecord, RecordType | |
| ) | |
| class HealthRecordMerger: | |
| """Merge and aggregate health records""" | |
| def merge_records( | |
| records: List[HealthRecord], | |
| merge_strategy: str = 'latest' | |
| ) -> Dict[str, Any]: | |
| """ | |
| Merge multiple health records into aggregated data | |
| Args: | |
| records: List of health records to merge | |
| merge_strategy: 'latest', 'average', 'all' | |
| Returns: | |
| Merged data dictionary | |
| """ | |
| if not records: | |
| return {} | |
| # Group records by type | |
| records_by_type = defaultdict(list) | |
| for record in records: | |
| records_by_type[record.record_type].append(record) | |
| merged = { | |
| 'total_records': len(records), | |
| 'date_range': { | |
| 'start': min(r.timestamp for r in records).isoformat(), | |
| 'end': max(r.timestamp for r in records).isoformat() | |
| }, | |
| 'by_type': {} | |
| } | |
| # Merge each type | |
| for record_type, type_records in records_by_type.items(): | |
| if record_type == RecordType.NUTRITION: | |
| merged['by_type']['nutrition'] = HealthRecordMerger._merge_nutrition_records( | |
| type_records, merge_strategy | |
| ) | |
| elif record_type == RecordType.EXERCISE: | |
| merged['by_type']['exercise'] = HealthRecordMerger._merge_exercise_records( | |
| type_records, merge_strategy | |
| ) | |
| elif record_type == RecordType.SYMPTOM: | |
| merged['by_type']['symptom'] = HealthRecordMerger._merge_symptom_records( | |
| type_records, merge_strategy | |
| ) | |
| elif record_type == RecordType.MENTAL_HEALTH: | |
| merged['by_type']['mental_health'] = HealthRecordMerger._merge_mental_health_records( | |
| type_records, merge_strategy | |
| ) | |
| # Extract common health metrics | |
| merged['health_metrics'] = HealthRecordMerger._extract_health_metrics( | |
| records, merge_strategy | |
| ) | |
| return merged | |
| def _merge_nutrition_records( | |
| records: List[HealthRecord], | |
| strategy: str | |
| ) -> Dict[str, Any]: | |
| """Merge nutrition records""" | |
| if strategy == 'latest': | |
| latest = max(records, key=lambda r: r.timestamp) | |
| return { | |
| 'latest_record': latest.model_dump(), | |
| 'total_records': len(records) | |
| } | |
| elif strategy == 'average': | |
| # Calculate averages | |
| total_calories = sum(r.data.get('calories', 0) for r in records if r.data.get('calories')) | |
| total_protein = sum(r.data.get('protein', 0) for r in records if r.data.get('protein')) | |
| total_carbs = sum(r.data.get('carbs', 0) for r in records if r.data.get('carbs')) | |
| total_fat = sum(r.data.get('fat', 0) for r in records if r.data.get('fat')) | |
| count = len(records) | |
| return { | |
| 'average_daily': { | |
| 'calories': round(total_calories / count, 1) if count > 0 else 0, | |
| 'protein': round(total_protein / count, 1) if count > 0 else 0, | |
| 'carbs': round(total_carbs / count, 1) if count > 0 else 0, | |
| 'fat': round(total_fat / count, 1) if count > 0 else 0 | |
| }, | |
| 'total': { | |
| 'calories': round(total_calories, 1), | |
| 'protein': round(total_protein, 1), | |
| 'carbs': round(total_carbs, 1), | |
| 'fat': round(total_fat, 1) | |
| }, | |
| 'total_records': count | |
| } | |
| else: # 'all' | |
| return { | |
| 'all_records': [r.model_dump() for r in records], | |
| 'total_records': len(records) | |
| } | |
| def _merge_exercise_records( | |
| records: List[HealthRecord], | |
| strategy: str | |
| ) -> Dict[str, Any]: | |
| """Merge exercise records""" | |
| if strategy == 'latest': | |
| latest = max(records, key=lambda r: r.timestamp) | |
| return { | |
| 'latest_record': latest.model_dump(), | |
| 'total_records': len(records) | |
| } | |
| elif strategy == 'average': | |
| total_duration = sum(r.data.get('duration_minutes', 0) for r in records) | |
| total_calories = sum(r.data.get('calories_burned', 0) for r in records) | |
| # Count by exercise type | |
| exercise_types = defaultdict(int) | |
| for r in records: | |
| ex_type = r.data.get('exercise_type', 'unknown') | |
| exercise_types[ex_type] += 1 | |
| count = len(records) | |
| return { | |
| 'total_workouts': count, | |
| 'total_duration_minutes': total_duration, | |
| 'total_calories_burned': round(total_calories, 1), | |
| 'average_duration': round(total_duration / count, 1) if count > 0 else 0, | |
| 'average_calories': round(total_calories / count, 1) if count > 0 else 0, | |
| 'exercise_types': dict(exercise_types) | |
| } | |
| else: # 'all' | |
| return { | |
| 'all_records': [r.model_dump() for r in records], | |
| 'total_records': len(records) | |
| } | |
| def _merge_symptom_records( | |
| records: List[HealthRecord], | |
| strategy: str | |
| ) -> Dict[str, Any]: | |
| """Merge symptom records""" | |
| if strategy == 'latest': | |
| latest = max(records, key=lambda r: r.timestamp) | |
| return { | |
| 'latest_record': latest.model_dump(), | |
| 'total_records': len(records) | |
| } | |
| # Collect all symptoms | |
| all_symptoms = [] | |
| symptom_counts = defaultdict(int) | |
| for r in records: | |
| symptoms = r.data.get('symptoms', []) | |
| if isinstance(symptoms, list): | |
| all_symptoms.extend(symptoms) | |
| for symptom in symptoms: | |
| symptom_counts[symptom] += 1 | |
| return { | |
| 'total_reports': len(records), | |
| 'unique_symptoms': len(set(all_symptoms)), | |
| 'most_common_symptoms': sorted( | |
| symptom_counts.items(), | |
| key=lambda x: x[1], | |
| reverse=True | |
| )[:5], | |
| 'all_symptoms': list(set(all_symptoms)) | |
| } | |
| def _merge_mental_health_records( | |
| records: List[HealthRecord], | |
| strategy: str | |
| ) -> Dict[str, Any]: | |
| """Merge mental health records""" | |
| if strategy == 'latest': | |
| latest = max(records, key=lambda r: r.timestamp) | |
| return { | |
| 'latest_record': latest.model_dump(), | |
| 'total_records': len(records) | |
| } | |
| # Calculate averages | |
| stress_levels = [r.data.get('stress_level') for r in records if r.data.get('stress_level')] | |
| sleep_hours = [r.data.get('sleep_hours') for r in records if r.data.get('sleep_hours')] | |
| sleep_quality = [r.data.get('sleep_quality') for r in records if r.data.get('sleep_quality')] | |
| return { | |
| 'total_records': len(records), | |
| 'average_stress_level': round(sum(stress_levels) / len(stress_levels), 1) if stress_levels else None, | |
| 'average_sleep_hours': round(sum(sleep_hours) / len(sleep_hours), 1) if sleep_hours else None, | |
| 'average_sleep_quality': round(sum(sleep_quality) / len(sleep_quality), 1) if sleep_quality else None, | |
| 'stress_trend': 'improving' if len(stress_levels) >= 2 and stress_levels[-1] < stress_levels[0] else 'stable' | |
| } | |
| def _extract_health_metrics( | |
| records: List[HealthRecord], | |
| strategy: str | |
| ) -> Dict[str, Any]: | |
| """Extract common health metrics from records""" | |
| weights = [r.weight for r in records if r.weight] | |
| heights = [r.height for r in records if r.height] | |
| bmis = [r.bmi for r in records if r.bmi] | |
| metrics = {} | |
| if weights: | |
| metrics['weight'] = { | |
| 'latest': weights[-1], | |
| 'average': round(sum(weights) / len(weights), 1), | |
| 'min': min(weights), | |
| 'max': max(weights), | |
| 'change': round(weights[-1] - weights[0], 1) if len(weights) >= 2 else 0 | |
| } | |
| if heights: | |
| metrics['height'] = { | |
| 'latest': heights[-1], | |
| 'average': round(sum(heights) / len(heights), 1) | |
| } | |
| if bmis: | |
| metrics['bmi'] = { | |
| 'latest': bmis[-1], | |
| 'average': round(sum(bmis) / len(bmis), 1), | |
| 'change': round(bmis[-1] - bmis[0], 1) if len(bmis) >= 2 else 0 | |
| } | |
| return metrics | |
| def merge_by_date_range( | |
| records: List[HealthRecord], | |
| start_date: datetime, | |
| end_date: datetime, | |
| merge_strategy: str = 'average' | |
| ) -> Dict[str, Any]: | |
| """ | |
| Merge records within a specific date range | |
| Args: | |
| records: All health records | |
| start_date: Start of date range | |
| end_date: End of date range | |
| merge_strategy: How to merge data | |
| Returns: | |
| Merged data for the date range | |
| """ | |
| # Filter records by date range | |
| filtered = [ | |
| r for r in records | |
| if start_date <= r.timestamp <= end_date | |
| ] | |
| return HealthRecordMerger.merge_records(filtered, merge_strategy) | |
| def get_weekly_summary( | |
| records: List[HealthRecord], | |
| weeks_back: int = 1 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get weekly summary of health records | |
| Args: | |
| records: All health records | |
| weeks_back: Number of weeks to look back | |
| Returns: | |
| Weekly summary | |
| """ | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(weeks=weeks_back) | |
| return HealthRecordMerger.merge_by_date_range( | |
| records, | |
| start_date, | |
| end_date, | |
| merge_strategy='average' | |
| ) | |
| def get_monthly_summary( | |
| records: List[HealthRecord], | |
| months_back: int = 1 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get monthly summary of health records | |
| Args: | |
| records: All health records | |
| months_back: Number of months to look back | |
| Returns: | |
| Monthly summary | |
| """ | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=30 * months_back) | |
| return HealthRecordMerger.merge_by_date_range( | |
| records, | |
| start_date, | |
| end_date, | |
| merge_strategy='average' | |
| ) | |
| def merge_records( | |
| records: List[HealthRecord], | |
| strategy: str = 'latest' | |
| ) -> Dict[str, Any]: | |
| """ | |
| Convenience function to merge health records | |
| Args: | |
| records: List of health records | |
| strategy: 'latest', 'average', or 'all' | |
| Returns: | |
| Merged data dictionary | |
| """ | |
| return HealthRecordMerger.merge_records(records, strategy) | |