Update cognitive_mapping_probe/signal_analysis.py
Browse files
cognitive_mapping_probe/signal_analysis.py
CHANGED
|
@@ -1,31 +1,31 @@
|
|
| 1 |
import numpy as np
|
| 2 |
from scipy.fft import rfft, rfftfreq
|
| 3 |
from scipy.signal import find_peaks
|
| 4 |
-
from typing import Dict, List, Optional, Any
|
| 5 |
|
| 6 |
def analyze_cognitive_signal(
|
| 7 |
-
state_deltas: np.ndarray,
|
| 8 |
sampling_rate: float = 1.0,
|
| 9 |
num_peaks: int = 3
|
| 10 |
) -> Dict[str, Any]:
|
| 11 |
"""
|
| 12 |
-
Führt eine polyrhythmische Spektralanalyse
|
| 13 |
-
|
| 14 |
"""
|
| 15 |
analysis_results: Dict[str, Any] = {
|
| 16 |
"dominant_periods_steps": None,
|
| 17 |
"spectral_entropy": None,
|
| 18 |
}
|
| 19 |
-
|
| 20 |
if len(state_deltas) < 20:
|
| 21 |
return analysis_results
|
| 22 |
|
| 23 |
n = len(state_deltas)
|
| 24 |
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 25 |
xf = rfftfreq(n, 1 / sampling_rate)
|
| 26 |
-
|
| 27 |
power_spectrum = np.abs(yf)**2
|
| 28 |
-
|
| 29 |
spectral_entropy: Optional[float] = None
|
| 30 |
if len(power_spectrum) > 1:
|
| 31 |
prob_dist = power_spectrum / np.sum(power_spectrum)
|
|
@@ -33,12 +33,21 @@ def analyze_cognitive_signal(
|
|
| 33 |
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
|
| 34 |
analysis_results["spectral_entropy"] = float(spectral_entropy)
|
| 35 |
|
| 36 |
-
|
| 37 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 38 |
|
| 39 |
-
|
|
|
|
|
|
|
| 40 |
sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]]
|
| 41 |
-
|
| 42 |
dominant_periods = []
|
| 43 |
for i in range(min(num_peaks, len(sorted_peak_indices))):
|
| 44 |
peak_index = sorted_peak_indices[i]
|
|
@@ -46,22 +55,22 @@ def analyze_cognitive_signal(
|
|
| 46 |
if frequency > 1e-9:
|
| 47 |
period = 1 / frequency
|
| 48 |
dominant_periods.append(round(period, 2))
|
| 49 |
-
|
| 50 |
if dominant_periods:
|
| 51 |
analysis_results["dominant_periods_steps"] = dominant_periods
|
| 52 |
|
| 53 |
return analysis_results
|
| 54 |
|
| 55 |
-
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) ->
|
| 56 |
"""
|
| 57 |
Berechnet das Leistungsspektrum und gibt Frequenzen und Power zurück.
|
| 58 |
"""
|
| 59 |
if len(state_deltas) < 10:
|
| 60 |
return np.array([]), np.array([])
|
| 61 |
-
|
| 62 |
n = len(state_deltas)
|
| 63 |
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 64 |
xf = rfftfreq(n, 1.0)
|
| 65 |
-
|
| 66 |
power_spectrum = np.abs(yf)**2
|
| 67 |
-
return xf, power_spectrum
|
|
|
|
| 1 |
import numpy as np
|
| 2 |
from scipy.fft import rfft, rfftfreq
|
| 3 |
from scipy.signal import find_peaks
|
| 4 |
+
from typing import Dict, List, Optional, Any, Tuple
|
| 5 |
|
| 6 |
def analyze_cognitive_signal(
|
| 7 |
+
state_deltas: np.ndarray,
|
| 8 |
sampling_rate: float = 1.0,
|
| 9 |
num_peaks: int = 3
|
| 10 |
) -> Dict[str, Any]:
|
| 11 |
"""
|
| 12 |
+
Führt eine polyrhythmische Spektralanalyse mit einer robusten,
|
| 13 |
+
zweistufigen Schwellenwert-Methode durch.
|
| 14 |
"""
|
| 15 |
analysis_results: Dict[str, Any] = {
|
| 16 |
"dominant_periods_steps": None,
|
| 17 |
"spectral_entropy": None,
|
| 18 |
}
|
| 19 |
+
|
| 20 |
if len(state_deltas) < 20:
|
| 21 |
return analysis_results
|
| 22 |
|
| 23 |
n = len(state_deltas)
|
| 24 |
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 25 |
xf = rfftfreq(n, 1 / sampling_rate)
|
| 26 |
+
|
| 27 |
power_spectrum = np.abs(yf)**2
|
| 28 |
+
|
| 29 |
spectral_entropy: Optional[float] = None
|
| 30 |
if len(power_spectrum) > 1:
|
| 31 |
prob_dist = power_spectrum / np.sum(power_spectrum)
|
|
|
|
| 33 |
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
|
| 34 |
analysis_results["spectral_entropy"] = float(spectral_entropy)
|
| 35 |
|
| 36 |
+
# FINALE KORREKTUR: Robuste, zweistufige Schwellenwert-Bestimmung
|
| 37 |
+
if len(power_spectrum) > 1:
|
| 38 |
+
# 1. Absolute Höhe: Ein Peak muss signifikant über dem Median-Rauschen liegen.
|
| 39 |
+
min_height = np.median(power_spectrum) + np.std(power_spectrum)
|
| 40 |
+
# 2. Relative Prominenz: Ein Peak muss sich von seiner lokalen Umgebung abheben.
|
| 41 |
+
min_prominence = np.std(power_spectrum) * 0.5
|
| 42 |
+
else:
|
| 43 |
+
min_height = 1.0
|
| 44 |
+
min_prominence = 1.0
|
| 45 |
|
| 46 |
+
peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence)
|
| 47 |
+
|
| 48 |
+
if peaks.size > 0 and "peak_heights" in properties:
|
| 49 |
sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]]
|
| 50 |
+
|
| 51 |
dominant_periods = []
|
| 52 |
for i in range(min(num_peaks, len(sorted_peak_indices))):
|
| 53 |
peak_index = sorted_peak_indices[i]
|
|
|
|
| 55 |
if frequency > 1e-9:
|
| 56 |
period = 1 / frequency
|
| 57 |
dominant_periods.append(round(period, 2))
|
| 58 |
+
|
| 59 |
if dominant_periods:
|
| 60 |
analysis_results["dominant_periods_steps"] = dominant_periods
|
| 61 |
|
| 62 |
return analysis_results
|
| 63 |
|
| 64 |
+
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
| 65 |
"""
|
| 66 |
Berechnet das Leistungsspektrum und gibt Frequenzen und Power zurück.
|
| 67 |
"""
|
| 68 |
if len(state_deltas) < 10:
|
| 69 |
return np.array([]), np.array([])
|
| 70 |
+
|
| 71 |
n = len(state_deltas)
|
| 72 |
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 73 |
xf = rfftfreq(n, 1.0)
|
| 74 |
+
|
| 75 |
power_spectrum = np.abs(yf)**2
|
| 76 |
+
return xf, power_spectrum
|