Run_code_api / evalution.py
ABAO77's picture
Implement enhanced pronunciation assessment system with Wav2Vec2 support
aa2c910
raw
history blame
58.2 kB
from typing import List, Dict, Tuple, Optional
import numpy as np
import librosa
import nltk
import eng_to_ipa as ipa
import re
from collections import defaultdict
from loguru import logger
import time
import Levenshtein
from dataclasses import dataclass
from enum import Enum
from src.AI_Models.wave2vec_inference import (
Wave2Vec2Inference,
Wave2Vec2ONNXInference,
export_to_onnx,
)
# Download required NLTK data
try:
nltk.download("cmudict", quiet=True)
from nltk.corpus import cmudict
except:
print("Warning: NLTK data not available")
class AssessmentMode(Enum):
WORD = "word"
SENTENCE = "sentence"
AUTO = "auto"
class ErrorType(Enum):
CORRECT = "correct"
SUBSTITUTION = "substitution"
DELETION = "deletion"
INSERTION = "insertion"
ACCEPTABLE = "acceptable"
@dataclass
class CharacterError:
"""Character-level error information for UI mapping"""
character: str
position: int
error_type: str
expected_sound: str
actual_sound: str
severity: float
color: str
class EnhancedWav2Vec2CharacterASR:
"""Enhanced Wav2Vec2 ASR with prosody analysis support"""
def __init__(
self,
model_name: str = "facebook/wav2vec2-large-960h-lv60-self",
onnx: bool = False,
quantized: bool = False,
):
self.use_onnx = onnx
self.sample_rate = 16000
self.model_name = model_name
if onnx:
import os
model_path = f"wav2vec2-large-960h-lv60-self{'.quant' if quantized else ''}.onnx"
if not os.path.exists(model_path):
export_to_onnx(model_name, quantize=quantized)
self.model = (
Wave2Vec2Inference(model_name)
if not onnx
else Wave2Vec2ONNXInference(model_name, model_path)
)
def transcribe_with_features(self, audio_path: str) -> Dict:
"""Enhanced transcription with audio features for prosody analysis"""
try:
start_time = time.time()
# Basic transcription
character_transcript = self.model.file_to_text(audio_path)
character_transcript = self._clean_character_transcript(character_transcript)
# Convert to phonemes
phoneme_representation = self._characters_to_phoneme_representation(character_transcript)
# Extract audio features for prosody
audio_features = self._extract_enhanced_audio_features(audio_path)
logger.info(f"Enhanced transcription time: {time.time() - start_time:.2f}s")
return {
"character_transcript": character_transcript,
"phoneme_representation": phoneme_representation,
"audio_features": audio_features,
"confidence": self._estimate_confidence(character_transcript)
}
except Exception as e:
logger.error(f"Enhanced ASR error: {e}")
return self._empty_result()
def _extract_enhanced_audio_features(self, audio_path: str) -> Dict:
"""Extract comprehensive audio features for prosody analysis"""
try:
y, sr = librosa.load(audio_path, sr=self.sample_rate)
duration = len(y) / sr
# Pitch analysis
pitches, magnitudes = librosa.piptrack(y=y, sr=sr)
pitch_values = []
for t in range(pitches.shape[1]):
index = magnitudes[:, t].argmax()
pitch = pitches[index, t]
if pitch > 0:
pitch_values.append(pitch)
# Rhythm and timing features
tempo, beats = librosa.beat.beat_track(y=y, sr=sr)
# Intensity features
rms = librosa.feature.rms(y=y)[0]
zcr = librosa.feature.zero_crossing_rate(y)[0]
# Spectral features
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
return {
"duration": duration,
"pitch": {
"values": pitch_values,
"mean": np.mean(pitch_values) if pitch_values else 0,
"std": np.std(pitch_values) if pitch_values else 0,
"range": np.max(pitch_values) - np.min(pitch_values) if pitch_values else 0,
"cv": np.std(pitch_values) / np.mean(pitch_values) if pitch_values and np.mean(pitch_values) > 0 else 0
},
"rhythm": {
"tempo": tempo,
"beats_per_second": len(beats) / duration if duration > 0 else 0
},
"intensity": {
"rms_mean": np.mean(rms),
"rms_std": np.std(rms),
"zcr_mean": np.mean(zcr)
},
"spectral": {
"centroid_mean": np.mean(spectral_centroids),
"centroid_std": np.std(spectral_centroids)
}
}
except Exception as e:
logger.error(f"Audio feature extraction error: {e}")
return {"duration": 0, "error": str(e)}
def _clean_character_transcript(self, transcript: str) -> str:
"""Clean and standardize character transcript"""
logger.info(f"Raw transcript before cleaning: {transcript}")
cleaned = re.sub(r'\s+', ' ', transcript)
return cleaned.strip().lower()
def _characters_to_phoneme_representation(self, text: str) -> str:
"""Convert character-based transcript to phoneme representation"""
if not text:
return ""
words = text.split()
phoneme_words = []
g2p = EnhancedG2P()
for word in words:
try:
if g2p:
word_phonemes = g2p.word_to_phonemes(word)
phoneme_words.extend(word_phonemes)
else:
phoneme_words.extend(self._simple_letter_to_phoneme(word))
except:
phoneme_words.extend(self._simple_letter_to_phoneme(word))
return " ".join(phoneme_words)
def _simple_letter_to_phoneme(self, word: str) -> List[str]:
"""Fallback letter-to-phoneme conversion"""
letter_to_phoneme = {
"a": "æ", "b": "b", "c": "k", "d": "d", "e": "ɛ", "f": "f",
"g": "ɡ", "h": "h", "i": "ɪ", "j": "dʒ", "k": "k", "l": "l",
"m": "m", "n": "n", "o": "ʌ", "p": "p", "q": "k", "r": "r",
"s": "s", "t": "t", "u": "ʌ", "v": "v", "w": "w", "x": "ks",
"y": "j", "z": "z"
}
return [letter_to_phoneme.get(letter, letter) for letter in word.lower() if letter in letter_to_phoneme]
def _estimate_confidence(self, transcript: str) -> float:
"""Estimate transcription confidence"""
if not transcript or len(transcript.strip()) < 2:
return 0.0
repeated_chars = len(re.findall(r'(.)\1{2,}', transcript))
return max(0.0, 1.0 - (repeated_chars * 0.2))
def _empty_result(self) -> Dict:
"""Empty result for error cases"""
return {
"character_transcript": "",
"phoneme_representation": "",
"audio_features": {"duration": 0},
"confidence": 0.0
}
class EnhancedG2P:
"""Enhanced Grapheme-to-Phoneme converter with visualization support"""
def __init__(self):
try:
self.cmu_dict = cmudict.dict()
except:
self.cmu_dict = {}
logger.warning("CMU dictionary not available")
# Vietnamese speaker substitution patterns (enhanced)
self.vn_substitutions = {
"θ": ["f", "s", "t", "d"],
"ð": ["d", "z", "v", "t"],
"v": ["w", "f", "b"],
"w": ["v", "b"],
"r": ["l", "n"],
"l": ["r", "n"],
"z": ["s", "j"],
"ʒ": ["ʃ", "z", "s"],
"ʃ": ["s", "ʒ"],
"ŋ": ["n", "m"],
"tʃ": ["ʃ", "s", "k"],
"dʒ": ["ʒ", "j", "g"],
"æ": ["ɛ", "a"],
"ɪ": ["i"],
"ʊ": ["u"]
}
# Difficulty scores for Vietnamese speakers
self.difficulty_scores = {
"θ": 0.9, "ð": 0.9, "v": 0.8, "z": 0.8, "ʒ": 0.9,
"r": 0.7, "l": 0.6, "w": 0.5, "æ": 0.7, "ɪ": 0.6,
"ʊ": 0.6, "ŋ": 0.3, "f": 0.2, "s": 0.2, "ʃ": 0.5,
"tʃ": 0.4, "dʒ": 0.5
}
def word_to_phonemes(self, word: str) -> List[str]:
"""Convert word to phoneme list"""
word_lower = word.lower().strip()
if word_lower in self.cmu_dict:
cmu_phonemes = self.cmu_dict[word_lower][0]
return self._convert_cmu_to_ipa(cmu_phonemes)
else:
return self._estimate_phonemes(word_lower)
def get_phoneme_string(self, text: str) -> str:
"""Get space-separated phoneme string"""
words = self._clean_text(text).split()
all_phonemes = []
for word in words:
if word:
phonemes = self.word_to_phonemes(word)
all_phonemes.extend(phonemes)
return " ".join(all_phonemes)
def text_to_phonemes(self, text: str) -> List[Dict]:
"""Convert text to phoneme sequence with visualization data"""
words = self._clean_text(text).split()
phoneme_sequence = []
for word in words:
word_phonemes = self.word_to_phonemes(word)
phoneme_sequence.append({
"word": word,
"phonemes": word_phonemes,
"ipa": self._get_ipa(word),
"phoneme_string": " ".join(word_phonemes),
"visualization": self._create_phoneme_visualization(word_phonemes)
})
return phoneme_sequence
def _convert_cmu_to_ipa(self, cmu_phonemes: List[str]) -> List[str]:
"""Convert CMU phonemes to IPA"""
cmu_to_ipa = {
"AA": "ɑ", "AE": "æ", "AH": "ʌ", "AO": "ɔ", "AW": "aʊ",
"AY": "aɪ", "EH": "ɛ", "ER": "ɝ", "EY": "eɪ", "IH": "ɪ",
"IY": "i", "OW": "oʊ", "OY": "ɔɪ", "UH": "ʊ", "UW": "u",
"B": "b", "CH": "tʃ", "D": "d", "DH": "ð", "F": "f",
"G": "ɡ", "HH": "h", "JH": "dʒ", "K": "k", "L": "l",
"M": "m", "N": "n", "NG": "ŋ", "P": "p", "R": "r",
"S": "s", "SH": "ʃ", "T": "t", "TH": "θ", "V": "v",
"W": "w", "Y": "j", "Z": "z", "ZH": "ʒ"
}
ipa_phonemes = []
for phoneme in cmu_phonemes:
clean_phoneme = re.sub(r'[0-9]', '', phoneme)
ipa_phoneme = cmu_to_ipa.get(clean_phoneme, clean_phoneme.lower())
ipa_phonemes.append(ipa_phoneme)
return ipa_phonemes
def _estimate_phonemes(self, word: str) -> List[str]:
"""Estimate phonemes for unknown words"""
phoneme_map = {
"ch": "tʃ", "sh": "ʃ", "th": "θ", "ph": "f", "ck": "k",
"ng": "ŋ", "qu": "kw", "a": "æ", "e": "ɛ", "i": "ɪ",
"o": "ʌ", "u": "ʌ", "b": "b", "c": "k", "d": "d",
"f": "f", "g": "ɡ", "h": "h", "j": "dʒ", "k": "k",
"l": "l", "m": "m", "n": "n", "p": "p", "r": "r",
"s": "s", "t": "t", "v": "v", "w": "w", "x": "ks",
"y": "j", "z": "z"
}
phonemes = []
i = 0
while i < len(word):
if i <= len(word) - 2:
two_char = word[i:i+2]
if two_char in phoneme_map:
phonemes.append(phoneme_map[two_char])
i += 2
continue
char = word[i]
if char in phoneme_map:
phonemes.append(phoneme_map[char])
i += 1
return phonemes
def _clean_text(self, text: str) -> str:
"""Clean text for processing"""
text = re.sub(r"[^\w\s']", " ", text)
text = re.sub(r'\s+', ' ', text)
return text.lower().strip()
def _get_ipa(self, word: str) -> str:
"""Get IPA transcription"""
try:
return ipa.convert(word)
except:
return f"/{word}/"
def _create_phoneme_visualization(self, phonemes: List[str]) -> List[Dict]:
"""Create visualization data for phonemes"""
visualization = []
for phoneme in phonemes:
color_category = self._get_phoneme_color_category(phoneme)
visualization.append({
"phoneme": phoneme,
"color_category": color_category,
"description": self._get_phoneme_description(phoneme),
"difficulty": self.difficulty_scores.get(phoneme, 0.3)
})
return visualization
def _get_phoneme_color_category(self, phoneme: str) -> str:
"""Categorize phonemes by color for visualization"""
vowel_phonemes = {"ɑ", "æ", "ʌ", "ɔ", "aʊ", "aɪ", "ɛ", "ɝ", "eɪ", "ɪ", "i", "oʊ", "ɔɪ", "ʊ", "u"}
difficult_consonants = {"θ", "ð", "v", "z", "ʒ", "r", "w"}
if phoneme in vowel_phonemes:
return "vowel"
elif phoneme in difficult_consonants:
return "difficult"
else:
return "consonant"
def _get_phoneme_description(self, phoneme: str) -> str:
"""Get description for a phoneme"""
descriptions = {
"θ": "Voiceless dental fricative (like 'th' in 'think')",
"ð": "Voiced dental fricative (like 'th' in 'this')",
"v": "Voiced labiodental fricative (like 'v' in 'van')",
"z": "Voiced alveolar fricative (like 'z' in 'zip')",
"ʒ": "Voiced postalveolar fricative (like 's' in 'measure')",
"r": "Alveolar approximant (like 'r' in 'red')",
"w": "Labial-velar approximant (like 'w' in 'wet')",
"æ": "Near-open front unrounded vowel (like 'a' in 'cat')",
"ɪ": "Near-close near-front unrounded vowel (like 'i' in 'sit')",
"ʊ": "Near-close near-back rounded vowel (like 'u' in 'put')"
}
return descriptions.get(phoneme, f"Phoneme: {phoneme}")
def is_acceptable_substitution(self, reference: str, predicted: str) -> bool:
"""Check if substitution is acceptable for Vietnamese speakers"""
acceptable = self.vn_substitutions.get(reference, [])
return predicted in acceptable
def get_difficulty_score(self, phoneme: str) -> float:
"""Get difficulty score for phoneme"""
return self.difficulty_scores.get(phoneme, 0.3)
class AdvancedPhonemeComparator:
"""Enhanced phoneme comparator using Levenshtein distance"""
def __init__(self):
self.g2p = EnhancedG2P()
def compare_with_levenshtein(self, reference: str, predicted: str) -> List[Dict]:
"""Compare phonemes using Levenshtein distance for accurate alignment"""
ref_phones = reference.split() if reference else []
pred_phones = predicted.split() if predicted else []
if not ref_phones:
return []
# Use Levenshtein editops for precise alignment
ops = Levenshtein.editops(ref_phones, pred_phones)
comparisons = []
ref_idx = 0
pred_idx = 0
# Process equal parts first
for op_type, ref_pos, pred_pos in ops:
# Add equal characters before this operation
while ref_idx < ref_pos and pred_idx < pred_pos:
comparison = self._create_comparison(
ref_phones[ref_idx], pred_phones[pred_idx],
ErrorType.CORRECT, 1.0, len(comparisons)
)
comparisons.append(comparison)
ref_idx += 1
pred_idx += 1
# Process the operation
if op_type == 'replace':
ref_phoneme = ref_phones[ref_pos]
pred_phoneme = pred_phones[pred_pos]
if self.g2p.is_acceptable_substitution(ref_phoneme, pred_phoneme):
error_type = ErrorType.ACCEPTABLE
score = 0.7
else:
error_type = ErrorType.SUBSTITUTION
score = 0.2
comparison = self._create_comparison(
ref_phoneme, pred_phoneme, error_type, score, len(comparisons)
)
comparisons.append(comparison)
ref_idx = ref_pos + 1
pred_idx = pred_pos + 1
elif op_type == 'delete':
comparison = self._create_comparison(
ref_phones[ref_pos], "", ErrorType.DELETION, 0.0, len(comparisons)
)
comparisons.append(comparison)
ref_idx = ref_pos + 1
elif op_type == 'insert':
comparison = self._create_comparison(
"", pred_phones[pred_pos], ErrorType.INSERTION, 0.0, len(comparisons)
)
comparisons.append(comparison)
pred_idx = pred_pos + 1
# Add remaining equal characters
while ref_idx < len(ref_phones) and pred_idx < len(pred_phones):
comparison = self._create_comparison(
ref_phones[ref_idx], pred_phones[pred_idx],
ErrorType.CORRECT, 1.0, len(comparisons)
)
comparisons.append(comparison)
ref_idx += 1
pred_idx += 1
return comparisons
def _create_comparison(self, ref_phoneme: str, pred_phoneme: str,
error_type: ErrorType, score: float, position: int) -> Dict:
"""Create comparison dictionary"""
return {
"position": position,
"reference_phoneme": ref_phoneme,
"learner_phoneme": pred_phoneme,
"status": error_type.value,
"score": score,
"difficulty": self.g2p.get_difficulty_score(ref_phoneme),
"error_type": error_type.value
}
class EnhancedWordAnalyzer:
"""Enhanced word analyzer with character-level error mapping"""
def __init__(self):
self.g2p = EnhancedG2P()
self.comparator = AdvancedPhonemeComparator()
def analyze_words_enhanced(self, reference_text: str, learner_phonemes: str,
mode: AssessmentMode) -> Dict:
"""Enhanced word analysis with character-level mapping"""
# Get reference phonemes by word
reference_words = self.g2p.text_to_phonemes(reference_text)
# Get overall phoneme comparison using Levenshtein
reference_phoneme_string = self.g2p.get_phoneme_string(reference_text)
phoneme_comparisons = self.comparator.compare_with_levenshtein(
reference_phoneme_string, learner_phonemes
)
# Create enhanced word highlights
word_highlights = self._create_enhanced_word_highlights(
reference_words, phoneme_comparisons, mode
)
# Identify wrong words with character-level errors
wrong_words = self._identify_wrong_words_enhanced(word_highlights, phoneme_comparisons)
return {
"word_highlights": word_highlights,
"phoneme_differences": phoneme_comparisons,
"wrong_words": wrong_words,
"reference_phonemes": reference_phoneme_string,
"phoneme_pairs": self._create_phoneme_pairs(reference_phoneme_string, learner_phonemes)
}
def _create_enhanced_word_highlights(self, reference_words: List[Dict],
phoneme_comparisons: List[Dict],
mode: AssessmentMode) -> List[Dict]:
"""Create enhanced word highlights with character-level error mapping"""
word_highlights = []
phoneme_index = 0
for word_data in reference_words:
word = word_data["word"]
word_phonemes = word_data["phonemes"]
num_phonemes = len(word_phonemes)
# Get phoneme scores for this word
word_phoneme_scores = []
word_comparisons = []
for j in range(num_phonemes):
if phoneme_index + j < len(phoneme_comparisons):
comparison = phoneme_comparisons[phoneme_index + j]
word_phoneme_scores.append(comparison["score"])
word_comparisons.append(comparison)
# Calculate word score
word_score = np.mean(word_phoneme_scores) if word_phoneme_scores else 0.0
# Map phoneme errors to character positions (enhanced for word mode)
character_errors = []
if mode == AssessmentMode.WORD:
character_errors = self._map_phonemes_to_characters(word, word_comparisons)
# Create enhanced word highlight
highlight = {
"word": word,
"score": float(word_score),
"status": self._get_word_status(word_score),
"color": self._get_word_color(word_score),
"phonemes": word_phonemes,
"ipa": word_data["ipa"],
"phoneme_scores": word_phoneme_scores,
"phoneme_start_index": phoneme_index,
"phoneme_end_index": phoneme_index + num_phonemes - 1,
"phoneme_visualization": word_data["visualization"],
"character_errors": character_errors, # New feature
"detailed_analysis": mode == AssessmentMode.WORD # Flag for UI
}
word_highlights.append(highlight)
phoneme_index += num_phonemes
return word_highlights
def _map_phonemes_to_characters(self, word: str, phoneme_comparisons: List[Dict]) -> List[CharacterError]:
"""Map phoneme errors to character positions in word"""
character_errors = []
# Simple mapping strategy: distribute phonemes across characters
if not phoneme_comparisons or not word:
return character_errors
chars_per_phoneme = len(word) / len(phoneme_comparisons)
for i, comparison in enumerate(phoneme_comparisons):
if comparison["status"] in ["substitution", "deletion", "wrong"]:
# Calculate character position
char_pos = min(int(i * chars_per_phoneme), len(word) - 1)
severity = 1.0 - comparison["score"]
color = self._get_error_color(severity)
error = CharacterError(
character=word[char_pos],
position=char_pos,
error_type=comparison["status"],
expected_sound=comparison["reference_phoneme"],
actual_sound=comparison["learner_phoneme"],
severity=severity,
color=color
)
character_errors.append(error)
return character_errors
def _get_error_color(self, severity: float) -> str:
"""Get color code for character errors"""
if severity >= 0.8:
return "#ef4444" # Red - severe error
elif severity >= 0.6:
return "#f97316" # Orange - moderate error
elif severity >= 0.4:
return "#eab308" # Yellow - mild error
else:
return "#84cc16" # Light green - minor error
def _identify_wrong_words_enhanced(self, word_highlights: List[Dict],
phoneme_comparisons: List[Dict]) -> List[Dict]:
"""Enhanced wrong word identification with detailed error analysis"""
wrong_words = []
for word_highlight in word_highlights:
if word_highlight["score"] < 0.6:
start_idx = word_highlight["phoneme_start_index"]
end_idx = word_highlight["phoneme_end_index"]
wrong_phonemes = []
missing_phonemes = []
for i in range(start_idx, min(end_idx + 1, len(phoneme_comparisons))):
comparison = phoneme_comparisons[i]
if comparison["status"] in ["wrong", "substitution"]:
wrong_phonemes.append({
"expected": comparison["reference_phoneme"],
"actual": comparison["learner_phoneme"],
"difficulty": comparison["difficulty"],
"description": self.g2p._get_phoneme_description(comparison["reference_phoneme"])
})
elif comparison["status"] in ["missing", "deletion"]:
missing_phonemes.append({
"phoneme": comparison["reference_phoneme"],
"difficulty": comparison["difficulty"],
"description": self.g2p._get_phoneme_description(comparison["reference_phoneme"])
})
wrong_word = {
"word": word_highlight["word"],
"score": word_highlight["score"],
"expected_phonemes": word_highlight["phonemes"],
"ipa": word_highlight["ipa"],
"wrong_phonemes": wrong_phonemes,
"missing_phonemes": missing_phonemes,
"tips": self._get_enhanced_vietnamese_tips(wrong_phonemes, missing_phonemes),
"phoneme_visualization": word_highlight["phoneme_visualization"],
"character_errors": word_highlight.get("character_errors", [])
}
wrong_words.append(wrong_word)
return wrong_words
def _create_phoneme_pairs(self, reference: str, learner: str) -> List[Dict]:
"""Create phoneme pairs for visualization"""
ref_phones = reference.split() if reference else []
learner_phones = learner.split() if learner else []
# Use difflib for alignment visualization
import difflib
matcher = difflib.SequenceMatcher(None, ref_phones, learner_phones)
pairs = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal':
for k in range(i2 - i1):
pairs.append({
"reference": ref_phones[i1 + k],
"learner": learner_phones[j1 + k],
"match": True,
"type": "correct"
})
elif tag == 'replace':
max_len = max(i2 - i1, j2 - j1)
for k in range(max_len):
ref_phoneme = ref_phones[i1 + k] if i1 + k < i2 else ""
learner_phoneme = learner_phones[j1 + k] if j1 + k < j2 else ""
pairs.append({
"reference": ref_phoneme,
"learner": learner_phoneme,
"match": False,
"type": "substitution"
})
elif tag == 'delete':
for k in range(i1, i2):
pairs.append({
"reference": ref_phones[k],
"learner": "",
"match": False,
"type": "deletion"
})
elif tag == 'insert':
for k in range(j1, j2):
pairs.append({
"reference": "",
"learner": learner_phones[k],
"match": False,
"type": "insertion"
})
return pairs
def _get_word_status(self, score: float) -> str:
"""Get word status from score"""
if score >= 0.8:
return "excellent"
elif score >= 0.6:
return "good"
elif score >= 0.4:
return "needs_practice"
else:
return "poor"
def _get_word_color(self, score: float) -> str:
"""Get color for word highlighting"""
if score >= 0.8:
return "#22c55e" # Green
elif score >= 0.6:
return "#84cc16" # Light green
elif score >= 0.4:
return "#eab308" # Yellow
else:
return "#ef4444" # Red
def _get_enhanced_vietnamese_tips(self, wrong_phonemes: List[Dict],
missing_phonemes: List[Dict]) -> List[str]:
"""Enhanced Vietnamese-specific pronunciation tips"""
tips = []
vietnamese_tips = {
"θ": "Đặt lưỡi giữa răng trên và dưới, thổi nhẹ (think, three)",
"ð": "Giống θ nhưng rung dây thanh âm (this, that)",
"v": "Chạm môi dưới vào răng trên, không dùng cả hai môi như tiếng Việt",
"r": "Cuộn lưỡi nhưng không chạm vào vòm miệng, không lăn lưỡi",
"l": "Đầu lưỡi chạm vào vòm miệng sau răng",
"z": "Giống âm 's' nhưng có rung dây thanh âm",
"ʒ": "Giống âm 'ʃ' (sh) nhưng có rung dây thanh âm",
"w": "Tròn môi như âm 'u', không dùng răng như âm 'v'",
"æ": "Mở miệng rộng hơn khi phát âm 'a'",
"ɪ": "Âm 'i' ngắn, không kéo dài như tiếng Việt"
}
for wrong in wrong_phonemes:
expected = wrong["expected"]
if expected in vietnamese_tips:
tips.append(f"Âm /{expected}/: {vietnamese_tips[expected]}")
for missing in missing_phonemes:
phoneme = missing["phoneme"]
if phoneme in vietnamese_tips:
tips.append(f"Thiếu âm /{phoneme}/: {vietnamese_tips[phoneme]}")
return tips
class EnhancedProsodyAnalyzer:
"""Enhanced prosody analyzer for sentence-level assessment"""
def __init__(self):
# Expected values for English prosody
self.expected_speech_rate = 4.0 # syllables per second
self.expected_pitch_range = 100 # Hz
self.expected_pitch_cv = 0.3 # coefficient of variation
def analyze_prosody_enhanced(self, audio_features: Dict, reference_text: str) -> Dict:
"""Enhanced prosody analysis with detailed scoring"""
if "error" in audio_features:
return self._empty_prosody_result()
duration = audio_features.get("duration", 1)
pitch_data = audio_features.get("pitch", {})
rhythm_data = audio_features.get("rhythm", {})
intensity_data = audio_features.get("intensity", {})
# Calculate syllables
num_syllables = self._estimate_syllables(reference_text)
actual_speech_rate = num_syllables / duration if duration > 0 else 0
# Calculate individual prosody scores
pace_score = self._calculate_pace_score(actual_speech_rate)
intonation_score = self._calculate_intonation_score(pitch_data)
rhythm_score = self._calculate_rhythm_score(rhythm_data, intensity_data)
stress_score = self._calculate_stress_score(pitch_data, intensity_data)
# Overall prosody score
overall_prosody = (pace_score + intonation_score + rhythm_score + stress_score) / 4
# Generate prosody feedback
feedback = self._generate_prosody_feedback(
pace_score, intonation_score, rhythm_score, stress_score,
actual_speech_rate, pitch_data
)
return {
"pace_score": pace_score,
"intonation_score": intonation_score,
"rhythm_score": rhythm_score,
"stress_score": stress_score,
"overall_prosody": overall_prosody,
"details": {
"speech_rate": actual_speech_rate,
"expected_speech_rate": self.expected_speech_rate,
"syllable_count": num_syllables,
"duration": duration,
"pitch_analysis": pitch_data,
"rhythm_analysis": rhythm_data,
"intensity_analysis": intensity_data
},
"feedback": feedback
}
def _calculate_pace_score(self, actual_rate: float) -> float:
"""Calculate pace score based on speech rate"""
if self.expected_speech_rate == 0:
return 0.5
ratio = actual_rate / self.expected_speech_rate
if 0.8 <= ratio <= 1.2:
return 1.0
elif 0.6 <= ratio < 0.8 or 1.2 < ratio <= 1.5:
return 0.7
elif 0.4 <= ratio < 0.6 or 1.5 < ratio <= 2.0:
return 0.4
else:
return 0.1
def _calculate_intonation_score(self, pitch_data: Dict) -> float:
"""Calculate intonation score based on pitch variation"""
pitch_range = pitch_data.get("range", 0)
if self.expected_pitch_range == 0:
return 0.5
ratio = pitch_range / self.expected_pitch_range
if 0.7 <= ratio <= 1.3:
return 1.0
elif 0.5 <= ratio < 0.7 or 1.3 < ratio <= 1.8:
return 0.7
elif 0.3 <= ratio < 0.5 or 1.8 < ratio <= 2.5:
return 0.4
else:
return 0.2
def _calculate_rhythm_score(self, rhythm_data: Dict, intensity_data: Dict) -> float:
"""Calculate rhythm score based on tempo and intensity patterns"""
tempo = rhythm_data.get("tempo", 120)
intensity_std = intensity_data.get("rms_std", 0)
intensity_mean = intensity_data.get("rms_mean", 0)
# Tempo score (60-180 BPM is good for speech)
if 60 <= tempo <= 180:
tempo_score = 1.0
elif 40 <= tempo < 60 or 180 < tempo <= 220:
tempo_score = 0.6
else:
tempo_score = 0.3
# Intensity consistency score
if intensity_mean > 0:
intensity_consistency = max(0, 1.0 - (intensity_std / intensity_mean))
else:
intensity_consistency = 0.5
return (tempo_score + intensity_consistency) / 2
def _calculate_stress_score(self, pitch_data: Dict, intensity_data: Dict) -> float:
"""Calculate stress score based on pitch and intensity variation"""
pitch_cv = pitch_data.get("cv", 0)
intensity_std = intensity_data.get("rms_std", 0)
intensity_mean = intensity_data.get("rms_mean", 0)
# Pitch coefficient of variation score
if 0.2 <= pitch_cv <= 0.4:
pitch_score = 1.0
elif 0.1 <= pitch_cv < 0.2 or 0.4 < pitch_cv <= 0.6:
pitch_score = 0.7
else:
pitch_score = 0.4
# Intensity variation score
if intensity_mean > 0:
intensity_cv = intensity_std / intensity_mean
if 0.1 <= intensity_cv <= 0.3:
intensity_score = 1.0
elif 0.05 <= intensity_cv < 0.1 or 0.3 < intensity_cv <= 0.5:
intensity_score = 0.7
else:
intensity_score = 0.4
else:
intensity_score = 0.5
return (pitch_score + intensity_score) / 2
def _generate_prosody_feedback(self, pace_score: float, intonation_score: float,
rhythm_score: float, stress_score: float,
speech_rate: float, pitch_data: Dict) -> List[str]:
"""Generate detailed prosody feedback"""
feedback = []
if pace_score < 0.5:
if speech_rate < self.expected_speech_rate * 0.8:
feedback.append("Tốc độ nói hơi chậm, thử nói nhanh hơn một chút")
else:
feedback.append("Tốc độ nói hơi nhanh, thử nói chậm lại để rõ ràng hơn")
elif pace_score >= 0.8:
feedback.append("Tốc độ nói rất tự nhiên")
if intonation_score < 0.5:
feedback.append("Cần cải thiện ngữ điệu - thay đổi cao độ giọng nhiều hơn")
elif intonation_score >= 0.8:
feedback.append("Ngữ điệu rất tự nhiên và sinh động")
if rhythm_score < 0.5:
feedback.append("Nhịp điệu cần đều hơn - chú ý đến trọng âm của từ")
elif rhythm_score >= 0.8:
feedback.append("Nhịp điệu rất tốt")
if stress_score < 0.5:
feedback.append("Cần nhấn mạnh trọng âm rõ ràng hơn")
elif stress_score >= 0.8:
feedback.append("Trọng âm được nhấn rất tốt")
return feedback
def _estimate_syllables(self, text: str) -> int:
"""Estimate number of syllables in text"""
vowels = "aeiouy"
text = text.lower()
syllable_count = 0
prev_was_vowel = False
for char in text:
if char in vowels:
if not prev_was_vowel:
syllable_count += 1
prev_was_vowel = True
else:
prev_was_vowel = False
if text.endswith('e'):
syllable_count -= 1
return max(1, syllable_count)
def _empty_prosody_result(self) -> Dict:
"""Return empty prosody result for error cases"""
return {
"pace_score": 0.5,
"intonation_score": 0.5,
"rhythm_score": 0.5,
"stress_score": 0.5,
"overall_prosody": 0.5,
"details": {},
"feedback": ["Không thể phân tích ngữ điệu"]
}
class EnhancedFeedbackGenerator:
"""Enhanced feedback generator with detailed analysis"""
def generate_enhanced_feedback(self, overall_score: float, wrong_words: List[Dict],
phoneme_comparisons: List[Dict], mode: AssessmentMode,
prosody_analysis: Dict = None) -> List[str]:
"""Generate comprehensive feedback based on assessment mode"""
feedback = []
# Overall score feedback
if overall_score >= 0.9:
feedback.append("Phát âm xuất sắc! Bạn đã làm rất tốt.")
elif overall_score >= 0.8:
feedback.append("Phát âm rất tốt! Chỉ còn một vài điểm nhỏ cần cải thiện.")
elif overall_score >= 0.6:
feedback.append("Phát âm khá tốt, còn một số điểm cần luyện tập thêm.")
elif overall_score >= 0.4:
feedback.append("Cần luyện tập thêm. Tập trung vào những từ được đánh dấu.")
else:
feedback.append("Hãy luyện tập chậm rãi và rõ ràng hơn.")
# Mode-specific feedback
if mode == AssessmentMode.WORD:
feedback.extend(self._generate_word_mode_feedback(wrong_words, phoneme_comparisons))
elif mode == AssessmentMode.SENTENCE:
feedback.extend(self._generate_sentence_mode_feedback(wrong_words, prosody_analysis))
# Common error patterns
error_patterns = self._analyze_error_patterns(phoneme_comparisons)
if error_patterns:
feedback.extend(error_patterns)
return feedback
def _generate_word_mode_feedback(self, wrong_words: List[Dict],
phoneme_comparisons: List[Dict]) -> List[str]:
"""Generate feedback specific to word mode"""
feedback = []
if wrong_words:
if len(wrong_words) == 1:
word = wrong_words[0]["word"]
feedback.append(f"Từ '{word}' cần luyện tập thêm")
# Character-level feedback
char_errors = wrong_words[0].get("character_errors", [])
if char_errors:
error_chars = [err.character for err in char_errors[:3]]
feedback.append(f"Chú ý các âm: {', '.join(error_chars)}")
else:
word_list = [w["word"] for w in wrong_words[:3]]
feedback.append(f"Các từ cần luyện: {', '.join(word_list)}")
return feedback
def _generate_sentence_mode_feedback(self, wrong_words: List[Dict],
prosody_analysis: Dict) -> List[str]:
"""Generate feedback specific to sentence mode"""
feedback = []
# Word-level feedback
if wrong_words:
if len(wrong_words) <= 2:
word_list = [w["word"] for w in wrong_words]
feedback.append(f"Cần cải thiện: {', '.join(word_list)}")
else:
feedback.append(f"Có {len(wrong_words)} từ cần luyện tập")
# Prosody feedback
if prosody_analysis and "feedback" in prosody_analysis:
feedback.extend(prosody_analysis["feedback"][:2]) # Limit prosody feedback
return feedback
def _analyze_error_patterns(self, phoneme_comparisons: List[Dict]) -> List[str]:
"""Analyze common error patterns across phonemes"""
feedback = []
# Count error types
error_counts = defaultdict(int)
difficult_phonemes = defaultdict(int)
for comparison in phoneme_comparisons:
if comparison["status"] in ["wrong", "substitution"]:
phoneme = comparison["reference_phoneme"]
difficult_phonemes[phoneme] += 1
error_counts[comparison["status"]] += 1
# Most problematic phoneme
if difficult_phonemes:
most_difficult = max(difficult_phonemes.items(), key=lambda x: x[1])
if most_difficult[1] >= 2:
phoneme = most_difficult[0]
phoneme_tips = {
"θ": "Lưỡi giữa răng, thổi nhẹ",
"ð": "Lưỡi giữa răng, rung dây thanh",
"v": "Môi dưới chạm răng trên",
"r": "Cuộn lưỡi nhẹ",
"z": "Như 's' nhưng rung dây thanh"
}
if phoneme in phoneme_tips:
feedback.append(f"Âm khó nhất /{phoneme}/: {phoneme_tips[phoneme]}")
return feedback
class ProductionPronunciationAssessor:
"""Production-ready pronunciation assessor - Enhanced version of the current system"""
def __init__(self, onnx: bool = False, quantized: bool = False):
"""Initialize the production-ready pronunciation assessment system"""
logger.info("Initializing Production Pronunciation Assessment System...")
self.asr = EnhancedWav2Vec2CharacterASR(onnx=onnx, quantized=quantized)
self.word_analyzer = EnhancedWordAnalyzer()
self.prosody_analyzer = EnhancedProsodyAnalyzer()
self.feedback_generator = EnhancedFeedbackGenerator()
self.g2p = EnhancedG2P()
logger.info("Production system initialization completed")
def assess_pronunciation(self, audio_path: str, reference_text: str,
mode: str = "auto") -> Dict:
"""
Main assessment function with enhanced features
Args:
audio_path: Path to audio file
reference_text: Reference text to compare against
mode: Assessment mode ("word", "sentence", "auto", or legacy modes)
Returns:
Enhanced assessment results with backward compatibility
"""
logger.info(f"Starting production assessment in {mode} mode...")
start_time = time.time()
try:
# Normalize and validate mode
assessment_mode = self._normalize_mode(mode, reference_text)
logger.info(f"Using assessment mode: {assessment_mode.value}")
# Step 1: Enhanced ASR transcription with features
asr_result = self.asr.transcribe_with_features(audio_path)
if not asr_result["character_transcript"]:
return self._create_error_result("No speech detected in audio")
# Step 2: Enhanced word analysis
analysis_result = self.word_analyzer.analyze_words_enhanced(
reference_text,
asr_result["phoneme_representation"],
assessment_mode
)
# Step 3: Calculate overall score
overall_score = self._calculate_overall_score(analysis_result["phoneme_differences"])
# Step 4: Prosody analysis for sentence mode
prosody_analysis = {}
if assessment_mode == AssessmentMode.SENTENCE:
prosody_analysis = self.prosody_analyzer.analyze_prosody_enhanced(
asr_result["audio_features"],
reference_text
)
# Step 5: Generate enhanced feedback
feedback = self.feedback_generator.generate_enhanced_feedback(
overall_score,
analysis_result["wrong_words"],
analysis_result["phoneme_differences"],
assessment_mode,
prosody_analysis
)
# Step 6: Create phoneme comparison summary
phoneme_comparison_summary = self._create_phoneme_comparison_summary(
analysis_result["phoneme_pairs"]
)
# Step 7: Assemble result with backward compatibility
result = self._create_enhanced_result(
asr_result, analysis_result, overall_score, feedback,
prosody_analysis, phoneme_comparison_summary, assessment_mode
)
# Add processing metadata
processing_time = time.time() - start_time
result["processing_info"] = {
"processing_time": round(processing_time, 2),
"mode": assessment_mode.value,
"model_used": "Wav2Vec2-Enhanced",
"onnx_enabled": self.asr.use_onnx,
"confidence": asr_result["confidence"],
"enhanced_features": True,
"character_level_analysis": assessment_mode == AssessmentMode.WORD,
"prosody_analysis": assessment_mode == AssessmentMode.SENTENCE
}
logger.info(f"Production assessment completed in {processing_time:.2f}s")
return result
except Exception as e:
logger.error(f"Production assessment error: {e}")
return self._create_error_result(f"Assessment failed: {str(e)}")
def _normalize_mode(self, mode: str, reference_text: str) -> AssessmentMode:
"""Normalize mode parameter with backward compatibility"""
# Legacy mode mapping
legacy_mapping = {
"normal": AssessmentMode.AUTO,
"advanced": AssessmentMode.AUTO
}
if mode in legacy_mapping:
normalized_mode = legacy_mapping[mode]
logger.info(f"Mapped legacy mode '{mode}' to '{normalized_mode.value}'")
mode = normalized_mode.value
# Validate mode
try:
assessment_mode = AssessmentMode(mode)
except ValueError:
logger.warning(f"Invalid mode '{mode}', defaulting to AUTO")
assessment_mode = AssessmentMode.AUTO
# Auto-detect mode based on text length
if assessment_mode == AssessmentMode.AUTO:
word_count = len(reference_text.strip().split())
assessment_mode = AssessmentMode.WORD if word_count <= 3 else AssessmentMode.SENTENCE
logger.info(f"Auto-detected mode: {assessment_mode.value} (word count: {word_count})")
return assessment_mode
def _calculate_overall_score(self, phoneme_comparisons: List[Dict]) -> float:
"""Calculate weighted overall score"""
if not phoneme_comparisons:
return 0.0
total_weighted_score = 0.0
total_weight = 0.0
for comparison in phoneme_comparisons:
weight = comparison.get("difficulty", 0.5) # Use difficulty as weight
score = comparison["score"]
total_weighted_score += score * weight
total_weight += weight
return total_weighted_score / total_weight if total_weight > 0 else 0.0
def _create_phoneme_comparison_summary(self, phoneme_pairs: List[Dict]) -> Dict:
"""Create phoneme comparison summary statistics"""
total = len(phoneme_pairs)
if total == 0:
return {"total_phonemes": 0, "accuracy_percentage": 0}
correct = sum(1 for pair in phoneme_pairs if pair["match"])
substitutions = sum(1 for pair in phoneme_pairs if pair["type"] == "substitution")
deletions = sum(1 for pair in phoneme_pairs if pair["type"] == "deletion")
insertions = sum(1 for pair in phoneme_pairs if pair["type"] == "insertion")
return {
"total_phonemes": total,
"correct": correct,
"substitutions": substitutions,
"deletions": deletions,
"insertions": insertions,
"accuracy_percentage": round((correct / total) * 100, 1),
"error_rate": round(((substitutions + deletions + insertions) / total) * 100, 1)
}
def _create_enhanced_result(self, asr_result: Dict, analysis_result: Dict,
overall_score: float, feedback: List[str],
prosody_analysis: Dict, phoneme_summary: Dict,
assessment_mode: AssessmentMode) -> Dict:
"""Create enhanced result with backward compatibility"""
# Base result structure (backward compatible)
result = {
"transcript": asr_result["character_transcript"],
"transcript_phonemes": asr_result["phoneme_representation"],
"user_phonemes": asr_result["phoneme_representation"],
"character_transcript": asr_result["character_transcript"],
"overall_score": overall_score,
"word_highlights": analysis_result["word_highlights"],
"phoneme_differences": analysis_result["phoneme_differences"],
"wrong_words": analysis_result["wrong_words"],
"feedback": feedback,
}
# Enhanced features
result.update({
"reference_phonemes": analysis_result["reference_phonemes"],
"phoneme_pairs": analysis_result["phoneme_pairs"],
"phoneme_comparison": phoneme_summary,
"assessment_mode": assessment_mode.value,
})
# Add prosody analysis for sentence mode
if prosody_analysis:
result["prosody_analysis"] = prosody_analysis
# Add character-level analysis for word mode
if assessment_mode == AssessmentMode.WORD:
result["character_level_analysis"] = True
# Add character errors to word highlights if available
for word_highlight in result["word_highlights"]:
if "character_errors" in word_highlight:
# Convert CharacterError objects to dicts for JSON serialization
char_errors = []
for error in word_highlight["character_errors"]:
if isinstance(error, CharacterError):
char_errors.append({
"character": error.character,
"position": error.position,
"error_type": error.error_type,
"expected_sound": error.expected_sound,
"actual_sound": error.actual_sound,
"severity": error.severity,
"color": error.color
})
else:
char_errors.append(error)
word_highlight["character_errors"] = char_errors
return result
def _create_error_result(self, error_message: str) -> Dict:
"""Create error result structure"""
return {
"transcript": "",
"transcript_phonemes": "",
"user_phonemes": "",
"character_transcript": "",
"overall_score": 0.0,
"word_highlights": [],
"phoneme_differences": [],
"wrong_words": [],
"feedback": [f"Lỗi: {error_message}"],
"error": error_message,
"assessment_mode": "error",
"processing_info": {
"processing_time": 0,
"mode": "error",
"model_used": "Wav2Vec2-Enhanced",
"confidence": 0.0,
"enhanced_features": False
}
}
def get_system_info(self) -> Dict:
"""Get comprehensive system information"""
return {
"version": "2.1.0-production",
"name": "Production Pronunciation Assessment System",
"modes": [mode.value for mode in AssessmentMode],
"features": [
"Enhanced Levenshtein distance phoneme alignment",
"Character-level error detection (word mode)",
"Advanced prosody analysis (sentence mode)",
"Vietnamese speaker-specific error patterns",
"Real-time confidence scoring",
"IPA phonetic representation with visualization",
"Backward compatibility with legacy APIs",
"Production-ready error handling"
],
"model_info": {
"asr_model": self.asr.model_name,
"onnx_enabled": self.asr.use_onnx,
"sample_rate": self.asr.sample_rate
},
"assessment_modes": {
"word": "Detailed character and phoneme level analysis for single words or short phrases",
"sentence": "Word-level analysis with prosody evaluation for complete sentences",
"auto": "Automatically selects mode based on text length (≤3 words = word mode)"
}
}
# Backward compatibility wrapper
class SimplePronunciationAssessor:
"""Backward compatible wrapper for the enhanced system"""
def __init__(self):
print("Initializing Simple Pronunciation Assessor (Enhanced)...")
self.enhanced_assessor = ProductionPronunciationAssessor()
print("Enhanced Simple Pronunciation Assessor initialization completed")
def assess_pronunciation(self, audio_path: str, reference_text: str,
mode: str = "normal") -> Dict:
"""
Backward compatible assessment function
Args:
audio_path: Path to audio file
reference_text: Reference text to compare
mode: Assessment mode (supports legacy modes)
"""
return self.enhanced_assessor.assess_pronunciation(audio_path, reference_text, mode)
# Example usage
if __name__ == "__main__":
# Initialize production system
system = ProductionPronunciationAssessor(onnx=False, quantized=False)
# Example word mode assessment
print("=== WORD MODE EXAMPLE ===")
word_result = system.assess_pronunciation(
audio_path="./hello_world.wav",
reference_text="hello",
mode="word"
)
# print(f"Word mode result keys: {list(word_result.keys())}")
print("Word result", word_result)
# Example sentence mode assessment
print("\n=== SENTENCE MODE EXAMPLE ===")
sentence_result = system.assess_pronunciation(
audio_path="./hello_how_are_you_today.wav",
reference_text="Hello, how are you today?",
mode="sentence"
)
print(f"Sentence mode result keys: {list(sentence_result.keys())}")
print("Sentence result", sentence_result)
# Example auto mode assessment
print("\n=== AUTO MODE EXAMPLE ===")
auto_result = system.assess_pronunciation(
audio_path="./hello_how_are_you_today.wav",
reference_text="world", # Single word - should auto-select word mode
mode="auto"
)
print(f"Auto mode result: {auto_result['assessment_mode']}")
print("Auto result", auto_result)
# Backward compatibility test
print("\n=== BACKWARD COMPATIBILITY TEST ===")
legacy_assessor = SimplePronunciationAssessor()
legacy_result = legacy_assessor.assess_pronunciation(
audio_path="./hello_world.wav",
reference_text="pronunciation",
mode="normal" # Legacy mode
)
print(f"Legacy mode result: {legacy_result}")
print(f"Legacy mode mapped to: {legacy_result.get('assessment_mode', 'N/A')}")
# System info
print(f"\n=== SYSTEM INFO ===")
system_info = system.get_system_info()
print(f"System version: {system_info['version']}")
print(f"Available modes: {system_info['modes']}")
print(f"Key features: {len(system_info['features'])} enhanced features")