Spaces:
Build error
Build error
| """Main loading function. | |
| Copyright PolyAI Limited. | |
| """ | |
| import json | |
| import os | |
| import random | |
| from pathlib import Path | |
| import numpy as np | |
| import soundfile as sf | |
| import torch | |
| from librosa.util import normalize | |
| from pyannote.audio import Inference | |
| from torch.utils import data | |
| import constants as c | |
| def random_crop(x, maxseqlen): | |
| if x.shape[0] >= maxseqlen: | |
| offset = random.randrange(x.shape[0] - maxseqlen + 1) | |
| x = x[offset: offset + maxseqlen] | |
| else: | |
| offset = 0 | |
| return x, offset | |
| def dynamic_range_compression(x, C=0.3, M=6.5, clip_val=1e-5): | |
| return (np.log(np.clip(x, a_min=clip_val, a_max=None)) + M) * C | |
| def dynamic_range_decompression(x, C=0.3, M=6.5): | |
| return np.exp(x / C - M) | |
| class QuantizeDataset(data.Dataset): | |
| def __init__(self, hp, metapath, datadir=None, speaker_embedding_dir=None): | |
| self.hp = hp | |
| self.datadir = Path(datadir) | |
| self.speaker_embedding_dir = speaker_embedding_dir | |
| self.sem_mask_id = hp.n_semantic_codes | |
| print(f"Loading metadata in {metapath}...") | |
| with open(metapath, "r") as f: | |
| self.text = json.load(f) | |
| if 0 < self.hp.max_dataset_samples < len(self.text): | |
| self.new_text = {} | |
| num = 0 | |
| for k, v in self.text.items(): | |
| if num >= self.hp.max_dataset_samples: | |
| break | |
| self.new_text[k] = v | |
| num += 1 | |
| self.text = self.new_text | |
| self.datasetbase = [x for x in self.text.keys()] | |
| self.dataset = [ | |
| os.path.join(self.datadir, x) for x in self.datasetbase] | |
| if self.speaker_embedding_dir is None: | |
| self.spkr_embedding = Inference( | |
| "pyannote/embedding", | |
| window="whole", | |
| use_auth_token=os.environ["HUGGING_FACE_HUB_TOKEN"], | |
| ) | |
| # Print statistics: | |
| n = len(self.dataset) | |
| print(f"Total {n} examples") | |
| self.lengths = [float(v["duration"]) for v in self.text.values()] | |
| total_duration = sum(self.lengths) | |
| avglen = total_duration / len(self.lengths) | |
| maxlen = max(self.lengths) | |
| minlen = min(self.lengths) | |
| print( | |
| f"Average duration of audio: {avglen} sec, " | |
| "Maximum duration: {maxlen} sec, Minimum duration: {minlen} sec" | |
| ) | |
| def __len__(self): | |
| return len(self.dataset) | |
| def load_quantization(self, _name): | |
| if self.hp.vocoder_type == 'NATIVE': | |
| metadata = self.text[_name] | |
| quantization = np.array(metadata["quantization"]).T # ..., 4 | |
| elif self.hp.vocoder_type == 'DAC': | |
| codes_path = self.datadir.parent / 'audios-dac' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
| quantization = np.load(codes_path).T # ..., 12 | |
| elif self.hp.vocoder_type == 'ENCODEC': | |
| codes_path = self.datadir.parent / 'audios-encodec' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
| quantization = np.load(codes_path).squeeze(0).T # ..., 8 | |
| elif self.hp.vocoder_type == 'SPEECHTOKENIZER': | |
| codes_path = self.datadir.parent / 'audios-speech-tokenizer/acoustic' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
| quantization = np.load(codes_path).T # ..., 7 | |
| else: | |
| raise ValueError(f"Unknown vocoder_type {self.hp.vocoder_type}") | |
| return quantization | |
| def __getitem__(self, i): | |
| dataname = self.dataset[i] | |
| _name = self.datasetbase[i] | |
| metadata = self.text[_name] | |
| # Speaker 1 | |
| acoustic_tokens = self.load_quantization(_name) | |
| acoustic_tokens = np.pad( | |
| acoustic_tokens, [[1, 0],[0,0]], constant_values=c.SPKR_1) | |
| npy_path = self.datadir.parent / 'audios-speech-tokenizer/semantic' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
| semantic_tokens = np.load(npy_path)[None] | |
| semantic_tokens = np.pad( | |
| semantic_tokens,[[0,0], [1, 0]], constant_values=c.SPKR_1) | |
| if "name_2" in metadata: | |
| wav, _ = sf.read(dataname.split(".")[0] + "_1.wav") | |
| else: | |
| wav, _ = sf.read(dataname) | |
| audio = normalize(wav) * 0.95 | |
| speaker_embedding = self.spkr_embedding( | |
| {"waveform": torch.FloatTensor(audio).unsqueeze(0), | |
| "sample_rate": self.hp.sample_rate,} | |
| ).reshape(1, -1) | |
| speaker_embedding = np.repeat( | |
| speaker_embedding, semantic_tokens.shape[1], axis=0) | |
| # Speaker 2 | |
| if "text_2" in metadata: | |
| _name = _name.split(".wav")[0] + "_2.wav" | |
| acoustic_tokens_2 = self.load_quantization(_name) | |
| acoustic_tokens_2 = np.pad( | |
| acoustic_tokens_2, [[1, 0],[0,0]], constant_values=c.SPKR_2) | |
| npy_path = self.datadir.parent / 'audios-speech-tokenizer/semantic' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
| semantic_tokens_2 = np.load(npy_path)[None] | |
| semantic_tokens_2 = np.pad( | |
| semantic_tokens_2,[[0,0], [1, 0]], constant_values=c.SPKR_2) | |
| wav, _ = sf.read(dataname.split(".wav")[0] + "_2.wav") | |
| audio = normalize(wav) * 0.95 | |
| speaker_embedding_2 = self.spkr_embedding( | |
| {"waveform": torch.FloatTensor(audio).unsqueeze(0), | |
| "sample_rate": self.hp.sample_rate,} | |
| ).reshape(1, -1) | |
| speaker_embedding_2 = np.repeat( | |
| speaker_embedding_2, semantic_tokens_2.shape[1], axis=0) | |
| # Merge both speakers | |
| acoustic_tokens = np.concatenate( | |
| (acoustic_tokens, acoustic_tokens_2), axis=0) | |
| semantic_tokens = np.concatenate( | |
| (semantic_tokens, semantic_tokens_2), axis=1) | |
| speaker_embedding = np.concatenate( | |
| (speaker_embedding, speaker_embedding_2), axis=0) | |
| speaker_embedding = speaker_embedding[:self.hp.max_length, :] | |
| acoustic_tokens = acoustic_tokens[:self.hp.max_length, :] | |
| semantic_tokens = semantic_tokens[:, :self.hp.max_length] | |
| # # HACK - we have no 8 lvls pfb30 | |
| # acoustic_tokens = np.concatenate((semantic_tokens.T, acoustic_tokens), axis=1) | |
| # # END HACK | |
| return speaker_embedding, acoustic_tokens, acoustic_tokens, dataname, semantic_tokens # noqa | |