Spaces:
Running
on
Zero
Running
on
Zero
| import librosa | |
| from scipy.signal import butter, sosfilt | |
| import numpy as np | |
| from settings import DEBUG_MODE, RESAMPLING_FREQ | |
| from audio_utils import sec_to_hhmmss | |
| def bandpass_filter(audio_path, RESAMPLING_FREQ, low=300, high=3400): | |
| sos = butter(4, [low / (RESAMPLING_FREQ / 2), high / (RESAMPLING_FREQ / 2)], btype="band", output="sos") | |
| return sosfilt(sos, audio_path) | |
| def extract_features(audio_path, RESAMPLING_FREQ, frame=0.05): | |
| hop = int(RESAMPLING_FREQ * frame) | |
| rms = librosa.feature.rms(y=audio_path, hop_length=hop)[0] | |
| flux = librosa.onset.onset_strength(y=audio_path, sr=RESAMPLING_FREQ, hop_length=hop) | |
| rolloff = librosa.feature.spectral_rolloff(y=audio_path, sr=RESAMPLING_FREQ, hop_length=hop)[0] | |
| harmonic = librosa.effects.harmonic(audio_path) | |
| percussive = audio_path - harmonic | |
| hnr = librosa.feature.rms(y=harmonic, hop_length=hop)[0] / (librosa.feature.rms(y=percussive, hop_length=hop)[0] + 1e-6) | |
| times = librosa.frames_to_time(np.arange(len(rms)), sr=RESAMPLING_FREQ, hop_length=hop) | |
| return rms, flux, rolloff, hnr, times | |
| def compute_intensity(rms, flux, rolloff, hnr): | |
| rms_w, flux_w, roll_w, hnr_w = 3.0, 1.3, 1.0, 0.8 | |
| r = (rms - np.mean(rms[:30])) / (np.std(rms[:30]) + 1e-5) | |
| f = flux / (np.percentile(flux, 90) + 1e-6) | |
| ro = rolloff / np.max(rolloff) | |
| hn = hnr / np.max(hnr) | |
| intensity = ( | |
| rms_w * np.clip(r, 0, None) | |
| + flux_w * f | |
| + roll_w * ro | |
| + hnr_w * (1 - hn) | |
| ) | |
| intensity = np.maximum(intensity, 0) | |
| intensity = librosa.util.normalize(intensity) | |
| return intensity | |
| def segment_intensity(times, intensity, thr=0.25): | |
| ema_alpha = 0.45 | |
| hangover = int(0.15 / (times[1] - times[0])) | |
| smooth = np.copy(intensity) | |
| for i in range(1, len(intensity)): | |
| smooth[i] = ema_alpha * intensity[i] + (1 - ema_alpha) * smooth[i - 1] | |
| on_thr, off_thr = thr, thr * 0.6 | |
| active = False | |
| counter = 0 | |
| events = [] | |
| start = None | |
| for i, val in enumerate(smooth): | |
| if not active and val >= on_thr: | |
| active = True | |
| start = times[i] | |
| if active and val >= off_thr: | |
| counter = hangover | |
| elif active: | |
| counter -= 1 | |
| if counter <= 0: | |
| active = False | |
| events.append((start, times[i])) | |
| start = None | |
| if active and start is not None: | |
| events.append((start, times[-1])) | |
| return events, smooth | |
| def assign_levels(events, intensity, times): | |
| results = [] | |
| for st, en in events: | |
| mask = (times >= st) & (times <= en) | |
| if np.sum(mask) == 0: | |
| continue | |
| med = np.median(intensity[mask]) | |
| max_val = np.max(intensity[mask]) | |
| if med > 0.8: | |
| lvl = "4 gritando" | |
| elif med > 0.6: | |
| lvl = "3 elevado" | |
| elif med > 0.4: | |
| lvl = "2 intermedio" | |
| else: | |
| lvl = "1 bajo" | |
| results.append((st, en, lvl, med, max_val)) | |
| return results | |
| def merge_adjacent_segments(results, gap_threshold=0.3): | |
| if not results: | |
| return [] | |
| merged = [] | |
| cur_st, cur_en, cur_lvl, cur_med, cur_max = results[0] | |
| for st, en, lvl, med, mx in results[1:]: | |
| if lvl == cur_lvl and st - cur_en <= gap_threshold: | |
| cur_en = en | |
| cur_med = (cur_med + med) / 2 | |
| cur_max = max(cur_max, mx) | |
| else: | |
| merged.append((cur_st, cur_en, cur_lvl, cur_med, cur_max)) | |
| cur_st, cur_en, cur_lvl, cur_med, cur_max = st, en, lvl, med, mx | |
| merged.append((cur_st, cur_en, cur_lvl, cur_med, cur_max)) | |
| return merged | |
| def shout(audio_path): | |
| if DEBUG_MODE: | |
| print(f"[MODEL LOADING] Loading shout model") | |
| y, sr = librosa.load(audio_path, sr=RESAMPLING_FREQ, mono=True) | |
| y = bandpass_filter(y, sr) | |
| rms, flux, rolloff, hnr, times = extract_features(y, sr) | |
| intensity = compute_intensity(rms, flux, rolloff, hnr) | |
| events, _ = segment_intensity(times, intensity, thr=0.18) | |
| results = assign_levels(events, intensity, times) | |
| results = merge_adjacent_segments(results, gap_threshold=1) | |
| results = [ | |
| (st, en, lvl, med, max_val) | |
| for st, en, lvl, med, max_val in results | |
| if "elevado" in lvl or "gritando" in lvl | |
| ] | |
| formatted = [] | |
| for st, en, lvl, med, max_val in results: | |
| formatted.append(f"{sec_to_hhmmss(st)} – {sec_to_hhmmss(en)} | volumen de voz: {lvl}") | |
| if not formatted: | |
| return "No se detectaron gritos o voces elevadas" | |
| return "\n".join(formatted) | |