import numpy as np from scipy.fft import rfft, rfftfreq from scipy.signal import find_peaks from typing import Dict, List, Optional, Any, Tuple def analyze_cognitive_signal( state_deltas: np.ndarray, sampling_rate: float = 1.0, num_peaks: int = 3 ) -> Dict[str, Any]: """ Führt eine polyrhythmische Spektralanalyse mit einer robusten, zweistufigen Schwellenwert-Methode durch. """ analysis_results: Dict[str, Any] = { "dominant_periods_steps": None, "spectral_entropy": None, } if len(state_deltas) < 20: return analysis_results n = len(state_deltas) yf = rfft(state_deltas - np.mean(state_deltas)) xf = rfftfreq(n, 1 / sampling_rate) power_spectrum = np.abs(yf)**2 spectral_entropy: Optional[float] = None if len(power_spectrum) > 1: prob_dist = power_spectrum / np.sum(power_spectrum) prob_dist = prob_dist[prob_dist > 1e-12] spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist)) analysis_results["spectral_entropy"] = float(spectral_entropy) # FINALE KORREKTUR: Robuste, zweistufige Schwellenwert-Bestimmung if len(power_spectrum) > 1: # 1. Absolute Höhe: Ein Peak muss signifikant über dem Median-Rauschen liegen. min_height = np.median(power_spectrum) + np.std(power_spectrum) # 2. Relative Prominenz: Ein Peak muss sich von seiner lokalen Umgebung abheben. min_prominence = np.std(power_spectrum) * 0.5 else: min_height = 1.0 min_prominence = 1.0 peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence) if peaks.size > 0 and "peak_heights" in properties: sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]] dominant_periods = [] for i in range(min(num_peaks, len(sorted_peak_indices))): peak_index = sorted_peak_indices[i] frequency = xf[peak_index + 1] if frequency > 1e-9: period = 1 / frequency dominant_periods.append(round(period, 2)) if dominant_periods: analysis_results["dominant_periods_steps"] = dominant_periods return analysis_results def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Berechnet das Leistungsspektrum und gibt Frequenzen und Power zurück. """ if len(state_deltas) < 10: return np.array([]), np.array([]) n = len(state_deltas) yf = rfft(state_deltas - np.mean(state_deltas)) xf = rfftfreq(n, 1.0) power_spectrum = np.abs(yf)**2 return xf, power_spectrum