File size: 9,917 Bytes
a2e831a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python3
"""
NeuCodec Test - Gradio App
Equivalent to nemo and snac test spaces, but for NeuCodec used in NeuTTS-Air models.
Allows testing encode/decode cycles with the neuphonic/neucodec model.
"""

import gradio as gr
import torch
import librosa
import numpy as np
import traceback
import time

# Attempt to import NeuCodec
try:
    from neucodec import NeuCodec, DistillNeuCodec
    print("NeuCodec modules imported successfully.")
except ImportError as e:
    print(f"Error importing NeuCodec: {e}")
    raise ImportError("Could not import NeuCodec. Make sure 'neucodec' is installed correctly.") from e

# --- Configuration ---
TARGET_SR = 16000  # NeuCodec operates at 16kHz for encoding
OUTPUT_SR = 24000  # NeuCodec outputs at 24kHz
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_NAME = "neuphonic/neucodec"  # Options: neuphonic/neucodec, neuphonic/distill-neucodec
print(f"Using device: {DEVICE}")

# --- Load Model (Load once globally) ---
neucodec = None
try:
    print(f"Loading NeuCodec model: {MODEL_NAME}...")
    start_time = time.time()

    if MODEL_NAME == "neuphonic/distill-neucodec":
        neucodec = DistillNeuCodec.from_pretrained(MODEL_NAME)
    else:
        neucodec = NeuCodec.from_pretrained(MODEL_NAME)

    neucodec = neucodec.to(DEVICE)
    neucodec.eval()  # Set model to evaluation mode
    end_time = time.time()
    print(f"NeuCodec loaded successfully to {DEVICE}. Time taken: {end_time - start_time:.2f} seconds.")
except Exception as e:
    print(f"FATAL: Error loading NeuCodec: {e}")
    print(traceback.format_exc())

# --- Main Processing Function ---
def process_audio(audio_filepath):
    """
    Loads, resamples, encodes, decodes audio using NeuCodec, and returns results.
    """
    if neucodec is None:
        return None, None, None, "Error: NeuCodec could not be loaded. Cannot process audio."

    if audio_filepath is None:
        return None, None, None, "Please upload an audio file."

    logs = ["--- Starting Audio Processing with NeuCodec ---"]
    try:
        # 1. Load Audio
        logs.append(f"Loading audio file: {audio_filepath}")
        load_start = time.time()

        # Load original audio (for playback reference)
        original_waveform, original_sr = librosa.load(audio_filepath, sr=None, mono=False)
        logs.append(f"Audio loaded. Original SR: {original_sr} Hz, Shape: {original_waveform.shape}")

        # Convert to mono if stereo
        if len(original_waveform.shape) > 1:
            logs.append(f"Warning: Input audio has {original_waveform.shape[0]} channels. Converting to mono.")
            original_waveform = librosa.to_mono(original_waveform)

        load_end = time.time()
        logs.append(f"Loading time: {load_end - load_start:.2f}s")

        # --- Prepare Original for Playback ---
        original_audio_playback = (original_sr, original_waveform)
        logs.append("Prepared original audio for playback.")

        # 2. Resample to 16kHz for encoding (NeuCodec expects 16kHz input)
        resample_start = time.time()
        logs.append(f"Resampling waveform to {TARGET_SR} Hz for encoding...")
        waveform_16k = librosa.resample(original_waveform, orig_sr=original_sr, target_sr=TARGET_SR)
        logs.append(f"Resampling complete. New Shape: {waveform_16k.shape}")
        resample_end = time.time()
        logs.append(f"Resampling time: {resample_end - resample_start:.2f}s")

        # --- Prepare 16kHz version for Playback ---
        resampled_audio_playback = (TARGET_SR, waveform_16k)
        logs.append("Prepared 16kHz audio for playback.")

        # 3. Prepare for NeuCodec Encoding
        # NeuCodec expects [batch, channels, samples] format
        waveform_tensor = torch.from_numpy(waveform_16k).float().unsqueeze(0).unsqueeze(0)  # [1, 1, samples]
        waveform_tensor = waveform_tensor.to(DEVICE)

        logs.append(f"Waveform prepared for encoding. Shape: {waveform_tensor.shape}, Device: {DEVICE}")

        # 4. Encode Audio using NeuCodec
        logs.append("Encoding audio with NeuCodec...")
        encode_start = time.time()
        with torch.no_grad():
            encoded_codes = neucodec.encode_code(audio_or_path=waveform_tensor.cpu())
        encode_end = time.time()

        if encoded_codes is None:
            log_msg = "Encoding failed: encoded_codes is None"
            logs.append(log_msg)
            raise ValueError(log_msg)

        logs.append(f"Encoding complete. Time: {encode_end - encode_start:.2f}s")
        logs.append(f"Encoded codes shape: {encoded_codes.shape}")
        logs.append(f"Encoded codes device: {encoded_codes.device}")

        # Log some statistics about the codes
        logs.append(f"Code sequence length: {encoded_codes.shape[-1]}")
        logs.append(f"Code range: [{encoded_codes.min().item():.0f}, {encoded_codes.max().item():.0f}]")

        # Calculate compression ratio
        original_samples = waveform_16k.shape[0]
        code_elements = encoded_codes.numel()
        compression_ratio = original_samples / code_elements if code_elements > 0 else 0
        logs.append(f"Compression ratio: ~{compression_ratio:.1f}:1 ({original_samples} samples -> {code_elements} codes)")

        # 5. Decode the Codes using NeuCodec
        logs.append("Decoding the generated codes with NeuCodec...")
        decode_start = time.time()
        with torch.no_grad():
            reconstructed_waveform = neucodec.decode_code(encoded_codes)
        decode_end = time.time()
        logs.append(f"Decoding complete. Reconstructed waveform shape: {reconstructed_waveform.shape}, Device: {reconstructed_waveform.device}. Time: {decode_end - decode_start:.2f}s")

        # 6. Prepare Reconstructed Audio for Playback
        # Output is at 24kHz. Move to CPU, remove batch and channel dims, convert to NumPy.
        reconstructed_audio_np = reconstructed_waveform.cpu().squeeze().numpy()
        logs.append(f"Reconstructed audio prepared for playback at {OUTPUT_SR} Hz. Shape: {reconstructed_audio_np.shape}")
        reconstructed_audio_playback = (OUTPUT_SR, reconstructed_audio_np)

        # 7. Calculate quality metrics
        # For comparison, we need to resample original to 24kHz to match reconstructed output
        logs.append("Calculating quality metrics...")
        original_24k = librosa.resample(original_waveform, orig_sr=original_sr, target_sr=OUTPUT_SR)

        # Handle length differences (common with codecs)
        min_len = min(len(original_24k), len(reconstructed_audio_np))
        original_trimmed = original_24k[:min_len]
        reconstructed_trimmed = reconstructed_audio_np[:min_len]

        # Simple MSE calculation
        mse = np.mean((original_trimmed - reconstructed_trimmed) ** 2)

        if len(original_24k) != len(reconstructed_audio_np):
            logs.append(f"Audio length difference: Original {len(original_24k)} samples, Reconstructed {len(reconstructed_audio_np)} samples")

        logs.append(f"MSE (first {min_len} samples at 24kHz): {mse:.6f}")

        # Calculate Signal-to-Noise Ratio (SNR)
        signal_power = np.mean(original_trimmed ** 2)
        noise_power = mse
        if noise_power > 0:
            snr_db = 10 * np.log10(signal_power / noise_power)
            logs.append(f"SNR: {snr_db:.2f} dB")

        logs.append("\n--- Audio Processing Completed Successfully ---")

        # Summary statistics
        total_time = (load_end - load_start) + (resample_end - resample_start) + (encode_end - encode_start) + (decode_end - decode_start)
        logs.append(f"Total processing time: {total_time:.2f}s")
        logs.append(f"Audio duration: {len(original_waveform) / original_sr:.2f}s")
        logs.append(f"Real-time factor: {(len(original_waveform) / original_sr) / total_time:.2f}x")

        return original_audio_playback, resampled_audio_playback, reconstructed_audio_playback, "\n".join(logs)

    except Exception as e:
        logs.append("\n--- An Error Occurred ---")
        logs.append(f"Error Type: {type(e).__name__}")
        logs.append(f"Error Details: {e}")
        logs.append("\n--- Traceback ---")
        logs.append(traceback.format_exc())
        return None, None, None, "\n".join(logs)

