Spaces:
Runtime error
Runtime error
| import argparse | |
| import json | |
| import os | |
| import subprocess | |
| import tempfile | |
| import zipfile | |
| from pathlib import Path | |
| import cog | |
| import kaldiio | |
| import numpy as np | |
| import pyworld as pw | |
| import resampy | |
| import soundfile as sf | |
| import torch | |
| from model_decoder import Decoder_ac | |
| from model_encoder import Encoder, Encoder_lf0 | |
| from model_encoder import SpeakerEncoder as Encoder_spk | |
| from spectrogram import logmelspectrogram | |
| def extract_logmel(wav_path, mean, std, sr=16000): | |
| # wav, fs = librosa.load(wav_path, sr=sr) | |
| wav, fs = sf.read(wav_path) | |
| if fs != sr: | |
| wav = resampy.resample(wav, fs, sr, axis=0) | |
| fs = sr | |
| # wav, _ = librosa.effects.trim(wav, top_db=15) | |
| # duration = len(wav)/fs | |
| assert fs == 16000 | |
| peak = np.abs(wav).max() | |
| if peak > 1.0: | |
| wav /= peak | |
| mel = logmelspectrogram( | |
| x=wav, | |
| fs=fs, | |
| n_mels=80, | |
| n_fft=400, | |
| n_shift=160, | |
| win_length=400, | |
| window="hann", | |
| fmin=80, | |
| fmax=7600, | |
| ) | |
| mel = (mel - mean) / (std + 1e-8) | |
| tlen = mel.shape[0] | |
| frame_period = 160 / fs * 1000 | |
| f0, timeaxis = pw.dio(wav.astype("float64"), fs, frame_period=frame_period) | |
| f0 = pw.stonemask(wav.astype("float64"), f0, timeaxis, fs) | |
| f0 = f0[:tlen].reshape(-1).astype("float32") | |
| nonzeros_indices = np.nonzero(f0) | |
| lf0 = f0.copy() | |
| lf0[nonzeros_indices] = np.log( | |
| f0[nonzeros_indices] | |
| ) # for f0(Hz), lf0 > 0 when f0 != 0 | |
| mean, std = np.mean(lf0[nonzeros_indices]), np.std(lf0[nonzeros_indices]) | |
| lf0[nonzeros_indices] = (lf0[nonzeros_indices] - mean) / (std + 1e-8) | |
| return mel, lf0 | |
| class Predictor(cog.Predictor): | |
| def setup(self): | |
| """Load models""" | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| checkpoint_path = "VQMIVC-pretrained models/checkpoints/useCSMITrue_useCPMITrue_usePSMITrue_useAmpTrue/VQMIVC-model.ckpt-500.pt" | |
| mel_stats = np.load("./mel_stats/stats.npy") | |
| encoder = Encoder( | |
| in_channels=80, channels=512, n_embeddings=512, z_dim=64, c_dim=256 | |
| ) | |
| encoder_lf0 = Encoder_lf0() | |
| encoder_spk = Encoder_spk() | |
| decoder = Decoder_ac(dim_neck=64) | |
| encoder.to(device) | |
| encoder_lf0.to(device) | |
| encoder_spk.to(device) | |
| decoder.to(device) | |
| checkpoint = torch.load( | |
| checkpoint_path, map_location=lambda storage, loc: storage | |
| ) | |
| encoder.load_state_dict(checkpoint["encoder"]) | |
| encoder_spk.load_state_dict(checkpoint["encoder_spk"]) | |
| decoder.load_state_dict(checkpoint["decoder"]) | |
| encoder.eval() | |
| encoder_spk.eval() | |
| decoder.eval() | |
| self.mean = mel_stats[0] | |
| self.std = mel_stats[1] | |
| self.encoder = encoder | |
| self.encoder_spk = encoder_spk | |
| self.encoder_lf0 = encoder_lf0 | |
| self.decoder = decoder | |
| self.device = device | |
| def predict(self, input_source, input_reference): | |
| """Compute prediction""" | |
| # inference | |
| out_dir = Path(tempfile.mkdtemp()) | |
| out_path = out_dir / Path( | |
| os.path.basename(str(input_source)).split(".")[0] + "_converted_gen.wav" | |
| ) | |
| src_wav_path = input_source | |
| ref_wav_path = input_reference | |
| feat_writer = kaldiio.WriteHelper( | |
| "ark,scp:{o}.ark,{o}.scp".format(o=str(out_dir) + "/feats.1") | |
| ) | |
| src_mel, src_lf0 = extract_logmel(src_wav_path, self.mean, self.std) | |
| ref_mel, _ = extract_logmel(ref_wav_path, self.mean, self.std) | |
| src_mel = torch.FloatTensor(src_mel.T).unsqueeze(0).to(self.device) | |
| src_lf0 = torch.FloatTensor(src_lf0).unsqueeze(0).to(self.device) | |
| ref_mel = torch.FloatTensor(ref_mel.T).unsqueeze(0).to(self.device) | |
| out_filename = os.path.basename(src_wav_path).split(".")[0] | |
| with torch.no_grad(): | |
| z, _, _, _ = self.encoder.encode(src_mel) | |
| lf0_embs = self.encoder_lf0(src_lf0) | |
| spk_emb = self.encoder_spk(ref_mel) | |
| output = self.decoder(z, lf0_embs, spk_emb) | |
| feat_writer[out_filename + "_converted"] = output.squeeze(0).cpu().numpy() | |
| feat_writer[out_filename + "_source"] = src_mel.squeeze(0).cpu().numpy().T | |
| feat_writer[out_filename + "_reference"] = ( | |
| ref_mel.squeeze(0).cpu().numpy().T | |
| ) | |
| feat_writer.close() | |
| print("synthesize waveform...") | |
| cmd = [ | |
| "parallel-wavegan-decode", | |
| "--checkpoint", | |
| "./vocoder/checkpoint-3000000steps.pkl", | |
| "--feats-scp", | |
| f"{str(out_dir)}/feats.1.scp", | |
| "--outdir", | |
| str(out_dir), | |
| ] | |
| subprocess.call(cmd) | |
| return out_path | |