Update cognitive_mapping_probe/signal_analysis.py
Browse files
cognitive_mapping_probe/signal_analysis.py
CHANGED
|
@@ -4,58 +4,63 @@ from scipy.signal import find_peaks
|
|
| 4 |
from typing import Dict, List, Optional, Any, Tuple
|
| 5 |
|
| 6 |
def analyze_cognitive_signal(
|
| 7 |
-
state_deltas: np.ndarray,
|
| 8 |
sampling_rate: float = 1.0,
|
| 9 |
num_peaks: int = 3
|
| 10 |
) -> Dict[str, Any]:
|
| 11 |
"""
|
| 12 |
Führt eine polyrhythmische Spektralanalyse mit einer robusten,
|
| 13 |
-
zweistufigen Schwellenwert-Methode durch.
|
| 14 |
"""
|
| 15 |
analysis_results: Dict[str, Any] = {
|
| 16 |
"dominant_periods_steps": None,
|
| 17 |
"spectral_entropy": None,
|
| 18 |
}
|
| 19 |
-
|
|
|
|
| 20 |
if len(state_deltas) < 20:
|
| 21 |
return analysis_results
|
| 22 |
|
| 23 |
n = len(state_deltas)
|
|
|
|
| 24 |
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 25 |
xf = rfftfreq(n, 1 / sampling_rate)
|
| 26 |
-
|
| 27 |
power_spectrum = np.abs(yf)**2
|
| 28 |
-
|
|
|
|
| 29 |
spectral_entropy: Optional[float] = None
|
| 30 |
-
if
|
| 31 |
prob_dist = power_spectrum / np.sum(power_spectrum)
|
|
|
|
| 32 |
prob_dist = prob_dist[prob_dist > 1e-12]
|
| 33 |
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
|
| 34 |
analysis_results["spectral_entropy"] = float(spectral_entropy)
|
| 35 |
|
| 36 |
-
#
|
| 37 |
if len(power_spectrum) > 1:
|
| 38 |
# 1. Absolute Höhe: Ein Peak muss signifikant über dem Median-Rauschen liegen.
|
| 39 |
min_height = np.median(power_spectrum) + np.std(power_spectrum)
|
| 40 |
# 2. Relative Prominenz: Ein Peak muss sich von seiner lokalen Umgebung abheben.
|
| 41 |
min_prominence = np.std(power_spectrum) * 0.5
|
| 42 |
else:
|
| 43 |
-
min_height = 1.0
|
| 44 |
-
min_prominence = 1.0
|
| 45 |
|
|
|
|
| 46 |
peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence)
|
| 47 |
-
|
| 48 |
if peaks.size > 0 and "peak_heights" in properties:
|
|
|
|
| 49 |
sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]]
|
| 50 |
-
|
| 51 |
dominant_periods = []
|
| 52 |
for i in range(min(num_peaks, len(sorted_peak_indices))):
|
| 53 |
peak_index = sorted_peak_indices[i]
|
| 54 |
-
frequency = xf[peak_index + 1]
|
| 55 |
if frequency > 1e-9:
|
| 56 |
period = 1 / frequency
|
| 57 |
dominant_periods.append(round(period, 2))
|
| 58 |
-
|
| 59 |
if dominant_periods:
|
| 60 |
analysis_results["dominant_periods_steps"] = dominant_periods
|
| 61 |
|
|
@@ -63,14 +68,15 @@ def analyze_cognitive_signal(
|
|
| 63 |
|
| 64 |
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
| 65 |
"""
|
| 66 |
-
Berechnet das Leistungsspektrum und gibt Frequenzen und Power zurück
|
|
|
|
| 67 |
"""
|
| 68 |
if len(state_deltas) < 10:
|
| 69 |
return np.array([]), np.array([])
|
| 70 |
-
|
| 71 |
n = len(state_deltas)
|
| 72 |
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 73 |
xf = rfftfreq(n, 1.0)
|
| 74 |
-
|
| 75 |
power_spectrum = np.abs(yf)**2
|
| 76 |
-
return xf, power_spectrum
|
|
|
|
| 4 |
from typing import Dict, List, Optional, Any, Tuple
|
| 5 |
|
| 6 |
def analyze_cognitive_signal(
|
| 7 |
+
state_deltas: np.ndarray,
|
| 8 |
sampling_rate: float = 1.0,
|
| 9 |
num_peaks: int = 3
|
| 10 |
) -> Dict[str, Any]:
|
| 11 |
"""
|
| 12 |
Führt eine polyrhythmische Spektralanalyse mit einer robusten,
|
| 13 |
+
zweistufigen Schwellenwert-Methode durch, um die Signifikanz von Frequenz-Peaks zu gewährleisten.
|
| 14 |
"""
|
| 15 |
analysis_results: Dict[str, Any] = {
|
| 16 |
"dominant_periods_steps": None,
|
| 17 |
"spectral_entropy": None,
|
| 18 |
}
|
| 19 |
+
|
| 20 |
+
# Eine robuste Analyse erfordert eine minimale Signallänge.
|
| 21 |
if len(state_deltas) < 20:
|
| 22 |
return analysis_results
|
| 23 |
|
| 24 |
n = len(state_deltas)
|
| 25 |
+
# Entferne den DC-Anteil (Mittelwert) für eine saubere Frequenzanalyse.
|
| 26 |
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 27 |
xf = rfftfreq(n, 1 / sampling_rate)
|
| 28 |
+
|
| 29 |
power_spectrum = np.abs(yf)**2
|
| 30 |
+
|
| 31 |
+
# Berechnung der spektralen Entropie zur Messung der Signal-Komplexität.
|
| 32 |
spectral_entropy: Optional[float] = None
|
| 33 |
+
if np.sum(power_spectrum) > 1e-9:
|
| 34 |
prob_dist = power_spectrum / np.sum(power_spectrum)
|
| 35 |
+
# Ignoriere numerisch instabile Nullen.
|
| 36 |
prob_dist = prob_dist[prob_dist > 1e-12]
|
| 37 |
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
|
| 38 |
analysis_results["spectral_entropy"] = float(spectral_entropy)
|
| 39 |
|
| 40 |
+
# Robuste, zweistufige Schwellenwert-Bestimmung für die Peak-Detektion.
|
| 41 |
if len(power_spectrum) > 1:
|
| 42 |
# 1. Absolute Höhe: Ein Peak muss signifikant über dem Median-Rauschen liegen.
|
| 43 |
min_height = np.median(power_spectrum) + np.std(power_spectrum)
|
| 44 |
# 2. Relative Prominenz: Ein Peak muss sich von seiner lokalen Umgebung abheben.
|
| 45 |
min_prominence = np.std(power_spectrum) * 0.5
|
| 46 |
else:
|
| 47 |
+
min_height, min_prominence = 1.0, 1.0
|
|
|
|
| 48 |
|
| 49 |
+
# Finde Peaks im Spektrum (ignoriere die Frequenz 0).
|
| 50 |
peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence)
|
| 51 |
+
|
| 52 |
if peaks.size > 0 and "peak_heights" in properties:
|
| 53 |
+
# Sortiere die gefundenen Peaks nach ihrer Höhe (Power).
|
| 54 |
sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]]
|
| 55 |
+
|
| 56 |
dominant_periods = []
|
| 57 |
for i in range(min(num_peaks, len(sorted_peak_indices))):
|
| 58 |
peak_index = sorted_peak_indices[i]
|
| 59 |
+
frequency = xf[peak_index + 1] # +1, da wir den DC-Offset ignoriert haben.
|
| 60 |
if frequency > 1e-9:
|
| 61 |
period = 1 / frequency
|
| 62 |
dominant_periods.append(round(period, 2))
|
| 63 |
+
|
| 64 |
if dominant_periods:
|
| 65 |
analysis_results["dominant_periods_steps"] = dominant_periods
|
| 66 |
|
|
|
|
| 68 |
|
| 69 |
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
| 70 |
"""
|
| 71 |
+
Berechnet das Leistungsspektrum und gibt Frequenzen und Power als separate Arrays zurück,
|
| 72 |
+
die direkt für die Visualisierung verwendet werden können.
|
| 73 |
"""
|
| 74 |
if len(state_deltas) < 10:
|
| 75 |
return np.array([]), np.array([])
|
| 76 |
+
|
| 77 |
n = len(state_deltas)
|
| 78 |
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 79 |
xf = rfftfreq(n, 1.0)
|
| 80 |
+
|
| 81 |
power_spectrum = np.abs(yf)**2
|
| 82 |
+
return xf, power_spectrum
|