Spaces:
Configuration error
Configuration error
| import argparse | |
| import glob | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import traceback | |
| from multiprocessing import cpu_count | |
| import faiss | |
| import librosa | |
| import numpy as np | |
| import torch | |
| from scipy.io.wavfile import read | |
| from sklearn.cluster import MiniBatchKMeans | |
| from torch.nn import functional as F | |
| MATPLOTLIB_FLAG = False | |
| logging.basicConfig(stream=sys.stdout, level=logging.WARN) | |
| logger = logging | |
| f0_bin = 256 | |
| f0_max = 1100.0 | |
| f0_min = 50.0 | |
| f0_mel_min = 1127 * np.log(1 + f0_min / 700) | |
| f0_mel_max = 1127 * np.log(1 + f0_max / 700) | |
| def normalize_f0(f0, x_mask, uv, random_scale=True): | |
| # calculate means based on x_mask | |
| uv_sum = torch.sum(uv, dim=1, keepdim=True) | |
| uv_sum[uv_sum == 0] = 9999 | |
| means = torch.sum(f0[:, 0, :] * uv, dim=1, keepdim=True) / uv_sum | |
| if random_scale: | |
| factor = torch.Tensor(f0.shape[0], 1).uniform_(0.8, 1.2).to(f0.device) | |
| else: | |
| factor = torch.ones(f0.shape[0], 1).to(f0.device) | |
| # normalize f0 based on means and factor | |
| f0_norm = (f0 - means.unsqueeze(-1)) * factor.unsqueeze(-1) | |
| if torch.isnan(f0_norm).any(): | |
| exit(0) | |
| return f0_norm * x_mask | |
| def plot_data_to_numpy(x, y): | |
| global MATPLOTLIB_FLAG | |
| if not MATPLOTLIB_FLAG: | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| MATPLOTLIB_FLAG = True | |
| mpl_logger = logging.getLogger('matplotlib') | |
| mpl_logger.setLevel(logging.WARNING) | |
| import matplotlib.pylab as plt | |
| import numpy as np | |
| fig, ax = plt.subplots(figsize=(10, 2)) | |
| plt.plot(x) | |
| plt.plot(y) | |
| plt.tight_layout() | |
| fig.canvas.draw() | |
| data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='') | |
| data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,)) | |
| plt.close() | |
| return data | |
| def f0_to_coarse(f0): | |
| f0_mel = 1127 * (1 + f0 / 700).log() | |
| a = (f0_bin - 2) / (f0_mel_max - f0_mel_min) | |
| b = f0_mel_min * a - 1. | |
| f0_mel = torch.where(f0_mel > 0, f0_mel * a - b, f0_mel) | |
| # torch.clip_(f0_mel, min=1., max=float(f0_bin - 1)) | |
| f0_coarse = torch.round(f0_mel).long() | |
| f0_coarse = f0_coarse * (f0_coarse > 0) | |
| f0_coarse = f0_coarse + ((f0_coarse < 1) * 1) | |
| f0_coarse = f0_coarse * (f0_coarse < f0_bin) | |
| f0_coarse = f0_coarse + ((f0_coarse >= f0_bin) * (f0_bin - 1)) | |
| return f0_coarse | |
| def get_content(cmodel, y): | |
| with torch.no_grad(): | |
| c = cmodel.extract_features(y.squeeze(1))[0] | |
| c = c.transpose(1, 2) | |
| return c | |
| def get_f0_predictor(f0_predictor,hop_length,sampling_rate,**kargs): | |
| if f0_predictor == "pm": | |
| from modules.F0Predictor.PMF0Predictor import PMF0Predictor | |
| f0_predictor_object = PMF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate) | |
| elif f0_predictor == "crepe": | |
| from modules.F0Predictor.CrepeF0Predictor import CrepeF0Predictor | |
| f0_predictor_object = CrepeF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate,device=kargs["device"],threshold=kargs["threshold"]) | |
| elif f0_predictor == "harvest": | |
| from modules.F0Predictor.HarvestF0Predictor import HarvestF0Predictor | |
| f0_predictor_object = HarvestF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate) | |
| elif f0_predictor == "dio": | |
| from modules.F0Predictor.DioF0Predictor import DioF0Predictor | |
| f0_predictor_object = DioF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate) | |
| elif f0_predictor == "rmvpe": | |
| from modules.F0Predictor.RMVPEF0Predictor import RMVPEF0Predictor | |
| f0_predictor_object = RMVPEF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate,dtype=torch.float32 ,device=kargs["device"],threshold=kargs["threshold"]) | |
| elif f0_predictor == "fcpe": | |
| from modules.F0Predictor.FCPEF0Predictor import FCPEF0Predictor | |
| f0_predictor_object = FCPEF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate,dtype=torch.float32 ,device=kargs["device"],threshold=kargs["threshold"]) | |
| else: | |
| raise Exception("Unknown f0 predictor") | |
| return f0_predictor_object | |
| def get_speech_encoder(speech_encoder,device=None,**kargs): | |
| if speech_encoder == "vec768l12": | |
| from vencoder.ContentVec768L12 import ContentVec768L12 | |
| speech_encoder_object = ContentVec768L12(device = device) | |
| elif speech_encoder == "vec256l9": | |
| from vencoder.ContentVec256L9 import ContentVec256L9 | |
| speech_encoder_object = ContentVec256L9(device = device) | |
| elif speech_encoder == "vec256l9-onnx": | |
| from vencoder.ContentVec256L9_Onnx import ContentVec256L9_Onnx | |
| speech_encoder_object = ContentVec256L9_Onnx(device = device) | |
| elif speech_encoder == "vec256l12-onnx": | |
| from vencoder.ContentVec256L12_Onnx import ContentVec256L12_Onnx | |
| speech_encoder_object = ContentVec256L12_Onnx(device = device) | |
| elif speech_encoder == "vec768l9-onnx": | |
| from vencoder.ContentVec768L9_Onnx import ContentVec768L9_Onnx | |
| speech_encoder_object = ContentVec768L9_Onnx(device = device) | |
| elif speech_encoder == "vec768l12-onnx": | |
| from vencoder.ContentVec768L12_Onnx import ContentVec768L12_Onnx | |
| speech_encoder_object = ContentVec768L12_Onnx(device = device) | |
| elif speech_encoder == "hubertsoft-onnx": | |
| from vencoder.HubertSoft_Onnx import HubertSoft_Onnx | |
| speech_encoder_object = HubertSoft_Onnx(device = device) | |
| elif speech_encoder == "hubertsoft": | |
| from vencoder.HubertSoft import HubertSoft | |
| speech_encoder_object = HubertSoft(device = device) | |
| elif speech_encoder == "whisper-ppg": | |
| from vencoder.WhisperPPG import WhisperPPG | |
| speech_encoder_object = WhisperPPG(device = device) | |
| elif speech_encoder == "cnhubertlarge": | |
| from vencoder.CNHubertLarge import CNHubertLarge | |
| speech_encoder_object = CNHubertLarge(device = device) | |
| elif speech_encoder == "dphubert": | |
| from vencoder.DPHubert import DPHubert | |
| speech_encoder_object = DPHubert(device = device) | |
| elif speech_encoder == "whisper-ppg-large": | |
| from vencoder.WhisperPPGLarge import WhisperPPGLarge | |
| speech_encoder_object = WhisperPPGLarge(device = device) | |
| elif speech_encoder == "wavlmbase+": | |
| from vencoder.WavLMBasePlus import WavLMBasePlus | |
| speech_encoder_object = WavLMBasePlus(device = device) | |
| else: | |
| raise Exception("Unknown speech encoder") | |
| return speech_encoder_object | |
| def load_checkpoint(checkpoint_path, model, optimizer=None, skip_optimizer=False): | |
| assert os.path.isfile(checkpoint_path) | |
| checkpoint_dict = torch.load(checkpoint_path, map_location='cpu') | |
| iteration = checkpoint_dict['iteration'] | |
| learning_rate = checkpoint_dict['learning_rate'] | |
| if optimizer is not None and not skip_optimizer and checkpoint_dict['optimizer'] is not None: | |
| optimizer.load_state_dict(checkpoint_dict['optimizer']) | |
| saved_state_dict = checkpoint_dict['model'] | |
| model = model.to(list(saved_state_dict.values())[0].dtype) | |
| if hasattr(model, 'module'): | |
| state_dict = model.module.state_dict() | |
| else: | |
| state_dict = model.state_dict() | |
| new_state_dict = {} | |
| for k, v in state_dict.items(): | |
| try: | |
| # assert "dec" in k or "disc" in k | |
| # print("load", k) | |
| new_state_dict[k] = saved_state_dict[k] | |
| assert saved_state_dict[k].shape == v.shape, (saved_state_dict[k].shape, v.shape) | |
| except Exception: | |
| if "enc_q" not in k or "emb_g" not in k: | |
| print("%s is not in the checkpoint,please check your checkpoint.If you're using pretrain model,just ignore this warning." % k) | |
| logger.info("%s is not in the checkpoint" % k) | |
| new_state_dict[k] = v | |
| if hasattr(model, 'module'): | |
| model.module.load_state_dict(new_state_dict) | |
| else: | |
| model.load_state_dict(new_state_dict) | |
| print("load ") | |
| logger.info("Loaded checkpoint '{}' (iteration {})".format( | |
| checkpoint_path, iteration)) | |
| return model, optimizer, learning_rate, iteration | |
| def save_checkpoint(model, optimizer, learning_rate, iteration, checkpoint_path): | |
| logger.info("Saving model and optimizer state at iteration {} to {}".format( | |
| iteration, checkpoint_path)) | |
| if hasattr(model, 'module'): | |
| state_dict = model.module.state_dict() | |
| else: | |
| state_dict = model.state_dict() | |
| torch.save({'model': state_dict, | |
| 'iteration': iteration, | |
| 'optimizer': optimizer.state_dict(), | |
| 'learning_rate': learning_rate}, checkpoint_path) | |
| def clean_checkpoints(path_to_models='logs/44k/', n_ckpts_to_keep=2, sort_by_time=True): | |
| """Freeing up space by deleting saved ckpts | |
| Arguments: | |
| path_to_models -- Path to the model directory | |
| n_ckpts_to_keep -- Number of ckpts to keep, excluding G_0.pth and D_0.pth | |
| sort_by_time -- True -> chronologically delete ckpts | |
| False -> lexicographically delete ckpts | |
| """ | |
| ckpts_files = [f for f in os.listdir(path_to_models) if os.path.isfile(os.path.join(path_to_models, f))] | |
| def name_key(_f): | |
| return int(re.compile("._(\\d+)\\.pth").match(_f).group(1)) | |
| def time_key(_f): | |
| return os.path.getmtime(os.path.join(path_to_models, _f)) | |
| sort_key = time_key if sort_by_time else name_key | |
| def x_sorted(_x): | |
| return sorted([f for f in ckpts_files if f.startswith(_x) and not f.endswith("_0.pth")], key=sort_key) | |
| to_del = [os.path.join(path_to_models, fn) for fn in | |
| (x_sorted('G')[:-n_ckpts_to_keep] + x_sorted('D')[:-n_ckpts_to_keep])] | |
| def del_info(fn): | |
| return logger.info(f".. Free up space by deleting ckpt {fn}") | |
| def del_routine(x): | |
| return [os.remove(x), del_info(x)] | |
| [del_routine(fn) for fn in to_del] | |
| def summarize(writer, global_step, scalars={}, histograms={}, images={}, audios={}, audio_sampling_rate=22050): | |
| for k, v in scalars.items(): | |
| writer.add_scalar(k, v, global_step) | |
| for k, v in histograms.items(): | |
| writer.add_histogram(k, v, global_step) | |
| for k, v in images.items(): | |
| writer.add_image(k, v, global_step, dataformats='HWC') | |
| for k, v in audios.items(): | |
| writer.add_audio(k, v, global_step, audio_sampling_rate) | |
| def latest_checkpoint_path(dir_path, regex="G_*.pth"): | |
| f_list = glob.glob(os.path.join(dir_path, regex)) | |
| f_list.sort(key=lambda f: int("".join(filter(str.isdigit, f)))) | |
| x = f_list[-1] | |
| print(x) | |
| return x | |
| def plot_spectrogram_to_numpy(spectrogram): | |
| global MATPLOTLIB_FLAG | |
| if not MATPLOTLIB_FLAG: | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| MATPLOTLIB_FLAG = True | |
| mpl_logger = logging.getLogger('matplotlib') | |
| mpl_logger.setLevel(logging.WARNING) | |
| import matplotlib.pylab as plt | |
| import numpy as np | |
| fig, ax = plt.subplots(figsize=(10,2)) | |
| im = ax.imshow(spectrogram, aspect="auto", origin="lower", | |
| interpolation='none') | |
| plt.colorbar(im, ax=ax) | |
| plt.xlabel("Frames") | |
| plt.ylabel("Channels") | |
| plt.tight_layout() | |
| fig.canvas.draw() | |
| data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='') | |
| data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,)) | |
| plt.close() | |
| return data | |
| def plot_alignment_to_numpy(alignment, info=None): | |
| global MATPLOTLIB_FLAG | |
| if not MATPLOTLIB_FLAG: | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| MATPLOTLIB_FLAG = True | |
| mpl_logger = logging.getLogger('matplotlib') | |
| mpl_logger.setLevel(logging.WARNING) | |
| import matplotlib.pylab as plt | |
| import numpy as np | |
| fig, ax = plt.subplots(figsize=(6, 4)) | |
| im = ax.imshow(alignment.transpose(), aspect='auto', origin='lower', | |
| interpolation='none') | |
| fig.colorbar(im, ax=ax) | |
| xlabel = 'Decoder timestep' | |
| if info is not None: | |
| xlabel += '\n\n' + info | |
| plt.xlabel(xlabel) | |
| plt.ylabel('Encoder timestep') | |
| plt.tight_layout() | |
| fig.canvas.draw() | |
| data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='') | |
| data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,)) | |
| plt.close() | |
| return data | |
| def load_wav_to_torch(full_path): | |
| sampling_rate, data = read(full_path) | |
| return torch.FloatTensor(data.astype(np.float32)), sampling_rate | |
| def load_filepaths_and_text(filename, split="|"): | |
| with open(filename, encoding='utf-8') as f: | |
| filepaths_and_text = [line.strip().split(split) for line in f] | |
| return filepaths_and_text | |
| def get_hparams(init=True): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('-c', '--config', type=str, default="./configs/config.json", | |
| help='JSON file for configuration') | |
| parser.add_argument('-m', '--model', type=str, required=True, | |
| help='Model name') | |
| args = parser.parse_args() | |
| model_dir = os.path.join("./logs", args.model) | |
| if not os.path.exists(model_dir): | |
| os.makedirs(model_dir) | |
| config_path = args.config | |
| config_save_path = os.path.join(model_dir, "config.json") | |
| if init: | |
| with open(config_path, "r") as f: | |
| data = f.read() | |
| with open(config_save_path, "w") as f: | |
| f.write(data) | |
| else: | |
| with open(config_save_path, "r") as f: | |
| data = f.read() | |
| config = json.loads(data) | |
| hparams = HParams(**config) | |
| hparams.model_dir = model_dir | |
| return hparams | |
| def get_hparams_from_dir(model_dir): | |
| config_save_path = os.path.join(model_dir, "config.json") | |
| with open(config_save_path, "r") as f: | |
| data = f.read() | |
| config = json.loads(data) | |
| hparams =HParams(**config) | |
| hparams.model_dir = model_dir | |
| return hparams | |
| def get_hparams_from_file(config_path, infer_mode = False): | |
| with open(config_path, "r") as f: | |
| data = f.read() | |
| config = json.loads(data) | |
| hparams =HParams(**config) if not infer_mode else InferHParams(**config) | |
| return hparams | |
| def check_git_hash(model_dir): | |
| source_dir = os.path.dirname(os.path.realpath(__file__)) | |
| if not os.path.exists(os.path.join(source_dir, ".git")): | |
| logger.warn("{} is not a git repository, therefore hash value comparison will be ignored.".format( | |
| source_dir | |
| )) | |
| return | |
| cur_hash = subprocess.getoutput("git rev-parse HEAD") | |
| path = os.path.join(model_dir, "githash") | |
| if os.path.exists(path): | |
| saved_hash = open(path).read() | |
| if saved_hash != cur_hash: | |
| logger.warn("git hash values are different. {}(saved) != {}(current)".format( | |
| saved_hash[:8], cur_hash[:8])) | |
| else: | |
| open(path, "w").write(cur_hash) | |
| def get_logger(model_dir, filename="train.log"): | |
| global logger | |
| logger = logging.getLogger(os.path.basename(model_dir)) | |
| logger.setLevel(logging.DEBUG) | |
| formatter = logging.Formatter("%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s") | |
| if not os.path.exists(model_dir): | |
| os.makedirs(model_dir) | |
| h = logging.FileHandler(os.path.join(model_dir, filename)) | |
| h.setLevel(logging.DEBUG) | |
| h.setFormatter(formatter) | |
| logger.addHandler(h) | |
| return logger | |
| def repeat_expand_2d(content, target_len, mode = 'left'): | |
| # content : [h, t] | |
| return repeat_expand_2d_left(content, target_len) if mode == 'left' else repeat_expand_2d_other(content, target_len, mode) | |
| def repeat_expand_2d_left(content, target_len): | |
| # content : [h, t] | |
| src_len = content.shape[-1] | |
| target = torch.zeros([content.shape[0], target_len], dtype=torch.float).to(content.device) | |
| temp = torch.arange(src_len+1) * target_len / src_len | |
| current_pos = 0 | |
| for i in range(target_len): | |
| if i < temp[current_pos+1]: | |
| target[:, i] = content[:, current_pos] | |
| else: | |
| current_pos += 1 | |
| target[:, i] = content[:, current_pos] | |
| return target | |
| # mode : 'nearest'| 'linear'| 'bilinear'| 'bicubic'| 'trilinear'| 'area' | |
| def repeat_expand_2d_other(content, target_len, mode = 'nearest'): | |
| # content : [h, t] | |
| content = content[None,:,:] | |
| target = F.interpolate(content,size=target_len,mode=mode)[0] | |
| return target | |
| def mix_model(model_paths,mix_rate,mode): | |
| mix_rate = torch.FloatTensor(mix_rate)/100 | |
| model_tem = torch.load(model_paths[0]) | |
| models = [torch.load(path)["model"] for path in model_paths] | |
| if mode == 0: | |
| mix_rate = F.softmax(mix_rate,dim=0) | |
| for k in model_tem["model"].keys(): | |
| model_tem["model"][k] = torch.zeros_like(model_tem["model"][k]) | |
| for i,model in enumerate(models): | |
| model_tem["model"][k] += model[k]*mix_rate[i] | |
| torch.save(model_tem,os.path.join(os.path.curdir,"output.pth")) | |
| return os.path.join(os.path.curdir,"output.pth") | |
| def change_rms(data1, sr1, data2, sr2, rate): # 1是输入音频,2是输出音频,rate是2的占比 from RVC | |
| # print(data1.max(),data2.max()) | |
| rms1 = librosa.feature.rms( | |
| y=data1, frame_length=sr1 // 2 * 2, hop_length=sr1 // 2 | |
| ) # 每半秒一个点 | |
| rms2 = librosa.feature.rms(y=data2.detach().cpu().numpy(), frame_length=sr2 // 2 * 2, hop_length=sr2 // 2) | |
| rms1 = torch.from_numpy(rms1).to(data2.device) | |
| rms1 = F.interpolate( | |
| rms1.unsqueeze(0), size=data2.shape[0], mode="linear" | |
| ).squeeze() | |
| rms2 = torch.from_numpy(rms2).to(data2.device) | |
| rms2 = F.interpolate( | |
| rms2.unsqueeze(0), size=data2.shape[0], mode="linear" | |
| ).squeeze() | |
| rms2 = torch.max(rms2, torch.zeros_like(rms2) + 1e-6) | |
| data2 *= ( | |
| torch.pow(rms1, torch.tensor(1 - rate)) | |
| * torch.pow(rms2, torch.tensor(rate - 1)) | |
| ) | |
| return data2 | |
| def train_index(spk_name,root_dir = "dataset/44k/"): #from: RVC https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI | |
| n_cpu = cpu_count() | |
| print("The feature index is constructing.") | |
| exp_dir = os.path.join(root_dir,spk_name) | |
| listdir_res = [] | |
| for file in os.listdir(exp_dir): | |
| if ".wav.soft.pt" in file: | |
| listdir_res.append(os.path.join(exp_dir,file)) | |
| if len(listdir_res) == 0: | |
| raise Exception("You need to run preprocess_hubert_f0.py!") | |
| npys = [] | |
| for name in sorted(listdir_res): | |
| phone = torch.load(name)[0].transpose(-1,-2).numpy() | |
| npys.append(phone) | |
| big_npy = np.concatenate(npys, 0) | |
| big_npy_idx = np.arange(big_npy.shape[0]) | |
| np.random.shuffle(big_npy_idx) | |
| big_npy = big_npy[big_npy_idx] | |
| if big_npy.shape[0] > 2e5: | |
| # if(1): | |
| info = "Trying doing kmeans %s shape to 10k centers." % big_npy.shape[0] | |
| print(info) | |
| try: | |
| big_npy = ( | |
| MiniBatchKMeans( | |
| n_clusters=10000, | |
| verbose=True, | |
| batch_size=256 * n_cpu, | |
| compute_labels=False, | |
| init="random", | |
| ) | |
| .fit(big_npy) | |
| .cluster_centers_ | |
| ) | |
| except Exception: | |
| info = traceback.format_exc() | |
| print(info) | |
| n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39) | |
| index = faiss.index_factory(big_npy.shape[1] , "IVF%s,Flat" % n_ivf) | |
| index_ivf = faiss.extract_index_ivf(index) # | |
| index_ivf.nprobe = 1 | |
| index.train(big_npy) | |
| batch_size_add = 8192 | |
| for i in range(0, big_npy.shape[0], batch_size_add): | |
| index.add(big_npy[i : i + batch_size_add]) | |
| # faiss.write_index( | |
| # index, | |
| # f"added_{spk_name}.index" | |
| # ) | |
| print("Successfully build index") | |
| return index | |
| class HParams(): | |
| def __init__(self, **kwargs): | |
| for k, v in kwargs.items(): | |
| if type(v) == dict: | |
| v = HParams(**v) | |
| self[k] = v | |
| def keys(self): | |
| return self.__dict__.keys() | |
| def items(self): | |
| return self.__dict__.items() | |
| def values(self): | |
| return self.__dict__.values() | |
| def __len__(self): | |
| return len(self.__dict__) | |
| def __getitem__(self, key): | |
| return getattr(self, key) | |
| def __setitem__(self, key, value): | |
| return setattr(self, key, value) | |
| def __contains__(self, key): | |
| return key in self.__dict__ | |
| def __repr__(self): | |
| return self.__dict__.__repr__() | |
| def get(self,index): | |
| return self.__dict__.get(index) | |
| class InferHParams(HParams): | |
| def __init__(self, **kwargs): | |
| for k, v in kwargs.items(): | |
| if type(v) == dict: | |
| v = InferHParams(**v) | |
| self[k] = v | |
| def __getattr__(self,index): | |
| return self.get(index) | |
| class Volume_Extractor: | |
| def __init__(self, hop_size = 512): | |
| self.hop_size = hop_size | |
| def extract(self, audio): # audio: 2d tensor array | |
| if not isinstance(audio,torch.Tensor): | |
| audio = torch.Tensor(audio) | |
| n_frames = int(audio.size(-1) // self.hop_size) | |
| audio2 = audio ** 2 | |
| audio2 = torch.nn.functional.pad(audio2, (int(self.hop_size // 2), int((self.hop_size + 1) // 2)), mode = 'reflect') | |
| volume = torch.nn.functional.unfold(audio2[:,None,None,:],(1,self.hop_size),stride=self.hop_size)[:,:,:n_frames].mean(dim=1)[0] | |
| volume = torch.sqrt(volume) | |
| return volume | |