|
|
import numpy as np |
|
|
from scipy.fft import rfft, rfftfreq |
|
|
from scipy.signal import find_peaks |
|
|
from typing import Dict, List, Optional, Any, Tuple |
|
|
|
|
|
def analyze_cognitive_signal( |
|
|
state_deltas: np.ndarray, |
|
|
sampling_rate: float = 1.0, |
|
|
num_peaks: int = 3 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Führt eine polyrhythmische Spektralanalyse mit einer robusten, |
|
|
zweistufigen Schwellenwert-Methode durch. |
|
|
""" |
|
|
analysis_results: Dict[str, Any] = { |
|
|
"dominant_periods_steps": None, |
|
|
"spectral_entropy": None, |
|
|
} |
|
|
|
|
|
if len(state_deltas) < 20: |
|
|
return analysis_results |
|
|
|
|
|
n = len(state_deltas) |
|
|
yf = rfft(state_deltas - np.mean(state_deltas)) |
|
|
xf = rfftfreq(n, 1 / sampling_rate) |
|
|
|
|
|
power_spectrum = np.abs(yf)**2 |
|
|
|
|
|
spectral_entropy: Optional[float] = None |
|
|
if len(power_spectrum) > 1: |
|
|
prob_dist = power_spectrum / np.sum(power_spectrum) |
|
|
prob_dist = prob_dist[prob_dist > 1e-12] |
|
|
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist)) |
|
|
analysis_results["spectral_entropy"] = float(spectral_entropy) |
|
|
|
|
|
|
|
|
if len(power_spectrum) > 1: |
|
|
|
|
|
min_height = np.median(power_spectrum) + np.std(power_spectrum) |
|
|
|
|
|
min_prominence = np.std(power_spectrum) * 0.5 |
|
|
else: |
|
|
min_height = 1.0 |
|
|
min_prominence = 1.0 |
|
|
|
|
|
peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence) |
|
|
|
|
|
if peaks.size > 0 and "peak_heights" in properties: |
|
|
sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]] |
|
|
|
|
|
dominant_periods = [] |
|
|
for i in range(min(num_peaks, len(sorted_peak_indices))): |
|
|
peak_index = sorted_peak_indices[i] |
|
|
frequency = xf[peak_index + 1] |
|
|
if frequency > 1e-9: |
|
|
period = 1 / frequency |
|
|
dominant_periods.append(round(period, 2)) |
|
|
|
|
|
if dominant_periods: |
|
|
analysis_results["dominant_periods_steps"] = dominant_periods |
|
|
|
|
|
return analysis_results |
|
|
|
|
|
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Berechnet das Leistungsspektrum und gibt Frequenzen und Power zurück. |
|
|
""" |
|
|
if len(state_deltas) < 10: |
|
|
return np.array([]), np.array([]) |
|
|
|
|
|
n = len(state_deltas) |
|
|
yf = rfft(state_deltas - np.mean(state_deltas)) |
|
|
xf = rfftfreq(n, 1.0) |
|
|
|
|
|
power_spectrum = np.abs(yf)**2 |
|
|
return xf, power_spectrum |
|
|
|