File size: 4,599 Bytes
1619dcb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import librosa
from scipy.signal import butter, sosfilt
import numpy as np
from settings import DEBUG_MODE, RESAMPLING_FREQ
from audio_utils import sec_to_hhmmss

def bandpass_filter(audio_path, RESAMPLING_FREQ, low=300, high=3400):
    sos = butter(4, [low / (RESAMPLING_FREQ / 2), high / (RESAMPLING_FREQ / 2)], btype="band", output="sos")
    return sosfilt(sos, audio_path)


def extract_features(audio_path, RESAMPLING_FREQ, frame=0.05):
    hop = int(RESAMPLING_FREQ * frame)
    rms = librosa.feature.rms(y=audio_path, hop_length=hop)[0]
    flux = librosa.onset.onset_strength(y=audio_path, sr=RESAMPLING_FREQ, hop_length=hop)
    rolloff = librosa.feature.spectral_rolloff(y=audio_path, sr=RESAMPLING_FREQ, hop_length=hop)[0]
    harmonic = librosa.effects.harmonic(audio_path)
    percussive = audio_path - harmonic
    hnr = librosa.feature.rms(y=harmonic, hop_length=hop)[0] / (librosa.feature.rms(y=percussive, hop_length=hop)[0] + 1e-6)

    times = librosa.frames_to_time(np.arange(len(rms)), sr=RESAMPLING_FREQ, hop_length=hop)
    return rms, flux, rolloff, hnr, times

def compute_intensity(rms, flux, rolloff, hnr):
    rms_w, flux_w, roll_w, hnr_w = 3.0, 1.3, 1.0, 0.8

    r = (rms - np.mean(rms[:30])) / (np.std(rms[:30]) + 1e-5)
    f = flux / (np.percentile(flux, 90) + 1e-6)
    ro = rolloff / np.max(rolloff)
    hn = hnr / np.max(hnr)

    intensity = (
        rms_w * np.clip(r, 0, None)
        + flux_w * f
        + roll_w * ro
        + hnr_w * (1 - hn)
    )

    intensity = np.maximum(intensity, 0)
    intensity = librosa.util.normalize(intensity)
    return intensity


def segment_intensity(times, intensity, thr=0.25):
    ema_alpha = 0.45   
    hangover = int(0.15 / (times[1] - times[0]))

    smooth = np.copy(intensity)
    for i in range(1, len(intensity)):
        smooth[i] = ema_alpha * intensity[i] + (1 - ema_alpha) * smooth[i - 1]

    on_thr, off_thr = thr, thr * 0.6
    active = False
    counter = 0
    events = []
    start = None

    for i, val in enumerate(smooth):
        if not active and val >= on_thr:
            active = True
            start = times[i]

        if active and val >= off_thr:
            counter = hangover
        elif active:
            counter -= 1
            if counter <= 0:
                active = False
                events.append((start, times[i]))
                start = None

    if active and start is not None:
        events.append((start, times[-1]))
    return events, smooth


def assign_levels(events, intensity, times):
    results = []
    for st, en in events:
        mask = (times >= st) & (times <= en)
        if np.sum(mask) == 0:
            continue

        med = np.median(intensity[mask])
        max_val = np.max(intensity[mask])

        if med > 0.8:
            lvl = "4 gritando"
        elif med > 0.6:
            lvl = "3 elevado"
        elif med > 0.4:
            lvl = "2 intermedio"
        else:
            lvl = "1 bajo"

        results.append((st, en, lvl, med, max_val))
    return results

def merge_adjacent_segments(results, gap_threshold=0.3):

    if not results:
        return []

    merged = []
    cur_st, cur_en, cur_lvl, cur_med, cur_max = results[0]

    for st, en, lvl, med, mx in results[1:]:
        if lvl == cur_lvl and st - cur_en <= gap_threshold:
            cur_en = en
            cur_med = (cur_med + med) / 2
            cur_max = max(cur_max, mx)
        else:
            merged.append((cur_st, cur_en, cur_lvl, cur_med, cur_max))
            cur_st, cur_en, cur_lvl, cur_med, cur_max = st, en, lvl, med, mx

    merged.append((cur_st, cur_en, cur_lvl, cur_med, cur_max))
    return merged


def shout(audio_path):

    if DEBUG_MODE: 
        print(f"[MODEL LOADING] Loading shout model")

    y, sr = librosa.load(audio_path, sr=RESAMPLING_FREQ, mono=True)
    y = bandpass_filter(y, sr)

    rms, flux, rolloff, hnr, times = extract_features(y, sr)
    intensity = compute_intensity(rms, flux, rolloff, hnr)
    events, _ = segment_intensity(times, intensity, thr=0.18)
    results = assign_levels(events, intensity, times)
    results = merge_adjacent_segments(results, gap_threshold=1)

    results = [
        (st, en, lvl, med, max_val)
        for st, en, lvl, med, max_val in results
        if "elevado" in lvl or "gritando" in lvl

    ]
    formatted = []
    for st, en, lvl, med, max_val in results:
        formatted.append(f"{sec_to_hhmmss(st)}{sec_to_hhmmss(en)} | volumen de voz: {lvl}")

    if not formatted:
        return "No se detectaron gritos o voces elevadas"
    
    return "\n".join(formatted)