Spaces:
Sleeping
Sleeping
Enhance Vietnamese feedback generation with actionable insights and specific improvement strategies. Refine overall feedback based on score ranges, provide detailed guidance for problematic words and phonemes, and suggest clear next steps for users to improve their pronunciation skills.
b9c5d04
| import torch | |
| from transformers import AutoModelForCTC, AutoProcessor, Wav2Vec2Processor, Wav2Vec2ForCTC | |
| import onnxruntime as rt | |
| import numpy as np | |
| import librosa | |
| import warnings | |
| import os | |
| warnings.filterwarnings("ignore") | |
| class Wave2Vec2Inference: | |
| def __init__(self, model_name, use_gpu=True): | |
| # Auto-detect device | |
| if use_gpu: | |
| if torch.backends.mps.is_available(): | |
| self.device = "mps" | |
| elif torch.cuda.is_available(): | |
| self.device = "cuda" | |
| else: | |
| self.device = "cpu" | |
| else: | |
| self.device = "cpu" | |
| print(f"Using device: {self.device}") | |
| # Load model and processor | |
| self.processor = AutoProcessor.from_pretrained(model_name) | |
| self.model = AutoModelForCTC.from_pretrained(model_name) | |
| self.model.to(self.device) | |
| self.model.eval() | |
| # Disable gradients for inference | |
| torch.set_grad_enabled(False) | |
| def buffer_to_text(self, audio_buffer): | |
| if len(audio_buffer) == 0: | |
| return "" | |
| # Convert to tensor | |
| if isinstance(audio_buffer, np.ndarray): | |
| audio_tensor = torch.from_numpy(audio_buffer).float() | |
| else: | |
| audio_tensor = torch.tensor(audio_buffer, dtype=torch.float32) | |
| # Process audio | |
| inputs = self.processor( | |
| audio_tensor, | |
| sampling_rate=16_000, | |
| return_tensors="pt", | |
| padding=True, | |
| ) | |
| # Move to device | |
| input_values = inputs.input_values.to(self.device) | |
| attention_mask = inputs.attention_mask.to(self.device) if "attention_mask" in inputs else None | |
| # Inference | |
| with torch.no_grad(): | |
| if attention_mask is not None: | |
| logits = self.model(input_values, attention_mask=attention_mask).logits | |
| else: | |
| logits = self.model(input_values).logits | |
| # Decode | |
| predicted_ids = torch.argmax(logits, dim=-1) | |
| if self.device != "cpu": | |
| predicted_ids = predicted_ids.cpu() | |
| transcription = self.processor.batch_decode(predicted_ids)[0] | |
| return transcription.lower().strip() | |
| def file_to_text(self, filename): | |
| try: | |
| audio_input, _ = librosa.load(filename, sr=16000, dtype=np.float32) | |
| return self.buffer_to_text(audio_input) | |
| except Exception as e: | |
| print(f"Error loading audio file {filename}: {e}") | |
| return "" | |
| class Wave2Vec2ONNXInference: | |
| def __init__(self, model_name, onnx_path, use_gpu=True): | |
| self.processor = Wav2Vec2Processor.from_pretrained(model_name) | |
| # Setup ONNX Runtime | |
| options = rt.SessionOptions() | |
| options.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| # Choose providers based on GPU availability | |
| providers = [] | |
| if use_gpu and rt.get_available_providers(): | |
| if 'CUDAExecutionProvider' in rt.get_available_providers(): | |
| providers.append('CUDAExecutionProvider') | |
| providers.append('CPUExecutionProvider') | |
| self.model = rt.InferenceSession(onnx_path, options, providers=providers) | |
| self.input_name = self.model.get_inputs()[0].name | |
| print(f"ONNX model loaded with providers: {self.model.get_providers()}") | |
| def buffer_to_text(self, audio_buffer): | |
| if len(audio_buffer) == 0: | |
| return "" | |
| # Convert to tensor | |
| if isinstance(audio_buffer, np.ndarray): | |
| audio_tensor = torch.from_numpy(audio_buffer).float() | |
| else: | |
| audio_tensor = torch.tensor(audio_buffer, dtype=torch.float32) | |
| # Process audio | |
| inputs = self.processor( | |
| audio_tensor, | |
| sampling_rate=16_000, | |
| return_tensors="np", | |
| padding=True, | |
| ) | |
| # ONNX inference | |
| input_values = inputs.input_values.astype(np.float32) | |
| onnx_outputs = self.model.run(None, {self.input_name: input_values})[0] | |
| # Decode | |
| prediction = np.argmax(onnx_outputs, axis=-1) | |
| transcription = self.processor.decode(prediction.squeeze().tolist()) | |
| return transcription.lower().strip() | |
| def file_to_text(self, filename): | |
| try: | |
| audio_input, _ = librosa.load(filename, sr=16000, dtype=np.float32) | |
| return self.buffer_to_text(audio_input) | |
| except Exception as e: | |
| print(f"Error loading audio file {filename}: {e}") | |
| return "" | |
| def convert_to_onnx(model_id_or_path, onnx_model_name): | |
| """Convert PyTorch model to ONNX format""" | |
| print(f"Converting {model_id_or_path} to ONNX...") | |
| model = Wav2Vec2ForCTC.from_pretrained(model_id_or_path) | |
| model.eval() | |
| # Create dummy input | |
| audio_len = 250000 | |
| dummy_input = torch.randn(1, audio_len, requires_grad=True) | |
| torch.onnx.export( | |
| model, | |
| dummy_input, | |
| onnx_model_name, | |
| export_params=True, | |
| opset_version=14, | |
| do_constant_folding=True, | |
| input_names=["input"], | |
| output_names=["output"], | |
| dynamic_axes={ | |
| "input": {1: "audio_len"}, | |
| "output": {1: "audio_len"}, | |
| }, | |
| ) | |
| print(f"ONNX model saved to: {onnx_model_name}") | |
| def quantize_onnx_model(onnx_model_path, quantized_model_path): | |
| """Quantize ONNX model for faster inference""" | |
| print("Starting quantization...") | |
| from onnxruntime.quantization import quantize_dynamic, QuantType | |
| quantize_dynamic( | |
| onnx_model_path, | |
| quantized_model_path, | |
| weight_type=QuantType.QUInt8 | |
| ) | |
| print(f"Quantized model saved to: {quantized_model_path}") | |
| def export_to_onnx(model_name, quantize=False): | |
| """ | |
| Export model to ONNX format with optional quantization | |
| Args: | |
| model_name: HuggingFace model name | |
| quantize: Whether to also create quantized version | |
| Returns: | |
| tuple: (onnx_path, quantized_path or None) | |
| """ | |
| onnx_filename = f"{model_name.split('/')[-1]}.onnx" | |
| convert_to_onnx(model_name, onnx_filename) | |
| quantized_path = None | |
| if quantize: | |
| quantized_path = onnx_filename.replace('.onnx', '.quantized.onnx') | |
| quantize_onnx_model(onnx_filename, quantized_path) | |
| return onnx_filename, quantized_path | |
| def create_inference(model_name, use_onnx=False, onnx_path=None, use_gpu=True, use_onnx_quantize=False): | |
| """ | |
| Create optimized inference instance | |
| Args: | |
| model_name: HuggingFace model name | |
| use_onnx: Whether to use ONNX runtime | |
| onnx_path: Path to ONNX model file | |
| use_gpu: Whether to use GPU if available | |
| use_onnx_quantize: Whether to use quantized ONNX model | |
| Returns: | |
| Inference instance | |
| """ | |
| if use_onnx: | |
| if not onnx_path or not os.path.exists(onnx_path): | |
| # Convert to ONNX if path not provided or doesn't exist | |
| onnx_filename = f"{model_name.split('/')[-1]}.onnx" | |
| convert_to_onnx(model_name, onnx_filename) | |
| onnx_path = onnx_filename | |
| if use_onnx_quantize: | |
| quantized_path = onnx_path.replace('.onnx', '.quantized.onnx') | |
| if not os.path.exists(quantized_path): | |
| quantize_onnx_model(onnx_path, quantized_path) | |
| onnx_path = quantized_path | |
| print(f"Using ONNX model: {onnx_path}") | |
| return Wave2Vec2ONNXInference(model_name, onnx_path, use_gpu) | |
| else: | |
| print("Using PyTorch model") | |
| return Wave2Vec2Inference(model_name, use_gpu) | |
| if __name__ == "__main__": | |
| import time | |
| model_name = "facebook/wav2vec2-large-960h-lv60-self" | |
| test_file = "test.wav" | |
| if not os.path.exists(test_file): | |
| print(f"Test file {test_file} not found. Please provide a valid audio file.") | |
| exit(1) | |
| # Test different configurations | |
| configs = [ | |
| {"use_onnx": False, "use_gpu": True}, | |
| {"use_onnx": True, "use_gpu": True, "use_onnx_quantize": False}, | |
| {"use_onnx": True, "use_gpu": True, "use_onnx_quantize": True}, | |
| ] | |
| for config in configs: | |
| print(f"\n=== Testing config: {config} ===") | |
| # Create inference instance | |
| asr = create_inference(model_name, **config) | |
| # Warm up | |
| asr.file_to_text(test_file) | |
| # Test performance | |
| times = [] | |
| for i in range(5): | |
| start_time = time.time() | |
| text = asr.file_to_text(test_file) | |
| end_time = time.time() | |
| execution_time = end_time - start_time | |
| times.append(execution_time) | |
| print(f"Run {i+1}: {execution_time:.3f}s - {text[:50]}...") | |
| avg_time = sum(times) / len(times) | |
| print(f"Average time: {avg_time:.3f}s") |