Spaces:
Configuration error
Configuration error
| import spaces | |
| import accelerate | |
| import gradio as gr | |
| import torch | |
| import safetensors | |
| from huggingface_hub import hf_hub_download | |
| import soundfile as sf | |
| import os | |
| import numpy as np | |
| import librosa | |
| from models.codec.kmeans.repcodec_model import RepCodec | |
| from models.tts.maskgct.maskgct_s2a import MaskGCT_S2A | |
| from models.tts.maskgct.maskgct_t2s import MaskGCT_T2S | |
| from models.codec.amphion_codec.codec import CodecEncoder, CodecDecoder | |
| from transformers import Wav2Vec2BertModel | |
| from utils.util import load_config | |
| from models.tts.maskgct.g2p.g2p_generation import g2p, chn_eng_g2p | |
| from transformers import SeamlessM4TFeatureExtractor | |
| import py3langid as langid | |
| processor = SeamlessM4TFeatureExtractor.from_pretrained("facebook/w2v-bert-2.0") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "CPU") | |
| whisper_model = None | |
| output_file_name_idx = 0 | |
| def detect_text_language(text): | |
| return langid.classify(text)[0] | |
| def detect_speech_language(speech_file): | |
| import whisper | |
| global whisper_model | |
| if whisper_model == None: | |
| whisper_model = whisper.load_model("turbo") | |
| # load audio and pad/trim it to fit 30 seconds | |
| audio = whisper.load_audio(speech_file) | |
| audio = whisper.pad_or_trim(audio) | |
| # make log-Mel spectrogram and move to the same device as the model | |
| mel = whisper.log_mel_spectrogram(audio, n_mels=128).to(whisper_model.device) | |
| # detect the spoken language | |
| _, probs = whisper_model.detect_language(mel) | |
| return max(probs, key=probs.get) | |
| def is_chinese(string): | |
| """ | |
| check if the string contains any Chinese character | |
| :return: bool | |
| """ | |
| for ch in string: | |
| if u'\u4e00' <= ch <= u'\u9fff': | |
| return True | |
| return False | |
| def is_english(string): | |
| """ | |
| check if the string contains any English leter | |
| :return: bool | |
| """ | |
| for ch in string: | |
| if ch.isalpha(): | |
| return True | |
| return False | |
| def preprocess(sentence): | |
| if is_chinese(sentence[-1]) or is_english(sentence[-1]): | |
| sentence = sentence + "。" | |
| if sentence[-1] == "!": | |
| sentence = sentence[0:-1] + "!" | |
| elif sentence[-1] == "?": | |
| sentence = sentence[0:-1] + "?" | |
| elif sentence[-1] not in ["?", "!"] : | |
| sentence = sentence[0:-1] +"。" | |
| return sentence | |
| def split_paragraph(text): | |
| sentences = [] | |
| first_punt_list = ";!?。!?;…" | |
| second_punc_list = first_punt_list + ", ," | |
| third_punt_list = second_punc_list + "」)》”’』])>\"']】 " | |
| fisrt_punc_check_start = 5 | |
| second_punc_check_start = 40 | |
| third_punc_check_start = 60 | |
| force_seg_len = 80 | |
| cur_length = 0.0 | |
| temp_sent = "" | |
| for char in text: | |
| temp_sent = temp_sent + char | |
| if is_chinese(char): | |
| cur_length = cur_length + 1 | |
| elif is_english(char): | |
| cur_length = cur_length + 0.3 | |
| else: | |
| cur_length = cur_length + 0.5 | |
| if cur_length < fisrt_punc_check_start: | |
| continue | |
| do_split = False | |
| if char in first_punt_list: | |
| do_split = True | |
| elif cur_length > second_punc_check_start and char in second_punc_list: | |
| do_split = True | |
| elif cur_length > third_punc_check_start and char in third_punt_list: | |
| do_split = True | |
| elif cur_length > force_seg_len: | |
| do_split = True | |
| if do_split: | |
| sentences.append(temp_sent) | |
| cur_length = 0 | |
| temp_sent = "" | |
| if len(temp_sent): | |
| sentences.append(temp_sent) | |
| return sentences | |
| def get_prompt_text(speech_16k, language): | |
| full_prompt_text = "" | |
| shot_prompt_text = "" | |
| short_prompt_end_ts = 0.0 | |
| import whisper | |
| global whisper_model | |
| if whisper_model == None: | |
| whisper_model = whisper.load_model("turbo") | |
| asr_result = whisper_model.transcribe(speech_16k, language=language) | |
| full_prompt_text = asr_result["text"] # whisper asr result | |
| #text = asr_result["segments"][0]["text"] # whisperx asr result | |
| shot_prompt_text = "" | |
| short_prompt_end_ts = 0.0 | |
| for segment in asr_result["segments"]: | |
| shot_prompt_text = shot_prompt_text + segment['text'] | |
| short_prompt_end_ts = segment['end'] | |
| if short_prompt_end_ts >= 4: | |
| break | |
| return full_prompt_text, shot_prompt_text, short_prompt_end_ts | |
| def g2p_(text, language): | |
| if language in ["zh", "en"]: | |
| return chn_eng_g2p(text) | |
| else: | |
| return g2p(text, sentence=None, language=language) | |
| def build_t2s_model(cfg, device): | |
| t2s_model = MaskGCT_T2S(cfg=cfg) | |
| t2s_model.eval() | |
| t2s_model.to(device) | |
| return t2s_model | |
| def build_s2a_model(cfg, device): | |
| soundstorm_model = MaskGCT_S2A(cfg=cfg) | |
| soundstorm_model.eval() | |
| soundstorm_model.to(device) | |
| return soundstorm_model | |
| def build_semantic_model(device): | |
| semantic_model = Wav2Vec2BertModel.from_pretrained("facebook/w2v-bert-2.0") | |
| semantic_model.eval() | |
| semantic_model.to(device) | |
| stat_mean_var = torch.load("./models/tts/maskgct/ckpt/wav2vec2bert_stats.pt") | |
| semantic_mean = stat_mean_var["mean"] | |
| semantic_std = torch.sqrt(stat_mean_var["var"]) | |
| semantic_mean = semantic_mean.to(device) | |
| semantic_std = semantic_std.to(device) | |
| return semantic_model, semantic_mean, semantic_std | |
| def build_semantic_codec(cfg, device): | |
| semantic_codec = RepCodec(cfg=cfg) | |
| semantic_codec.eval() | |
| semantic_codec.to(device) | |
| return semantic_codec | |
| def build_acoustic_codec(cfg, device): | |
| codec_encoder = CodecEncoder(cfg=cfg.encoder) | |
| codec_decoder = CodecDecoder(cfg=cfg.decoder) | |
| codec_encoder.eval() | |
| codec_decoder.eval() | |
| codec_encoder.to(device) | |
| codec_decoder.to(device) | |
| return codec_encoder, codec_decoder | |
| def extract_features(speech, processor): | |
| inputs = processor(speech, sampling_rate=16000, return_tensors="pt") | |
| input_features = inputs["input_features"][0] | |
| attention_mask = inputs["attention_mask"][0] | |
| return input_features, attention_mask | |
| def extract_semantic_code(semantic_mean, semantic_std, input_features, attention_mask): | |
| vq_emb = semantic_model( | |
| input_features=input_features, | |
| attention_mask=attention_mask, | |
| output_hidden_states=True, | |
| ) | |
| feat = vq_emb.hidden_states[17] # (B, T, C) | |
| feat = (feat - semantic_mean.to(feat)) / semantic_std.to(feat) | |
| semantic_code, rec_feat = semantic_codec.quantize(feat) # (B, T) | |
| return semantic_code, rec_feat | |
| def extract_acoustic_code(speech): | |
| vq_emb = codec_encoder(speech.unsqueeze(1)) | |
| _, vq, _, _, _ = codec_decoder.quantizer(vq_emb) | |
| acoustic_code = vq.permute(1, 2, 0) | |
| return acoustic_code | |
| def text2semantic( | |
| device, | |
| prompt_speech, | |
| prompt_text, | |
| prompt_language, | |
| target_text, | |
| target_language, | |
| target_len=None, | |
| n_timesteps=50, | |
| cfg=2.5, | |
| rescale_cfg=0.75, | |
| ): | |
| prompt_phone_id = g2p_(prompt_text, prompt_language)[1] | |
| target_phone_id = g2p_(target_text, target_language)[1] | |
| if target_len < 0: | |
| target_len = int( | |
| (len(prompt_speech) * len(target_phone_id) / len(prompt_phone_id)) | |
| / 16000 | |
| * 50 | |
| ) | |
| else: | |
| target_len = int(target_len * 50) | |
| prompt_phone_id = torch.tensor(prompt_phone_id, dtype=torch.long).to(device) | |
| target_phone_id = torch.tensor(target_phone_id, dtype=torch.long).to(device) | |
| phone_id = torch.cat([prompt_phone_id, target_phone_id]) | |
| input_fetures, attention_mask = extract_features(prompt_speech, processor) | |
| input_fetures = input_fetures.unsqueeze(0).to(device) | |
| attention_mask = attention_mask.unsqueeze(0).to(device) | |
| semantic_code, _ = extract_semantic_code( | |
| semantic_mean, semantic_std, input_fetures, attention_mask | |
| ) | |
| predict_semantic = t2s_model.reverse_diffusion( | |
| semantic_code[:, :], | |
| target_len, | |
| phone_id.unsqueeze(0), | |
| n_timesteps=n_timesteps, | |
| cfg=cfg, | |
| rescale_cfg=rescale_cfg, | |
| ) | |
| combine_semantic_code = torch.cat([semantic_code[:, :], predict_semantic], dim=-1) | |
| prompt_semantic_code = semantic_code | |
| return combine_semantic_code, prompt_semantic_code | |
| def semantic2acoustic( | |
| device, | |
| combine_semantic_code, | |
| acoustic_code, | |
| n_timesteps=[25, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], | |
| cfg=2.5, | |
| rescale_cfg=0.75, | |
| ): | |
| semantic_code = combine_semantic_code | |
| cond = s2a_model_1layer.cond_emb(semantic_code) | |
| prompt = acoustic_code[:, :, :] | |
| predict_1layer = s2a_model_1layer.reverse_diffusion( | |
| cond=cond, | |
| prompt=prompt, | |
| temp=1.5, | |
| filter_thres=0.98, | |
| n_timesteps=n_timesteps[:1], | |
| cfg=cfg, | |
| rescale_cfg=rescale_cfg, | |
| ) | |
| cond = s2a_model_full.cond_emb(semantic_code) | |
| prompt = acoustic_code[:, :, :] | |
| predict_full = s2a_model_full.reverse_diffusion( | |
| cond=cond, | |
| prompt=prompt, | |
| temp=1.5, | |
| filter_thres=0.98, | |
| n_timesteps=n_timesteps, | |
| cfg=cfg, | |
| rescale_cfg=rescale_cfg, | |
| gt_code=predict_1layer, | |
| ) | |
| vq_emb = codec_decoder.vq2emb(predict_full.permute(2, 0, 1), n_quantizers=12) | |
| recovered_audio = codec_decoder(vq_emb) | |
| prompt_vq_emb = codec_decoder.vq2emb(prompt.permute(2, 0, 1), n_quantizers=12) | |
| recovered_prompt_audio = codec_decoder(prompt_vq_emb) | |
| recovered_prompt_audio = recovered_prompt_audio[0][0].cpu().numpy() | |
| recovered_audio = recovered_audio[0][0].cpu().numpy() | |
| combine_audio = np.concatenate([recovered_prompt_audio, recovered_audio]) | |
| return combine_audio, recovered_audio | |
| # Load the model and checkpoints | |
| def load_models(): | |
| cfg_path = "./models/tts/maskgct/config/maskgct.json" | |
| cfg = load_config(cfg_path) | |
| semantic_model, semantic_mean, semantic_std = build_semantic_model(device) | |
| semantic_codec = build_semantic_codec(cfg.model.semantic_codec, device) | |
| codec_encoder, codec_decoder = build_acoustic_codec( | |
| cfg.model.acoustic_codec, device | |
| ) | |
| t2s_model = build_t2s_model(cfg.model.t2s_model, device) | |
| s2a_model_1layer = build_s2a_model(cfg.model.s2a_model.s2a_1layer, device) | |
| s2a_model_full = build_s2a_model(cfg.model.s2a_model.s2a_full, device) | |
| # Download checkpoints | |
| semantic_code_ckpt = hf_hub_download( | |
| "amphion/MaskGCT", filename="semantic_codec/model.safetensors" | |
| ) | |
| # codec_encoder_ckpt = hf_hub_download( | |
| # "amphion/MaskGCT", filename="acoustic_codec/model.safetensors" | |
| # ) | |
| # codec_decoder_ckpt = hf_hub_download( | |
| # "amphion/MaskGCT", filename="acoustic_codec/model_1.safetensors" | |
| # ) | |
| t2s_model_ckpt = hf_hub_download( | |
| "amphion/MaskGCT", filename="t2s_model/model.safetensors" | |
| ) | |
| s2a_1layer_ckpt = hf_hub_download( | |
| "amphion/MaskGCT", filename="s2a_model/s2a_model_1layer/model.safetensors" | |
| ) | |
| s2a_full_ckpt = hf_hub_download( | |
| "amphion/MaskGCT", filename="s2a_model/s2a_model_full/model.safetensors" | |
| ) | |
| safetensors.torch.load_model(semantic_codec, semantic_code_ckpt) | |
| # safetensors.torch.load_model(codec_encoder, codec_encoder_ckpt) | |
| # safetensors.torch.load_model(codec_decoder, codec_decoder_ckpt) | |
| accelerate.load_checkpoint_and_dispatch(codec_encoder, "./acoustic_codec/model.safetensors") | |
| accelerate.load_checkpoint_and_dispatch(codec_decoder, "./acoustic_codec/model_1.safetensors") | |
| safetensors.torch.load_model(t2s_model, t2s_model_ckpt) | |
| safetensors.torch.load_model(s2a_model_1layer, s2a_1layer_ckpt) | |
| safetensors.torch.load_model(s2a_model_full, s2a_full_ckpt) | |
| return ( | |
| semantic_model, | |
| semantic_mean, | |
| semantic_std, | |
| semantic_codec, | |
| codec_encoder, | |
| codec_decoder, | |
| t2s_model, | |
| s2a_model_1layer, | |
| s2a_model_full, | |
| ) | |
| def maskgct_inference( | |
| prompt_speech_path, | |
| target_text, | |
| target_len=None, | |
| n_timesteps=25, | |
| cfg=2.5, | |
| rescale_cfg=0.75, | |
| n_timesteps_s2a=[25, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], | |
| cfg_s2a=2.5, | |
| rescale_cfg_s2a=0.75, | |
| device=torch.device("cuda:0"), | |
| ): | |
| sentences = split_paragraph(target_text) | |
| total_recovered_audio = None | |
| print("split_paragraph: before:", target_text, "\nafter:", sentences) | |
| for sentence in sentences: | |
| target_text = preprocess(sentence) | |
| speech_16k = librosa.load(prompt_speech_path, sr=16000)[0] | |
| speech = librosa.load(prompt_speech_path, sr=24000)[0] | |
| prompt_language = detect_speech_language(prompt_speech_path) | |
| full_prompt_text, short_prompt_text, shot_prompt_end_ts = get_prompt_text(prompt_speech_path, | |
| prompt_language) | |
| # use the first 4+ seconds wav as the prompt in case the prompt wav is too long | |
| speech = speech[0: int(shot_prompt_end_ts * 24000)] | |
| speech_16k = speech_16k[0: int(shot_prompt_end_ts*16000)] | |
| target_language = detect_text_language(target_text) | |
| combine_semantic_code, _ = text2semantic( | |
| device, | |
| speech_16k, | |
| short_prompt_text, | |
| prompt_language, | |
| target_text, | |
| target_language, | |
| target_len, | |
| n_timesteps, | |
| cfg, | |
| rescale_cfg, | |
| ) | |
| acoustic_code = extract_acoustic_code(torch.tensor(speech).unsqueeze(0).to(device)) | |
| _, recovered_audio = semantic2acoustic( | |
| device, | |
| combine_semantic_code, | |
| acoustic_code, | |
| n_timesteps=n_timesteps_s2a, | |
| cfg=cfg_s2a, | |
| rescale_cfg=rescale_cfg_s2a, | |
| ) | |
| print("finish text:", target_text) | |
| if total_recovered_audio is None: | |
| total_recovered_audio = recovered_audio | |
| else: | |
| total_recovered_audio = np.concatenate([total_recovered_audio, recovered_audio]) | |
| return total_recovered_audio | |
| def inference( | |
| prompt_wav, | |
| target_text, | |
| target_len, | |
| n_timesteps, | |
| ): | |
| global output_file_name_idx | |
| save_path = f"./output/output_{output_file_name_idx}.wav" | |
| os.makedirs("./output", exist_ok=True) | |
| recovered_audio = maskgct_inference( | |
| prompt_wav, | |
| target_text, | |
| target_len=target_len, | |
| n_timesteps=int(n_timesteps), | |
| device=device, | |
| ) | |
| sf.write(save_path, recovered_audio, 24000) | |
| output_file_name_idx = (output_file_name_idx + 1) % 10 | |
| return save_path | |
| # Load models once | |
| ( | |
| semantic_model, | |
| semantic_mean, | |
| semantic_std, | |
| semantic_codec, | |
| codec_encoder, | |
| codec_decoder, | |
| t2s_model, | |
| s2a_model_1layer, | |
| s2a_model_full, | |
| ) = load_models() | |
| # Language list | |
| language_list = ["en", "zh", "ja", "ko", "fr", "de"] | |
| # Gradio interface | |
| iface = gr.Interface( | |
| fn=inference, | |
| inputs=[ | |
| gr.Audio(label="Upload Prompt Wav", type="filepath"), | |
| gr.Textbox(label="Target Text(1024 characters at most)"), | |
| gr.Number( | |
| label="Target Duration (in seconds), if the target duration is less than 0, the system will estimate a duration.", value=-1 | |
| ), # Removed 'optional=True' | |
| gr.Slider( | |
| label="Number of Timesteps", minimum=15, maximum=100, value=25, step=1 | |
| ), | |
| ], | |
| outputs=gr.Audio(label="Generated Audio"), | |
| title="MaskGCT TTS Demo", | |
| description=""" | |
| [](https://arxiv.org/abs/2409.00750) [](https://huggingface.co/amphion/maskgct) [](https://huggingface.co/spaces/amphion/maskgct) [](https://github.com/open-mmlab/Amphion/tree/main/models/tts/maskgct) | |
| """ | |
| ) | |
| # Launch the interface | |
| iface.launch(allowed_paths=["./output"]) | |