File size: 10,632 Bytes
f70e266
3856f8c
 
 
 
 
 
f70e266
3856f8c
 
f70e266
 
 
 
 
 
 
 
3856f8c
 
f70e266
3856f8c
a6526f0
 
f70e266
 
 
3856f8c
f70e266
 
3856f8c
 
a6526f0
f70e266
 
3856f8c
f70e266
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3856f8c
a6526f0
 
f70e266
 
a6526f0
f70e266
a6526f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3856f8c
f70e266
 
3856f8c
 
f70e266
 
 
 
3856f8c
f70e266
 
3856f8c
f70e266
3856f8c
f70e266
 
 
 
3856f8c
f70e266
 
 
 
 
3856f8c
f70e266
 
3856f8c
 
f70e266
 
 
3856f8c
 
 
 
 
 
 
 
 
 
f70e266
 
 
 
3856f8c
a6526f0
f70e266
 
3856f8c
 
f70e266
 
 
3856f8c
 
 
 
 
 
 
f70e266
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3856f8c
f70e266
3856f8c
f70e266
 
 
 
 
3856f8c
f70e266
 
3856f8c
f70e266
 
 
 
 
 
3856f8c
f70e266
 
3856f8c
a6526f0
f70e266
 
3856f8c
 
f70e266
 
 
 
 
 
 
 
 
 
3856f8c
 
 
f70e266
 
 
 
 
 
3856f8c
 
 
f70e266
3856f8c
f70e266
 
 
3856f8c
f70e266
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a6526f0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# =======================================================================
# analyzer/ASR_pt_br.py
# 巴西葡萄牙語發音分析器
# 版本:v2.0 (與 en_us.py 邏輯對齊)
# 描述:此版本完全遵循 en_us.py 的程式碼結構和算法實現,
#       僅在語言特定配置(模型名稱、G2P語言)上有所不同,
#       並採用了更健壯的、基於 Unicode 的 IPA 切分方法以適應葡萄牙語。
# =======================================================================

# --- 1. 匯入區 (與 en_us.py 保持一致) ---
import torch
import soundfile as sf
import librosa
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
import os
from phonemizer import phonemize
import numpy as np
from datetime import datetime, timezone
import unicodedata # 【保留】這是處理葡萄牙語鼻音等音素的更優方案
import re # 【保留】用於更準確地切分單詞

# --- 2. 全域設定與模型載入 ---
#    【已修改】移除了全域的 processor 和 model 變數。
#    【已修改】刪除了舊的 load_model() 函數。
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"INFO: ASR_pt_br.py is configured to use device: {DEVICE}")

# 【關鍵修改 1:設定為葡萄牙語 ASR 模型】
MODEL_NAME = "caiocrocha/wav2vec2-large-xlsr-53-phoneme-portuguese"

# --- 3. 智能 IPA 切分函數 ---
# 【關鍵修改 2:保留更優越的通用切分邏輯】
# 【保持不變】
def _tokenize_ipa(ipa_string: str) -> list:
    """
    將 IPA 字串智能地切分為音素列表,能正確處理帶有附加符號的組合字符。
    """
    phonemes = []
    s = ipa_string.replace(' ', '')
    i = 0
    while i < len(s):
        if i + 1 < len(s) and s[i:i+2] in {'dʒ', 'tʃ'}:
            phonemes.append(s[i:i+2])
            i += 2
            continue

        current_char = s[i]
        i += 1
        while i < len(s) and unicodedata.category(s[i]) == 'Mn':
            current_char += s[i]
            i += 1
        phonemes.append(current_char)
    return phonemes

