Spaces:
Running
on
Zero
Running
on
Zero
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import os | |
| import librosa | |
| import torch | |
| import perth | |
| import torch.nn.functional as F | |
| from safetensors.torch import load_file as load_safetensors | |
| from huggingface_hub import snapshot_download | |
| from .models.t3 import T3 | |
| from .models.t3.modules.t3_config import T3Config | |
| from .models.s3tokenizer import S3_SR, drop_invalid_tokens | |
| from .models.s3gen import S3GEN_SR, S3Gen | |
| from .models.tokenizers import MTLTokenizer | |
| from .models.voice_encoder import VoiceEncoder | |
| from .models.t3.modules.cond_enc import T3Cond | |
| REPO_ID = "ResembleAI/chatterbox" | |
| # Supported languages for the multilingual model | |
| SUPPORTED_LANGUAGES = { | |
| "ar": "Arabic", | |
| "da": "Danish", | |
| "de": "German", | |
| "el": "Greek", | |
| "en": "English", | |
| "es": "Spanish", | |
| "fi": "Finnish", | |
| "fr": "French", | |
| "he": "Hebrew", | |
| "hi": "Hindi", | |
| "it": "Italian", | |
| "ja": "Japanese", | |
| "ko": "Korean", | |
| "ms": "Malay", | |
| "nl": "Dutch", | |
| "no": "Norwegian", | |
| "pl": "Polish", | |
| "pt": "Portuguese", | |
| "ru": "Russian", | |
| "sv": "Swedish", | |
| "sw": "Swahili", | |
| "tr": "Turkish", | |
| "zh": "Chinese", | |
| } | |
| def punc_norm(text: str) -> str: | |
| """ | |
| Quick cleanup func for punctuation from LLMs or | |
| containing chars not seen often in the dataset | |
| """ | |
| if len(text) == 0: | |
| return "You need to add some text for me to talk." | |
| # Capitalise first letter | |
| if text[0].islower(): | |
| text = text[0].upper() + text[1:] | |
| # Remove multiple space chars | |
| text = " ".join(text.split()) | |
| # Replace uncommon/llm punc | |
| punc_to_replace = [ | |
| ("...", ", "), | |
| ("…", ", "), | |
| (":", ","), | |
| (" - ", ", "), | |
| (";", ", "), | |
| ("—", "-"), | |
| ("–", "-"), | |
| (" ,", ","), | |
| ("“", "\""), | |
| ("”", "\""), | |
| ("‘", "'"), | |
| ("’", "'"), | |
| ] | |
| for old_char_sequence, new_char in punc_to_replace: | |
| text = text.replace(old_char_sequence, new_char) | |
| # Add full stop if no ending punc | |
| text = text.rstrip(" ") | |
| sentence_enders = {".", "!", "?", "-", ",","、",",","。","?","!"} | |
| if not any(text.endswith(p) for p in sentence_enders): | |
| text += "." | |
| return text | |
| class Conditionals: | |
| """ | |
| Conditionals for T3 and S3Gen | |
| - T3 conditionals: | |
| - speaker_emb | |
| - clap_emb | |
| - cond_prompt_speech_tokens | |
| - cond_prompt_speech_emb | |
| - emotion_adv | |
| - S3Gen conditionals: | |
| - prompt_token | |
| - prompt_token_len | |
| - prompt_feat | |
| - prompt_feat_len | |
| - embedding | |
| """ | |
| t3: T3Cond | |
| gen: dict | |
| def to(self, device): | |
| self.t3 = self.t3.to(device=device) | |
| for k, v in self.gen.items(): | |
| if torch.is_tensor(v): | |
| self.gen[k] = v.to(device=device) | |
| return self | |
| def save(self, fpath: Path): | |
| arg_dict = dict( | |
| t3=self.t3.__dict__, | |
| gen=self.gen | |
| ) | |
| torch.save(arg_dict, fpath) | |
| def load(cls, fpath, map_location="cpu"): | |
| kwargs = torch.load(fpath, map_location=map_location, weights_only=True) | |
| return cls(T3Cond(**kwargs['t3']), kwargs['gen']) | |
| class ChatterboxMultilingualTTS: | |
| ENC_COND_LEN = 6 * S3_SR | |
| DEC_COND_LEN = 10 * S3GEN_SR | |
| def __init__( | |
| self, | |
| t3: T3, | |
| s3gen: S3Gen, | |
| ve: VoiceEncoder, | |
| tokenizer: MTLTokenizer, | |
| device: str, | |
| conds: Conditionals = None, | |
| ): | |
| self.sr = S3GEN_SR # sample rate of synthesized audio | |
| self.t3 = t3 | |
| self.s3gen = s3gen | |
| self.ve = ve | |
| self.tokenizer = tokenizer | |
| self.device = device | |
| self.conds = conds | |
| self.watermarker = perth.PerthImplicitWatermarker() | |
| def get_supported_languages(cls): | |
| """Return dictionary of supported language codes and names.""" | |
| return SUPPORTED_LANGUAGES.copy() | |
| def from_local(cls, ckpt_dir, device) -> 'ChatterboxMultilingualTTS': | |
| ckpt_dir = Path(ckpt_dir) | |
| ve = VoiceEncoder() | |
| ve.load_state_dict( | |
| torch.load(ckpt_dir / "ve.pt", weights_only=True) | |
| ) | |
| ve.to(device).eval() | |
| t3 = T3(T3Config.multilingual()) | |
| t3_state = load_safetensors(ckpt_dir / "t3_23lang.safetensors") | |
| if "model" in t3_state.keys(): | |
| t3_state = t3_state["model"][0] | |
| t3.load_state_dict(t3_state) | |
| t3.to(device).eval() | |
| s3gen = S3Gen() | |
| s3gen.load_state_dict( | |
| torch.load(ckpt_dir / "s3gen.pt", weights_only=True) | |
| ) | |
| s3gen.to(device).eval() | |
| tokenizer = MTLTokenizer( | |
| str(ckpt_dir / "mtl_tokenizer.json") | |
| ) | |
| conds = None | |
| if (builtin_voice := ckpt_dir / "conds.pt").exists(): | |
| conds = Conditionals.load(builtin_voice).to(device) | |
| return cls(t3, s3gen, ve, tokenizer, device, conds=conds) | |
| def from_pretrained(cls, device: torch.device) -> 'ChatterboxMultilingualTTS': | |
| ckpt_dir = Path( | |
| snapshot_download( | |
| repo_id=REPO_ID, | |
| repo_type="model", | |
| revision="main", | |
| allow_patterns=["ve.pt", "t3_23lang.safetensors", "s3gen.pt", "mtl_tokenizer.json", "conds.pt", "Cangjie5_TC.json"], | |
| token=os.getenv("HF_TOKEN"), | |
| ) | |
| ) | |
| return cls.from_local(ckpt_dir, device) | |
| def prepare_conditionals(self, wav_fpath, exaggeration=0.5): | |
| ## Load reference wav | |
| s3gen_ref_wav, _sr = librosa.load(wav_fpath, sr=S3GEN_SR) | |
| ref_16k_wav = librosa.resample(s3gen_ref_wav, orig_sr=S3GEN_SR, target_sr=S3_SR) | |
| s3gen_ref_wav = s3gen_ref_wav[:self.DEC_COND_LEN] | |
| s3gen_ref_dict = self.s3gen.embed_ref(s3gen_ref_wav, S3GEN_SR, device=self.device) | |
| # Speech cond prompt tokens | |
| t3_cond_prompt_tokens = None | |
| if plen := self.t3.hp.speech_cond_prompt_len: | |
| s3_tokzr = self.s3gen.tokenizer | |
| t3_cond_prompt_tokens, _ = s3_tokzr.forward([ref_16k_wav[:self.ENC_COND_LEN]], max_len=plen) | |
| t3_cond_prompt_tokens = torch.atleast_2d(t3_cond_prompt_tokens).to(self.device) | |
| # Voice-encoder speaker embedding | |
| ve_embed = torch.from_numpy(self.ve.embeds_from_wavs([ref_16k_wav], sample_rate=S3_SR)) | |
| ve_embed = ve_embed.mean(axis=0, keepdim=True).to(self.device) | |
| t3_cond = T3Cond( | |
| speaker_emb=ve_embed, | |
| cond_prompt_speech_tokens=t3_cond_prompt_tokens, | |
| emotion_adv=exaggeration * torch.ones(1, 1, 1), | |
| ).to(device=self.device) | |
| self.conds = Conditionals(t3_cond, s3gen_ref_dict) | |
| def generate( | |
| self, | |
| text, | |
| language_id, | |
| audio_prompt_path=None, | |
| exaggeration=0.5, | |
| cfg_weight=0.5, | |
| temperature=0.8, | |
| repetition_penalty=2.0, | |
| min_p=0.05, | |
| top_p=1.0, | |
| ): | |
| # Validate language_id | |
| if language_id and language_id.lower() not in SUPPORTED_LANGUAGES: | |
| supported_langs = ", ".join(SUPPORTED_LANGUAGES.keys()) | |
| raise ValueError( | |
| f"Unsupported language_id '{language_id}'. " | |
| f"Supported languages: {supported_langs}" | |
| ) | |
| if audio_prompt_path: | |
| self.prepare_conditionals(audio_prompt_path, exaggeration=exaggeration) | |
| else: | |
| assert self.conds is not None, "Please `prepare_conditionals` first or specify `audio_prompt_path`" | |
| # Update exaggeration if needed | |
| if float(exaggeration) != float(self.conds.t3.emotion_adv[0, 0, 0].item()): | |
| _cond: T3Cond = self.conds.t3 | |
| self.conds.t3 = T3Cond( | |
| speaker_emb=_cond.speaker_emb, | |
| cond_prompt_speech_tokens=_cond.cond_prompt_speech_tokens, | |
| emotion_adv=exaggeration * torch.ones(1, 1, 1), | |
| ).to(device=self.device) | |
| # Norm and tokenize text | |
| text = punc_norm(text) | |
| text_tokens = self.tokenizer.text_to_tokens(text, language_id=language_id.lower() if language_id else None).to(self.device) | |
| text_tokens = torch.cat([text_tokens, text_tokens], dim=0) # Need two seqs for CFG | |
| sot = self.t3.hp.start_text_token | |
| eot = self.t3.hp.stop_text_token | |
| text_tokens = F.pad(text_tokens, (1, 0), value=sot) | |
| text_tokens = F.pad(text_tokens, (0, 1), value=eot) | |
| with torch.inference_mode(): | |
| speech_tokens = self.t3.inference( | |
| t3_cond=self.conds.t3, | |
| text_tokens=text_tokens, | |
| max_new_tokens=1000, # TODO: use the value in config | |
| temperature=temperature, | |
| cfg_weight=cfg_weight, | |
| repetition_penalty=repetition_penalty, | |
| min_p=min_p, | |
| top_p=top_p, | |
| ) | |
| # Extract only the conditional batch. | |
| speech_tokens = speech_tokens[0] | |
| # TODO: output becomes 1D | |
| speech_tokens = drop_invalid_tokens(speech_tokens) | |
| speech_tokens = speech_tokens.to(self.device) | |
| wav, _ = self.s3gen.inference( | |
| speech_tokens=speech_tokens, | |
| ref_dict=self.conds.gen, | |
| ) | |
| wav = wav.squeeze(0).detach().cpu().numpy() | |
| watermarked_wav = self.watermarker.apply_watermark(wav, sample_rate=self.sr) | |
| return torch.from_numpy(watermarked_wav).unsqueeze(0) | |