from typing import List, Dict, Tuple, Optional import numpy as np import librosa import nltk import eng_to_ipa as ipa import re from collections import defaultdict from loguru import logger import time import Levenshtein from dataclasses import dataclass from enum import Enum from src.AI_Models.wave2vec_inference import ( Wave2Vec2Inference, Wave2Vec2ONNXInference, export_to_onnx, ) # Download required NLTK data try: nltk.download("cmudict", quiet=True) from nltk.corpus import cmudict except: print("Warning: NLTK data not available") class AssessmentMode(Enum): WORD = "word" SENTENCE = "sentence" AUTO = "auto" class ErrorType(Enum): CORRECT = "correct" SUBSTITUTION = "substitution" DELETION = "deletion" INSERTION = "insertion" ACCEPTABLE = "acceptable" @dataclass class CharacterError: """Character-level error information for UI mapping""" character: str position: int error_type: str expected_sound: str actual_sound: str severity: float color: str class EnhancedWav2Vec2CharacterASR: """Enhanced Wav2Vec2 ASR with prosody analysis support""" def __init__( self, model_name: str = "facebook/wav2vec2-large-960h-lv60-self", onnx: bool = False, quantized: bool = False, ): self.use_onnx = onnx self.sample_rate = 16000 self.model_name = model_name if onnx: import os model_path = f"wav2vec2-large-960h-lv60-self{'.quant' if quantized else ''}.onnx" if not os.path.exists(model_path): export_to_onnx(model_name, quantize=quantized) self.model = ( Wave2Vec2Inference(model_name) if not onnx else Wave2Vec2ONNXInference(model_name, model_path) ) def transcribe_with_features(self, audio_path: str) -> Dict: """Enhanced transcription with audio features for prosody analysis""" try: start_time = time.time() # Basic transcription character_transcript = self.model.file_to_text(audio_path) character_transcript = self._clean_character_transcript(character_transcript) # Convert to phonemes phoneme_representation = self._characters_to_phoneme_representation(character_transcript) # Extract audio features for prosody audio_features = self._extract_enhanced_audio_features(audio_path) logger.info(f"Enhanced transcription time: {time.time() - start_time:.2f}s") return { "character_transcript": character_transcript, "phoneme_representation": phoneme_representation, "audio_features": audio_features, "confidence": self._estimate_confidence(character_transcript) } except Exception as e: logger.error(f"Enhanced ASR error: {e}") return self._empty_result() def _extract_enhanced_audio_features(self, audio_path: str) -> Dict: """Extract comprehensive audio features for prosody analysis""" try: y, sr = librosa.load(audio_path, sr=self.sample_rate) duration = len(y) / sr # Pitch analysis pitches, magnitudes = librosa.piptrack(y=y, sr=sr) pitch_values = [] for t in range(pitches.shape[1]): index = magnitudes[:, t].argmax() pitch = pitches[index, t] if pitch > 0: pitch_values.append(pitch) # Rhythm and timing features tempo, beats = librosa.beat.beat_track(y=y, sr=sr) # Intensity features rms = librosa.feature.rms(y=y)[0] zcr = librosa.feature.zero_crossing_rate(y)[0] # Spectral features spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0] return { "duration": duration, "pitch": { "values": pitch_values, "mean": np.mean(pitch_values) if pitch_values else 0, "std": np.std(pitch_values) if pitch_values else 0, "range": np.max(pitch_values) - np.min(pitch_values) if pitch_values else 0, "cv": np.std(pitch_values) / np.mean(pitch_values) if pitch_values and np.mean(pitch_values) > 0 else 0 }, "rhythm": { "tempo": tempo, "beats_per_second": len(beats) / duration if duration > 0 else 0 }, "intensity": { "rms_mean": np.mean(rms), "rms_std": np.std(rms), "zcr_mean": np.mean(zcr) }, "spectral": { "centroid_mean": np.mean(spectral_centroids), "centroid_std": np.std(spectral_centroids) } } except Exception as e: logger.error(f"Audio feature extraction error: {e}") return {"duration": 0, "error": str(e)} def _clean_character_transcript(self, transcript: str) -> str: """Clean and standardize character transcript""" logger.info(f"Raw transcript before cleaning: {transcript}") cleaned = re.sub(r'\s+', ' ', transcript) return cleaned.strip().lower() def _characters_to_phoneme_representation(self, text: str) -> str: """Convert character-based transcript to phoneme representation""" if not text: return "" words = text.split() phoneme_words = [] g2p = EnhancedG2P() for word in words: try: if g2p: word_phonemes = g2p.word_to_phonemes(word) phoneme_words.extend(word_phonemes) else: phoneme_words.extend(self._simple_letter_to_phoneme(word)) except: phoneme_words.extend(self._simple_letter_to_phoneme(word)) return " ".join(phoneme_words) def _simple_letter_to_phoneme(self, word: str) -> List[str]: """Fallback letter-to-phoneme conversion""" letter_to_phoneme = { "a": "æ", "b": "b", "c": "k", "d": "d", "e": "ɛ", "f": "f", "g": "ɡ", "h": "h", "i": "ɪ", "j": "dʒ", "k": "k", "l": "l", "m": "m", "n": "n", "o": "ʌ", "p": "p", "q": "k", "r": "r", "s": "s", "t": "t", "u": "ʌ", "v": "v", "w": "w", "x": "ks", "y": "j", "z": "z" } return [letter_to_phoneme.get(letter, letter) for letter in word.lower() if letter in letter_to_phoneme] def _estimate_confidence(self, transcript: str) -> float: """Estimate transcription confidence""" if not transcript or len(transcript.strip()) < 2: return 0.0 repeated_chars = len(re.findall(r'(.)\1{2,}', transcript)) return max(0.0, 1.0 - (repeated_chars * 0.2)) def _empty_result(self) -> Dict: """Empty result for error cases""" return { "character_transcript": "", "phoneme_representation": "", "audio_features": {"duration": 0}, "confidence": 0.0 } class EnhancedG2P: """Enhanced Grapheme-to-Phoneme converter with visualization support""" def __init__(self): try: self.cmu_dict = cmudict.dict() except: self.cmu_dict = {} logger.warning("CMU dictionary not available") # Vietnamese speaker substitution patterns (enhanced) self.vn_substitutions = { "θ": ["f", "s", "t", "d"], "ð": ["d", "z", "v", "t"], "v": ["w", "f", "b"], "w": ["v", "b"], "r": ["l", "n"], "l": ["r", "n"], "z": ["s", "j"], "ʒ": ["ʃ", "z", "s"], "ʃ": ["s", "ʒ"], "ŋ": ["n", "m"], "tʃ": ["ʃ", "s", "k"], "dʒ": ["ʒ", "j", "g"], "æ": ["ɛ", "a"], "ɪ": ["i"], "ʊ": ["u"] } # Difficulty scores for Vietnamese speakers self.difficulty_scores = { "θ": 0.9, "ð": 0.9, "v": 0.8, "z": 0.8, "ʒ": 0.9, "r": 0.7, "l": 0.6, "w": 0.5, "æ": 0.7, "ɪ": 0.6, "ʊ": 0.6, "ŋ": 0.3, "f": 0.2, "s": 0.2, "ʃ": 0.5, "tʃ": 0.4, "dʒ": 0.5 } def word_to_phonemes(self, word: str) -> List[str]: """Convert word to phoneme list""" word_lower = word.lower().strip() if word_lower in self.cmu_dict: cmu_phonemes = self.cmu_dict[word_lower][0] return self._convert_cmu_to_ipa(cmu_phonemes) else: return self._estimate_phonemes(word_lower) def get_phoneme_string(self, text: str) -> str: """Get space-separated phoneme string""" words = self._clean_text(text).split() all_phonemes = [] for word in words: if word: phonemes = self.word_to_phonemes(word) all_phonemes.extend(phonemes) return " ".join(all_phonemes) def text_to_phonemes(self, text: str) -> List[Dict]: """Convert text to phoneme sequence with visualization data""" words = self._clean_text(text).split() phoneme_sequence = [] for word in words: word_phonemes = self.word_to_phonemes(word) phoneme_sequence.append({ "word": word, "phonemes": word_phonemes, "ipa": self._get_ipa(word), "phoneme_string": " ".join(word_phonemes), "visualization": self._create_phoneme_visualization(word_phonemes) }) return phoneme_sequence def _convert_cmu_to_ipa(self, cmu_phonemes: List[str]) -> List[str]: """Convert CMU phonemes to IPA""" cmu_to_ipa = { "AA": "ɑ", "AE": "æ", "AH": "ʌ", "AO": "ɔ", "AW": "aʊ", "AY": "aɪ", "EH": "ɛ", "ER": "ɝ", "EY": "eɪ", "IH": "ɪ", "IY": "i", "OW": "oʊ", "OY": "ɔɪ", "UH": "ʊ", "UW": "u", "B": "b", "CH": "tʃ", "D": "d", "DH": "ð", "F": "f", "G": "ɡ", "HH": "h", "JH": "dʒ", "K": "k", "L": "l", "M": "m", "N": "n", "NG": "ŋ", "P": "p", "R": "r", "S": "s", "SH": "ʃ", "T": "t", "TH": "θ", "V": "v", "W": "w", "Y": "j", "Z": "z", "ZH": "ʒ" } ipa_phonemes = [] for phoneme in cmu_phonemes: clean_phoneme = re.sub(r'[0-9]', '', phoneme) ipa_phoneme = cmu_to_ipa.get(clean_phoneme, clean_phoneme.lower()) ipa_phonemes.append(ipa_phoneme) return ipa_phonemes def _estimate_phonemes(self, word: str) -> List[str]: """Estimate phonemes for unknown words""" phoneme_map = { "ch": "tʃ", "sh": "ʃ", "th": "θ", "ph": "f", "ck": "k", "ng": "ŋ", "qu": "kw", "a": "æ", "e": "ɛ", "i": "ɪ", "o": "ʌ", "u": "ʌ", "b": "b", "c": "k", "d": "d", "f": "f", "g": "ɡ", "h": "h", "j": "dʒ", "k": "k", "l": "l", "m": "m", "n": "n", "p": "p", "r": "r", "s": "s", "t": "t", "v": "v", "w": "w", "x": "ks", "y": "j", "z": "z" } phonemes = [] i = 0 while i < len(word): if i <= len(word) - 2: two_char = word[i:i+2] if two_char in phoneme_map: phonemes.append(phoneme_map[two_char]) i += 2 continue char = word[i] if char in phoneme_map: phonemes.append(phoneme_map[char]) i += 1 return phonemes def _clean_text(self, text: str) -> str: """Clean text for processing""" text = re.sub(r"[^\w\s']", " ", text) text = re.sub(r'\s+', ' ', text) return text.lower().strip() def _get_ipa(self, word: str) -> str: """Get IPA transcription""" try: return ipa.convert(word) except: return f"/{word}/" def _create_phoneme_visualization(self, phonemes: List[str]) -> List[Dict]: """Create visualization data for phonemes""" visualization = [] for phoneme in phonemes: color_category = self._get_phoneme_color_category(phoneme) visualization.append({ "phoneme": phoneme, "color_category": color_category, "description": self._get_phoneme_description(phoneme), "difficulty": self.difficulty_scores.get(phoneme, 0.3) }) return visualization def _get_phoneme_color_category(self, phoneme: str) -> str: """Categorize phonemes by color for visualization""" vowel_phonemes = {"ɑ", "æ", "ʌ", "ɔ", "aʊ", "aɪ", "ɛ", "ɝ", "eɪ", "ɪ", "i", "oʊ", "ɔɪ", "ʊ", "u"} difficult_consonants = {"θ", "ð", "v", "z", "ʒ", "r", "w"} if phoneme in vowel_phonemes: return "vowel" elif phoneme in difficult_consonants: return "difficult" else: return "consonant" def _get_phoneme_description(self, phoneme: str) -> str: """Get description for a phoneme""" descriptions = { "θ": "Voiceless dental fricative (like 'th' in 'think')", "ð": "Voiced dental fricative (like 'th' in 'this')", "v": "Voiced labiodental fricative (like 'v' in 'van')", "z": "Voiced alveolar fricative (like 'z' in 'zip')", "ʒ": "Voiced postalveolar fricative (like 's' in 'measure')", "r": "Alveolar approximant (like 'r' in 'red')", "w": "Labial-velar approximant (like 'w' in 'wet')", "æ": "Near-open front unrounded vowel (like 'a' in 'cat')", "ɪ": "Near-close near-front unrounded vowel (like 'i' in 'sit')", "ʊ": "Near-close near-back rounded vowel (like 'u' in 'put')" } return descriptions.get(phoneme, f"Phoneme: {phoneme}") def is_acceptable_substitution(self, reference: str, predicted: str) -> bool: """Check if substitution is acceptable for Vietnamese speakers""" acceptable = self.vn_substitutions.get(reference, []) return predicted in acceptable def get_difficulty_score(self, phoneme: str) -> float: """Get difficulty score for phoneme""" return self.difficulty_scores.get(phoneme, 0.3) class AdvancedPhonemeComparator: """Enhanced phoneme comparator using Levenshtein distance""" def __init__(self): self.g2p = EnhancedG2P() def compare_with_levenshtein(self, reference: str, predicted: str) -> List[Dict]: """Compare phonemes using Levenshtein distance for accurate alignment""" ref_phones = reference.split() if reference else [] pred_phones = predicted.split() if predicted else [] if not ref_phones: return [] # Use Levenshtein editops for precise alignment ops = Levenshtein.editops(ref_phones, pred_phones) comparisons = [] ref_idx = 0 pred_idx = 0 # Process equal parts first for op_type, ref_pos, pred_pos in ops: # Add equal characters before this operation while ref_idx < ref_pos and pred_idx < pred_pos: comparison = self._create_comparison( ref_phones[ref_idx], pred_phones[pred_idx], ErrorType.CORRECT, 1.0, len(comparisons) ) comparisons.append(comparison) ref_idx += 1 pred_idx += 1 # Process the operation if op_type == 'replace': ref_phoneme = ref_phones[ref_pos] pred_phoneme = pred_phones[pred_pos] if self.g2p.is_acceptable_substitution(ref_phoneme, pred_phoneme): error_type = ErrorType.ACCEPTABLE score = 0.7 else: error_type = ErrorType.SUBSTITUTION score = 0.2 comparison = self._create_comparison( ref_phoneme, pred_phoneme, error_type, score, len(comparisons) ) comparisons.append(comparison) ref_idx = ref_pos + 1 pred_idx = pred_pos + 1 elif op_type == 'delete': comparison = self._create_comparison( ref_phones[ref_pos], "", ErrorType.DELETION, 0.0, len(comparisons) ) comparisons.append(comparison) ref_idx = ref_pos + 1 elif op_type == 'insert': comparison = self._create_comparison( "", pred_phones[pred_pos], ErrorType.INSERTION, 0.0, len(comparisons) ) comparisons.append(comparison) pred_idx = pred_pos + 1 # Add remaining equal characters while ref_idx < len(ref_phones) and pred_idx < len(pred_phones): comparison = self._create_comparison( ref_phones[ref_idx], pred_phones[pred_idx], ErrorType.CORRECT, 1.0, len(comparisons) ) comparisons.append(comparison) ref_idx += 1 pred_idx += 1 return comparisons def _create_comparison(self, ref_phoneme: str, pred_phoneme: str, error_type: ErrorType, score: float, position: int) -> Dict: """Create comparison dictionary""" return { "position": position, "reference_phoneme": ref_phoneme, "learner_phoneme": pred_phoneme, "status": error_type.value, "score": score, "difficulty": self.g2p.get_difficulty_score(ref_phoneme), "error_type": error_type.value } class EnhancedWordAnalyzer: """Enhanced word analyzer with character-level error mapping""" def __init__(self): self.g2p = EnhancedG2P() self.comparator = AdvancedPhonemeComparator() def analyze_words_enhanced(self, reference_text: str, learner_phonemes: str, mode: AssessmentMode) -> Dict: """Enhanced word analysis with character-level mapping""" # Get reference phonemes by word reference_words = self.g2p.text_to_phonemes(reference_text) # Get overall phoneme comparison using Levenshtein reference_phoneme_string = self.g2p.get_phoneme_string(reference_text) phoneme_comparisons = self.comparator.compare_with_levenshtein( reference_phoneme_string, learner_phonemes ) # Create enhanced word highlights word_highlights = self._create_enhanced_word_highlights( reference_words, phoneme_comparisons, mode ) # Identify wrong words with character-level errors wrong_words = self._identify_wrong_words_enhanced(word_highlights, phoneme_comparisons) return { "word_highlights": word_highlights, "phoneme_differences": phoneme_comparisons, "wrong_words": wrong_words, "reference_phonemes": reference_phoneme_string, "phoneme_pairs": self._create_phoneme_pairs(reference_phoneme_string, learner_phonemes) } def _create_enhanced_word_highlights(self, reference_words: List[Dict], phoneme_comparisons: List[Dict], mode: AssessmentMode) -> List[Dict]: """Create enhanced word highlights with character-level error mapping""" word_highlights = [] phoneme_index = 0 for word_data in reference_words: word = word_data["word"] word_phonemes = word_data["phonemes"] num_phonemes = len(word_phonemes) # Get phoneme scores for this word word_phoneme_scores = [] word_comparisons = [] for j in range(num_phonemes): if phoneme_index + j < len(phoneme_comparisons): comparison = phoneme_comparisons[phoneme_index + j] word_phoneme_scores.append(comparison["score"]) word_comparisons.append(comparison) # Calculate word score word_score = np.mean(word_phoneme_scores) if word_phoneme_scores else 0.0 # Map phoneme errors to character positions (enhanced for word mode) character_errors = [] if mode == AssessmentMode.WORD: character_errors = self._map_phonemes_to_characters(word, word_comparisons) # Create enhanced word highlight highlight = { "word": word, "score": float(word_score), "status": self._get_word_status(word_score), "color": self._get_word_color(word_score), "phonemes": word_phonemes, "ipa": word_data["ipa"], "phoneme_scores": word_phoneme_scores, "phoneme_start_index": phoneme_index, "phoneme_end_index": phoneme_index + num_phonemes - 1, "phoneme_visualization": word_data["visualization"], "character_errors": character_errors, # New feature "detailed_analysis": mode == AssessmentMode.WORD # Flag for UI } word_highlights.append(highlight) phoneme_index += num_phonemes return word_highlights def _map_phonemes_to_characters(self, word: str, phoneme_comparisons: List[Dict]) -> List[CharacterError]: """Map phoneme errors to character positions in word""" character_errors = [] # Simple mapping strategy: distribute phonemes across characters if not phoneme_comparisons or not word: return character_errors chars_per_phoneme = len(word) / len(phoneme_comparisons) for i, comparison in enumerate(phoneme_comparisons): if comparison["status"] in ["substitution", "deletion", "wrong"]: # Calculate character position char_pos = min(int(i * chars_per_phoneme), len(word) - 1) severity = 1.0 - comparison["score"] color = self._get_error_color(severity) error = CharacterError( character=word[char_pos], position=char_pos, error_type=comparison["status"], expected_sound=comparison["reference_phoneme"], actual_sound=comparison["learner_phoneme"], severity=severity, color=color ) character_errors.append(error) return character_errors def _get_error_color(self, severity: float) -> str: """Get color code for character errors""" if severity >= 0.8: return "#ef4444" # Red - severe error elif severity >= 0.6: return "#f97316" # Orange - moderate error elif severity >= 0.4: return "#eab308" # Yellow - mild error else: return "#84cc16" # Light green - minor error def _identify_wrong_words_enhanced(self, word_highlights: List[Dict], phoneme_comparisons: List[Dict]) -> List[Dict]: """Enhanced wrong word identification with detailed error analysis""" wrong_words = [] for word_highlight in word_highlights: if word_highlight["score"] < 0.6: start_idx = word_highlight["phoneme_start_index"] end_idx = word_highlight["phoneme_end_index"] wrong_phonemes = [] missing_phonemes = [] for i in range(start_idx, min(end_idx + 1, len(phoneme_comparisons))): comparison = phoneme_comparisons[i] if comparison["status"] in ["wrong", "substitution"]: wrong_phonemes.append({ "expected": comparison["reference_phoneme"], "actual": comparison["learner_phoneme"], "difficulty": comparison["difficulty"], "description": self.g2p._get_phoneme_description(comparison["reference_phoneme"]) }) elif comparison["status"] in ["missing", "deletion"]: missing_phonemes.append({ "phoneme": comparison["reference_phoneme"], "difficulty": comparison["difficulty"], "description": self.g2p._get_phoneme_description(comparison["reference_phoneme"]) }) wrong_word = { "word": word_highlight["word"], "score": word_highlight["score"], "expected_phonemes": word_highlight["phonemes"], "ipa": word_highlight["ipa"], "wrong_phonemes": wrong_phonemes, "missing_phonemes": missing_phonemes, "tips": self._get_enhanced_vietnamese_tips(wrong_phonemes, missing_phonemes), "phoneme_visualization": word_highlight["phoneme_visualization"], "character_errors": word_highlight.get("character_errors", []) } wrong_words.append(wrong_word) return wrong_words def _create_phoneme_pairs(self, reference: str, learner: str) -> List[Dict]: """Create phoneme pairs for visualization""" ref_phones = reference.split() if reference else [] learner_phones = learner.split() if learner else [] # Use difflib for alignment visualization import difflib matcher = difflib.SequenceMatcher(None, ref_phones, learner_phones) pairs = [] for tag, i1, i2, j1, j2 in matcher.get_opcodes(): if tag == 'equal': for k in range(i2 - i1): pairs.append({ "reference": ref_phones[i1 + k], "learner": learner_phones[j1 + k], "match": True, "type": "correct" }) elif tag == 'replace': max_len = max(i2 - i1, j2 - j1) for k in range(max_len): ref_phoneme = ref_phones[i1 + k] if i1 + k < i2 else "" learner_phoneme = learner_phones[j1 + k] if j1 + k < j2 else "" pairs.append({ "reference": ref_phoneme, "learner": learner_phoneme, "match": False, "type": "substitution" }) elif tag == 'delete': for k in range(i1, i2): pairs.append({ "reference": ref_phones[k], "learner": "", "match": False, "type": "deletion" }) elif tag == 'insert': for k in range(j1, j2): pairs.append({ "reference": "", "learner": learner_phones[k], "match": False, "type": "insertion" }) return pairs def _get_word_status(self, score: float) -> str: """Get word status from score""" if score >= 0.8: return "excellent" elif score >= 0.6: return "good" elif score >= 0.4: return "needs_practice" else: return "poor" def _get_word_color(self, score: float) -> str: """Get color for word highlighting""" if score >= 0.8: return "#22c55e" # Green elif score >= 0.6: return "#84cc16" # Light green elif score >= 0.4: return "#eab308" # Yellow else: return "#ef4444" # Red def _get_enhanced_vietnamese_tips(self, wrong_phonemes: List[Dict], missing_phonemes: List[Dict]) -> List[str]: """Enhanced Vietnamese-specific pronunciation tips""" tips = [] vietnamese_tips = { "θ": "Đặt lưỡi giữa răng trên và dưới, thổi nhẹ (think, three)", "ð": "Giống θ nhưng rung dây thanh âm (this, that)", "v": "Chạm môi dưới vào răng trên, không dùng cả hai môi như tiếng Việt", "r": "Cuộn lưỡi nhưng không chạm vào vòm miệng, không lăn lưỡi", "l": "Đầu lưỡi chạm vào vòm miệng sau răng", "z": "Giống âm 's' nhưng có rung dây thanh âm", "ʒ": "Giống âm 'ʃ' (sh) nhưng có rung dây thanh âm", "w": "Tròn môi như âm 'u', không dùng răng như âm 'v'", "æ": "Mở miệng rộng hơn khi phát âm 'a'", "ɪ": "Âm 'i' ngắn, không kéo dài như tiếng Việt" } for wrong in wrong_phonemes: expected = wrong["expected"] if expected in vietnamese_tips: tips.append(f"Âm /{expected}/: {vietnamese_tips[expected]}") for missing in missing_phonemes: phoneme = missing["phoneme"] if phoneme in vietnamese_tips: tips.append(f"Thiếu âm /{phoneme}/: {vietnamese_tips[phoneme]}") return tips class EnhancedProsodyAnalyzer: """Enhanced prosody analyzer for sentence-level assessment""" def __init__(self): # Expected values for English prosody self.expected_speech_rate = 4.0 # syllables per second self.expected_pitch_range = 100 # Hz self.expected_pitch_cv = 0.3 # coefficient of variation def analyze_prosody_enhanced(self, audio_features: Dict, reference_text: str) -> Dict: """Enhanced prosody analysis with detailed scoring""" if "error" in audio_features: return self._empty_prosody_result() duration = audio_features.get("duration", 1) pitch_data = audio_features.get("pitch", {}) rhythm_data = audio_features.get("rhythm", {}) intensity_data = audio_features.get("intensity", {}) # Calculate syllables num_syllables = self._estimate_syllables(reference_text) actual_speech_rate = num_syllables / duration if duration > 0 else 0 # Calculate individual prosody scores pace_score = self._calculate_pace_score(actual_speech_rate) intonation_score = self._calculate_intonation_score(pitch_data) rhythm_score = self._calculate_rhythm_score(rhythm_data, intensity_data) stress_score = self._calculate_stress_score(pitch_data, intensity_data) # Overall prosody score overall_prosody = (pace_score + intonation_score + rhythm_score + stress_score) / 4 # Generate prosody feedback feedback = self._generate_prosody_feedback( pace_score, intonation_score, rhythm_score, stress_score, actual_speech_rate, pitch_data ) return { "pace_score": pace_score, "intonation_score": intonation_score, "rhythm_score": rhythm_score, "stress_score": stress_score, "overall_prosody": overall_prosody, "details": { "speech_rate": actual_speech_rate, "expected_speech_rate": self.expected_speech_rate, "syllable_count": num_syllables, "duration": duration, "pitch_analysis": pitch_data, "rhythm_analysis": rhythm_data, "intensity_analysis": intensity_data }, "feedback": feedback } def _calculate_pace_score(self, actual_rate: float) -> float: """Calculate pace score based on speech rate""" if self.expected_speech_rate == 0: return 0.5 ratio = actual_rate / self.expected_speech_rate if 0.8 <= ratio <= 1.2: return 1.0 elif 0.6 <= ratio < 0.8 or 1.2 < ratio <= 1.5: return 0.7 elif 0.4 <= ratio < 0.6 or 1.5 < ratio <= 2.0: return 0.4 else: return 0.1 def _calculate_intonation_score(self, pitch_data: Dict) -> float: """Calculate intonation score based on pitch variation""" pitch_range = pitch_data.get("range", 0) if self.expected_pitch_range == 0: return 0.5 ratio = pitch_range / self.expected_pitch_range if 0.7 <= ratio <= 1.3: return 1.0 elif 0.5 <= ratio < 0.7 or 1.3 < ratio <= 1.8: return 0.7 elif 0.3 <= ratio < 0.5 or 1.8 < ratio <= 2.5: return 0.4 else: return 0.2 def _calculate_rhythm_score(self, rhythm_data: Dict, intensity_data: Dict) -> float: """Calculate rhythm score based on tempo and intensity patterns""" tempo = rhythm_data.get("tempo", 120) intensity_std = intensity_data.get("rms_std", 0) intensity_mean = intensity_data.get("rms_mean", 0) # Tempo score (60-180 BPM is good for speech) if 60 <= tempo <= 180: tempo_score = 1.0 elif 40 <= tempo < 60 or 180 < tempo <= 220: tempo_score = 0.6 else: tempo_score = 0.3 # Intensity consistency score if intensity_mean > 0: intensity_consistency = max(0, 1.0 - (intensity_std / intensity_mean)) else: intensity_consistency = 0.5 return (tempo_score + intensity_consistency) / 2 def _calculate_stress_score(self, pitch_data: Dict, intensity_data: Dict) -> float: """Calculate stress score based on pitch and intensity variation""" pitch_cv = pitch_data.get("cv", 0) intensity_std = intensity_data.get("rms_std", 0) intensity_mean = intensity_data.get("rms_mean", 0) # Pitch coefficient of variation score if 0.2 <= pitch_cv <= 0.4: pitch_score = 1.0 elif 0.1 <= pitch_cv < 0.2 or 0.4 < pitch_cv <= 0.6: pitch_score = 0.7 else: pitch_score = 0.4 # Intensity variation score if intensity_mean > 0: intensity_cv = intensity_std / intensity_mean if 0.1 <= intensity_cv <= 0.3: intensity_score = 1.0 elif 0.05 <= intensity_cv < 0.1 or 0.3 < intensity_cv <= 0.5: intensity_score = 0.7 else: intensity_score = 0.4 else: intensity_score = 0.5 return (pitch_score + intensity_score) / 2 def _generate_prosody_feedback(self, pace_score: float, intonation_score: float, rhythm_score: float, stress_score: float, speech_rate: float, pitch_data: Dict) -> List[str]: """Generate detailed prosody feedback""" feedback = [] if pace_score < 0.5: if speech_rate < self.expected_speech_rate * 0.8: feedback.append("Tốc độ nói hơi chậm, thử nói nhanh hơn một chút") else: feedback.append("Tốc độ nói hơi nhanh, thử nói chậm lại để rõ ràng hơn") elif pace_score >= 0.8: feedback.append("Tốc độ nói rất tự nhiên") if intonation_score < 0.5: feedback.append("Cần cải thiện ngữ điệu - thay đổi cao độ giọng nhiều hơn") elif intonation_score >= 0.8: feedback.append("Ngữ điệu rất tự nhiên và sinh động") if rhythm_score < 0.5: feedback.append("Nhịp điệu cần đều hơn - chú ý đến trọng âm của từ") elif rhythm_score >= 0.8: feedback.append("Nhịp điệu rất tốt") if stress_score < 0.5: feedback.append("Cần nhấn mạnh trọng âm rõ ràng hơn") elif stress_score >= 0.8: feedback.append("Trọng âm được nhấn rất tốt") return feedback def _estimate_syllables(self, text: str) -> int: """Estimate number of syllables in text""" vowels = "aeiouy" text = text.lower() syllable_count = 0 prev_was_vowel = False for char in text: if char in vowels: if not prev_was_vowel: syllable_count += 1 prev_was_vowel = True else: prev_was_vowel = False if text.endswith('e'): syllable_count -= 1 return max(1, syllable_count) def _empty_prosody_result(self) -> Dict: """Return empty prosody result for error cases""" return { "pace_score": 0.5, "intonation_score": 0.5, "rhythm_score": 0.5, "stress_score": 0.5, "overall_prosody": 0.5, "details": {}, "feedback": ["Không thể phân tích ngữ điệu"] } class EnhancedFeedbackGenerator: """Enhanced feedback generator with detailed analysis""" def generate_enhanced_feedback(self, overall_score: float, wrong_words: List[Dict], phoneme_comparisons: List[Dict], mode: AssessmentMode, prosody_analysis: Dict = None) -> List[str]: """Generate comprehensive feedback based on assessment mode""" feedback = [] # Overall score feedback if overall_score >= 0.9: feedback.append("Phát âm xuất sắc! Bạn đã làm rất tốt.") elif overall_score >= 0.8: feedback.append("Phát âm rất tốt! Chỉ còn một vài điểm nhỏ cần cải thiện.") elif overall_score >= 0.6: feedback.append("Phát âm khá tốt, còn một số điểm cần luyện tập thêm.") elif overall_score >= 0.4: feedback.append("Cần luyện tập thêm. Tập trung vào những từ được đánh dấu.") else: feedback.append("Hãy luyện tập chậm rãi và rõ ràng hơn.") # Mode-specific feedback if mode == AssessmentMode.WORD: feedback.extend(self._generate_word_mode_feedback(wrong_words, phoneme_comparisons)) elif mode == AssessmentMode.SENTENCE: feedback.extend(self._generate_sentence_mode_feedback(wrong_words, prosody_analysis)) # Common error patterns error_patterns = self._analyze_error_patterns(phoneme_comparisons) if error_patterns: feedback.extend(error_patterns) return feedback def _generate_word_mode_feedback(self, wrong_words: List[Dict], phoneme_comparisons: List[Dict]) -> List[str]: """Generate feedback specific to word mode""" feedback = [] if wrong_words: if len(wrong_words) == 1: word = wrong_words[0]["word"] feedback.append(f"Từ '{word}' cần luyện tập thêm") # Character-level feedback char_errors = wrong_words[0].get("character_errors", []) if char_errors: error_chars = [err.character for err in char_errors[:3]] feedback.append(f"Chú ý các âm: {', '.join(error_chars)}") else: word_list = [w["word"] for w in wrong_words[:3]] feedback.append(f"Các từ cần luyện: {', '.join(word_list)}") return feedback def _generate_sentence_mode_feedback(self, wrong_words: List[Dict], prosody_analysis: Dict) -> List[str]: """Generate feedback specific to sentence mode""" feedback = [] # Word-level feedback if wrong_words: if len(wrong_words) <= 2: word_list = [w["word"] for w in wrong_words] feedback.append(f"Cần cải thiện: {', '.join(word_list)}") else: feedback.append(f"Có {len(wrong_words)} từ cần luyện tập") # Prosody feedback if prosody_analysis and "feedback" in prosody_analysis: feedback.extend(prosody_analysis["feedback"][:2]) # Limit prosody feedback return feedback def _analyze_error_patterns(self, phoneme_comparisons: List[Dict]) -> List[str]: """Analyze common error patterns across phonemes""" feedback = [] # Count error types error_counts = defaultdict(int) difficult_phonemes = defaultdict(int) for comparison in phoneme_comparisons: if comparison["status"] in ["wrong", "substitution"]: phoneme = comparison["reference_phoneme"] difficult_phonemes[phoneme] += 1 error_counts[comparison["status"]] += 1 # Most problematic phoneme if difficult_phonemes: most_difficult = max(difficult_phonemes.items(), key=lambda x: x[1]) if most_difficult[1] >= 2: phoneme = most_difficult[0] phoneme_tips = { "θ": "Lưỡi giữa răng, thổi nhẹ", "ð": "Lưỡi giữa răng, rung dây thanh", "v": "Môi dưới chạm răng trên", "r": "Cuộn lưỡi nhẹ", "z": "Như 's' nhưng rung dây thanh" } if phoneme in phoneme_tips: feedback.append(f"Âm khó nhất /{phoneme}/: {phoneme_tips[phoneme]}") return feedback class ProductionPronunciationAssessor: """Production-ready pronunciation assessor - Enhanced version of the current system""" def __init__(self, onnx: bool = False, quantized: bool = False): """Initialize the production-ready pronunciation assessment system""" logger.info("Initializing Production Pronunciation Assessment System...") self.asr = EnhancedWav2Vec2CharacterASR(onnx=onnx, quantized=quantized) self.word_analyzer = EnhancedWordAnalyzer() self.prosody_analyzer = EnhancedProsodyAnalyzer() self.feedback_generator = EnhancedFeedbackGenerator() self.g2p = EnhancedG2P() logger.info("Production system initialization completed") def assess_pronunciation(self, audio_path: str, reference_text: str, mode: str = "auto") -> Dict: """ Main assessment function with enhanced features Args: audio_path: Path to audio file reference_text: Reference text to compare against mode: Assessment mode ("word", "sentence", "auto", or legacy modes) Returns: Enhanced assessment results with backward compatibility """ logger.info(f"Starting production assessment in {mode} mode...") start_time = time.time() try: # Normalize and validate mode assessment_mode = self._normalize_mode(mode, reference_text) logger.info(f"Using assessment mode: {assessment_mode.value}") # Step 1: Enhanced ASR transcription with features asr_result = self.asr.transcribe_with_features(audio_path) if not asr_result["character_transcript"]: return self._create_error_result("No speech detected in audio") # Step 2: Enhanced word analysis analysis_result = self.word_analyzer.analyze_words_enhanced( reference_text, asr_result["phoneme_representation"], assessment_mode ) # Step 3: Calculate overall score overall_score = self._calculate_overall_score(analysis_result["phoneme_differences"]) # Step 4: Prosody analysis for sentence mode prosody_analysis = {} if assessment_mode == AssessmentMode.SENTENCE: prosody_analysis = self.prosody_analyzer.analyze_prosody_enhanced( asr_result["audio_features"], reference_text ) # Step 5: Generate enhanced feedback feedback = self.feedback_generator.generate_enhanced_feedback( overall_score, analysis_result["wrong_words"], analysis_result["phoneme_differences"], assessment_mode, prosody_analysis ) # Step 6: Create phoneme comparison summary phoneme_comparison_summary = self._create_phoneme_comparison_summary( analysis_result["phoneme_pairs"] ) # Step 7: Assemble result with backward compatibility result = self._create_enhanced_result( asr_result, analysis_result, overall_score, feedback, prosody_analysis, phoneme_comparison_summary, assessment_mode ) # Add processing metadata processing_time = time.time() - start_time result["processing_info"] = { "processing_time": round(processing_time, 2), "mode": assessment_mode.value, "model_used": "Wav2Vec2-Enhanced", "onnx_enabled": self.asr.use_onnx, "confidence": asr_result["confidence"], "enhanced_features": True, "character_level_analysis": assessment_mode == AssessmentMode.WORD, "prosody_analysis": assessment_mode == AssessmentMode.SENTENCE } logger.info(f"Production assessment completed in {processing_time:.2f}s") return result except Exception as e: logger.error(f"Production assessment error: {e}") return self._create_error_result(f"Assessment failed: {str(e)}") def _normalize_mode(self, mode: str, reference_text: str) -> AssessmentMode: """Normalize mode parameter with backward compatibility""" # Legacy mode mapping legacy_mapping = { "normal": AssessmentMode.AUTO, "advanced": AssessmentMode.AUTO } if mode in legacy_mapping: normalized_mode = legacy_mapping[mode] logger.info(f"Mapped legacy mode '{mode}' to '{normalized_mode.value}'") mode = normalized_mode.value # Validate mode try: assessment_mode = AssessmentMode(mode) except ValueError: logger.warning(f"Invalid mode '{mode}', defaulting to AUTO") assessment_mode = AssessmentMode.AUTO # Auto-detect mode based on text length if assessment_mode == AssessmentMode.AUTO: word_count = len(reference_text.strip().split()) assessment_mode = AssessmentMode.WORD if word_count <= 3 else AssessmentMode.SENTENCE logger.info(f"Auto-detected mode: {assessment_mode.value} (word count: {word_count})") return assessment_mode def _calculate_overall_score(self, phoneme_comparisons: List[Dict]) -> float: """Calculate weighted overall score""" if not phoneme_comparisons: return 0.0 total_weighted_score = 0.0 total_weight = 0.0 for comparison in phoneme_comparisons: weight = comparison.get("difficulty", 0.5) # Use difficulty as weight score = comparison["score"] total_weighted_score += score * weight total_weight += weight return total_weighted_score / total_weight if total_weight > 0 else 0.0 def _create_phoneme_comparison_summary(self, phoneme_pairs: List[Dict]) -> Dict: """Create phoneme comparison summary statistics""" total = len(phoneme_pairs) if total == 0: return {"total_phonemes": 0, "accuracy_percentage": 0} correct = sum(1 for pair in phoneme_pairs if pair["match"]) substitutions = sum(1 for pair in phoneme_pairs if pair["type"] == "substitution") deletions = sum(1 for pair in phoneme_pairs if pair["type"] == "deletion") insertions = sum(1 for pair in phoneme_pairs if pair["type"] == "insertion") return { "total_phonemes": total, "correct": correct, "substitutions": substitutions, "deletions": deletions, "insertions": insertions, "accuracy_percentage": round((correct / total) * 100, 1), "error_rate": round(((substitutions + deletions + insertions) / total) * 100, 1) } def _create_enhanced_result(self, asr_result: Dict, analysis_result: Dict, overall_score: float, feedback: List[str], prosody_analysis: Dict, phoneme_summary: Dict, assessment_mode: AssessmentMode) -> Dict: """Create enhanced result with backward compatibility""" # Base result structure (backward compatible) result = { "transcript": asr_result["character_transcript"], "transcript_phonemes": asr_result["phoneme_representation"], "user_phonemes": asr_result["phoneme_representation"], "character_transcript": asr_result["character_transcript"], "overall_score": overall_score, "word_highlights": analysis_result["word_highlights"], "phoneme_differences": analysis_result["phoneme_differences"], "wrong_words": analysis_result["wrong_words"], "feedback": feedback, } # Enhanced features result.update({ "reference_phonemes": analysis_result["reference_phonemes"], "phoneme_pairs": analysis_result["phoneme_pairs"], "phoneme_comparison": phoneme_summary, "assessment_mode": assessment_mode.value, }) # Add prosody analysis for sentence mode if prosody_analysis: result["prosody_analysis"] = prosody_analysis # Add character-level analysis for word mode if assessment_mode == AssessmentMode.WORD: result["character_level_analysis"] = True # Add character errors to word highlights if available for word_highlight in result["word_highlights"]: if "character_errors" in word_highlight: # Convert CharacterError objects to dicts for JSON serialization char_errors = [] for error in word_highlight["character_errors"]: if isinstance(error, CharacterError): char_errors.append({ "character": error.character, "position": error.position, "error_type": error.error_type, "expected_sound": error.expected_sound, "actual_sound": error.actual_sound, "severity": error.severity, "color": error.color }) else: char_errors.append(error) word_highlight["character_errors"] = char_errors return result def _create_error_result(self, error_message: str) -> Dict: """Create error result structure""" return { "transcript": "", "transcript_phonemes": "", "user_phonemes": "", "character_transcript": "", "overall_score": 0.0, "word_highlights": [], "phoneme_differences": [], "wrong_words": [], "feedback": [f"Lỗi: {error_message}"], "error": error_message, "assessment_mode": "error", "processing_info": { "processing_time": 0, "mode": "error", "model_used": "Wav2Vec2-Enhanced", "confidence": 0.0, "enhanced_features": False } } def get_system_info(self) -> Dict: """Get comprehensive system information""" return { "version": "2.1.0-production", "name": "Production Pronunciation Assessment System", "modes": [mode.value for mode in AssessmentMode], "features": [ "Enhanced Levenshtein distance phoneme alignment", "Character-level error detection (word mode)", "Advanced prosody analysis (sentence mode)", "Vietnamese speaker-specific error patterns", "Real-time confidence scoring", "IPA phonetic representation with visualization", "Backward compatibility with legacy APIs", "Production-ready error handling" ], "model_info": { "asr_model": self.asr.model_name, "onnx_enabled": self.asr.use_onnx, "sample_rate": self.asr.sample_rate }, "assessment_modes": { "word": "Detailed character and phoneme level analysis for single words or short phrases", "sentence": "Word-level analysis with prosody evaluation for complete sentences", "auto": "Automatically selects mode based on text length (≤3 words = word mode)" } } # Backward compatibility wrapper class SimplePronunciationAssessor: """Backward compatible wrapper for the enhanced system""" def __init__(self): print("Initializing Simple Pronunciation Assessor (Enhanced)...") self.enhanced_assessor = ProductionPronunciationAssessor() print("Enhanced Simple Pronunciation Assessor initialization completed") def assess_pronunciation(self, audio_path: str, reference_text: str, mode: str = "normal") -> Dict: """ Backward compatible assessment function Args: audio_path: Path to audio file reference_text: Reference text to compare mode: Assessment mode (supports legacy modes) """ return self.enhanced_assessor.assess_pronunciation(audio_path, reference_text, mode) # Example usage if __name__ == "__main__": # Initialize production system system = ProductionPronunciationAssessor(onnx=False, quantized=False) # Example word mode assessment print("=== WORD MODE EXAMPLE ===") word_result = system.assess_pronunciation( audio_path="./hello_world.wav", reference_text="hello", mode="word" ) # print(f"Word mode result keys: {list(word_result.keys())}") print("Word result", word_result) # Example sentence mode assessment print("\n=== SENTENCE MODE EXAMPLE ===") sentence_result = system.assess_pronunciation( audio_path="./hello_how_are_you_today.wav", reference_text="Hello, how are you today?", mode="sentence" ) print(f"Sentence mode result keys: {list(sentence_result.keys())}") print("Sentence result", sentence_result) # Example auto mode assessment print("\n=== AUTO MODE EXAMPLE ===") auto_result = system.assess_pronunciation( audio_path="./hello_how_are_you_today.wav", reference_text="world", # Single word - should auto-select word mode mode="auto" ) print(f"Auto mode result: {auto_result['assessment_mode']}") print("Auto result", auto_result) # Backward compatibility test print("\n=== BACKWARD COMPATIBILITY TEST ===") legacy_assessor = SimplePronunciationAssessor() legacy_result = legacy_assessor.assess_pronunciation( audio_path="./hello_world.wav", reference_text="pronunciation", mode="normal" # Legacy mode ) print(f"Legacy mode result: {legacy_result}") print(f"Legacy mode mapped to: {legacy_result.get('assessment_mode', 'N/A')}") # System info print(f"\n=== SYSTEM INFO ===") system_info = system.get_system_info() print(f"System version: {system_info['version']}") print(f"Available modes: {system_info['modes']}") print(f"Key features: {len(system_info['features'])} enhanced features")