# --- 4. 核心分析函數 (主入口) ---
#    【已修改】將模型載入和快取邏輯合併至此。
def analyze(audio_file_path: str, target_sentence: str, cache: dict = {}) -> dict:
    """
    接收音訊檔案路徑和目標葡萄牙語句子,回傳詳細的發音分析字典。
    模型會被載入並儲存在此函數獨立的 'cache' 中,實現狀態隔離。
    """
    # 檢查快取中是否已有模型,如果沒有則載入
    if "model" not in cache:
        print(f"快取未命中 (ASR_pt_br)。正在載入模型 '{MODEL_NAME}'...")
        try:
            # 載入模型並存入此函數的快取字典
            cache["processor"] = Wav2Vec2Processor.from_pretrained(MODEL_NAME)
            cache["model"] = Wav2Vec2ForCTC.from_pretrained(MODEL_NAME)
            cache["model"].to(DEVICE)
            print(f"模型 '{MODEL_NAME}' 已載入並快取。")
        except Exception as e:
            print(f"處理或載入模型 '{MODEL_NAME}' 時發生錯誤: {e}")
            raise RuntimeError(f"Failed to load model '{MODEL_NAME}': {e}")

    # 從此函數的獨立快取中獲取模型和處理器
    processor = cache["processor"]
    model = cache["model"]

    # --- 以下為原始分析邏輯,保持不變 ---
    # 1. 準備目標音素 (G2P)
    target_words_original = re.findall(r"[\w'-]+", target_sentence)
    cleaned_sentence = " ".join(target_words_original)

    # 【關鍵修改 3:設定 G2P 語言為 'pt-br'】
    target_ipa_by_word_str = phonemize(
        cleaned_sentence,
        language='pt-br',
        backend='espeak',
        with_stress=True,
        strip=True
    ).split()
    
    if len(target_words_original) != len(target_ipa_by_word_str):
        print(f"警告: G2P 後單詞數量 ({len(target_ipa_by_word_str)}) 與原始單詞數量 ({len(target_words_original)}) 不匹配。將進行截斷。")
        min_len = min(len(target_words_original), len(target_ipa_by_word_str))
        target_words_original = target_words_original[:min_len]
        target_ipa_by_word_str = target_ipa_by_word_str[:min_len]

    # 【關鍵修改 4:與 en_us.py 對齊,在準備目標音素時就清除所有不比較的符號】
    target_ipa_by_word = [
        _tokenize_ipa(word.replace('ˈ', '').replace('ˌ', '').replace('ː', ''))
        for word in target_ipa_by_word_str
    ]

    # 2. 處理音訊並進行語音辨識 (ASR)
    try:
        speech, sample_rate = sf.read(audio_file_path)
        if sample_rate != 16000:
            speech = librosa.resample(y=speech, orig_sr=sample_rate, target_sr=16000)
    except Exception as e:
        raise IOError(f"讀取或處理音訊時發生錯誤: {e}")
    
    input_values = processor(speech, sampling_rate=16000, return_tensors="pt").input_values
    input_values = input_values.to(DEVICE)
    with torch.no_grad():
        logits = model(input_values).logits
    predicted_ids = torch.argmax(logits, dim=-1)
    
    # 【關鍵修改 5:與 en_us.py 對齊,清理模型輸出以匹配目標音素的處理方式】
    user_ipa_full = processor.decode(predicted_ids[0]).replace('|', '').replace('ː', '')

    # 3. 執行對齊並格式化輸出
    word_alignments = _get_phoneme_alignments_by_word(user_ipa_full, target_ipa_by_word)
    return _format_to_json_structure(word_alignments, target_sentence, target_words_original)


