Run_code_api / src /apis /controllers /speaking_controller.py
ABAO77's picture
Implement enhanced pronunciation assessment system with Wav2Vec2 support
aa2c910
raw
history blame
49 kB
from typing import List, Dict
import numpy as np
import librosa
import nltk
import eng_to_ipa as ipa
import torch
import re
from collections import defaultdict
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from optimum.onnxruntime import ORTModelForSpeechSeq2Seq
from loguru import logger
import time
from src.AI_Models.wave2vec_inference import (
Wave2Vec2Inference,
Wave2Vec2ONNXInference,
export_to_onnx,
)
# Download required NLTK data
try:
nltk.download("cmudict", quiet=True)
from nltk.corpus import cmudict
except:
print("Warning: NLTK data not available")
class Wav2Vec2CharacterASR:
"""Wav2Vec2 character-level ASR with support for both ONNX and Transformers inference"""
def __init__(
self,
model_name: str = "facebook/wav2vec2-large-960h-lv60-self",
onnx: bool = False,
quantized: bool = False,
):
"""
Initialize Wav2Vec2 character-level model
Args:
model_name: HuggingFace model name
onnx: If True, use ONNX runtime for inference. If False, use Transformers
onnx_model_path: Path to the ONNX model file (only used if onnx=True)
"""
self.use_onnx = onnx
self.sample_rate = 16000
self.model_name = model_name
# Check thử path của onnx model có tồn tại hay không
if onnx:
import os
if not os.path.exists(
"wav2vec2-large-960h-lv60-self"
+ (".quant" if quantized else "")
+ ".onnx"
):
export_to_onnx(model_name, quantize=quantized)
self.model = (
Wave2Vec2Inference(model_name)
if not onnx
else Wave2Vec2ONNXInference(
model_name,
"wav2vec2-large-960h-lv60-self"
+ (".quant" if quantized else "")
+ ".onnx",
)
)
def transcribe_to_characters(self, audio_path: str) -> Dict:
try:
start_time = time.time()
character_transcript = self.model.file_to_text(audio_path)
character_transcript = self._clean_character_transcript(
character_transcript
)
phoneme_like_transcript = self._characters_to_phoneme_representation(
character_transcript
)
logger.info(f"Transcription time: {time.time() - start_time:.2f}s")
return {
"character_transcript": character_transcript,
"phoneme_representation": phoneme_like_transcript,
}
except Exception as e:
print(f"Transformers transcription error: {e}")
return self._empty_result()
def _calculate_confidence_scores(self, logits: np.ndarray) -> List[float]:
"""Calculate confidence scores from logits using numpy"""
# Apply softmax
exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True))
softmax_probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
# Get max probabilities
max_probs = np.max(softmax_probs, axis=-1)[0]
return max_probs.tolist()
def _clean_character_transcript(self, transcript: str) -> str:
"""Clean and standardize character transcript"""
# Remove extra spaces and special tokens
logger.info(f"Raw transcript before cleaning: {transcript}")
cleaned = re.sub(r"\s+", " ", transcript)
cleaned = cleaned.strip().lower()
return cleaned
def _characters_to_phoneme_representation(self, text: str) -> str:
"""Convert character-based transcript to phoneme-like representation for comparison"""
if not text:
return ""
words = text.split()
phoneme_words = []
g2p = SimpleG2P()
for word in words:
try:
if g2p:
word_data = g2p.text_to_phonemes(word)[0]
phoneme_words.extend(word_data["phonemes"])
else:
phoneme_words.extend(self._simple_letter_to_phoneme(word))
except:
# Fallback: simple letter-to-sound mapping
phoneme_words.extend(self._simple_letter_to_phoneme(word))
return " ".join(phoneme_words)
def _simple_letter_to_phoneme(self, word: str) -> List[str]:
"""Simple fallback letter-to-phoneme conversion"""
letter_to_phoneme = {
"a": "æ",
"b": "b",
"c": "k",
"d": "d",
"e": "ɛ",
"f": "f",
"g": "ɡ",
"h": "h",
"i": "ɪ",
"j": "dʒ",
"k": "k",
"l": "l",
"m": "m",
"n": "n",
"o": "ʌ",
"p": "p",
"q": "k",
"r": "r",
"s": "s",
"t": "t",
"u": "ʌ",
"v": "v",
"w": "w",
"x": "ks",
"y": "j",
"z": "z",
}
phonemes = []
for letter in word.lower():
if letter in letter_to_phoneme:
phonemes.append(letter_to_phoneme[letter])
return phonemes
def _empty_result(self) -> Dict:
"""Return empty result structure"""
return {
"character_transcript": "",
"phoneme_representation": "",
"raw_predicted_ids": [],
"confidence_scores": [],
}
def get_model_info(self) -> Dict:
"""Get information about the loaded model"""
info = {
"model_name": self.model_name,
"sample_rate": self.sample_rate,
"inference_method": "ONNX" if self.use_onnx else "Transformers",
}
if self.use_onnx:
info.update(
{
"onnx_model_path": self.onnx_model_path,
"input_name": self.input_name,
"output_name": self.output_name,
"session_providers": self.session.get_providers(),
}
)
return info
class SimpleG2P:
"""Simple Grapheme-to-Phoneme converter for reference text"""
def __init__(self):
try:
self.cmu_dict = cmudict.dict()
except:
self.cmu_dict = {}
print("Warning: CMU dictionary not available")
def text_to_phonemes(self, text: str) -> List[Dict]:
"""Convert text to phoneme sequence"""
words = self._clean_text(text).split()
phoneme_sequence = []
for word in words:
word_phonemes = self._get_word_phonemes(word)
phoneme_sequence.append(
{
"word": word,
"phonemes": word_phonemes,
"ipa": self._get_ipa(word),
"phoneme_string": " ".join(word_phonemes),
}
)
return phoneme_sequence
def get_reference_phoneme_string(self, text: str) -> str:
"""Get reference phoneme string for comparison"""
phoneme_sequence = self.text_to_phonemes(text)
all_phonemes = []
for word_data in phoneme_sequence:
all_phonemes.extend(word_data["phonemes"])
return " ".join(all_phonemes)
def _clean_text(self, text: str) -> str:
"""Clean text for processing"""
text = re.sub(r"[^\w\s\']", " ", text)
text = re.sub(r"\s+", " ", text)
return text.lower().strip()
def _get_word_phonemes(self, word: str) -> List[str]:
"""Get phonemes for a word"""
word_lower = word.lower()
if word_lower in self.cmu_dict:
# Remove stress markers and convert to Wav2Vec2 phoneme format
phonemes = self.cmu_dict[word_lower][0]
clean_phonemes = [re.sub(r"[0-9]", "", p) for p in phonemes]
return self._convert_to_wav2vec_format(clean_phonemes)
else:
return self._estimate_phonemes(word)
def _convert_to_wav2vec_format(self, cmu_phonemes: List[str]) -> List[str]:
"""Convert CMU phonemes to Wav2Vec2 format"""
# Mapping from CMU to Wav2Vec2/eSpeak phonemes
cmu_to_espeak = {
"AA": "ɑ",
"AE": "æ",
"AH": "ʌ",
"AO": "ɔ",
"AW": "aʊ",
"AY": "aɪ",
"EH": "ɛ",
"ER": "ɝ",
"EY": "eɪ",
"IH": "ɪ",
"IY": "i",
"OW": "oʊ",
"OY": "ɔɪ",
"UH": "ʊ",
"UW": "u",
"B": "b",
"CH": "tʃ",
"D": "d",
"DH": "ð",
"F": "f",
"G": "ɡ",
"HH": "h",
"JH": "dʒ",
"K": "k",
"L": "l",
"M": "m",
"N": "n",
"NG": "ŋ",
"P": "p",
"R": "r",
"S": "s",
"SH": "ʃ",
"T": "t",
"TH": "θ",
"V": "v",
"W": "w",
"Y": "j",
"Z": "z",
"ZH": "ʒ",
}
converted = []
for phoneme in cmu_phonemes:
converted_phoneme = cmu_to_espeak.get(phoneme, phoneme.lower())
converted.append(converted_phoneme)
return converted
def _get_ipa(self, word: str) -> str:
"""Get IPA transcription"""
try:
return ipa.convert(word)
except:
return f"/{word}/"
def _estimate_phonemes(self, word: str) -> List[str]:
"""Estimate phonemes for unknown words"""
# Basic phoneme estimation with eSpeak-style output
phoneme_map = {
"ch": ["tʃ"],
"sh": ["ʃ"],
"th": ["θ"],
"ph": ["f"],
"ck": ["k"],
"ng": ["ŋ"],
"qu": ["k", "w"],
"a": ["æ"],
"e": ["ɛ"],
"i": ["ɪ"],
"o": ["ʌ"],
"u": ["ʌ"],
"b": ["b"],
"c": ["k"],
"d": ["d"],
"f": ["f"],
"g": ["ɡ"],
"h": ["h"],
"j": ["dʒ"],
"k": ["k"],
"l": ["l"],
"m": ["m"],
"n": ["n"],
"p": ["p"],
"r": ["r"],
"s": ["s"],
"t": ["t"],
"v": ["v"],
"w": ["w"],
"x": ["k", "s"],
"y": ["j"],
"z": ["z"],
}
word = word.lower()
phonemes = []
i = 0
while i < len(word):
# Check 2-letter combinations first
if i <= len(word) - 2:
two_char = word[i : i + 2]
if two_char in phoneme_map:
phonemes.extend(phoneme_map[two_char])
i += 2
continue
# Single character
char = word[i]
if char in phoneme_map:
phonemes.extend(phoneme_map[char])
i += 1
return phonemes
def get_visualization_data(self, text: str) -> List[Dict]:
"""Get visualization data for IPA representation"""
words = self._clean_text(text).split()
visualization_data = []
for word in words:
word_phonemes = self._get_word_phonemes(word)
ipa_transcription = self._get_ipa(word)
visualization_data.append({
"word": word,
"phonemes": word_phonemes,
"ipa": ipa_transcription,
"phoneme_string": " ".join(word_phonemes),
"visualization": self._create_phoneme_visualization(word_phonemes)
})
return visualization_data
def _create_phoneme_visualization(self, phonemes: List[str]) -> List[Dict]:
"""Create visualization data for phonemes"""
visualization = []
for phoneme in phonemes:
# Map phonemes to color categories for visualization
color_category = self._get_phoneme_color_category(phoneme)
visualization.append({
"phoneme": phoneme,
"color_category": color_category,
"description": self._get_phoneme_description(phoneme)
})
return visualization
def _get_phoneme_color_category(self, phoneme: str) -> str:
"""Categorize phonemes by color for visualization"""
vowel_phonemes = {"ɑ", "æ", "ʌ", "ɔ", "aʊ", "aɪ", "ɛ", "ɝ", "eɪ", "ɪ", "i", "oʊ", "ɔɪ", "ʊ", "u"}
consonant_phonemes = {
# Plosives
"p", "b", "t", "d", "k", "ɡ",
# Nasals
"m", "n", "ŋ",
# Fricatives
"f", "v", "θ", "ð", "s", "z", "ʃ", "ʒ", "h",
# Affricates
"tʃ", "dʒ",
# Liquids
"l", "r",
# Glides
"w", "j"
}
if phoneme in vowel_phonemes:
return "vowel"
elif phoneme in consonant_phonemes:
return "consonant"
else:
return "other"
def _get_phoneme_description(self, phoneme: str) -> str:
"""Get description for a phoneme"""
descriptions = {
# Vowels
"ɑ": "Open back unrounded vowel (like 'a' in 'father')",
"æ": "Near-open front unrounded vowel (like 'a' in 'cat')",
"ʌ": "Open-mid back unrounded vowel (like 'u' in 'cup')",
"ɔ": "Open-mid back rounded vowel (like 'o' in 'thought')",
"aʊ": "Diphthong (like 'ow' in 'cow')",
"aɪ": "Diphthong (like 'i' in 'bike')",
"ɛ": "Open-mid front unrounded vowel (like 'e' in 'bed')",
"ɝ": "R-colored vowel (like 'er' in 'her')",
"eɪ": "Diphthong (like 'a' in 'cake')",
"ɪ": "Near-close near-front unrounded vowel (like 'i' in 'sit')",
"i": "Close front unrounded vowel (like 'ee' in 'see')",
"oʊ": "Diphthong (like 'o' in 'go')",
"ɔɪ": "Diphthong (like 'oy' in 'boy')",
"ʊ": "Near-close near-back rounded vowel (like 'u' in 'put')",
"u": "Close back rounded vowel (like 'oo' in 'food')",
# Consonants
"p": "Voiceless bilabial plosive (like 'p' in 'pen')",
"b": "Voiced bilabial plosive (like 'b' in 'bat')",
"t": "Voiceless alveolar plosive (like 't' in 'top')",
"d": "Voiced alveolar plosive (like 'd' in 'dog')",
"k": "Voiceless velar plosive (like 'c' in 'cat')",
"ɡ": "Voiced velar plosive (like 'g' in 'go')",
"m": "Bilabial nasal (like 'm' in 'man')",
"n": "Alveolar nasal (like 'n' in 'net')",
"ŋ": "Velar nasal (like 'ng' in 'sing')",
"f": "Voiceless labiodental fricative (like 'f' in 'fan')",
"v": "Voiced labiodental fricative (like 'v' in 'van')",
"θ": "Voiceless dental fricative (like 'th' in 'think')",
"ð": "Voiced dental fricative (like 'th' in 'this')",
"s": "Voiceless alveolar fricative (like 's' in 'sit')",
"z": "Voiced alveolar fricative (like 'z' in 'zip')",
"ʃ": "Voiceless postalveolar fricative (like 'sh' in 'ship')",
"ʒ": "Voiced postalveolar fricative (like 's' in 'measure')",
"h": "Voiceless glottal fricative (like 'h' in 'hat')",
"tʃ": "Voiceless postalveolar affricate (like 'ch' in 'chat')",
"dʒ": "Voiced postalveolar affricate (like 'j' in 'jet')",
"l": "Alveolar lateral approximant (like 'l' in 'let')",
"r": "Alveolar approximant (like 'r' in 'red')",
"w": "Labial-velar approximant (like 'w' in 'wet')",
"j": "Palatal approximant (like 'y' in 'yes')",
}
return descriptions.get(phoneme, f"Phoneme: {phoneme}")
class PhonemeComparator:
"""Compare reference and learner phoneme sequences"""
def __init__(self):
# Vietnamese speakers' common phoneme substitutions
self.substitution_patterns = {
"θ": ["f", "s", "t"], # TH → F, S, T
"ð": ["d", "z", "v"], # DH → D, Z, V
"v": ["w", "f"], # V → W, F
"r": ["l"], # R → L
"l": ["r"], # L → R
"z": ["s"], # Z → S
"ʒ": ["ʃ", "z"], # ZH → SH, Z
"ŋ": ["n"], # NG → N
}
# Difficulty levels for Vietnamese speakers
self.difficulty_map = {
"θ": 0.9, # th (think)
"ð": 0.9, # th (this)
"v": 0.8, # v
"z": 0.8, # z
"ʒ": 0.9, # zh (measure)
"r": 0.7, # r
"l": 0.6, # l
"w": 0.5, # w
"f": 0.4, # f
"s": 0.3, # s
"ʃ": 0.5, # sh
"tʃ": 0.4, # ch
"dʒ": 0.5, # j
"ŋ": 0.3, # ng
}
# Additional Vietnamese substitution patterns
self.extended_substitution_patterns = {
# Common Vietnamese speaker errors
"θ": ["f", "s", "t", "d"], # TH sound
"ð": ["d", "z", "v", "t"], # DH sound
"v": ["w", "f", "b"], # V sound
"w": ["v", "b"], # W sound
"r": ["l", "n"], # R sound
"l": ["r", "n"], # L sound
"z": ["s", "j"], # Z sound
"ʒ": ["ʃ", "z", "s"], # ZH sound
"ʃ": ["s", "ʒ"], # SH sound
"ŋ": ["n", "m"], # NG sound
"tʃ": ["ʃ", "s", "k"], # CH sound
"dʒ": ["ʒ", "j", "g"], # J sound
}
def compare_phoneme_sequences(
self, reference_phonemes: str, learner_phonemes: str
) -> List[Dict]:
"""Compare reference and learner phoneme sequences"""
# Split phoneme strings
ref_phones = reference_phonemes.split()
learner_phones = learner_phonemes.split()
print(f"Reference phonemes: {ref_phones}")
print(f"Learner phonemes: {learner_phones}")
# Simple alignment comparison
comparisons = []
max_len = max(len(ref_phones), len(learner_phones))
for i in range(max_len):
ref_phoneme = ref_phones[i] if i < len(ref_phones) else ""
learner_phoneme = learner_phones[i] if i < len(learner_phones) else ""
if ref_phoneme and learner_phoneme:
# Both present - check accuracy
if ref_phoneme == learner_phoneme:
status = "correct"
score = 1.0
elif self._is_acceptable_substitution(ref_phoneme, learner_phoneme):
status = "acceptable"
score = 0.7
else:
status = "wrong"
score = 0.2
elif ref_phoneme and not learner_phoneme:
# Missing phoneme
status = "missing"
score = 0.0
elif learner_phoneme and not ref_phoneme:
# Extra phoneme
status = "extra"
score = 0.0
else:
continue
comparison = {
"position": i,
"reference_phoneme": ref_phoneme,
"learner_phoneme": learner_phoneme,
"status": status,
"score": score,
"difficulty": self.difficulty_map.get(ref_phoneme, 0.3),
}
comparisons.append(comparison)
return comparisons
def _is_acceptable_substitution(self, reference: str, learner: str) -> bool:
"""Check if learner phoneme is acceptable substitution for Vietnamese speakers"""
acceptable = self.extended_substitution_patterns.get(reference, [])
return learner in acceptable
# =============================================================================
# WORD ANALYZER
# =============================================================================
class WordAnalyzer:
"""Analyze word-level pronunciation accuracy using character-based ASR"""
def __init__(self):
self.g2p = SimpleG2P()
self.comparator = PhonemeComparator()
def analyze_words(self, reference_text: str, learner_phonemes: str) -> Dict:
"""Analyze word-level pronunciation using phoneme representation from character ASR"""
# Get reference phonemes by word
reference_words = self.g2p.text_to_phonemes(reference_text)
# Get overall phoneme comparison
reference_phoneme_string = self.g2p.get_reference_phoneme_string(reference_text)
phoneme_comparisons = self.comparator.compare_phoneme_sequences(
reference_phoneme_string, learner_phonemes
)
# Map phonemes back to words
word_highlights = self._create_word_highlights(
reference_words, phoneme_comparisons
)
# Identify wrong words
wrong_words = self._identify_wrong_words(word_highlights, phoneme_comparisons)
return {
"word_highlights": word_highlights,
"phoneme_differences": phoneme_comparisons,
"wrong_words": wrong_words,
}
def _create_word_highlights(
self, reference_words: List[Dict], phoneme_comparisons: List[Dict]
) -> List[Dict]:
"""Create word highlighting data with enhanced visualization"""
word_highlights = []
phoneme_index = 0
for word_data in reference_words:
word = word_data["word"]
word_phonemes = word_data["phonemes"]
num_phonemes = len(word_phonemes)
# Get phoneme scores for this word
word_phoneme_scores = []
for j in range(num_phonemes):
if phoneme_index + j < len(phoneme_comparisons):
comparison = phoneme_comparisons[phoneme_index + j]
word_phoneme_scores.append(comparison["score"])
# Calculate word score
word_score = np.mean(word_phoneme_scores) if word_phoneme_scores else 0.0
# Create word highlight with enhanced visualization data
highlight = {
"word": word,
"score": float(word_score),
"status": self._get_word_status(word_score),
"color": self._get_word_color(word_score),
"phonemes": word_phonemes,
"ipa": word_data["ipa"],
"phoneme_scores": word_phoneme_scores,
"phoneme_start_index": phoneme_index,
"phoneme_end_index": phoneme_index + num_phonemes - 1,
# Enhanced visualization data
"phoneme_visualization": self.g2p._create_phoneme_visualization(word_phonemes)
}
word_highlights.append(highlight)
phoneme_index += num_phonemes
return word_highlights
def _identify_wrong_words(
self, word_highlights: List[Dict], phoneme_comparisons: List[Dict]
) -> List[Dict]:
"""Identify words that were pronounced incorrectly"""
wrong_words = []
for word_highlight in word_highlights:
if word_highlight["score"] < 0.6: # Threshold for wrong pronunciation
# Find specific phoneme errors for this word
start_idx = word_highlight["phoneme_start_index"]
end_idx = word_highlight["phoneme_end_index"]
wrong_phonemes = []
missing_phonemes = []
for i in range(start_idx, min(end_idx + 1, len(phoneme_comparisons))):
comparison = phoneme_comparisons[i]
if comparison["status"] == "wrong":
wrong_phonemes.append(
{
"expected": comparison["reference_phoneme"],
"actual": comparison["learner_phoneme"],
"difficulty": comparison["difficulty"],
"visualization": self.g2p._create_phoneme_visualization([comparison["reference_phoneme"]])[0]
}
)
elif comparison["status"] == "missing":
missing_phonemes.append(
{
"phoneme": comparison["reference_phoneme"],
"difficulty": comparison["difficulty"],
"visualization": self.g2p._create_phoneme_visualization([comparison["reference_phoneme"]])[0]
}
)
wrong_word = {
"word": word_highlight["word"],
"score": word_highlight["score"],
"expected_phonemes": word_highlight["phonemes"],
"ipa": word_highlight["ipa"],
"wrong_phonemes": wrong_phonemes,
"missing_phonemes": missing_phonemes,
"tips": self._get_vietnamese_tips(wrong_phonemes, missing_phonemes),
# Enhanced visualization data
"phoneme_visualization": word_highlight["phoneme_visualization"]
}
wrong_words.append(wrong_word)
return wrong_words
def _get_word_status(self, score: float) -> str:
"""Get word status from score"""
if score >= 0.8:
return "excellent"
elif score >= 0.6:
return "good"
elif score >= 0.4:
return "needs_practice"
else:
return "poor"
def _get_word_color(self, score: float) -> str:
"""Get color for word highlighting"""
if score >= 0.8:
return "#22c55e" # Green
elif score >= 0.6:
return "#84cc16" # Light green
elif score >= 0.4:
return "#eab308" # Yellow
else:
return "#ef4444" # Red
def _get_vietnamese_tips(
self, wrong_phonemes: List[Dict], missing_phonemes: List[Dict]
) -> List[str]:
"""Get Vietnamese-specific pronunciation tips"""
tips = []
# Tips for specific Vietnamese pronunciation challenges
vietnamese_tips = {
"θ": "Đặt lưỡi giữa răng trên và dưới, thổi nhẹ (think, three)",
"ð": "Giống θ nhưng rung dây thanh âm (this, that)",
"v": "Chạm môi dưới vào răng trên, không dùng cả hai môi như tiếng Việt",
"r": "Cuộn lưỡi nhưng không chạm vào vòm miệng, không lăn lưỡi",
"l": "Đầu lưỡi chạm vào vòm miệng sau răng",
"z": "Giống âm 's' nhưng có rung dây thanh âm",
"ʒ": "Giống âm 'ʃ' (sh) nhưng có rung dây thanh âm",
"w": "Tròn môi như âm 'u', không dùng răng như âm 'v'",
}
# Add tips for wrong phonemes
for wrong in wrong_phonemes:
expected = wrong["expected"]
actual = wrong["actual"]
if expected in vietnamese_tips:
tips.append(f"Âm '{expected}': {vietnamese_tips[expected]}")
else:
tips.append(f"Luyện âm '{expected}' thay vì '{actual}'")
# Add tips for missing phonemes
for missing in missing_phonemes:
phoneme = missing["phoneme"]
if phoneme in vietnamese_tips:
tips.append(f"Thiếu âm '{phoneme}': {vietnamese_tips[phoneme]}")
return tips
class SimpleFeedbackGenerator:
"""Generate simple, actionable feedback in Vietnamese"""
def generate_feedback(
self,
overall_score: float,
wrong_words: List[Dict],
phoneme_comparisons: List[Dict],
) -> List[str]:
"""Generate Vietnamese feedback"""
feedback = []
# Overall feedback in Vietnamese
if overall_score >= 0.8:
feedback.append("Phát âm rất tốt! Bạn đã làm xuất sắc.")
elif overall_score >= 0.6:
feedback.append("Phát âm khá tốt, còn một vài điểm cần cải thiện.")
elif overall_score >= 0.4:
feedback.append(
"Cần luyện tập thêm. Tập trung vào những từ được đánh dấu đỏ."
)
else:
feedback.append("Hãy luyện tập chậm và rõ ràng hơn.")
# Wrong words feedback
if wrong_words:
if len(wrong_words) <= 3:
word_names = [w["word"] for w in wrong_words]
feedback.append(f"Các từ cần luyện tập: {', '.join(word_names)}")
else:
feedback.append(
f"Có {len(wrong_words)} từ cần luyện tập. Tập trung vào từng từ một."
)
# Most problematic phonemes
problem_phonemes = defaultdict(int)
for comparison in phoneme_comparisons:
if comparison["status"] in ["wrong", "missing"]:
phoneme = comparison["reference_phoneme"]
problem_phonemes[phoneme] += 1
if problem_phonemes:
most_difficult = sorted(
problem_phonemes.items(), key=lambda x: x[1], reverse=True
)
top_problem = most_difficult[0][0]
phoneme_tips = {
"θ": "Lưỡi giữa răng, thổi nhẹ",
"ð": "Lưỡi giữa răng, rung dây thanh",
"v": "Môi dưới chạm răng trên",
"r": "Cuộn lưỡi, không chạm vòm miệng",
"l": "Lưỡi chạm vòm miệng",
"z": "Như 's' nhưng rung dây thanh",
}
if top_problem in phoneme_tips:
feedback.append(
f"Âm khó nhất '{top_problem}': {phoneme_tips[top_problem]}"
)
return feedback
class SimplePronunciationAssessor:
"""Main pronunciation assessor supporting both normal (Whisper) and advanced (Wav2Vec2) modes
Backward compatible wrapper for EnhancedPronunciationAssessor"""
def __init__(self):
print("Initializing Simple Pronunciation Assessor...")
self.enhanced_assessor = EnhancedPronunciationAssessor()
print("Simple Pronunciation Assessor initialization completed")
def assess_pronunciation(
self, audio_path: str, reference_text: str, mode: str = "normal"
) -> Dict:
"""
Backward compatible assessment function with mode selection
Args:
audio_path: Path to audio file
reference_text: Reference text to compare
mode: 'normal' (Whisper), 'advanced' (Wav2Vec2), or 'auto' (determined by text length)
Output: Word highlights + Phoneme differences + Wrong words
"""
print(f"Starting pronunciation assessment in {mode} mode...")
# Map old modes to new modes for backward compatibility
mode_mapping = {
"normal": "auto",
"advanced": "auto"
}
# Validate and map mode parameter
if mode in mode_mapping:
new_mode = mode_mapping[mode]
print(f"Mapping old mode '{mode}' to new mode '{new_mode}' for backward compatibility")
elif mode in ["word", "sentence", "auto"]:
new_mode = mode
else:
# Default to auto for any invalid mode
new_mode = "auto"
print(f"Invalid mode '{mode}' provided, defaulting to 'auto'")
# Use the enhanced assessor
result = self.enhanced_assessor.assess_pronunciation(
audio_path, reference_text, new_mode
)
# Filter result to maintain backward compatibility
compatible_result = {
"transcript": result["transcript"],
"transcript_phonemes": result["transcript_phonemes"],
"user_phonemes": result["user_phonemes"],
"character_transcript": result["character_transcript"],
"overall_score": result["overall_score"],
"word_highlights": result["word_highlights"],
"phoneme_differences": result["phoneme_differences"],
"wrong_words": result["wrong_words"],
"feedback": result["feedback"],
"processing_info": result["processing_info"],
}
# Add new fields if they exist (for newer clients)
if "reference_phonemes" in result:
compatible_result["reference_phonemes"] = result["reference_phonemes"]
if "phoneme_pairs" in result:
compatible_result["phoneme_pairs"] = result["phoneme_pairs"]
if "phoneme_comparison" in result:
compatible_result["phoneme_comparison"] = result["phoneme_comparison"]
if "prosody_analysis" in result:
compatible_result["prosody_analysis"] = result["prosody_analysis"]
print("Assessment completed successfully")
return compatible_result
def _calculate_overall_score(self, phoneme_comparisons: List[Dict]) -> float:
"""Calculate overall pronunciation score"""
if not phoneme_comparisons:
return 0.0
total_score = sum(comparison["score"] for comparison in phoneme_comparisons)
return total_score / len(phoneme_comparisons)
class EnhancedPronunciationAssessor:
"""Enhanced pronunciation assessor with word mode and sentence mode support"""
def __init__(self):
print("Initializing Enhanced Pronunciation Assessor...")
self.wav2vec2_asr = Wav2Vec2CharacterASR() # Advanced mode
self.whisper_asr = None # Normal mode
self.word_analyzer = WordAnalyzer()
self.feedback_generator = SimpleFeedbackGenerator()
self.g2p = SimpleG2P()
self.comparator = PhonemeComparator()
print("Enhanced Pronunciation Assessor initialization completed")
def assess_pronunciation(
self, audio_path: str, reference_text: str, mode: str = "auto"
) -> Dict:
"""
Enhanced assessment function with mode selection
Args:
audio_path: Path to audio file
reference_text: Reference text to compare
mode: 'word', 'sentence', or 'auto' (automatically determined based on text length)
Returns:
Enhanced assessment results with prosody analysis for sentence mode
"""
print(f"Starting enhanced pronunciation assessment in {mode} mode...")
# Validate and normalize mode parameter
valid_modes = ["word", "sentence", "auto"]
if mode not in valid_modes:
print(f"Invalid mode '{mode}' provided, defaulting to 'auto'")
mode = "auto"
# Determine mode based on text length if auto
if mode == "auto":
word_count = len(reference_text.strip().split())
mode = "word" if word_count <= 3 else "sentence"
print(f"Auto-selected mode: {mode} (word count: {word_count})")
# Step 1: Transcription using Wav2Vec2 character model
print("Step 1: Using Wav2Vec2 character transcription...")
asr_result = self.wav2vec2_asr.transcribe_to_characters(audio_path)
model_info = f"Wav2Vec2-Character ({self.wav2vec2_asr.model})"
character_transcript = asr_result["character_transcript"]
phoneme_representation = asr_result["phoneme_representation"]
print(f"Character transcript: {character_transcript}")
print(f"Phoneme representation: {phoneme_representation}")
# Step 2: Word analysis using phoneme representation
print("Step 2: Analyzing words...")
analysis_result = self.word_analyzer.analyze_words(
reference_text, phoneme_representation
)
# Step 3: Calculate overall score
phoneme_comparisons = analysis_result["phoneme_differences"]
overall_score = self._calculate_overall_score(phoneme_comparisons)
# Step 4: Generate feedback
print("Step 3: Generating feedback...")
feedback = self.feedback_generator.generate_feedback(
overall_score, analysis_result["wrong_words"], phoneme_comparisons
)
# Step 5: Enhanced phoneme comparison using Levenshtein distance
print("Step 4: Performing advanced phoneme comparison...")
reference_phoneme_string = self.g2p.get_reference_phoneme_string(reference_text)
enhanced_comparisons = self._enhanced_phoneme_comparison(
reference_phoneme_string, phoneme_representation
)
# Step 6: Prosody analysis for sentence mode
prosody_analysis = {}
if mode == "sentence":
print("Step 5: Performing prosody analysis...")
prosody_analysis = self._analyze_prosody(audio_path, reference_text)
# Step 7: Create phoneme pairs for visualization
phoneme_pairs = self._create_phoneme_pairs(
reference_phoneme_string, phoneme_representation
)
# Step 8: Create phoneme comparison summary
phoneme_comparison_summary = self._create_phoneme_comparison_summary(
phoneme_pairs
)
result = {
"transcript": character_transcript, # What user actually said
"transcript_phonemes": phoneme_representation,
"user_phonemes": phoneme_representation, # Alias for UI clarity
"character_transcript": character_transcript,
"overall_score": overall_score,
"word_highlights": analysis_result["word_highlights"],
"phoneme_differences": enhanced_comparisons,
"wrong_words": analysis_result["wrong_words"],
"feedback": feedback,
"processing_info": {
"model_used": model_info,
"mode": mode,
"character_based": True,
"language_model_correction": False,
"raw_output": True,
},
# Enhanced features
"reference_phonemes": reference_phoneme_string,
"phoneme_pairs": phoneme_pairs,
"phoneme_comparison": phoneme_comparison_summary,
"prosody_analysis": prosody_analysis,
}
print("Enhanced assessment completed successfully")
return result
def _calculate_overall_score(self, phoneme_comparisons: List[Dict]) -> float:
"""Calculate overall pronunciation score"""
if not phoneme_comparisons:
return 0.0
total_score = sum(comparison["score"] for comparison in phoneme_comparisons)
return total_score / len(phoneme_comparisons)
def _enhanced_phoneme_comparison(self, reference: str, learner: str) -> List[Dict]:
"""Enhanced phoneme comparison using Levenshtein distance"""
import difflib
# Split phoneme strings
ref_phones = reference.split()
learner_phones = learner.split()
# Use SequenceMatcher for alignment
matcher = difflib.SequenceMatcher(None, ref_phones, learner_phones)
comparisons = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal':
# Correct phonemes
for k in range(i2 - i1):
comparisons.append({
"position": len(comparisons),
"reference_phoneme": ref_phones[i1 + k],
"learner_phoneme": learner_phones[j1 + k],
"status": "correct",
"score": 1.0,
"difficulty": self.comparator.difficulty_map.get(ref_phones[i1 + k], 0.3),
})
elif tag == 'delete':
# Missing phonemes
for k in range(i1, i2):
comparisons.append({
"position": len(comparisons),
"reference_phoneme": ref_phones[k],
"learner_phoneme": "",
"status": "missing",
"score": 0.0,
"difficulty": self.comparator.difficulty_map.get(ref_phones[k], 0.3),
})
elif tag == 'insert':
# Extra phonemes
for k in range(j1, j2):
comparisons.append({
"position": len(comparisons),
"reference_phoneme": "",
"learner_phoneme": learner_phones[k],
"status": "extra",
"score": 0.0,
"difficulty": 0.3,
})
elif tag == 'replace':
# Substituted phonemes
max_len = max(i2 - i1, j2 - j1)
for k in range(max_len):
ref_phoneme = ref_phones[i1 + k] if i1 + k < i2 else ""
learner_phoneme = learner_phones[j1 + k] if j1 + k < j2 else ""
if ref_phoneme and learner_phoneme:
# Both present - check if substitution is acceptable
if self.comparator._is_acceptable_substitution(ref_phoneme, learner_phoneme):
status = "acceptable"
score = 0.7
else:
status = "wrong"
score = 0.2
elif ref_phoneme and not learner_phoneme:
status = "missing"
score = 0.0
elif learner_phoneme and not ref_phoneme:
status = "extra"
score = 0.0
else:
continue
comparisons.append({
"position": len(comparisons),
"reference_phoneme": ref_phoneme,
"learner_phoneme": learner_phoneme,
"status": status,
"score": score,
"difficulty": self.comparator.difficulty_map.get(ref_phoneme, 0.3),
})
return comparisons
def _create_phoneme_pairs(self, reference: str, learner: str) -> List[Dict]:
"""Create phoneme pairs for visualization"""
ref_phones = reference.split()
learner_phones = learner.split()
# Use SequenceMatcher for alignment
import difflib
matcher = difflib.SequenceMatcher(None, ref_phones, learner_phones)
pairs = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal':
for k in range(i2 - i1):
pairs.append({
"reference": ref_phones[i1 + k],
"learner": learner_phones[j1 + k],
"match": True,
"type": "correct"
})
elif tag == 'replace':
max_len = max(i2 - i1, j2 - j1)
for k in range(max_len):
ref_phoneme = ref_phones[i1 + k] if i1 + k < i2 else ""
learner_phoneme = learner_phones[j1 + k] if j1 + k < j2 else ""
pairs.append({
"reference": ref_phoneme,
"learner": learner_phoneme,
"match": False,
"type": "substitution"
})
elif tag == 'delete':
for k in range(i1, i2):
pairs.append({
"reference": ref_phones[k],
"learner": "",
"match": False,
"type": "deletion"
})
elif tag == 'insert':
for k in range(j1, j2):
pairs.append({
"reference": "",
"learner": learner_phones[k],
"match": False,
"type": "insertion"
})
return pairs
def _create_phoneme_comparison_summary(self, phoneme_pairs: List[Dict]) -> Dict:
"""Create a summary of phoneme comparison statistics"""
total = len(phoneme_pairs)
correct = sum(1 for pair in phoneme_pairs if pair["match"])
substitutions = sum(1 for pair in phoneme_pairs if pair["type"] == "substitution")
deletions = sum(1 for pair in phoneme_pairs if pair["type"] == "deletion")
insertions = sum(1 for pair in phoneme_pairs if pair["type"] == "insertion")
return {
"total_phonemes": total,
"correct": correct,
"substitutions": substitutions,
"deletions": deletions,
"insertions": insertions,
"accuracy_percentage": (correct / total * 100) if total > 0 else 0,
"error_rate": ((substitutions + deletions + insertions) / total * 100) if total > 0 else 0
}
def _analyze_prosody(self, audio_path: str, reference_text: str) -> Dict:
"""Analyze prosody features (pitch, rhythm, intensity)"""
try:
# Load audio file
import librosa
y, sr = librosa.load(audio_path, sr=16000)
# Extract prosodic features
# Pitch analysis
pitches, magnitudes = librosa.piptrack(y=y, sr=sr)
pitch_values = []
for i in range(pitches.shape[1]):
index = magnitudes[:, i].argmax()
pitch = pitches[index, i]
if pitch > 0: # Only consider non-zero pitch values
pitch_values.append(pitch)
avg_pitch = float(np.mean(pitch_values)) if pitch_values else 0.0
pitch_variability = float(np.std(pitch_values)) if pitch_values else 0.0
# Rhythm analysis (using zero-crossing rate as a proxy)
zcr = librosa.feature.zero_crossing_rate(y)
avg_zcr = float(np.mean(zcr))
# Intensity analysis (RMS energy)
rms = librosa.feature.rms(y=y)
avg_rms = float(np.mean(rms))
# Calculate speaking rate (words per minute)
duration = len(y) / sr # in seconds
word_count = len(reference_text.split())
speaking_rate = (word_count / duration) * 60 if duration > 0 else 0 # words per minute
# Provide feedback based on prosodic features
prosody_feedback = []
if speaking_rate < 100:
prosody_feedback.append("Speaking rate is quite slow. Try to speak at a more natural pace.")
elif speaking_rate > 200:
prosody_feedback.append("Speaking rate is quite fast. Try to slow down for better clarity.")
else:
prosody_feedback.append("Speaking rate is good.")
if pitch_variability < 50:
prosody_feedback.append("Pitch variability is low. Try to use more intonation to make speech more expressive.")
else:
prosody_feedback.append("Good pitch variability, which makes speech more engaging.")
return {
"pitch": {
"average": avg_pitch,
"variability": pitch_variability
},
"rhythm": {
"zero_crossing_rate": avg_zcr
},
"intensity": {
"rms_energy": avg_rms
},
"speaking_rate": {
"words_per_minute": speaking_rate,
"duration_seconds": duration
},
"feedback": prosody_feedback
}
except Exception as e:
print(f"Prosody analysis error: {e}")
return {
"error": f"Prosody analysis failed: {str(e)}",
"pitch": {"average": 0, "variability": 0},
"rhythm": {"zero_crossing_rate": 0},
"intensity": {"rms_energy": 0},
"speaking_rate": {"words_per_minute": 0, "duration_seconds": 0},
"feedback": ["Prosody analysis unavailable"]
}