import gradio as gr import random import torch import logging from slam_llm.utils.model_utils import get_custom_model_factory from slam_llm.utils.dataset_utils import get_preprocessed_dataset from examples.tts.utils.codec_utils import audio_decode_cosyvoice from examples.tts.speech_dataset_tts_gradio import SpeechDatasetOnline import os import logging import soundfile as sf import torchaudio import hydra from omegaconf import DictConfig, ListConfig, OmegaConf import time import pdb import json from pathlib import Path from huggingface_hub import snapshot_download as hf_snapshot_download # from modelscope import snapshot_download as ms_snapshot_download import spaces import tempfile import uuid from pathlib import Path # Global model cache to avoid reloading MODEL_CACHE = {} TOKENIZER_CACHE = {} # def download_and_load_model_cache(): config_path = "config/" train_config = OmegaConf.load(os.path.join(config_path, "train_config.yaml")) model_config = OmegaConf.load(os.path.join(config_path, "model_config.yaml")) dataset_config = OmegaConf.load(os.path.join(config_path, "dataset_config.yaml")) decode_config = OmegaConf.load(os.path.join(config_path, "decode_config.yaml")) kwargs = OmegaConf.load(os.path.join(config_path, "kwargs.yaml")) # if not Path(model_config.llm_path).exists(): # hf_snapshot_download(repo_id="Qwen/Qwen2.5-1.5B", local_dir="./ckpts/Qwen") # if not Path(model_config.codec_decoder_path).exists(): # ms_snapshot_download('iic/CosyVoice-300M-SFT', local_dir="./ckpts/CosyVoice") if not Path(kwargs.ckpt_path).exists(): hf_snapshot_download(repo_id='yhaha/EmoVoice', local_dir="./ckpts/") # debug_path = './ckpts' # print(f"--- Debug: 正在检查路径 {os.path.abspath(debug_path)} ---") # # os.walk 会递归地遍历所有子文件夹和文件 # for root, dirs, files in os.walk(debug_path): # # 打印当前检查的目录 # print(f"\n[目录]: {root}") # # 打印这个目录下的子文件夹 # print(f" (子文件夹): {dirs}") # # 打印这个目录下的文件 # print(f" (文件): {files}") # print("\n--- Debug: 路径检查完毕 ---") torch.cuda.manual_seed(train_config.seed) torch.manual_seed(train_config.seed) random.seed(train_config.seed) model_factory = get_custom_model_factory(model_config) #, logger) model, tokenizer = model_factory(train_config, model_config, **kwargs) codec_decoder = model.codec_decoder device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # FIX(MZY): put the whole model to device. model.to(device) model.eval() print(model) MODEL_CACHE["EmoVoice_1.5B"] = model TOKENIZER_CACHE["EmoVoice_1.5B"] = tokenizer @spaces.GPU(duration=60) @torch.inference_mode() def infer_tts(text, emotion_text_prompt, neutral_speaker_wav): """ 在线输入 text = "The kettle SCREAMED as it reached boiling point, mirroring my inner tension." emotion_text_prompt = "Parallel emotions with rising heat, an audible cry of pent emotion." neutral_speaker_wav = "/nfs/yangguanrou.ygr/data/gpt4o_third/output_gpt4o/neutral/gpt4o_23948_neutral_ash.wav" """ input_path = "input/" output_path= "output/" # 定义要存储的数据 # data = { # "key": "random_key", # "source_text": text, # "target_text": text, # "emotion_text_prompt": emotion_text_prompt, # "neutral_speaker_wav":neutral_speaker_wav, # } # with open(os.path.join(input_path, 'test.jsonl'), 'a') as f: # f.write(json.dumps(data) + '\n') # dataset_config.train_data_path = os.path.join(input_path, 'test.jsonl') # dataset_config.val_data_path = os.path.join(input_path, 'test.jsonl') # dataset_test = get_preprocessed_dataset( # tokenizer, # dataset_config, # split="test", # ) # test_dataloader = torch.utils.data.DataLoader( # dataset_test, # num_workers=train_config.num_workers_dataloader, # pin_memory=True, # shuffle=False, # batch_size=train_config.val_batch_size, # drop_last=False, # collate_fn=dataset_test.collator # ) # new process data dataset = SpeechDatasetOnline(dataset_config,TOKENIZER_CACHE["EmoVoice_1.5B"]) batch = dataset.prepare_input(text, emotion_text_prompt, neutral_speaker_wav) print(batch) task_type = decode_config.task_type code_layer = model_config.vocab_config.code_layer code_type = model_config.code_type num_latency_tokens = dataset_config.num_latency_tokens modeling_paradigm = dataset_config.modeling_paradigm interleaved_text_token_num = dataset_config.interleaved_text_token_num interleaved_audio_token_num = dataset_config.interleaved_audio_token_num # decode_log_dir = kwargs.get('decode_log') output_text_only = kwargs.get('output_text_only', False) speech_sample_rate = kwargs.get('speech_sample_rate', 24000) audio_prompt_path = kwargs.get('audio_prompt_path', None) tone_dir = "neutral_prompt_speech" # for step, batch in enumerate(test_dataloader): for key in batch.keys(): batch[key] = batch[key].to(device) if isinstance(batch[key], torch.Tensor) else batch[key] audio_prompt_path = batch["neutral_speaker_wav"][0] if modeling_paradigm == "parallel" or modeling_paradigm == "interleaved": model_outputs = model.generate(**batch, **decode_config) if modeling_paradigm == "parallel" or modeling_paradigm == "serial": text_outputs = model_outputs[code_layer] audio_outputs = model_outputs[:code_layer] if modeling_paradigm != "serial": if audio_outputs[0].shape[0] == decode_config.max_new_tokens: # if the audio token is too long, skip (bad case) return "default.wav" # logger.warning(f"Audio token is too long, skip. You can try to increase the max_new_tokens in the decode_config.") # continue # else: # if audio_outputs[0]==[]: # logger.warning(f"Text token never stop") # continue # for i, key in enumerate(batch["keys"]): audio_tokens = [audio_outputs[layer] for layer in range(code_layer)] if code_layer > 0 else audio_outputs print(audio_tokens) audio_hat = audio_decode_cosyvoice(audio_tokens, model_config, codec_decoder, audio_prompt_path, code_layer, num_latency_tokens, speed=1.0) if audio_hat == None: # logger.info(f"Error in decoding {key}: eoa at start! or No eoa!") return "default.wav" # if key[-4:] == ".wav": # # key = key[:-4] unique_filename = f"{uuid.uuid4()}.wav" save_path = os.path.join(output_path, unique_filename) print(save_path) # with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: # save_path = tmp_file.name # save_path = f"{output_path}/{unique_filename}.wav" sf.write(save_path, audio_hat.squeeze().cpu().numpy(), speech_sample_rate) return save_path # Gradio input and output components text = gr.Textbox(lines=1, label="Text", placeholder="e.g., 'Wobbly tables ruin everything!'") emotion_text_prompt = gr.Textbox(lines=1, label="Emotion Text Prompt", placeholder="e.g., 'Expressing aggravated displeasure and discontent.'") neutral_speaker_wav = gr.Audio(label="Neutral Speaker Prompt WAV", type="filepath") description_text = """ ### **EmoVoice** is a emotion-controllable TTS model that exploits large language models (LLMs) to enable fine-grained freestyle natural language emotion control. ### [📖 **Arxiv**](https://arxiv.org/abs/2504.12867) | [💻 **GitHub**](https://github.com/yanghaha0908/EmoVoice) | [🤗 **Model**](https://huggingface.co/yhaha/EmoVoice) | [🚀 **Space**](https://huggingface.co/spaces/chenxie95/EmoVoice) | [🌐 **Demo Page**](https://yanghaha0908.github.io/EmoVoice/) """ gr_interface = gr.Interface( fn=infer_tts, inputs=[text, emotion_text_prompt, neutral_speaker_wav], outputs=[ gr.Audio(label="🎵 Emotional Speech", type="filepath") ], title="EmoVoice: LLM-based Emotional Text-To-Speech Model with Freestyle Text Prompting", description=description_text, flagging_mode="never", cache_examples="lazy", ) # 表示用户无法对应用的输出进行标记或报告问题。 # cache_examples="lazy" 表示只有在用户真正交互时,示例的输出才会被计算和缓存,避免不必要的计算。 if __name__ == "__main__": # download_and_load_model_cache() gr_interface.queue(15).launch()