# --- 5. 對齊函數 (與 en_us.py 的實現邏輯完全對齊) ---
# 【保持不變】
def _get_phoneme_alignments_by_word(user_phoneme_str, target_words_ipa_tokenized):
    """
    使用動態規劃執行音素對齊。
    (此函數實現與 en_us.py 完全相同)
    """
    user_phonemes = _tokenize_ipa(user_phoneme_str)
    
    target_phonemes_flat = []
    word_boundaries_indices = [] 
    current_idx = 0
    for word_ipa_tokens in target_words_ipa_tokenized:
        target_phonemes_flat.extend(word_ipa_tokens)
        current_idx += len(word_ipa_tokens)
        word_boundaries_indices.append(current_idx - 1)

    dp = np.zeros((len(user_phonemes) + 1, len(target_phonemes_flat) + 1))
    for i in range(1, len(user_phonemes) + 1): dp[i][0] = i
    for j in range(1, len(target_phonemes_flat) + 1): dp[0][j] = j
    for i in range(1, len(user_phonemes) + 1):
        for j in range(1, len(target_phonemes_flat) + 1):
            cost = 0 if user_phonemes[i-1] == target_phonemes_flat[j-1] else 1
            dp[i][j] = min(dp[i-1][j] + 1, dp[i][j-1] + 1, dp[i-1][j-1] + cost)

    i, j = len(user_phonemes), len(target_phonemes_flat)
    user_path, target_path = [], []
    while i > 0 or j > 0:
        cost = float('inf') if i == 0 or j == 0 else (0 if user_phonemes[i-1] == target_phonemes_flat[j-1] else 1)
        if i > 0 and j > 0 and dp[i][j] == dp[i-1][j-1] + cost:
            user_path.insert(0, user_phonemes[i-1]); target_path.insert(0, target_phonemes_flat[j-1]); i -= 1; j -= 1
        elif i > 0 and dp[i][j] == dp[i-1][j] + 1:
            user_path.insert(0, user_phonemes[i-1]); target_path.insert(0, '-'); i -= 1
        else:
            user_path.insert(0, '-'); target_path.insert(0, target_phonemes_flat[j-1]); j -= 1
    
    alignments_by_word = []
    word_start_idx_in_path = 0
    target_phoneme_counter_in_path = 0

    for path_idx, p in enumerate(target_path):
        if p != '-':
            if target_phoneme_counter_in_path in word_boundaries_indices:
                alignments_by_word.append({
                    "target": target_path[word_start_idx_in_path : path_idx + 1],
                    "user": user_path[word_start_idx_in_path : path_idx + 1]
                })
                word_start_idx_in_path = path_idx + 1
            target_phoneme_counter_in_path += 1
            
    return alignments_by_word

# --- 6. 格式化函數 (與 en_us.py 的實現邏輯完全對齊) ---
# 【保持不變】
def _format_to_json_structure(alignments, sentence, original_words) -> dict:
    """
    將對齊結果格式化為最終的 JSON 結構。
    (此函數實現與 en_us.py 完全相同,僅 G2P 語言設定不同)
    """
    total_phonemes, total_errors, correct_words_count = 0, 0, 0
    words_data = []
    num_words_to_process = min(len(alignments), len(original_words))

    for i in range(num_words_to_process):
        alignment = alignments[i]
        word_is_correct = True
        phonemes_data = []
        
        for j in range(len(alignment['target'])):
            target_phoneme = alignment['target'][j]
            user_phoneme = alignment['user'][j]
            is_match = (user_phoneme == target_phoneme)
            phonemes_data.append({"target": target_phoneme, "user": user_phoneme, "isMatch": is_match})
            if not is_match:
                word_is_correct = False
                if not (user_phoneme == '-' and target_phoneme == '-'): total_errors += 1
        
        if word_is_correct:
            correct_words_count += 1
            
        words_data.append({"word": original_words[i], "isCorrect": word_is_correct, "phonemes": phonemes_data})
        total_phonemes += sum(1 for p in alignment['target'] if p != '-')

    if len(alignments) < len(original_words):
        for i in range(len(alignments), len(original_words)):
            # 【關鍵修改 6:確保此處的 G2P 語言和符號清理也保持一致】
            missed_word_ipa_str = phonemize(original_words[i], language='pt-br', backend='espeak', strip=True).replace('ː', '')
            missed_word_ipa = _tokenize_ipa(missed_word_ipa_str)
            phonemes_data = []
            for p_ipa in missed_word_ipa:
                phonemes_data.append({"target": p_ipa, "user": "-", "isMatch": False})
                total_errors += 1
                total_phonemes += 1
            words_data.append({"word": original_words[i], "isCorrect": False, "phonemes": phonemes_data})

    total_words = len(original_words)
    overall_score = (correct_words_count / total_words) * 100 if total_words > 0 else 0
    phoneme_error_rate = (total_errors / total_phonemes) * 100 if total_phonemes > 0 else 0

    return {
        "sentence": sentence,
        "analysisTimestampUTC": datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S (UTC)'),
        "summary": {
            "overallScore": round(overall_score, 1),
            "totalWords": total_words,
            "correctWords": correct_words_count,
            "phonemeErrorRate": round(phoneme_error_rate, 2),
            "total_errors": total_errors,
            "total_target_phonemes": total_phonemes
        },
        "words": words_data
    }