Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Tuple, Optional | |
| import numpy as np | |
| import librosa | |
| import nltk | |
| import eng_to_ipa as ipa | |
| import re | |
| from collections import defaultdict | |
| from loguru import logger | |
| import time | |
| import Levenshtein | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from src.AI_Models.wave2vec_inference import ( | |
| Wave2Vec2Inference, | |
| Wave2Vec2ONNXInference, | |
| export_to_onnx, | |
| ) | |
| # Download required NLTK data | |
| try: | |
| nltk.download("cmudict", quiet=True) | |
| from nltk.corpus import cmudict | |
| except: | |
| print("Warning: NLTK data not available") | |
| class AssessmentMode(Enum): | |
| WORD = "word" | |
| SENTENCE = "sentence" | |
| AUTO = "auto" | |
| class ErrorType(Enum): | |
| CORRECT = "correct" | |
| SUBSTITUTION = "substitution" | |
| DELETION = "deletion" | |
| INSERTION = "insertion" | |
| ACCEPTABLE = "acceptable" | |
| class CharacterError: | |
| """Character-level error information for UI mapping""" | |
| character: str | |
| position: int | |
| error_type: str | |
| expected_sound: str | |
| actual_sound: str | |
| severity: float | |
| color: str | |
| class EnhancedWav2Vec2CharacterASR: | |
| """Enhanced Wav2Vec2 ASR with prosody analysis support""" | |
| def __init__( | |
| self, | |
| model_name: str = "facebook/wav2vec2-large-960h-lv60-self", | |
| onnx: bool = False, | |
| quantized: bool = False, | |
| ): | |
| self.use_onnx = onnx | |
| self.sample_rate = 16000 | |
| self.model_name = model_name | |
| if onnx: | |
| import os | |
| model_path = f"wav2vec2-large-960h-lv60-self{'.quant' if quantized else ''}.onnx" | |
| if not os.path.exists(model_path): | |
| export_to_onnx(model_name, quantize=quantized) | |
| self.model = ( | |
| Wave2Vec2Inference(model_name) | |
| if not onnx | |
| else Wave2Vec2ONNXInference(model_name, model_path) | |
| ) | |
| def transcribe_with_features(self, audio_path: str) -> Dict: | |
| """Enhanced transcription with audio features for prosody analysis""" | |
| try: | |
| start_time = time.time() | |
| # Basic transcription | |
| character_transcript = self.model.file_to_text(audio_path) | |
| character_transcript = self._clean_character_transcript(character_transcript) | |
| # Convert to phonemes | |
| phoneme_representation = self._characters_to_phoneme_representation(character_transcript) | |
| # Extract audio features for prosody | |
| audio_features = self._extract_enhanced_audio_features(audio_path) | |
| logger.info(f"Enhanced transcription time: {time.time() - start_time:.2f}s") | |
| return { | |
| "character_transcript": character_transcript, | |
| "phoneme_representation": phoneme_representation, | |
| "audio_features": audio_features, | |
| "confidence": self._estimate_confidence(character_transcript) | |
| } | |
| except Exception as e: | |
| logger.error(f"Enhanced ASR error: {e}") | |
| return self._empty_result() | |
| def _extract_enhanced_audio_features(self, audio_path: str) -> Dict: | |
| """Extract comprehensive audio features for prosody analysis""" | |
| try: | |
| y, sr = librosa.load(audio_path, sr=self.sample_rate) | |
| duration = len(y) / sr | |
| # Pitch analysis | |
| pitches, magnitudes = librosa.piptrack(y=y, sr=sr) | |
| pitch_values = [] | |
| for t in range(pitches.shape[1]): | |
| index = magnitudes[:, t].argmax() | |
| pitch = pitches[index, t] | |
| if pitch > 0: | |
| pitch_values.append(pitch) | |
| # Rhythm and timing features | |
| tempo, beats = librosa.beat.beat_track(y=y, sr=sr) | |
| # Intensity features | |
| rms = librosa.feature.rms(y=y)[0] | |
| zcr = librosa.feature.zero_crossing_rate(y)[0] | |
| # Spectral features | |
| spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0] | |
| return { | |
| "duration": duration, | |
| "pitch": { | |
| "values": pitch_values, | |
| "mean": np.mean(pitch_values) if pitch_values else 0, | |
| "std": np.std(pitch_values) if pitch_values else 0, | |
| "range": np.max(pitch_values) - np.min(pitch_values) if pitch_values else 0, | |
| "cv": np.std(pitch_values) / np.mean(pitch_values) if pitch_values and np.mean(pitch_values) > 0 else 0 | |
| }, | |
| "rhythm": { | |
| "tempo": tempo, | |
| "beats_per_second": len(beats) / duration if duration > 0 else 0 | |
| }, | |
| "intensity": { | |
| "rms_mean": np.mean(rms), | |
| "rms_std": np.std(rms), | |
| "zcr_mean": np.mean(zcr) | |
| }, | |
| "spectral": { | |
| "centroid_mean": np.mean(spectral_centroids), | |
| "centroid_std": np.std(spectral_centroids) | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Audio feature extraction error: {e}") | |
| return {"duration": 0, "error": str(e)} | |
| def _clean_character_transcript(self, transcript: str) -> str: | |
| """Clean and standardize character transcript""" | |
| logger.info(f"Raw transcript before cleaning: {transcript}") | |
| cleaned = re.sub(r'\s+', ' ', transcript) | |
| return cleaned.strip().lower() | |
| def _characters_to_phoneme_representation(self, text: str) -> str: | |
| """Convert character-based transcript to phoneme representation""" | |
| if not text: | |
| return "" | |
| words = text.split() | |
| phoneme_words = [] | |
| g2p = EnhancedG2P() | |
| for word in words: | |
| try: | |
| if g2p: | |
| word_phonemes = g2p.word_to_phonemes(word) | |
| phoneme_words.extend(word_phonemes) | |
| else: | |
| phoneme_words.extend(self._simple_letter_to_phoneme(word)) | |
| except: | |
| phoneme_words.extend(self._simple_letter_to_phoneme(word)) | |
| return " ".join(phoneme_words) | |
| def _simple_letter_to_phoneme(self, word: str) -> List[str]: | |
| """Fallback letter-to-phoneme conversion""" | |
| letter_to_phoneme = { | |
| "a": "æ", "b": "b", "c": "k", "d": "d", "e": "ɛ", "f": "f", | |
| "g": "ɡ", "h": "h", "i": "ɪ", "j": "dʒ", "k": "k", "l": "l", | |
| "m": "m", "n": "n", "o": "ʌ", "p": "p", "q": "k", "r": "r", | |
| "s": "s", "t": "t", "u": "ʌ", "v": "v", "w": "w", "x": "ks", | |
| "y": "j", "z": "z" | |
| } | |
| return [letter_to_phoneme.get(letter, letter) for letter in word.lower() if letter in letter_to_phoneme] | |
| def _estimate_confidence(self, transcript: str) -> float: | |
| """Estimate transcription confidence""" | |
| if not transcript or len(transcript.strip()) < 2: | |
| return 0.0 | |
| repeated_chars = len(re.findall(r'(.)\1{2,}', transcript)) | |
| return max(0.0, 1.0 - (repeated_chars * 0.2)) | |
| def _empty_result(self) -> Dict: | |
| """Empty result for error cases""" | |
| return { | |
| "character_transcript": "", | |
| "phoneme_representation": "", | |
| "audio_features": {"duration": 0}, | |
| "confidence": 0.0 | |
| } | |
| class EnhancedG2P: | |
| """Enhanced Grapheme-to-Phoneme converter with visualization support""" | |
| def __init__(self): | |
| try: | |
| self.cmu_dict = cmudict.dict() | |
| except: | |
| self.cmu_dict = {} | |
| logger.warning("CMU dictionary not available") | |
| # Vietnamese speaker substitution patterns (enhanced) | |
| self.vn_substitutions = { | |
| "θ": ["f", "s", "t", "d"], | |
| "ð": ["d", "z", "v", "t"], | |
| "v": ["w", "f", "b"], | |
| "w": ["v", "b"], | |
| "r": ["l", "n"], | |
| "l": ["r", "n"], | |
| "z": ["s", "j"], | |
| "ʒ": ["ʃ", "z", "s"], | |
| "ʃ": ["s", "ʒ"], | |
| "ŋ": ["n", "m"], | |
| "tʃ": ["ʃ", "s", "k"], | |
| "dʒ": ["ʒ", "j", "g"], | |
| "æ": ["ɛ", "a"], | |
| "ɪ": ["i"], | |
| "ʊ": ["u"] | |
| } | |
| # Difficulty scores for Vietnamese speakers | |
| self.difficulty_scores = { | |
| "θ": 0.9, "ð": 0.9, "v": 0.8, "z": 0.8, "ʒ": 0.9, | |
| "r": 0.7, "l": 0.6, "w": 0.5, "æ": 0.7, "ɪ": 0.6, | |
| "ʊ": 0.6, "ŋ": 0.3, "f": 0.2, "s": 0.2, "ʃ": 0.5, | |
| "tʃ": 0.4, "dʒ": 0.5 | |
| } | |
| def word_to_phonemes(self, word: str) -> List[str]: | |
| """Convert word to phoneme list""" | |
| word_lower = word.lower().strip() | |
| if word_lower in self.cmu_dict: | |
| cmu_phonemes = self.cmu_dict[word_lower][0] | |
| return self._convert_cmu_to_ipa(cmu_phonemes) | |
| else: | |
| return self._estimate_phonemes(word_lower) | |
| def get_phoneme_string(self, text: str) -> str: | |
| """Get space-separated phoneme string""" | |
| words = self._clean_text(text).split() | |
| all_phonemes = [] | |
| for word in words: | |
| if word: | |
| phonemes = self.word_to_phonemes(word) | |
| all_phonemes.extend(phonemes) | |
| return " ".join(all_phonemes) | |
| def text_to_phonemes(self, text: str) -> List[Dict]: | |
| """Convert text to phoneme sequence with visualization data""" | |
| words = self._clean_text(text).split() | |
| phoneme_sequence = [] | |
| for word in words: | |
| word_phonemes = self.word_to_phonemes(word) | |
| phoneme_sequence.append({ | |
| "word": word, | |
| "phonemes": word_phonemes, | |
| "ipa": self._get_ipa(word), | |
| "phoneme_string": " ".join(word_phonemes), | |
| "visualization": self._create_phoneme_visualization(word_phonemes) | |
| }) | |
| return phoneme_sequence | |
| def _convert_cmu_to_ipa(self, cmu_phonemes: List[str]) -> List[str]: | |
| """Convert CMU phonemes to IPA""" | |
| cmu_to_ipa = { | |
| "AA": "ɑ", "AE": "æ", "AH": "ʌ", "AO": "ɔ", "AW": "aʊ", | |
| "AY": "aɪ", "EH": "ɛ", "ER": "ɝ", "EY": "eɪ", "IH": "ɪ", | |
| "IY": "i", "OW": "oʊ", "OY": "ɔɪ", "UH": "ʊ", "UW": "u", | |
| "B": "b", "CH": "tʃ", "D": "d", "DH": "ð", "F": "f", | |
| "G": "ɡ", "HH": "h", "JH": "dʒ", "K": "k", "L": "l", | |
| "M": "m", "N": "n", "NG": "ŋ", "P": "p", "R": "r", | |
| "S": "s", "SH": "ʃ", "T": "t", "TH": "θ", "V": "v", | |
| "W": "w", "Y": "j", "Z": "z", "ZH": "ʒ" | |
| } | |
| ipa_phonemes = [] | |
| for phoneme in cmu_phonemes: | |
| clean_phoneme = re.sub(r'[0-9]', '', phoneme) | |
| ipa_phoneme = cmu_to_ipa.get(clean_phoneme, clean_phoneme.lower()) | |
| ipa_phonemes.append(ipa_phoneme) | |
| return ipa_phonemes | |
| def _estimate_phonemes(self, word: str) -> List[str]: | |
| """Estimate phonemes for unknown words""" | |
| phoneme_map = { | |
| "ch": "tʃ", "sh": "ʃ", "th": "θ", "ph": "f", "ck": "k", | |
| "ng": "ŋ", "qu": "kw", "a": "æ", "e": "ɛ", "i": "ɪ", | |
| "o": "ʌ", "u": "ʌ", "b": "b", "c": "k", "d": "d", | |
| "f": "f", "g": "ɡ", "h": "h", "j": "dʒ", "k": "k", | |
| "l": "l", "m": "m", "n": "n", "p": "p", "r": "r", | |
| "s": "s", "t": "t", "v": "v", "w": "w", "x": "ks", | |
| "y": "j", "z": "z" | |
| } | |
| phonemes = [] | |
| i = 0 | |
| while i < len(word): | |
| if i <= len(word) - 2: | |
| two_char = word[i:i+2] | |
| if two_char in phoneme_map: | |
| phonemes.append(phoneme_map[two_char]) | |
| i += 2 | |
| continue | |
| char = word[i] | |
| if char in phoneme_map: | |
| phonemes.append(phoneme_map[char]) | |
| i += 1 | |
| return phonemes | |
| def _clean_text(self, text: str) -> str: | |
| """Clean text for processing""" | |
| text = re.sub(r"[^\w\s']", " ", text) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.lower().strip() | |
| def _get_ipa(self, word: str) -> str: | |
| """Get IPA transcription""" | |
| try: | |
| return ipa.convert(word) | |
| except: | |
| return f"/{word}/" | |
| def _create_phoneme_visualization(self, phonemes: List[str]) -> List[Dict]: | |
| """Create visualization data for phonemes""" | |
| visualization = [] | |
| for phoneme in phonemes: | |
| color_category = self._get_phoneme_color_category(phoneme) | |
| visualization.append({ | |
| "phoneme": phoneme, | |
| "color_category": color_category, | |
| "description": self._get_phoneme_description(phoneme), | |
| "difficulty": self.difficulty_scores.get(phoneme, 0.3) | |
| }) | |
| return visualization | |
| def _get_phoneme_color_category(self, phoneme: str) -> str: | |
| """Categorize phonemes by color for visualization""" | |
| vowel_phonemes = {"ɑ", "æ", "ʌ", "ɔ", "aʊ", "aɪ", "ɛ", "ɝ", "eɪ", "ɪ", "i", "oʊ", "ɔɪ", "ʊ", "u"} | |
| difficult_consonants = {"θ", "ð", "v", "z", "ʒ", "r", "w"} | |
| if phoneme in vowel_phonemes: | |
| return "vowel" | |
| elif phoneme in difficult_consonants: | |
| return "difficult" | |
| else: | |
| return "consonant" | |
| def _get_phoneme_description(self, phoneme: str) -> str: | |
| """Get description for a phoneme""" | |
| descriptions = { | |
| "θ": "Voiceless dental fricative (like 'th' in 'think')", | |
| "ð": "Voiced dental fricative (like 'th' in 'this')", | |
| "v": "Voiced labiodental fricative (like 'v' in 'van')", | |
| "z": "Voiced alveolar fricative (like 'z' in 'zip')", | |
| "ʒ": "Voiced postalveolar fricative (like 's' in 'measure')", | |
| "r": "Alveolar approximant (like 'r' in 'red')", | |
| "w": "Labial-velar approximant (like 'w' in 'wet')", | |
| "æ": "Near-open front unrounded vowel (like 'a' in 'cat')", | |
| "ɪ": "Near-close near-front unrounded vowel (like 'i' in 'sit')", | |
| "ʊ": "Near-close near-back rounded vowel (like 'u' in 'put')" | |
| } | |
| return descriptions.get(phoneme, f"Phoneme: {phoneme}") | |
| def is_acceptable_substitution(self, reference: str, predicted: str) -> bool: | |
| """Check if substitution is acceptable for Vietnamese speakers""" | |
| acceptable = self.vn_substitutions.get(reference, []) | |
| return predicted in acceptable | |
| def get_difficulty_score(self, phoneme: str) -> float: | |
| """Get difficulty score for phoneme""" | |
| return self.difficulty_scores.get(phoneme, 0.3) | |
| class AdvancedPhonemeComparator: | |
| """Enhanced phoneme comparator using Levenshtein distance""" | |
| def __init__(self): | |
| self.g2p = EnhancedG2P() | |
| def compare_with_levenshtein(self, reference: str, predicted: str) -> List[Dict]: | |
| """Compare phonemes using Levenshtein distance for accurate alignment""" | |
| ref_phones = reference.split() if reference else [] | |
| pred_phones = predicted.split() if predicted else [] | |
| if not ref_phones: | |
| return [] | |
| # Use Levenshtein editops for precise alignment | |
| ops = Levenshtein.editops(ref_phones, pred_phones) | |
| comparisons = [] | |
| ref_idx = 0 | |
| pred_idx = 0 | |
| # Process equal parts first | |
| for op_type, ref_pos, pred_pos in ops: | |
| # Add equal characters before this operation | |
| while ref_idx < ref_pos and pred_idx < pred_pos: | |
| comparison = self._create_comparison( | |
| ref_phones[ref_idx], pred_phones[pred_idx], | |
| ErrorType.CORRECT, 1.0, len(comparisons) | |
| ) | |
| comparisons.append(comparison) | |
| ref_idx += 1 | |
| pred_idx += 1 | |
| # Process the operation | |
| if op_type == 'replace': | |
| ref_phoneme = ref_phones[ref_pos] | |
| pred_phoneme = pred_phones[pred_pos] | |
| if self.g2p.is_acceptable_substitution(ref_phoneme, pred_phoneme): | |
| error_type = ErrorType.ACCEPTABLE | |
| score = 0.7 | |
| else: | |
| error_type = ErrorType.SUBSTITUTION | |
| score = 0.2 | |
| comparison = self._create_comparison( | |
| ref_phoneme, pred_phoneme, error_type, score, len(comparisons) | |
| ) | |
| comparisons.append(comparison) | |
| ref_idx = ref_pos + 1 | |
| pred_idx = pred_pos + 1 | |
| elif op_type == 'delete': | |
| comparison = self._create_comparison( | |
| ref_phones[ref_pos], "", ErrorType.DELETION, 0.0, len(comparisons) | |
| ) | |
| comparisons.append(comparison) | |
| ref_idx = ref_pos + 1 | |
| elif op_type == 'insert': | |
| comparison = self._create_comparison( | |
| "", pred_phones[pred_pos], ErrorType.INSERTION, 0.0, len(comparisons) | |
| ) | |
| comparisons.append(comparison) | |
| pred_idx = pred_pos + 1 | |
| # Add remaining equal characters | |
| while ref_idx < len(ref_phones) and pred_idx < len(pred_phones): | |
| comparison = self._create_comparison( | |
| ref_phones[ref_idx], pred_phones[pred_idx], | |
| ErrorType.CORRECT, 1.0, len(comparisons) | |
| ) | |
| comparisons.append(comparison) | |
| ref_idx += 1 | |
| pred_idx += 1 | |
| return comparisons | |
| def _create_comparison(self, ref_phoneme: str, pred_phoneme: str, | |
| error_type: ErrorType, score: float, position: int) -> Dict: | |
| """Create comparison dictionary""" | |
| return { | |
| "position": position, | |
| "reference_phoneme": ref_phoneme, | |
| "learner_phoneme": pred_phoneme, | |
| "status": error_type.value, | |
| "score": score, | |
| "difficulty": self.g2p.get_difficulty_score(ref_phoneme), | |
| "error_type": error_type.value | |
| } | |
| class EnhancedWordAnalyzer: | |
| """Enhanced word analyzer with character-level error mapping""" | |
| def __init__(self): | |
| self.g2p = EnhancedG2P() | |
| self.comparator = AdvancedPhonemeComparator() | |
| def analyze_words_enhanced(self, reference_text: str, learner_phonemes: str, | |
| mode: AssessmentMode) -> Dict: | |
| """Enhanced word analysis with character-level mapping""" | |
| # Get reference phonemes by word | |
| reference_words = self.g2p.text_to_phonemes(reference_text) | |
| # Get overall phoneme comparison using Levenshtein | |
| reference_phoneme_string = self.g2p.get_phoneme_string(reference_text) | |
| phoneme_comparisons = self.comparator.compare_with_levenshtein( | |
| reference_phoneme_string, learner_phonemes | |
| ) | |
| # Create enhanced word highlights | |
| word_highlights = self._create_enhanced_word_highlights( | |
| reference_words, phoneme_comparisons, mode | |
| ) | |
| # Identify wrong words with character-level errors | |
| wrong_words = self._identify_wrong_words_enhanced(word_highlights, phoneme_comparisons) | |
| return { | |
| "word_highlights": word_highlights, | |
| "phoneme_differences": phoneme_comparisons, | |
| "wrong_words": wrong_words, | |
| "reference_phonemes": reference_phoneme_string, | |
| "phoneme_pairs": self._create_phoneme_pairs(reference_phoneme_string, learner_phonemes) | |
| } | |
| def _create_enhanced_word_highlights(self, reference_words: List[Dict], | |
| phoneme_comparisons: List[Dict], | |
| mode: AssessmentMode) -> List[Dict]: | |
| """Create enhanced word highlights with character-level error mapping""" | |
| word_highlights = [] | |
| phoneme_index = 0 | |
| for word_data in reference_words: | |
| word = word_data["word"] | |
| word_phonemes = word_data["phonemes"] | |
| num_phonemes = len(word_phonemes) | |
| # Get phoneme scores for this word | |
| word_phoneme_scores = [] | |
| word_comparisons = [] | |
| for j in range(num_phonemes): | |
| if phoneme_index + j < len(phoneme_comparisons): | |
| comparison = phoneme_comparisons[phoneme_index + j] | |
| word_phoneme_scores.append(comparison["score"]) | |
| word_comparisons.append(comparison) | |
| # Calculate word score | |
| word_score = np.mean(word_phoneme_scores) if word_phoneme_scores else 0.0 | |
| # Map phoneme errors to character positions (enhanced for word mode) | |
| character_errors = [] | |
| if mode == AssessmentMode.WORD: | |
| character_errors = self._map_phonemes_to_characters(word, word_comparisons) | |
| # Create enhanced word highlight | |
| highlight = { | |
| "word": word, | |
| "score": float(word_score), | |
| "status": self._get_word_status(word_score), | |
| "color": self._get_word_color(word_score), | |
| "phonemes": word_phonemes, | |
| "ipa": word_data["ipa"], | |
| "phoneme_scores": word_phoneme_scores, | |
| "phoneme_start_index": phoneme_index, | |
| "phoneme_end_index": phoneme_index + num_phonemes - 1, | |
| "phoneme_visualization": word_data["visualization"], | |
| "character_errors": character_errors, # New feature | |
| "detailed_analysis": mode == AssessmentMode.WORD # Flag for UI | |
| } | |
| word_highlights.append(highlight) | |
| phoneme_index += num_phonemes | |
| return word_highlights | |
| def _map_phonemes_to_characters(self, word: str, phoneme_comparisons: List[Dict]) -> List[CharacterError]: | |
| """Map phoneme errors to character positions in word""" | |
| character_errors = [] | |
| # Simple mapping strategy: distribute phonemes across characters | |
| if not phoneme_comparisons or not word: | |
| return character_errors | |
| chars_per_phoneme = len(word) / len(phoneme_comparisons) | |
| for i, comparison in enumerate(phoneme_comparisons): | |
| if comparison["status"] in ["substitution", "deletion", "wrong"]: | |
| # Calculate character position | |
| char_pos = min(int(i * chars_per_phoneme), len(word) - 1) | |
| severity = 1.0 - comparison["score"] | |
| color = self._get_error_color(severity) | |
| error = CharacterError( | |
| character=word[char_pos], | |
| position=char_pos, | |
| error_type=comparison["status"], | |
| expected_sound=comparison["reference_phoneme"], | |
| actual_sound=comparison["learner_phoneme"], | |
| severity=severity, | |
| color=color | |
| ) | |
| character_errors.append(error) | |
| return character_errors | |
| def _get_error_color(self, severity: float) -> str: | |
| """Get color code for character errors""" | |
| if severity >= 0.8: | |
| return "#ef4444" # Red - severe error | |
| elif severity >= 0.6: | |
| return "#f97316" # Orange - moderate error | |
| elif severity >= 0.4: | |
| return "#eab308" # Yellow - mild error | |
| else: | |
| return "#84cc16" # Light green - minor error | |
| def _identify_wrong_words_enhanced(self, word_highlights: List[Dict], | |
| phoneme_comparisons: List[Dict]) -> List[Dict]: | |
| """Enhanced wrong word identification with detailed error analysis""" | |
| wrong_words = [] | |
| for word_highlight in word_highlights: | |
| if word_highlight["score"] < 0.6: | |
| start_idx = word_highlight["phoneme_start_index"] | |
| end_idx = word_highlight["phoneme_end_index"] | |
| wrong_phonemes = [] | |
| missing_phonemes = [] | |
| for i in range(start_idx, min(end_idx + 1, len(phoneme_comparisons))): | |
| comparison = phoneme_comparisons[i] | |
| if comparison["status"] in ["wrong", "substitution"]: | |
| wrong_phonemes.append({ | |
| "expected": comparison["reference_phoneme"], | |
| "actual": comparison["learner_phoneme"], | |
| "difficulty": comparison["difficulty"], | |
| "description": self.g2p._get_phoneme_description(comparison["reference_phoneme"]) | |
| }) | |
| elif comparison["status"] in ["missing", "deletion"]: | |
| missing_phonemes.append({ | |
| "phoneme": comparison["reference_phoneme"], | |
| "difficulty": comparison["difficulty"], | |
| "description": self.g2p._get_phoneme_description(comparison["reference_phoneme"]) | |
| }) | |
| wrong_word = { | |
| "word": word_highlight["word"], | |
| "score": word_highlight["score"], | |
| "expected_phonemes": word_highlight["phonemes"], | |
| "ipa": word_highlight["ipa"], | |
| "wrong_phonemes": wrong_phonemes, | |
| "missing_phonemes": missing_phonemes, | |
| "tips": self._get_enhanced_vietnamese_tips(wrong_phonemes, missing_phonemes), | |
| "phoneme_visualization": word_highlight["phoneme_visualization"], | |
| "character_errors": word_highlight.get("character_errors", []) | |
| } | |
| wrong_words.append(wrong_word) | |
| return wrong_words | |
| def _create_phoneme_pairs(self, reference: str, learner: str) -> List[Dict]: | |
| """Create phoneme pairs for visualization""" | |
| ref_phones = reference.split() if reference else [] | |
| learner_phones = learner.split() if learner else [] | |
| # Use difflib for alignment visualization | |
| import difflib | |
| matcher = difflib.SequenceMatcher(None, ref_phones, learner_phones) | |
| pairs = [] | |
| for tag, i1, i2, j1, j2 in matcher.get_opcodes(): | |
| if tag == 'equal': | |
| for k in range(i2 - i1): | |
| pairs.append({ | |
| "reference": ref_phones[i1 + k], | |
| "learner": learner_phones[j1 + k], | |
| "match": True, | |
| "type": "correct" | |
| }) | |
| elif tag == 'replace': | |
| max_len = max(i2 - i1, j2 - j1) | |
| for k in range(max_len): | |
| ref_phoneme = ref_phones[i1 + k] if i1 + k < i2 else "" | |
| learner_phoneme = learner_phones[j1 + k] if j1 + k < j2 else "" | |
| pairs.append({ | |
| "reference": ref_phoneme, | |
| "learner": learner_phoneme, | |
| "match": False, | |
| "type": "substitution" | |
| }) | |
| elif tag == 'delete': | |
| for k in range(i1, i2): | |
| pairs.append({ | |
| "reference": ref_phones[k], | |
| "learner": "", | |
| "match": False, | |
| "type": "deletion" | |
| }) | |
| elif tag == 'insert': | |
| for k in range(j1, j2): | |
| pairs.append({ | |
| "reference": "", | |
| "learner": learner_phones[k], | |
| "match": False, | |
| "type": "insertion" | |
| }) | |
| return pairs | |
| def _get_word_status(self, score: float) -> str: | |
| """Get word status from score""" | |
| if score >= 0.8: | |
| return "excellent" | |
| elif score >= 0.6: | |
| return "good" | |
| elif score >= 0.4: | |
| return "needs_practice" | |
| else: | |
| return "poor" | |
| def _get_word_color(self, score: float) -> str: | |
| """Get color for word highlighting""" | |
| if score >= 0.8: | |
| return "#22c55e" # Green | |
| elif score >= 0.6: | |
| return "#84cc16" # Light green | |
| elif score >= 0.4: | |
| return "#eab308" # Yellow | |
| else: | |
| return "#ef4444" # Red | |
| def _get_enhanced_vietnamese_tips(self, wrong_phonemes: List[Dict], | |
| missing_phonemes: List[Dict]) -> List[str]: | |
| """Enhanced Vietnamese-specific pronunciation tips""" | |
| tips = [] | |
| vietnamese_tips = { | |
| "θ": "Đặt lưỡi giữa răng trên và dưới, thổi nhẹ (think, three)", | |
| "ð": "Giống θ nhưng rung dây thanh âm (this, that)", | |
| "v": "Chạm môi dưới vào răng trên, không dùng cả hai môi như tiếng Việt", | |
| "r": "Cuộn lưỡi nhưng không chạm vào vòm miệng, không lăn lưỡi", | |
| "l": "Đầu lưỡi chạm vào vòm miệng sau răng", | |
| "z": "Giống âm 's' nhưng có rung dây thanh âm", | |
| "ʒ": "Giống âm 'ʃ' (sh) nhưng có rung dây thanh âm", | |
| "w": "Tròn môi như âm 'u', không dùng răng như âm 'v'", | |
| "æ": "Mở miệng rộng hơn khi phát âm 'a'", | |
| "ɪ": "Âm 'i' ngắn, không kéo dài như tiếng Việt" | |
| } | |
| for wrong in wrong_phonemes: | |
| expected = wrong["expected"] | |
| if expected in vietnamese_tips: | |
| tips.append(f"Âm /{expected}/: {vietnamese_tips[expected]}") | |
| for missing in missing_phonemes: | |
| phoneme = missing["phoneme"] | |
| if phoneme in vietnamese_tips: | |
| tips.append(f"Thiếu âm /{phoneme}/: {vietnamese_tips[phoneme]}") | |
| return tips | |
| class EnhancedProsodyAnalyzer: | |
| """Enhanced prosody analyzer for sentence-level assessment""" | |
| def __init__(self): | |
| # Expected values for English prosody | |
| self.expected_speech_rate = 4.0 # syllables per second | |
| self.expected_pitch_range = 100 # Hz | |
| self.expected_pitch_cv = 0.3 # coefficient of variation | |
| def analyze_prosody_enhanced(self, audio_features: Dict, reference_text: str) -> Dict: | |
| """Enhanced prosody analysis with detailed scoring""" | |
| if "error" in audio_features: | |
| return self._empty_prosody_result() | |
| duration = audio_features.get("duration", 1) | |
| pitch_data = audio_features.get("pitch", {}) | |
| rhythm_data = audio_features.get("rhythm", {}) | |
| intensity_data = audio_features.get("intensity", {}) | |
| # Calculate syllables | |
| num_syllables = self._estimate_syllables(reference_text) | |
| actual_speech_rate = num_syllables / duration if duration > 0 else 0 | |
| # Calculate individual prosody scores | |
| pace_score = self._calculate_pace_score(actual_speech_rate) | |
| intonation_score = self._calculate_intonation_score(pitch_data) | |
| rhythm_score = self._calculate_rhythm_score(rhythm_data, intensity_data) | |
| stress_score = self._calculate_stress_score(pitch_data, intensity_data) | |
| # Overall prosody score | |
| overall_prosody = (pace_score + intonation_score + rhythm_score + stress_score) / 4 | |
| # Generate prosody feedback | |
| feedback = self._generate_prosody_feedback( | |
| pace_score, intonation_score, rhythm_score, stress_score, | |
| actual_speech_rate, pitch_data | |
| ) | |
| return { | |
| "pace_score": pace_score, | |
| "intonation_score": intonation_score, | |
| "rhythm_score": rhythm_score, | |
| "stress_score": stress_score, | |
| "overall_prosody": overall_prosody, | |
| "details": { | |
| "speech_rate": actual_speech_rate, | |
| "expected_speech_rate": self.expected_speech_rate, | |
| "syllable_count": num_syllables, | |
| "duration": duration, | |
| "pitch_analysis": pitch_data, | |
| "rhythm_analysis": rhythm_data, | |
| "intensity_analysis": intensity_data | |
| }, | |
| "feedback": feedback | |
| } | |
| def _calculate_pace_score(self, actual_rate: float) -> float: | |
| """Calculate pace score based on speech rate""" | |
| if self.expected_speech_rate == 0: | |
| return 0.5 | |
| ratio = actual_rate / self.expected_speech_rate | |
| if 0.8 <= ratio <= 1.2: | |
| return 1.0 | |
| elif 0.6 <= ratio < 0.8 or 1.2 < ratio <= 1.5: | |
| return 0.7 | |
| elif 0.4 <= ratio < 0.6 or 1.5 < ratio <= 2.0: | |
| return 0.4 | |
| else: | |
| return 0.1 | |
| def _calculate_intonation_score(self, pitch_data: Dict) -> float: | |
| """Calculate intonation score based on pitch variation""" | |
| pitch_range = pitch_data.get("range", 0) | |
| if self.expected_pitch_range == 0: | |
| return 0.5 | |
| ratio = pitch_range / self.expected_pitch_range | |
| if 0.7 <= ratio <= 1.3: | |
| return 1.0 | |
| elif 0.5 <= ratio < 0.7 or 1.3 < ratio <= 1.8: | |
| return 0.7 | |
| elif 0.3 <= ratio < 0.5 or 1.8 < ratio <= 2.5: | |
| return 0.4 | |
| else: | |
| return 0.2 | |
| def _calculate_rhythm_score(self, rhythm_data: Dict, intensity_data: Dict) -> float: | |
| """Calculate rhythm score based on tempo and intensity patterns""" | |
| tempo = rhythm_data.get("tempo", 120) | |
| intensity_std = intensity_data.get("rms_std", 0) | |
| intensity_mean = intensity_data.get("rms_mean", 0) | |
| # Tempo score (60-180 BPM is good for speech) | |
| if 60 <= tempo <= 180: | |
| tempo_score = 1.0 | |
| elif 40 <= tempo < 60 or 180 < tempo <= 220: | |
| tempo_score = 0.6 | |
| else: | |
| tempo_score = 0.3 | |
| # Intensity consistency score | |
| if intensity_mean > 0: | |
| intensity_consistency = max(0, 1.0 - (intensity_std / intensity_mean)) | |
| else: | |
| intensity_consistency = 0.5 | |
| return (tempo_score + intensity_consistency) / 2 | |
| def _calculate_stress_score(self, pitch_data: Dict, intensity_data: Dict) -> float: | |
| """Calculate stress score based on pitch and intensity variation""" | |
| pitch_cv = pitch_data.get("cv", 0) | |
| intensity_std = intensity_data.get("rms_std", 0) | |
| intensity_mean = intensity_data.get("rms_mean", 0) | |
| # Pitch coefficient of variation score | |
| if 0.2 <= pitch_cv <= 0.4: | |
| pitch_score = 1.0 | |
| elif 0.1 <= pitch_cv < 0.2 or 0.4 < pitch_cv <= 0.6: | |
| pitch_score = 0.7 | |
| else: | |
| pitch_score = 0.4 | |
| # Intensity variation score | |
| if intensity_mean > 0: | |
| intensity_cv = intensity_std / intensity_mean | |
| if 0.1 <= intensity_cv <= 0.3: | |
| intensity_score = 1.0 | |
| elif 0.05 <= intensity_cv < 0.1 or 0.3 < intensity_cv <= 0.5: | |
| intensity_score = 0.7 | |
| else: | |
| intensity_score = 0.4 | |
| else: | |
| intensity_score = 0.5 | |
| return (pitch_score + intensity_score) / 2 | |
| def _generate_prosody_feedback(self, pace_score: float, intonation_score: float, | |
| rhythm_score: float, stress_score: float, | |
| speech_rate: float, pitch_data: Dict) -> List[str]: | |
| """Generate detailed prosody feedback""" | |
| feedback = [] | |
| if pace_score < 0.5: | |
| if speech_rate < self.expected_speech_rate * 0.8: | |
| feedback.append("Tốc độ nói hơi chậm, thử nói nhanh hơn một chút") | |
| else: | |
| feedback.append("Tốc độ nói hơi nhanh, thử nói chậm lại để rõ ràng hơn") | |
| elif pace_score >= 0.8: | |
| feedback.append("Tốc độ nói rất tự nhiên") | |
| if intonation_score < 0.5: | |
| feedback.append("Cần cải thiện ngữ điệu - thay đổi cao độ giọng nhiều hơn") | |
| elif intonation_score >= 0.8: | |
| feedback.append("Ngữ điệu rất tự nhiên và sinh động") | |
| if rhythm_score < 0.5: | |
| feedback.append("Nhịp điệu cần đều hơn - chú ý đến trọng âm của từ") | |
| elif rhythm_score >= 0.8: | |
| feedback.append("Nhịp điệu rất tốt") | |
| if stress_score < 0.5: | |
| feedback.append("Cần nhấn mạnh trọng âm rõ ràng hơn") | |
| elif stress_score >= 0.8: | |
| feedback.append("Trọng âm được nhấn rất tốt") | |
| return feedback | |
| def _estimate_syllables(self, text: str) -> int: | |
| """Estimate number of syllables in text""" | |
| vowels = "aeiouy" | |
| text = text.lower() | |
| syllable_count = 0 | |
| prev_was_vowel = False | |
| for char in text: | |
| if char in vowels: | |
| if not prev_was_vowel: | |
| syllable_count += 1 | |
| prev_was_vowel = True | |
| else: | |
| prev_was_vowel = False | |
| if text.endswith('e'): | |
| syllable_count -= 1 | |
| return max(1, syllable_count) | |
| def _empty_prosody_result(self) -> Dict: | |
| """Return empty prosody result for error cases""" | |
| return { | |
| "pace_score": 0.5, | |
| "intonation_score": 0.5, | |
| "rhythm_score": 0.5, | |
| "stress_score": 0.5, | |
| "overall_prosody": 0.5, | |
| "details": {}, | |
| "feedback": ["Không thể phân tích ngữ điệu"] | |
| } | |
| class EnhancedFeedbackGenerator: | |
| """Enhanced feedback generator with detailed analysis""" | |
| def generate_enhanced_feedback(self, overall_score: float, wrong_words: List[Dict], | |
| phoneme_comparisons: List[Dict], mode: AssessmentMode, | |
| prosody_analysis: Dict = None) -> List[str]: | |
| """Generate comprehensive feedback based on assessment mode""" | |
| feedback = [] | |
| # Overall score feedback | |
| if overall_score >= 0.9: | |
| feedback.append("Phát âm xuất sắc! Bạn đã làm rất tốt.") | |
| elif overall_score >= 0.8: | |
| feedback.append("Phát âm rất tốt! Chỉ còn một vài điểm nhỏ cần cải thiện.") | |
| elif overall_score >= 0.6: | |
| feedback.append("Phát âm khá tốt, còn một số điểm cần luyện tập thêm.") | |
| elif overall_score >= 0.4: | |
| feedback.append("Cần luyện tập thêm. Tập trung vào những từ được đánh dấu.") | |
| else: | |
| feedback.append("Hãy luyện tập chậm rãi và rõ ràng hơn.") | |
| # Mode-specific feedback | |
| if mode == AssessmentMode.WORD: | |
| feedback.extend(self._generate_word_mode_feedback(wrong_words, phoneme_comparisons)) | |
| elif mode == AssessmentMode.SENTENCE: | |
| feedback.extend(self._generate_sentence_mode_feedback(wrong_words, prosody_analysis)) | |
| # Common error patterns | |
| error_patterns = self._analyze_error_patterns(phoneme_comparisons) | |
| if error_patterns: | |
| feedback.extend(error_patterns) | |
| return feedback | |
| def _generate_word_mode_feedback(self, wrong_words: List[Dict], | |
| phoneme_comparisons: List[Dict]) -> List[str]: | |
| """Generate feedback specific to word mode""" | |
| feedback = [] | |
| if wrong_words: | |
| if len(wrong_words) == 1: | |
| word = wrong_words[0]["word"] | |
| feedback.append(f"Từ '{word}' cần luyện tập thêm") | |
| # Character-level feedback | |
| char_errors = wrong_words[0].get("character_errors", []) | |
| if char_errors: | |
| error_chars = [err.character for err in char_errors[:3]] | |
| feedback.append(f"Chú ý các âm: {', '.join(error_chars)}") | |
| else: | |
| word_list = [w["word"] for w in wrong_words[:3]] | |
| feedback.append(f"Các từ cần luyện: {', '.join(word_list)}") | |
| return feedback | |
| def _generate_sentence_mode_feedback(self, wrong_words: List[Dict], | |
| prosody_analysis: Dict) -> List[str]: | |
| """Generate feedback specific to sentence mode""" | |
| feedback = [] | |
| # Word-level feedback | |
| if wrong_words: | |
| if len(wrong_words) <= 2: | |
| word_list = [w["word"] for w in wrong_words] | |
| feedback.append(f"Cần cải thiện: {', '.join(word_list)}") | |
| else: | |
| feedback.append(f"Có {len(wrong_words)} từ cần luyện tập") | |
| # Prosody feedback | |
| if prosody_analysis and "feedback" in prosody_analysis: | |
| feedback.extend(prosody_analysis["feedback"][:2]) # Limit prosody feedback | |
| return feedback | |
| def _analyze_error_patterns(self, phoneme_comparisons: List[Dict]) -> List[str]: | |
| """Analyze common error patterns across phonemes""" | |
| feedback = [] | |
| # Count error types | |
| error_counts = defaultdict(int) | |
| difficult_phonemes = defaultdict(int) | |
| for comparison in phoneme_comparisons: | |
| if comparison["status"] in ["wrong", "substitution"]: | |
| phoneme = comparison["reference_phoneme"] | |
| difficult_phonemes[phoneme] += 1 | |
| error_counts[comparison["status"]] += 1 | |
| # Most problematic phoneme | |
| if difficult_phonemes: | |
| most_difficult = max(difficult_phonemes.items(), key=lambda x: x[1]) | |
| if most_difficult[1] >= 2: | |
| phoneme = most_difficult[0] | |
| phoneme_tips = { | |
| "θ": "Lưỡi giữa răng, thổi nhẹ", | |
| "ð": "Lưỡi giữa răng, rung dây thanh", | |
| "v": "Môi dưới chạm răng trên", | |
| "r": "Cuộn lưỡi nhẹ", | |
| "z": "Như 's' nhưng rung dây thanh" | |
| } | |
| if phoneme in phoneme_tips: | |
| feedback.append(f"Âm khó nhất /{phoneme}/: {phoneme_tips[phoneme]}") | |
| return feedback | |
| class ProductionPronunciationAssessor: | |
| """Production-ready pronunciation assessor - Enhanced version of the current system""" | |
| def __init__(self, onnx: bool = False, quantized: bool = False): | |
| """Initialize the production-ready pronunciation assessment system""" | |
| logger.info("Initializing Production Pronunciation Assessment System...") | |
| self.asr = EnhancedWav2Vec2CharacterASR(onnx=onnx, quantized=quantized) | |
| self.word_analyzer = EnhancedWordAnalyzer() | |
| self.prosody_analyzer = EnhancedProsodyAnalyzer() | |
| self.feedback_generator = EnhancedFeedbackGenerator() | |
| self.g2p = EnhancedG2P() | |
| logger.info("Production system initialization completed") | |
| def assess_pronunciation(self, audio_path: str, reference_text: str, | |
| mode: str = "auto") -> Dict: | |
| """ | |
| Main assessment function with enhanced features | |
| Args: | |
| audio_path: Path to audio file | |
| reference_text: Reference text to compare against | |
| mode: Assessment mode ("word", "sentence", "auto", or legacy modes) | |
| Returns: | |
| Enhanced assessment results with backward compatibility | |
| """ | |
| logger.info(f"Starting production assessment in {mode} mode...") | |
| start_time = time.time() | |
| try: | |
| # Normalize and validate mode | |
| assessment_mode = self._normalize_mode(mode, reference_text) | |
| logger.info(f"Using assessment mode: {assessment_mode.value}") | |
| # Step 1: Enhanced ASR transcription with features | |
| asr_result = self.asr.transcribe_with_features(audio_path) | |
| if not asr_result["character_transcript"]: | |
| return self._create_error_result("No speech detected in audio") | |
| # Step 2: Enhanced word analysis | |
| analysis_result = self.word_analyzer.analyze_words_enhanced( | |
| reference_text, | |
| asr_result["phoneme_representation"], | |
| assessment_mode | |
| ) | |
| # Step 3: Calculate overall score | |
| overall_score = self._calculate_overall_score(analysis_result["phoneme_differences"]) | |
| # Step 4: Prosody analysis for sentence mode | |
| prosody_analysis = {} | |
| if assessment_mode == AssessmentMode.SENTENCE: | |
| prosody_analysis = self.prosody_analyzer.analyze_prosody_enhanced( | |
| asr_result["audio_features"], | |
| reference_text | |
| ) | |
| # Step 5: Generate enhanced feedback | |
| feedback = self.feedback_generator.generate_enhanced_feedback( | |
| overall_score, | |
| analysis_result["wrong_words"], | |
| analysis_result["phoneme_differences"], | |
| assessment_mode, | |
| prosody_analysis | |
| ) | |
| # Step 6: Create phoneme comparison summary | |
| phoneme_comparison_summary = self._create_phoneme_comparison_summary( | |
| analysis_result["phoneme_pairs"] | |
| ) | |
| # Step 7: Assemble result with backward compatibility | |
| result = self._create_enhanced_result( | |
| asr_result, analysis_result, overall_score, feedback, | |
| prosody_analysis, phoneme_comparison_summary, assessment_mode | |
| ) | |
| # Add processing metadata | |
| processing_time = time.time() - start_time | |
| result["processing_info"] = { | |
| "processing_time": round(processing_time, 2), | |
| "mode": assessment_mode.value, | |
| "model_used": "Wav2Vec2-Enhanced", | |
| "onnx_enabled": self.asr.use_onnx, | |
| "confidence": asr_result["confidence"], | |
| "enhanced_features": True, | |
| "character_level_analysis": assessment_mode == AssessmentMode.WORD, | |
| "prosody_analysis": assessment_mode == AssessmentMode.SENTENCE | |
| } | |
| logger.info(f"Production assessment completed in {processing_time:.2f}s") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Production assessment error: {e}") | |
| return self._create_error_result(f"Assessment failed: {str(e)}") | |
| def _normalize_mode(self, mode: str, reference_text: str) -> AssessmentMode: | |
| """Normalize mode parameter with backward compatibility""" | |
| # Legacy mode mapping | |
| legacy_mapping = { | |
| "normal": AssessmentMode.AUTO, | |
| "advanced": AssessmentMode.AUTO | |
| } | |
| if mode in legacy_mapping: | |
| normalized_mode = legacy_mapping[mode] | |
| logger.info(f"Mapped legacy mode '{mode}' to '{normalized_mode.value}'") | |
| mode = normalized_mode.value | |
| # Validate mode | |
| try: | |
| assessment_mode = AssessmentMode(mode) | |
| except ValueError: | |
| logger.warning(f"Invalid mode '{mode}', defaulting to AUTO") | |
| assessment_mode = AssessmentMode.AUTO | |
| # Auto-detect mode based on text length | |
| if assessment_mode == AssessmentMode.AUTO: | |
| word_count = len(reference_text.strip().split()) | |
| assessment_mode = AssessmentMode.WORD if word_count <= 3 else AssessmentMode.SENTENCE | |
| logger.info(f"Auto-detected mode: {assessment_mode.value} (word count: {word_count})") | |
| return assessment_mode | |
| def _calculate_overall_score(self, phoneme_comparisons: List[Dict]) -> float: | |
| """Calculate weighted overall score""" | |
| if not phoneme_comparisons: | |
| return 0.0 | |
| total_weighted_score = 0.0 | |
| total_weight = 0.0 | |
| for comparison in phoneme_comparisons: | |
| weight = comparison.get("difficulty", 0.5) # Use difficulty as weight | |
| score = comparison["score"] | |
| total_weighted_score += score * weight | |
| total_weight += weight | |
| return total_weighted_score / total_weight if total_weight > 0 else 0.0 | |
| def _create_phoneme_comparison_summary(self, phoneme_pairs: List[Dict]) -> Dict: | |
| """Create phoneme comparison summary statistics""" | |
| total = len(phoneme_pairs) | |
| if total == 0: | |
| return {"total_phonemes": 0, "accuracy_percentage": 0} | |
| correct = sum(1 for pair in phoneme_pairs if pair["match"]) | |
| substitutions = sum(1 for pair in phoneme_pairs if pair["type"] == "substitution") | |
| deletions = sum(1 for pair in phoneme_pairs if pair["type"] == "deletion") | |
| insertions = sum(1 for pair in phoneme_pairs if pair["type"] == "insertion") | |
| return { | |
| "total_phonemes": total, | |
| "correct": correct, | |
| "substitutions": substitutions, | |
| "deletions": deletions, | |
| "insertions": insertions, | |
| "accuracy_percentage": round((correct / total) * 100, 1), | |
| "error_rate": round(((substitutions + deletions + insertions) / total) * 100, 1) | |
| } | |
| def _create_enhanced_result(self, asr_result: Dict, analysis_result: Dict, | |
| overall_score: float, feedback: List[str], | |
| prosody_analysis: Dict, phoneme_summary: Dict, | |
| assessment_mode: AssessmentMode) -> Dict: | |
| """Create enhanced result with backward compatibility""" | |
| # Base result structure (backward compatible) | |
| result = { | |
| "transcript": asr_result["character_transcript"], | |
| "transcript_phonemes": asr_result["phoneme_representation"], | |
| "user_phonemes": asr_result["phoneme_representation"], | |
| "character_transcript": asr_result["character_transcript"], | |
| "overall_score": overall_score, | |
| "word_highlights": analysis_result["word_highlights"], | |
| "phoneme_differences": analysis_result["phoneme_differences"], | |
| "wrong_words": analysis_result["wrong_words"], | |
| "feedback": feedback, | |
| } | |
| # Enhanced features | |
| result.update({ | |
| "reference_phonemes": analysis_result["reference_phonemes"], | |
| "phoneme_pairs": analysis_result["phoneme_pairs"], | |
| "phoneme_comparison": phoneme_summary, | |
| "assessment_mode": assessment_mode.value, | |
| }) | |
| # Add prosody analysis for sentence mode | |
| if prosody_analysis: | |
| result["prosody_analysis"] = prosody_analysis | |
| # Add character-level analysis for word mode | |
| if assessment_mode == AssessmentMode.WORD: | |
| result["character_level_analysis"] = True | |
| # Add character errors to word highlights if available | |
| for word_highlight in result["word_highlights"]: | |
| if "character_errors" in word_highlight: | |
| # Convert CharacterError objects to dicts for JSON serialization | |
| char_errors = [] | |
| for error in word_highlight["character_errors"]: | |
| if isinstance(error, CharacterError): | |
| char_errors.append({ | |
| "character": error.character, | |
| "position": error.position, | |
| "error_type": error.error_type, | |
| "expected_sound": error.expected_sound, | |
| "actual_sound": error.actual_sound, | |
| "severity": error.severity, | |
| "color": error.color | |
| }) | |
| else: | |
| char_errors.append(error) | |
| word_highlight["character_errors"] = char_errors | |
| return result | |
| def _create_error_result(self, error_message: str) -> Dict: | |
| """Create error result structure""" | |
| return { | |
| "transcript": "", | |
| "transcript_phonemes": "", | |
| "user_phonemes": "", | |
| "character_transcript": "", | |
| "overall_score": 0.0, | |
| "word_highlights": [], | |
| "phoneme_differences": [], | |
| "wrong_words": [], | |
| "feedback": [f"Lỗi: {error_message}"], | |
| "error": error_message, | |
| "assessment_mode": "error", | |
| "processing_info": { | |
| "processing_time": 0, | |
| "mode": "error", | |
| "model_used": "Wav2Vec2-Enhanced", | |
| "confidence": 0.0, | |
| "enhanced_features": False | |
| } | |
| } | |
| def get_system_info(self) -> Dict: | |
| """Get comprehensive system information""" | |
| return { | |
| "version": "2.1.0-production", | |
| "name": "Production Pronunciation Assessment System", | |
| "modes": [mode.value for mode in AssessmentMode], | |
| "features": [ | |
| "Enhanced Levenshtein distance phoneme alignment", | |
| "Character-level error detection (word mode)", | |
| "Advanced prosody analysis (sentence mode)", | |
| "Vietnamese speaker-specific error patterns", | |
| "Real-time confidence scoring", | |
| "IPA phonetic representation with visualization", | |
| "Backward compatibility with legacy APIs", | |
| "Production-ready error handling" | |
| ], | |
| "model_info": { | |
| "asr_model": self.asr.model_name, | |
| "onnx_enabled": self.asr.use_onnx, | |
| "sample_rate": self.asr.sample_rate | |
| }, | |
| "assessment_modes": { | |
| "word": "Detailed character and phoneme level analysis for single words or short phrases", | |
| "sentence": "Word-level analysis with prosody evaluation for complete sentences", | |
| "auto": "Automatically selects mode based on text length (≤3 words = word mode)" | |
| } | |
| } | |
| # Backward compatibility wrapper | |
| class SimplePronunciationAssessor: | |
| """Backward compatible wrapper for the enhanced system""" | |
| def __init__(self): | |
| print("Initializing Simple Pronunciation Assessor (Enhanced)...") | |
| self.enhanced_assessor = ProductionPronunciationAssessor() | |
| print("Enhanced Simple Pronunciation Assessor initialization completed") | |
| def assess_pronunciation(self, audio_path: str, reference_text: str, | |
| mode: str = "normal") -> Dict: | |
| """ | |
| Backward compatible assessment function | |
| Args: | |
| audio_path: Path to audio file | |
| reference_text: Reference text to compare | |
| mode: Assessment mode (supports legacy modes) | |
| """ | |
| return self.enhanced_assessor.assess_pronunciation(audio_path, reference_text, mode) | |
| # Example usage | |
| if __name__ == "__main__": | |
| # Initialize production system | |
| system = ProductionPronunciationAssessor(onnx=False, quantized=False) | |
| # Example word mode assessment | |
| print("=== WORD MODE EXAMPLE ===") | |
| word_result = system.assess_pronunciation( | |
| audio_path="./hello_world.wav", | |
| reference_text="hello", | |
| mode="word" | |
| ) | |
| # print(f"Word mode result keys: {list(word_result.keys())}") | |
| print("Word result", word_result) | |
| # Example sentence mode assessment | |
| print("\n=== SENTENCE MODE EXAMPLE ===") | |
| sentence_result = system.assess_pronunciation( | |
| audio_path="./hello_how_are_you_today.wav", | |
| reference_text="Hello, how are you today?", | |
| mode="sentence" | |
| ) | |
| print(f"Sentence mode result keys: {list(sentence_result.keys())}") | |
| print("Sentence result", sentence_result) | |
| # Example auto mode assessment | |
| print("\n=== AUTO MODE EXAMPLE ===") | |
| auto_result = system.assess_pronunciation( | |
| audio_path="./hello_how_are_you_today.wav", | |
| reference_text="world", # Single word - should auto-select word mode | |
| mode="auto" | |
| ) | |
| print(f"Auto mode result: {auto_result['assessment_mode']}") | |
| print("Auto result", auto_result) | |
| # Backward compatibility test | |
| print("\n=== BACKWARD COMPATIBILITY TEST ===") | |
| legacy_assessor = SimplePronunciationAssessor() | |
| legacy_result = legacy_assessor.assess_pronunciation( | |
| audio_path="./hello_world.wav", | |
| reference_text="pronunciation", | |
| mode="normal" # Legacy mode | |
| ) | |
| print(f"Legacy mode result: {legacy_result}") | |
| print(f"Legacy mode mapped to: {legacy_result.get('assessment_mode', 'N/A')}") | |
| # System info | |
| print(f"\n=== SYSTEM INFO ===") | |
| system_info = system.get_system_info() | |
| print(f"System version: {system_info['version']}") | |
| print(f"Available modes: {system_info['modes']}") | |
| print(f"Key features: {len(system_info['features'])} enhanced features") |