# --- Gradio Interface ---
DESCRIPTION = """
This app demonstrates the **NeuCodec** model (`neuphonic/neucodec`) used in NeuTTS-Air.

**How it works:**
1. Upload an audio file (wav, mp3, flac, etc.).
2. The audio will be automatically resampled to 16kHz for encoding.
3. The 16kHz audio is encoded into discrete codes by NeuCodec.
4. These codes are then decoded back into 24kHz audio by NeuCodec.
5. You can listen to the original, the 16kHz version, and the final reconstructed 24kHz audio.

**Technical details:**
- Input sample rate: 16kHz (for encoding)
- Output sample rate: 24kHz (after decoding)
- Architecture: 50Hz neural audio codec with single codebook
- Hop length: 480 samples

**Note:** If the input is stereo, it will be converted to mono.
"""

iface = gr.Interface(
    fn=process_audio,
    inputs=gr.Audio(type="filepath", label="Upload Audio File"),
    outputs=[
        gr.Audio(label="Original Audio"),
        gr.Audio(label="16kHz Audio (Input to NeuCodec)"),
        gr.Audio(label="Reconstructed Audio (24kHz Output from NeuCodec)"),
        gr.Textbox(label="Log Output", lines=20)
    ],
    title="NeuCodec Demo (16kHz -> 24kHz)",
    description=DESCRIPTION,
    examples=[
        # TODO
        # ["examples/example1.wav"],
    ],
    cache_examples=False
)

if __name__ == "__main__":
    if neucodec is None:
        print("Cannot launch Gradio interface because NeuCodec failed to load.")
    else:
        print("Launching Gradio Interface...")
        print(f"Model: {MODEL_NAME}")
        print(f"Input sample rate: {TARGET_SR} Hz")
        print(f"Output sample rate: {OUTPUT_SR} Hz")
        print(f"Device: {DEVICE}")
        iface.launch(share=True)