File size: 2,716 Bytes
27d7ce9
 
c0f4adf
a4f24f3
27d7ce9
 
a4f24f3
c0f4adf
 
 
27d7ce9
a4f24f3
 
27d7ce9
c0f4adf
 
 
 
a4f24f3
c0f4adf
 
27d7ce9
 
c03af22
27d7ce9
a4f24f3
27d7ce9
a4f24f3
c0f4adf
27d7ce9
 
c03af22
27d7ce9
c0f4adf
27d7ce9
a4f24f3
 
 
 
 
 
 
 
 
27d7ce9
a4f24f3
 
 
c0f4adf
a4f24f3
c0f4adf
 
 
 
 
 
 
a4f24f3
c0f4adf
 
 
 
 
a4f24f3
27d7ce9
c0f4adf
27d7ce9
c03af22
27d7ce9
a4f24f3
27d7ce9
 
 
a4f24f3
27d7ce9
866c3f0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import numpy as np
from scipy.fft import rfft, rfftfreq
from scipy.signal import find_peaks
from typing import Dict, List, Optional, Any, Tuple

def analyze_cognitive_signal(
    state_deltas: np.ndarray, 
    sampling_rate: float = 1.0,
    num_peaks: int = 3
) -> Dict[str, Any]:
    """
    Führt eine polyrhythmische Spektralanalyse mit einer robusten,
    zweistufigen Schwellenwert-Methode durch.
    """
    analysis_results: Dict[str, Any] = {
        "dominant_periods_steps": None,
        "spectral_entropy": None,
    }
    
    if len(state_deltas) < 20:
        return analysis_results

    n = len(state_deltas)
    yf = rfft(state_deltas - np.mean(state_deltas))
    xf = rfftfreq(n, 1 / sampling_rate)
    
    power_spectrum = np.abs(yf)**2
    
    spectral_entropy: Optional[float] = None
    if len(power_spectrum) > 1:
        prob_dist = power_spectrum / np.sum(power_spectrum)
        prob_dist = prob_dist[prob_dist > 1e-12]
        spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
        analysis_results["spectral_entropy"] = float(spectral_entropy)

    # FINALE KORREKTUR: Robuste, zweistufige Schwellenwert-Bestimmung
    if len(power_spectrum) > 1:
        # 1. Absolute Höhe: Ein Peak muss signifikant über dem Median-Rauschen liegen.
        min_height = np.median(power_spectrum) + np.std(power_spectrum)
        # 2. Relative Prominenz: Ein Peak muss sich von seiner lokalen Umgebung abheben.
        min_prominence = np.std(power_spectrum) * 0.5
    else:
        min_height = 1.0
        min_prominence = 1.0

    peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence)
    
    if peaks.size > 0 and "peak_heights" in properties:
        sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]]
        
        dominant_periods = []
        for i in range(min(num_peaks, len(sorted_peak_indices))):
            peak_index = sorted_peak_indices[i]
            frequency = xf[peak_index + 1]
            if frequency > 1e-9:
                period = 1 / frequency
                dominant_periods.append(round(period, 2))
        
        if dominant_periods:
            analysis_results["dominant_periods_steps"] = dominant_periods

    return analysis_results

def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Berechnet das Leistungsspektrum und gibt Frequenzen und Power zurück.
    """
    if len(state_deltas) < 10:
        return np.array([]), np.array([])
        
    n = len(state_deltas)
    yf = rfft(state_deltas - np.mean(state_deltas))
    xf = rfftfreq(n, 1.0)
    
    power_spectrum = np.abs(yf)**2
    return xf, power_spectrum