neuralworm's picture
update tests
866c3f0
raw
history blame
2.72 kB
import numpy as np
from scipy.fft import rfft, rfftfreq
from scipy.signal import find_peaks
from typing import Dict, List, Optional, Any, Tuple
def analyze_cognitive_signal(
state_deltas: np.ndarray,
sampling_rate: float = 1.0,
num_peaks: int = 3
) -> Dict[str, Any]:
"""
Führt eine polyrhythmische Spektralanalyse mit einer robusten,
zweistufigen Schwellenwert-Methode durch.
"""
analysis_results: Dict[str, Any] = {
"dominant_periods_steps": None,
"spectral_entropy": None,
}
if len(state_deltas) < 20:
return analysis_results
n = len(state_deltas)
yf = rfft(state_deltas - np.mean(state_deltas))
xf = rfftfreq(n, 1 / sampling_rate)
power_spectrum = np.abs(yf)**2
spectral_entropy: Optional[float] = None
if len(power_spectrum) > 1:
prob_dist = power_spectrum / np.sum(power_spectrum)
prob_dist = prob_dist[prob_dist > 1e-12]
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
analysis_results["spectral_entropy"] = float(spectral_entropy)
# FINALE KORREKTUR: Robuste, zweistufige Schwellenwert-Bestimmung
if len(power_spectrum) > 1:
# 1. Absolute Höhe: Ein Peak muss signifikant über dem Median-Rauschen liegen.
min_height = np.median(power_spectrum) + np.std(power_spectrum)
# 2. Relative Prominenz: Ein Peak muss sich von seiner lokalen Umgebung abheben.
min_prominence = np.std(power_spectrum) * 0.5
else:
min_height = 1.0
min_prominence = 1.0
peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence)
if peaks.size > 0 and "peak_heights" in properties:
sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]]
dominant_periods = []
for i in range(min(num_peaks, len(sorted_peak_indices))):
peak_index = sorted_peak_indices[i]
frequency = xf[peak_index + 1]
if frequency > 1e-9:
period = 1 / frequency
dominant_periods.append(round(period, 2))
if dominant_periods:
analysis_results["dominant_periods_steps"] = dominant_periods
return analysis_results
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Berechnet das Leistungsspektrum und gibt Frequenzen und Power zurück.
"""
if len(state_deltas) < 10:
return np.array([]), np.array([])
n = len(state_deltas)
yf = rfft(state_deltas - np.mean(state_deltas))
xf = rfftfreq(n, 1.0)
power_spectrum = np.abs(yf)**2
return xf, power_spectrum