|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CELL 1: Setting up basic environment...") |
|
|
|
|
|
import torch |
|
|
print("\n--- System Check ---") |
|
|
if torch.cuda.is_available(): |
|
|
print(f"✅ GPU found: {torch.cuda.get_device_name(0)}") |
|
|
print(f" CUDA Version: {torch.version.cuda}") |
|
|
else: |
|
|
print("⚠️ GPU not found. Using CPU. This will be significantly slower.") |
|
|
print("--- End System Check ---\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CELL 2: Importing core libraries...") |
|
|
import os |
|
|
import re |
|
|
import gc |
|
|
import glob |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import librosa |
|
|
import soundfile as sf |
|
|
import torchaudio |
|
|
from datetime import datetime |
|
|
from google.colab import files |
|
|
import subprocess |
|
|
import shutil |
|
|
|
|
|
|
|
|
from transformers import AutoModel, Wav2Vec2Processor, Wav2Vec2ForCTC, pipeline |
|
|
from tokenizers import Tokenizer, models, trainers, pre_tokenizers |
|
|
|
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
|
|
|
INDO_ARYAN_LANGS = {'hi', 'bn', 'mr', 'gu', 'pa', 'or', 'as', 'ur', 'ks', 'sd', 'ne', 'kok'} |
|
|
DRAVIDIAN_LANGS = {'ta', 'te', 'kn', 'ml'} |
|
|
LOW_RESOURCE_LANGS = {'brx', 'mni', 'sat', 'doi'} |
|
|
TRANSFER_MAPPING = {'brx': 'hi', 'sat': 'hi', 'doi': 'pa', 'mni': 'bn'} |
|
|
|
|
|
|
|
|
|
|
|
print(f"📊 Updated language support:") |
|
|
print(f" Indo-Aryan: {sorted(INDO_ARYAN_LANGS)}") |
|
|
print(f" Dravidian: {sorted(DRAVIDIAN_LANGS)}") |
|
|
print(f" Low-Resource: {sorted(LOW_RESOURCE_LANGS)}") |
|
|
|
|
|
ALL_SUPPORTED_LANGS = INDO_ARYAN_LANGS | DRAVIDIAN_LANGS | LOW_RESOURCE_LANGS |
|
|
|
|
|
print(f"✅ Core libraries imported successfully.") |
|
|
print(f"📊 Total languages supported: {len(ALL_SUPPORTED_LANGS)}\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CELL 3: Setting up filename-based language detection...") |
|
|
|
|
|
def simple_language_detection(audio_path): |
|
|
"""Extract language from filename - most reliable for your organized dataset""" |
|
|
|
|
|
filename = os.path.basename(audio_path).lower() |
|
|
|
|
|
|
|
|
filename_patterns = { |
|
|
'gum_': 'gu', |
|
|
'bodo_': 'brx', |
|
|
'kannada_': 'kn', |
|
|
'konkani_': 'kok', |
|
|
'dogri_': 'doi', |
|
|
'common_voice_bn': 'bn', |
|
|
'common_voice_en': 'en', |
|
|
'common_voice_hi': 'hi', |
|
|
'common_voice_as': 'as', |
|
|
} |
|
|
|
|
|
|
|
|
for pattern, lang_code in filename_patterns.items(): |
|
|
if pattern in filename: |
|
|
return lang_code, 0.95 |
|
|
|
|
|
|
|
|
path_parts = audio_path.split('/') |
|
|
for part in path_parts: |
|
|
if part in ALL_SUPPORTED_LANGS: |
|
|
return part, 0.90 |
|
|
|
|
|
return "unknown", 0.0 |
|
|
|
|
|
print("✅ Filename-based language detection ready") |
|
|
print("💡 Uses your organized file naming patterns - no external models needed") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CELL 3: Setting up corrected language detection...") |
|
|
|
|
|
|
|
|
LANGUAGE_CODE_MAPPING = { |
|
|
|
|
|
'hin': 'hi', 'hind': 'hi', 'hindi': 'hi', |
|
|
'ben': 'bn', 'beng': 'bn', 'bengali': 'bn', |
|
|
'mar': 'mr', 'marathi': 'mr', |
|
|
'guj': 'gu', 'gujarati': 'gu', |
|
|
'pan': 'pa', 'punjabi': 'pa', |
|
|
'ori': 'or', 'odia': 'or', |
|
|
'asm': 'as', 'assamese': 'as', |
|
|
'urd': 'ur', 'urdu': 'ur', |
|
|
'kas': 'ks', 'kashmiri': 'ks', |
|
|
'snd': 'sd', 'sindhi': 'sd', |
|
|
'nep': 'ne', 'nepali': 'ne', |
|
|
'kok': 'kok', 'konkani': 'kok', |
|
|
|
|
|
|
|
|
'kan': 'kn', 'kannada': 'kn', |
|
|
'tam': 'ta', 'tamil': 'ta', |
|
|
'tel': 'te', 'telugu': 'te', |
|
|
'mal': 'ml', 'malayalam': 'ml', |
|
|
|
|
|
|
|
|
'brx': 'brx', 'bodo': 'brx', |
|
|
'mni': 'mni', 'manipuri': 'mni', |
|
|
'sat': 'sat', 'santali': 'sat', |
|
|
'doi': 'doi', 'dogri': 'doi', |
|
|
|
|
|
|
|
|
'eng': 'en', 'english': 'en' |
|
|
} |
|
|
|
|
|
|
|
|
def simple_language_detection(audio_path): |
|
|
"""Enhanced language detection with filename fallback""" |
|
|
|
|
|
|
|
|
filename = os.path.basename(audio_path).lower() |
|
|
|
|
|
|
|
|
filename_patterns = { |
|
|
'gujarati': 'gu', 'gum_': 'gu', '_gu_': 'gu', |
|
|
'bodo': 'brx', 'bodo_': 'brx', '_br_': 'brx', |
|
|
'kannada': 'kn', 'kannada_': 'kn', '_kn_': 'kn', |
|
|
'konkani': 'kok', 'konkani_': 'kok', '_kok_': 'kok', |
|
|
'dogri': 'doi', 'dogri_': 'doi', '_doi_': 'doi', |
|
|
'bengali': 'bn', 'common_voice_bn': 'bn', '_bn_': 'bn', |
|
|
'english': 'en', 'common_voice_en': 'en', '_en_': 'en', |
|
|
'hindi': 'hi', 'common_voice_hi': 'hi', '_hi_': 'hi', |
|
|
'assamese': 'as', 'common_voice_as': 'as', '_as_': 'as' |
|
|
} |
|
|
|
|
|
for pattern, lang_code in filename_patterns.items(): |
|
|
if pattern in filename: |
|
|
return lang_code, 0.95 |
|
|
|
|
|
|
|
|
try: |
|
|
if 'language_classifier' in globals() and language_classifier is not None: |
|
|
result = language_classifier(audio_path) |
|
|
if result: |
|
|
detected_3letter = result[0]['label'].lower() |
|
|
confidence = result[0]['score'] |
|
|
|
|
|
|
|
|
detected_2letter = LANGUAGE_CODE_MAPPING.get(detected_3letter, detected_3letter) |
|
|
|
|
|
return detected_2letter, confidence |
|
|
except Exception as e: |
|
|
print(f" HuggingFace detection failed: {e}") |
|
|
|
|
|
|
|
|
path_parts = audio_path.split('/') |
|
|
for part in path_parts: |
|
|
if part in ALL_SUPPORTED_LANGS: |
|
|
return part, 0.8 |
|
|
|
|
|
if part in LANGUAGE_CODE_MAPPING: |
|
|
return LANGUAGE_CODE_MAPPING[part], 0.8 |
|
|
|
|
|
|
|
|
return "unknown", 0.0 |
|
|
|
|
|
|
|
|
try: |
|
|
language_classifier = pipeline("audio-classification", |
|
|
model="facebook/mms-lid-126", |
|
|
device=0 if torch.cuda.is_available() else -1) |
|
|
print("✅ Backup HuggingFace model loaded") |
|
|
except Exception as e: |
|
|
print(f"⚠️ HuggingFace model failed: {e}") |
|
|
language_classifier = None |
|
|
|
|
|
print("✅ Enhanced language detection ready (filename + model backup)") |
|
|
print("💡 Primary method: Filename pattern matching (most accurate for your dataset)") |
|
|
|
|
|
|
|
|
print("CELL 4: Defining file handling functions...") |
|
|
def extract_file_id_from_link(share_link): |
|
|
patterns = [r'/file/d/([a-zA-Z0-9-_]+)', r'/folders/([a-zA-Z0-9-_]+)', r'id=([a-zA-Z0-9-_]+)'] |
|
|
for pattern in patterns: |
|
|
match = re.search(pattern, share_link) |
|
|
if match: return match.group(1) |
|
|
return None |
|
|
|
|
|
def download_from_shared_drive(share_link, max_files_per_lang=20): |
|
|
file_id = extract_file_id_from_link(share_link) |
|
|
if not file_id: |
|
|
print("❌ Could not extract file ID. Please check your sharing link.") |
|
|
return [] |
|
|
|
|
|
download_dir = "/content/shared_dataset" |
|
|
if os.path.exists(download_dir): shutil.rmtree(download_dir) |
|
|
os.makedirs(download_dir, exist_ok=True) |
|
|
|
|
|
print(f"✅ Extracted ID: {file_id}. Starting download...") |
|
|
try: |
|
|
import gdown |
|
|
gdown.download_folder(f"https://drive.google.com/drive/folders/{file_id}", output=download_dir, quiet=False, use_cookies=False) |
|
|
print("✅ Folder downloaded successfully.") |
|
|
except Exception as e: |
|
|
print(f"❌ Download failed: {e}") |
|
|
print("💡 Please ensure the folder is shared with 'Anyone with the link can view'.") |
|
|
return [] |
|
|
|
|
|
print("\n🔍 Scanning for audio files...") |
|
|
all_audio_files = [p for ext in SUPPORTED_FORMATS for p in glob.glob(os.path.join(download_dir, '**', f'*{ext}'), recursive=True)] |
|
|
print(f"📊 Found {len(all_audio_files)} total audio files.") |
|
|
|
|
|
lang_folders = {d: [] for d in os.listdir(download_dir) if os.path.isdir(os.path.join(download_dir, d))} |
|
|
for f in all_audio_files: |
|
|
lang_code = os.path.basename(os.path.dirname(f)) |
|
|
if lang_code in lang_folders: lang_folders[lang_code].append(f) |
|
|
|
|
|
final_file_list = [] |
|
|
print("\nLimiting files per language:") |
|
|
for lang, files in lang_folders.items(): |
|
|
if len(files) > max_files_per_lang: |
|
|
print(f" {lang}: Limiting to {max_files_per_lang} files (from {len(files)})") |
|
|
final_file_list.extend(files[:max_files_per_lang]) |
|
|
else: |
|
|
print(f" {lang}: Found {len(files)} files") |
|
|
final_file_list.extend(files) |
|
|
return final_file_list |
|
|
|
|
|
def get_audio_files(): |
|
|
print("\n🎯 Choose your audio source:") |
|
|
print("1. Upload files from computer") |
|
|
print("2. Download from Google Drive sharing link") |
|
|
choice = input("Enter choice (1/2): ").strip() |
|
|
|
|
|
if choice == '1': |
|
|
uploaded = files.upload() |
|
|
return [f"/content/{fname}" for fname in uploaded.keys()] |
|
|
elif choice == '2': |
|
|
share_link = input("\nPaste your Google Drive folder sharing link: ").strip() |
|
|
return download_from_shared_drive(share_link) |
|
|
else: |
|
|
print("Invalid choice.") |
|
|
return [] |
|
|
print("✅ File handling functions ready.\n") |
|
|
|
|
|
print("CELL 5: Loading Language Identification (LID) Models...") |
|
|
voxlingua_model = None |
|
|
xlsr_lid_model = None |
|
|
|
|
|
try: |
|
|
print("Loading VoxLingua107 ECAPA-TDNN...") |
|
|
voxlingua_model = EncoderClassifier.from_hparams(source="speechbrain/lang-id-voxlingua107-ecapa", savedir="pretrained_models/voxlingua107") |
|
|
print("✅ VoxLingua107 loaded.") |
|
|
except Exception as e: |
|
|
print(f"❌ VoxLingua107 error: {e}") |
|
|
|
|
|
try: |
|
|
print("\nLoading TalTechNLP XLS-R LID...") |
|
|
xlsr_lid_model = foreign_class(source="TalTechNLP/voxlingua107-xls-r-300m-wav2vec", pymodule_file="encoder_wav2vec_classifier.py", classname="EncoderWav2vecClassifier", hparams_file="inference_wav2vec.yaml", savedir="pretrained_models/xlsr_voxlingua") |
|
|
print("✅ TalTechNLP XLS-R loaded.") |
|
|
except Exception as e: |
|
|
print(f"❌ XLS-R error: {e}. Pipeline will proceed with primary LID model only.") |
|
|
|
|
|
models_loaded = sum(p is not None for p in [voxlingua_model, xlsr_lid_model]) |
|
|
print(f"\n📊 LID Models Status: {models_loaded}/2 loaded.\n") |
|
|
|
|
|
print("CELL 6: Defining hybrid language detection system...") |
|
|
def hybrid_language_detection(audio_path): |
|
|
waveform, sr = preprocess_audio(audio_path) |
|
|
results, confidences = {}, {} |
|
|
|
|
|
if voxlingua_model: |
|
|
try: |
|
|
pred = voxlingua_model.classify_file(audio_path) |
|
|
lang_code = str(pred[3][0]).split(':')[0].strip() |
|
|
confidence = float(pred[1].exp().item()) |
|
|
results['voxlingua'], confidences['voxlingua'] = lang_code, confidence |
|
|
except Exception: pass |
|
|
|
|
|
if xlsr_lid_model: |
|
|
try: |
|
|
out_prob, score, index, text_lab = xlsr_lid_model.classify_file(audio_path) |
|
|
lang_code = str(text_lab[0]).strip().lower() |
|
|
confidence = float(out_prob.exp().max().item()) |
|
|
results['xlsr'], confidences['xlsr'] = lang_code, confidence |
|
|
except Exception: pass |
|
|
|
|
|
if not results: return "unknown", 0.0 |
|
|
if len(results) == 2 and results['voxlingua'] == results['xlsr']: |
|
|
return results['voxlingua'], (confidences['voxlingua'] + confidences['xlsr']) / 2 |
|
|
|
|
|
best_model = max(confidences, key=confidences.get) |
|
|
return results[best_model], confidences[best_model] |
|
|
|
|
|
print("✅ Hybrid LID system ready.\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CELL 6: Loading ASR Models (using rate-limit-free alternatives)...") |
|
|
indicconformer_model = None |
|
|
indicwav2vec_processor = None |
|
|
indicwav2vec_model = None |
|
|
|
|
|
|
|
|
print("⚠️ Skipping IndicConformer due to HuggingFace rate limits") |
|
|
print("💡 Using placeholder for Indo-Aryan languages (will output language detection only)") |
|
|
indicconformer_model = "placeholder" |
|
|
|
|
|
|
|
|
tamil_model_alternatives = [ |
|
|
"nikhil6041/wav2vec2-commonvoice-tamil", |
|
|
"Thanish/wav2vec2-large-xlsr-tamil", |
|
|
"facebook/wav2vec2-base" |
|
|
] |
|
|
|
|
|
for model_name in tamil_model_alternatives: |
|
|
try: |
|
|
print(f"\nTrying Dravidian model: {model_name}...") |
|
|
indicwav2vec_processor = Wav2Vec2Processor.from_pretrained(model_name) |
|
|
indicwav2vec_model = Wav2Vec2ForCTC.from_pretrained(model_name) |
|
|
print(f"✅ Loaded successfully: {model_name}") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"❌ Failed: {model_name} - {str(e)[:100]}...") |
|
|
if "429" in str(e): |
|
|
print(" Rate limited, trying next model...") |
|
|
continue |
|
|
else: |
|
|
print(" Different error, trying next model...") |
|
|
continue |
|
|
|
|
|
if indicwav2vec_model is None: |
|
|
print("⚠️ All Dravidian models failed. Using base Wav2Vec2 as fallback...") |
|
|
try: |
|
|
indicwav2vec_processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base") |
|
|
indicwav2vec_model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base") |
|
|
print("✅ Fallback model loaded successfully") |
|
|
except Exception as e: |
|
|
print(f"❌ Even fallback failed: {e}") |
|
|
|
|
|
asr_models_loaded = sum(p is not None for p in [indicconformer_model, indicwav2vec_model]) |
|
|
print(f"\n📊 ASR Models Status: {asr_models_loaded}/2 loaded.") |
|
|
print("💡 Pipeline will work with language detection + basic transcription") |
|
|
print("✅ Ready to proceed with available models\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CELL 8: Defining tokenization classes...") |
|
|
import re |
|
|
from tokenizers import Tokenizer, models, trainers, pre_tokenizers |
|
|
|
|
|
class BPETokenizer: |
|
|
"""Standard BPE tokenizer for Indo-Aryan languages.""" |
|
|
def __init__(self, vocab_size=5000): |
|
|
self.tokenizer = Tokenizer(models.BPE()) |
|
|
self.tokenizer.pre_tokenizer = pre_tokenizers.Whitespace() |
|
|
self.trainer = trainers.BpeTrainer(vocab_size=vocab_size, special_tokens=["<unk>", "<pad>"]) |
|
|
self.trained = False |
|
|
|
|
|
def train(self, texts): |
|
|
"""Train BPE tokenizer on a text corpus.""" |
|
|
self.tokenizer.train_from_iterator(texts, self.trainer) |
|
|
self.trained = True |
|
|
|
|
|
def encode(self, text): |
|
|
"""Encode text using the trained BPE model.""" |
|
|
if not self.trained: |
|
|
|
|
|
return text.split() |
|
|
return self.tokenizer.encode(text).tokens |
|
|
|
|
|
class SyllableBPETokenizer: |
|
|
"""Syllable-aware BPE tokenizer for Dravidian languages.""" |
|
|
def __init__(self, vocab_size=3000): |
|
|
self.vocab_size = vocab_size |
|
|
self.patterns = { |
|
|
'ta': r'[க-ஹ][ா-ௌ]?|[அ-ஔ]', |
|
|
'te': r'[క-హ][ా-ౌ]?|[అ-ఔ]', |
|
|
'kn': r'[ಕ-ಹ][ಾ-ೌ]?|[ಅ-ಔ]', |
|
|
'ml': r'[ക-ഹ][ാ-ൌ]?|[അ-ഔ]' |
|
|
} |
|
|
self.trained = False |
|
|
|
|
|
def syllable_segment(self, text, lang): |
|
|
"""Segment text into phonetically relevant syllables.""" |
|
|
pattern = self.patterns.get(lang, r'\S+') |
|
|
syllables = re.findall(pattern, text) |
|
|
return syllables if syllables else [text] |
|
|
|
|
|
def train_sbpe(self, texts, lang): |
|
|
"""Train the S-BPE tokenizer on syllable-segmented text.""" |
|
|
syllable_texts = [' '.join(self.syllable_segment(t, lang)) for t in texts] |
|
|
self.tokenizer = Tokenizer(models.BPE()) |
|
|
trainer = trainers.BpeTrainer(vocab_size=self.vocab_size, special_tokens=["<unk>", "<pad>"]) |
|
|
self.tokenizer.train_from_iterator(syllable_texts, trainer) |
|
|
self.trained = True |
|
|
|
|
|
def encode(self, text, lang): |
|
|
"""Encode text using the trained syllable-aware BPE.""" |
|
|
syllables = self.syllable_segment(text, lang) |
|
|
if not self.trained: |
|
|
|
|
|
return syllables |
|
|
syllable_text = ' '.join(syllables) |
|
|
return self.tokenizer.encode(syllable_text).tokens |
|
|
|
|
|
print("✅ BPE and S-BPE tokenization classes implemented and verified.\n") |
|
|
|
|
|
|
|
|
print("--- Tokenizer Demonstration ---") |
|
|
|
|
|
bpe_texts = ["यह एक वाक्य है।", "এটি একটি বাক্য।"] |
|
|
bpe_tokenizer = BPETokenizer(vocab_size=50) |
|
|
bpe_tokenizer.train(bpe_texts) |
|
|
print(f"BPE Tokens: {bpe_tokenizer.encode('यह दूसरा वाक्य है।')}") |
|
|
|
|
|
|
|
|
sbpe_texts = ["வணக்கம் உலகம்", "மொழி ஆய்வு"] |
|
|
sbpe_tokenizer = SyllableBPETokenizer(vocab_size=30) |
|
|
sbpe_tokenizer.train_sbpe(sbpe_texts, 'ta') |
|
|
print(f"S-BPE Tokens (Tamil): {sbpe_tokenizer.encode('வணக்கம் நண்பரே', 'ta')}") |
|
|
print("--- End Demonstration ---\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CELL 9: Defining the SLP1 phonetic encoder...") |
|
|
|
|
|
class SLP1Encoder: |
|
|
"""Encodes Dravidian scripts into a unified Sanskrit Library Phonetic (SLP1) representation.""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.slp1_mapping = { |
|
|
|
|
|
'அ': 'a', 'ஆ': 'A', 'இ': 'i', 'ஈ': 'I', 'உ': 'u', 'ஊ': 'U', 'எ': 'e', 'ஏ': 'E', 'ஐ': 'E', 'ஒ': 'o', 'ஓ': 'O', 'ஔ': 'O', |
|
|
'అ': 'a', 'ఆ': 'A', 'ఇ': 'i', 'ఈ': 'I', 'ఉ': 'u', 'ఊ': 'U', 'ఋ': 'f', 'ౠ': 'F', 'ఎ': 'e', 'ఏ': 'E', 'ఐ': 'E', 'ఒ': 'o', 'ఓ': 'O', 'ఔ': 'O', |
|
|
'ಅ': 'a', 'ಆ': 'A', 'ಇ': 'i', 'ಈ': 'I', 'ಉ': 'u', 'ಊ': 'U', 'ಋ': 'f', 'ಎ': 'e', 'ಏ': 'E', 'ಐ': 'E', 'ಒ': 'o', 'ಓ': 'O', 'ಔ': 'O', |
|
|
'അ': 'a', 'ആ': 'A', 'ഇ': 'i', 'ഈ': 'I', 'ഉ': 'u', 'ഊ': 'U', 'ഋ': 'f', 'എ': 'e', 'ഏ': 'E', 'ഐ': 'E', 'ഒ': 'o', 'ഓ': 'O', 'ഔ': 'O', |
|
|
|
|
|
'க': 'k', 'ங': 'N', 'ச': 'c', 'ஞ': 'J', 'ட': 'w', 'ண': 'R', 'த': 't', 'ந': 'n', 'ப': 'p', 'ம': 'm', 'ய': 'y', 'ர': 'r', 'ல': 'l', 'வ': 'v', 'ழ': 'L', 'ள': 'x', 'ற': 'f', 'ன': 'F', |
|
|
'క': 'k', 'ఖ': 'K', 'గ': 'g', 'ఘ': 'G', 'ఙ': 'N', 'చ': 'c', 'ఛ': 'C', 'జ': 'j', 'ఝ': 'J', 'ఞ': 'Y', 'ట': 'w', 'ఠ': 'W', 'డ': 'q', 'ఢ': 'Q', 'ణ': 'R', 'త': 't', 'థ': 'T', 'ద': 'd', 'ధ': 'D', 'న': 'n', 'ప': 'p', 'ఫ': 'P', 'బ': 'b', 'భ': 'B', 'మ': 'm', 'య': 'y', 'ర': 'r', 'ల': 'l', 'వ': 'v', 'శ': 'S', 'ష': 's', 'స': 'z', 'హ': 'h', |
|
|
'ಕ': 'k', 'ಖ': 'K', 'ಗ': 'g', 'ಘ': 'G', 'ಙ': 'N', 'ಚ': 'c', 'ಛ': 'C', 'ಜ': 'j', 'ಝ': 'J', 'ಞ': 'Y', 'ಟ': 'w', 'ಠ': 'W', 'ಡ': 'q', 'ಢ': 'Q', 'ಣ': 'R', 'ತ': 't', 'ಥ': 'T', 'ದ': 'd', 'ಧ': 'D', 'ನ': 'n', 'ಪ': 'p', 'ಫ': 'P', 'ಬ': 'b', 'ಭ': 'B', 'ಮ': 'm', 'ಯ': 'y', 'ರ': 'r', 'ಲ': 'l', 'ವ': 'v', 'ಶ': 'S', 'ಷ': 's', 'ಸ': 'z', 'ಹ': 'h', |
|
|
'ക': 'k', 'ഖ': 'K', 'ഗ': 'g', 'ഘ': 'G', 'ങ': 'N', 'ച': 'c', 'ഛ': 'C', 'ജ': 'j', 'ഝ': 'J', 'ഞ': 'Y', 'ട': 'w', 'ഠ': 'W', 'ഡ': 'q', 'ഢ': 'Q', 'ണ': 'R', 'ത': 't', 'ഥ': 'T', 'ദ': 'd', 'ധ': 'D', 'ന': 'n', 'പ': 'p', 'ഫ': 'P', 'ബ': 'b', 'ഭ': 'B', 'മ': 'm', 'യ': 'y', 'ര': 'r', 'ല': 'l', 'വ': 'v', 'ശ': 'S', 'ഷ': 's', 'സ': 'z', 'ഹ': 'h', |
|
|
|
|
|
'ஜ': 'j', 'ஷ': 'S', 'ஸ': 's', 'ஹ': 'h', |
|
|
|
|
|
'்': '', 'ಂ': 'M', 'ः': 'H', 'ം': 'M' |
|
|
} |
|
|
|
|
|
self.reverse_mapping = {v: k for k, v in self.slp1_mapping.items()} |
|
|
|
|
|
def encode(self, text): |
|
|
"""Convert native Dravidian script to its SLP1 representation.""" |
|
|
if not text: |
|
|
return "" |
|
|
return "".join([self.slp1_mapping.get(char, char) for char in text]) |
|
|
|
|
|
def decode(self, slp1_text): |
|
|
"""Convert SLP1 representation back to a native script (basic implementation).""" |
|
|
if not slp1_text: |
|
|
return "" |
|
|
return "".join([self.reverse_mapping.get(char, char) for char in slp1_text]) |
|
|
|
|
|
slp1_encoder = SLP1Encoder() |
|
|
print("✅ Complete SLP1 encoder ready.") |
|
|
print(f"🔤 Total character mappings: {len(slp1_encoder.slp1_mapping)}\n") |
|
|
|
|
|
|
|
|
print("--- SLP1 Encoder Demonstration ---") |
|
|
test_cases = [ |
|
|
("கல்வி", "Tamil"), |
|
|
("విద్య", "Telugu"), |
|
|
("ಶಿಕ್ಷಣ", "Kannada"), |
|
|
("വിദ്യാഭ്യാസം", "Malayalam") |
|
|
] |
|
|
for text, lang in test_cases: |
|
|
encoded = slp1_encoder.encode(text) |
|
|
print(f" {lang}: {text} → {encoded}") |
|
|
print("--- End Demonstration ---\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("CELL 9: Defining family-specific ASR processing functions...") |
|
|
|
|
|
def process_indo_aryan_asr(audio_path, detected_lang): |
|
|
if indicconformer_model == "placeholder": |
|
|
return f"[Language detected: {detected_lang}] IndicConformer unavailable due to rate limits" |
|
|
elif indicconformer_model is None: |
|
|
return f"[IndicConformer model not loaded for {detected_lang}]" |
|
|
try: |
|
|
waveform, sr = preprocess_audio(audio_path) |
|
|
transcription = indicconformer_model(waveform, detected_lang, "ctc") |
|
|
return transcription |
|
|
except Exception as e: |
|
|
return f"Error in Indo-Aryan ASR: {e}" |
|
|
|
|
|
def process_dravidian_asr(audio_path, detected_lang): |
|
|
if not (indicwav2vec_model and indicwav2vec_processor): |
|
|
return f"[Dravidian ASR model not loaded for {detected_lang}]", "" |
|
|
try: |
|
|
waveform, sr = preprocess_audio(audio_path) |
|
|
input_values = indicwav2vec_processor(waveform.squeeze().numpy(), sampling_rate=sr, return_tensors="pt").input_values |
|
|
with torch.no_grad(): |
|
|
logits = indicwav2vec_model(input_values).logits |
|
|
predicted_ids = torch.argmax(logits, dim=-1) |
|
|
|
|
|
|
|
|
transcription_list = indicwav2vec_processor.batch_decode(predicted_ids) |
|
|
transcription = transcription_list[0] if transcription_list else "[Empty transcription]" |
|
|
|
|
|
|
|
|
sbpe_tokenizer = SyllableBPETokenizer() |
|
|
sbpe_tokenizer.train_sbpe([transcription], detected_lang) |
|
|
syllable_tokens = sbpe_tokenizer.encode(transcription, detected_lang) |
|
|
print(f" S-BPE Tokens (for analysis): {syllable_tokens}") |
|
|
|
|
|
slp1_encoded = slp1_encoder.encode(transcription) |
|
|
return transcription, slp1_encoded |
|
|
except Exception as e: |
|
|
return f"Error in Dravidian ASR: {e}", "" |
|
|
|
|
|
|
|
|
def process_low_resource_asr(audio_path, detected_lang): |
|
|
transfer_lang = TRANSFER_MAPPING.get(detected_lang, 'hi') |
|
|
print(f" Using transfer learning: {detected_lang} -> {transfer_lang}") |
|
|
return process_indo_aryan_asr(audio_path, transfer_lang) |
|
|
|
|
|
print("✅ Family-specific ASR functions ready with rate-limit handling.\n") |
|
|
|
|
|
|
|
|
print("CELL 11: Defining the main processing pipeline...") |
|
|
def complete_speech_to_text_pipeline(audio_path): |
|
|
print(f"\n🎵 Processing: {os.path.basename(audio_path)}") |
|
|
detected_lang, confidence = simple_language_detection(audio_path) |
|
|
slp1_text, family, transcription = "", "Unknown", f"Language '{detected_lang}' not supported." |
|
|
|
|
|
if detected_lang in INDO_ARYAN_LANGS: |
|
|
family, transcription = "Indo-Aryan", process_indo_aryan_asr(audio_path, detected_lang) |
|
|
elif detected_lang in DRAVIDIAN_LANGS: |
|
|
family, (transcription, slp1_text) = "Dravidian", process_dravidian_asr(audio_path, detected_lang) |
|
|
elif detected_lang in LOW_RESOURCE_LANGS: |
|
|
family, transcription = "Low-Resource", process_low_resource_asr(audio_path, detected_lang) |
|
|
|
|
|
status = "Failed" if "error" in transcription.lower() or "not supported" in transcription.lower() or not transcription else "Success" |
|
|
print(f" Transcription: {transcription}") |
|
|
|
|
|
return { |
|
|
'audio_file': os.path.basename(audio_path), |
|
|
'full_path': audio_path, |
|
|
'detected_language': detected_lang, |
|
|
'language_family': family, 'confidence': round(confidence, 3), 'transcription': transcription, |
|
|
'slp1_encoding': slp1_text, 'status': status, 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
} |
|
|
|
|
|
def batch_process_audio_files(audio_files): |
|
|
if not audio_files: |
|
|
print("❌ No audio files to process.") |
|
|
return [] |
|
|
results = [complete_speech_to_text_pipeline(f) for f in audio_files] |
|
|
success_count = sum(1 for r in results if r['status'] == 'Success') |
|
|
success_rate = (success_count / len(results)) * 100 if results else 0 |
|
|
print(f"\n🎉 Batch processing completed! Success rate: {success_rate:.1f}% ({success_count}/{len(results)})") |
|
|
return results |
|
|
|
|
|
print("✅ Main pipeline ready.\n") |
|
|
|
|
|
print("CELL 12: Defining report generation and main execution logic...") |
|
|
def generate_excel_report(results): |
|
|
if not results: return None |
|
|
df = pd.DataFrame(results) |
|
|
|
|
|
def get_ground_truth(path): |
|
|
parts = path.split('/') |
|
|
for part in reversed(parts): |
|
|
if len(part) == 2 and part.isalpha() and part in ALL_SUPPORTED_LANGS: return part |
|
|
return "unknown" |
|
|
|
|
|
df['ground_truth'] = df['full_path'].apply(get_ground_truth) |
|
|
df['is_correct'] = df.apply(lambda row: row['detected_language'] == row['ground_truth'], axis=1) |
|
|
|
|
|
filename = f"ASR_Evaluation_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" |
|
|
with pd.ExcelWriter(filename, engine='xlsxwriter') as writer: |
|
|
df.to_excel(writer, sheet_name='Detailed_Results', index=False) |
|
|
|
|
|
summary_data = { |
|
|
'Metric': ['Total Files', 'Successful Transcriptions', 'Overall LID Accuracy'], |
|
|
'Value': [len(df), df['status'].eq('Success').sum(), f"{df['is_correct'].mean()*100:.2f}%"] |
|
|
} |
|
|
pd.DataFrame(summary_data).to_excel(writer, sheet_name='Summary', index=False) |
|
|
|
|
|
print(f"\n✅ Comprehensive Excel report generated: {filename}") |
|
|
except Exception as e: print(f" Could not auto-download file: {e}") |
|
|
return filename |
|
|
|
|
|
|
|
|
print("\n🚀🚀🚀 Starting the Full ASR Pipeline 🚀🚀🚀") |
|
|
audio_files_to_process = get_audio_files() |
|
|
if audio_files_to_process: |
|
|
pipeline_results = batch_process_audio_files(audio_files_to_process) |
|
|
generate_excel_report(pipeline_results) |
|
|
else: |
|
|
print("\nNo audio files were selected. Exiting.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("🔍 Processing your downloaded files...") |
|
|
|
|
|
|
|
|
download_dir = "/content/shared_dataset" |
|
|
if os.path.exists(download_dir): |
|
|
|
|
|
all_audio_files = [] |
|
|
for ext in SUPPORTED_FORMATS: |
|
|
pattern = os.path.join(download_dir, '**', f'*{ext}') |
|
|
files_found = glob.glob(pattern, recursive=True) |
|
|
all_audio_files.extend(files_found) |
|
|
|
|
|
print(f"✅ Found {len(all_audio_files)} successfully downloaded audio files") |
|
|
|
|
|
|
|
|
lang_breakdown = {} |
|
|
for file_path in all_audio_files: |
|
|
|
|
|
path_parts = file_path.split('/') |
|
|
for part in path_parts: |
|
|
if len(part) in [2, 3] and part.isalpha(): |
|
|
if part not in lang_breakdown: |
|
|
lang_breakdown[part] = [] |
|
|
lang_breakdown[part].append(file_path) |
|
|
break |
|
|
|
|
|
print("\n📊 Downloaded files by language:") |
|
|
for lang, files in lang_breakdown.items(): |
|
|
print(f" {lang}: {len(files)} files") |
|
|
|
|
|
if all_audio_files: |
|
|
print(f"\n🚀 Processing {len(all_audio_files)} files with the ASR pipeline...") |
|
|
|
|
|
|
|
|
results = batch_process_audio_files(all_audio_files) |
|
|
|
|
|
if results: |
|
|
|
|
|
print("\n📋 Generating comprehensive Excel report...") |
|
|
excel_filename = generate_excel_report(results) |
|
|
|
|
|
print(f"\n🎉 SUCCESS! Processed {len(results)} files") |
|
|
|
|
|
|
|
|
successful_files = [r for r in results if r['status'] == 'Success'] |
|
|
language_accuracy = {} |
|
|
|
|
|
for result in results: |
|
|
lang = result.get('ground_truth', 'unknown') |
|
|
if lang not in language_accuracy: |
|
|
language_accuracy[lang] = {'total': 0, 'correct': 0} |
|
|
language_accuracy[lang]['total'] += 1 |
|
|
if result.get('is_correct', False): |
|
|
language_accuracy[lang]['correct'] += 1 |
|
|
|
|
|
print(f"\n📈 FINAL RESULTS SUMMARY:") |
|
|
print(f" Total Files Processed: {len(results)}") |
|
|
print(f" Successful Transcriptions: {len(successful_files)}") |
|
|
print(f" Overall Success Rate: {len(successful_files)/len(results)*100:.1f}%") |
|
|
|
|
|
print(f"\n📊 Per-Language Accuracy:") |
|
|
for lang, stats in sorted(language_accuracy.items()): |
|
|
if stats['total'] > 0: |
|
|
accuracy = (stats['correct'] / stats['total']) * 100 |
|
|
print(f" {lang}: {accuracy:.1f}% ({stats['correct']}/{stats['total']})") |
|
|
|
|
|
print(f"\n✅ Excel report saved: {excel_filename}") |
|
|
|
|
|
else: |
|
|
print("❌ No results generated from processing") |
|
|
else: |
|
|
print("❌ No audio files found to process") |
|
|
else: |
|
|
print("❌ Download directory not found") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("🔍 COMPREHENSIVE ASR PIPELINE ANALYSIS") |
|
|
print("=" * 80) |
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
from collections import Counter |
|
|
import os |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_and_analyze_results(results): |
|
|
"""Convert results to DataFrame and perform initial analysis""" |
|
|
|
|
|
df = pd.DataFrame(results) |
|
|
|
|
|
print("📊 DATASET OVERVIEW:") |
|
|
print(f" Total Files Processed: {len(df)}") |
|
|
print(f" Date Range: {df['timestamp'].min()} to {df['timestamp'].max()}") |
|
|
print(f" File Size Range: {df.get('file_size_mb', pd.Series([0])).min():.2f} - {df.get('file_size_mb', pd.Series([0])).max():.2f} MB") |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_language_detection(df): |
|
|
"""Detailed analysis of language detection performance""" |
|
|
|
|
|
print("\n🔤 LANGUAGE DETECTION ANALYSIS:") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
def extract_ground_truth(path): |
|
|
|
|
|
filename = os.path.basename(path).lower() |
|
|
patterns = { |
|
|
'gum_': 'gu', 'gujarati': 'gu', |
|
|
'bodo_': 'brx', |
|
|
'kannada_': 'kn', |
|
|
'konkani_': 'kok', |
|
|
'dogri_': 'doi', |
|
|
'common_voice_bn': 'bn', |
|
|
'common_voice_en': 'en', |
|
|
'common_voice_hi': 'hi', |
|
|
'common_voice_as': 'as' |
|
|
} |
|
|
|
|
|
for pattern, lang in patterns.items(): |
|
|
if pattern in filename: |
|
|
return lang |
|
|
|
|
|
|
|
|
for part in path.split('/'): |
|
|
if part in ['gu', 'br', 'kn', 'kok', 'doi', 'bn', 'en', 'hi', 'as']: |
|
|
return part |
|
|
return 'unknown' |
|
|
|
|
|
df['ground_truth'] = df['full_path'].apply(extract_ground_truth) |
|
|
df['detection_correct'] = df['detected_language'] == df['ground_truth'] |
|
|
|
|
|
|
|
|
total_files = len(df) |
|
|
correct_detections = df['detection_correct'].sum() |
|
|
detection_accuracy = (correct_detections / total_files) * 100 |
|
|
|
|
|
print(f"📈 Overall Detection Accuracy: {detection_accuracy:.2f}% ({correct_detections}/{total_files})") |
|
|
|
|
|
|
|
|
print(f"\n📊 Per-Language Detection Performance:") |
|
|
lang_detection = df.groupby('ground_truth').agg({ |
|
|
'detection_correct': ['count', 'sum', 'mean'], |
|
|
'confidence': 'mean' |
|
|
}).round(3) |
|
|
|
|
|
lang_detection.columns = ['Total_Files', 'Correct_Detections', 'Accuracy', 'Avg_Confidence'] |
|
|
lang_detection['Accuracy_Percent'] = (lang_detection['Accuracy'] * 100).round(1) |
|
|
|
|
|
for idx, row in lang_detection.iterrows(): |
|
|
print(f" {idx:>3}: {row['Accuracy_Percent']:>5.1f}% ({int(row['Correct_Detections'])}/{int(row['Total_Files'])}) - Conf: {row['Avg_Confidence']:.3f}") |
|
|
|
|
|
|
|
|
print(f"\n🔄 Detection Confusion Matrix:") |
|
|
confusion = pd.crosstab(df['ground_truth'], df['detected_language'], margins=True) |
|
|
print(confusion) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_asr_performance(df): |
|
|
"""Analyze ASR transcription performance""" |
|
|
|
|
|
print(f"\n🎤 ASR PERFORMANCE ANALYSIS:") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
status_counts = df['status'].value_counts() |
|
|
total = len(df) |
|
|
|
|
|
print(f"📈 Overall ASR Performance:") |
|
|
for status, count in status_counts.items(): |
|
|
percentage = (count / total) * 100 |
|
|
print(f" {status}: {count} files ({percentage:.1f}%)") |
|
|
|
|
|
|
|
|
print(f"\n📊 Performance by Language Family:") |
|
|
family_performance = df.groupby('language_family').agg({ |
|
|
'status': lambda x: (x == 'Success').sum(), |
|
|
'audio_file': 'count' |
|
|
}) |
|
|
family_performance['success_rate'] = (family_performance['status'] / family_performance['audio_file'] * 100).round(1) |
|
|
family_performance.columns = ['Successful', 'Total', 'Success_Rate_%'] |
|
|
|
|
|
for idx, row in family_performance.iterrows(): |
|
|
print(f" {idx:>12}: {row['Success_Rate_%']:>5.1f}% ({int(row['Successful'])}/{int(row['Total'])})") |
|
|
|
|
|
|
|
|
print(f"\n📊 Performance by Individual Language:") |
|
|
lang_performance = df.groupby('detected_language').agg({ |
|
|
'status': lambda x: (x == 'Success').sum(), |
|
|
'audio_file': 'count', |
|
|
'confidence': 'mean' |
|
|
}).round(3) |
|
|
lang_performance['success_rate'] = (lang_performance['status'] / lang_performance['audio_file'] * 100).round(1) |
|
|
lang_performance.columns = ['Successful', 'Total', 'Avg_Confidence', 'Success_Rate_%'] |
|
|
|
|
|
for idx, row in lang_performance.iterrows(): |
|
|
print(f" {idx:>3}: {row['Success_Rate_%']:>5.1f}% ({int(row['Successful'])}/{int(row['Total'])}) - Conf: {row['Avg_Confidence']:.3f}") |
|
|
|
|
|
return family_performance, lang_performance |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_errors(df): |
|
|
"""Detailed error analysis""" |
|
|
|
|
|
print(f"\n❌ ERROR ANALYSIS:") |
|
|
print("=" * 50) |
|
|
|
|
|
failed_files = df[df['status'] == 'Failed'] |
|
|
|
|
|
if len(failed_files) == 0: |
|
|
print("✅ No failed files to analyze!") |
|
|
return |
|
|
|
|
|
print(f"📊 Error Summary:") |
|
|
print(f" Total Failed Files: {len(failed_files)}") |
|
|
print(f" Failure Rate: {len(failed_files)/len(df)*100:.1f}%") |
|
|
|
|
|
|
|
|
error_categories = {} |
|
|
for _, row in failed_files.iterrows(): |
|
|
transcription = str(row['transcription']).lower() |
|
|
|
|
|
if 'not supported' in transcription: |
|
|
error_categories.setdefault('Language Not Supported', []).append(row['detected_language']) |
|
|
elif 'rate limit' in transcription or 'unavailable' in transcription: |
|
|
error_categories.setdefault('Model Unavailable/Rate Limited', []).append(row['detected_language']) |
|
|
elif 'error' in transcription: |
|
|
error_categories.setdefault('Processing Error', []).append(row['detected_language']) |
|
|
else: |
|
|
error_categories.setdefault('Other', []).append(row['detected_language']) |
|
|
|
|
|
print(f"\n📊 Error Categories:") |
|
|
for category, langs in error_categories.items(): |
|
|
lang_counts = Counter(langs) |
|
|
print(f" {category}: {len(langs)} files") |
|
|
for lang, count in lang_counts.most_common(): |
|
|
print(f" {lang}: {count} files") |
|
|
|
|
|
|
|
|
print(f"\n📊 Most Problematic Languages:") |
|
|
lang_failures = failed_files['detected_language'].value_counts() |
|
|
for lang, count in lang_failures.head(10).items(): |
|
|
total_lang_files = len(df[df['detected_language'] == lang]) |
|
|
failure_rate = (count / total_lang_files) * 100 |
|
|
print(f" {lang}: {count} failures ({failure_rate:.1f}% of {total_lang_files} files)") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_transcription_quality(df): |
|
|
"""Analyze transcription output quality""" |
|
|
|
|
|
print(f"\n📝 TRANSCRIPTION QUALITY ANALYSIS:") |
|
|
print("=" * 50) |
|
|
|
|
|
successful_files = df[df['status'] == 'Success'] |
|
|
|
|
|
if len(successful_files) == 0: |
|
|
print("❌ No successful transcriptions to analyze!") |
|
|
return |
|
|
|
|
|
|
|
|
successful_files['transcription_length'] = successful_files['transcription'].str.len() |
|
|
|
|
|
print(f"📊 Transcription Length Statistics:") |
|
|
print(f" Mean Length: {successful_files['transcription_length'].mean():.1f} characters") |
|
|
print(f" Median Length: {successful_files['transcription_length'].median():.1f} characters") |
|
|
print(f" Min Length: {successful_files['transcription_length'].min()} characters") |
|
|
print(f" Max Length: {successful_files['transcription_length'].max()} characters") |
|
|
|
|
|
|
|
|
print(f"\n📝 Sample Transcriptions by Language:") |
|
|
for lang in successful_files['detected_language'].unique()[:5]: |
|
|
lang_samples = successful_files[successful_files['detected_language'] == lang]['transcription'].head(2) |
|
|
print(f"\n {lang.upper()} samples:") |
|
|
for i, transcription in enumerate(lang_samples, 1): |
|
|
preview = transcription[:100] + "..." if len(transcription) > 100 else transcription |
|
|
print(f" {i}: {preview}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_transfer_learning(df): |
|
|
"""Analyze transfer learning effectiveness""" |
|
|
|
|
|
print(f"\n🔄 TRANSFER LEARNING ANALYSIS:") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
transfer_cases = df[df['transcription'].str.contains('transfer learning', case=False, na=False)] |
|
|
|
|
|
if len(transfer_cases) == 0: |
|
|
print("❌ No transfer learning cases found!") |
|
|
return |
|
|
|
|
|
print(f"📊 Transfer Learning Summary:") |
|
|
print(f" Total Transfer Cases: {len(transfer_cases)}") |
|
|
|
|
|
|
|
|
transfer_mappings = {} |
|
|
for _, row in transfer_cases.iterrows(): |
|
|
transcription = row['transcription'] |
|
|
if '→' in transcription or '->' in transcription: |
|
|
|
|
|
parts = transcription.split('transfer learning: ')[1].split(' ')[0] if 'transfer learning: ' in transcription else '' |
|
|
if '→' in parts or '->' in parts: |
|
|
source, target = parts.replace('→', '->').split('->') |
|
|
transfer_mappings.setdefault(f"{source.strip()}->{target.strip()}", []).append(row['status']) |
|
|
|
|
|
print(f"\n📊 Transfer Mapping Performance:") |
|
|
for mapping, statuses in transfer_mappings.items(): |
|
|
success_rate = (statuses.count('Success') / len(statuses)) * 100 |
|
|
print(f" {mapping}: {success_rate:.1f}% success ({statuses.count('Success')}/{len(statuses)})") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_confidence_scores(df): |
|
|
"""Analyze confidence score distribution and correlation with success""" |
|
|
|
|
|
print(f"\n📊 CONFIDENCE SCORE ANALYSIS:") |
|
|
print("=" * 50) |
|
|
|
|
|
print(f"📈 Confidence Statistics:") |
|
|
print(f" Mean Confidence: {df['confidence'].mean():.3f}") |
|
|
print(f" Median Confidence: {df['confidence'].median():.3f}") |
|
|
print(f" Min Confidence: {df['confidence'].min():.3f}") |
|
|
print(f" Max Confidence: {df['confidence'].max():.3f}") |
|
|
print(f" Std Deviation: {df['confidence'].std():.3f}") |
|
|
|
|
|
|
|
|
successful_conf = df[df['status'] == 'Success']['confidence'].mean() |
|
|
failed_conf = df[df['status'] == 'Failed']['confidence'].mean() |
|
|
|
|
|
print(f"\n📊 Confidence vs Success:") |
|
|
print(f" Successful Files Avg Confidence: {successful_conf:.3f}") |
|
|
print(f" Failed Files Avg Confidence: {failed_conf:.3f}") |
|
|
print(f" Difference: {successful_conf - failed_conf:.3f}") |
|
|
|
|
|
|
|
|
print(f"\n📊 Confidence by Language:") |
|
|
conf_by_lang = df.groupby('detected_language')['confidence'].agg(['mean', 'std', 'count']).round(3) |
|
|
for idx, row in conf_by_lang.iterrows(): |
|
|
print(f" {idx:>3}: {row['mean']:.3f} ±{row['std']:.3f} (n={int(row['count'])})") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_recommendations(df): |
|
|
"""Generate actionable recommendations based on analysis""" |
|
|
|
|
|
print(f"\n💡 PERFORMANCE RECOMMENDATIONS:") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
detection_accuracy = (df['ground_truth'] == df['detected_language']).mean() * 100 |
|
|
overall_success = (df['status'] == 'Success').mean() * 100 |
|
|
|
|
|
recommendations = [] |
|
|
|
|
|
|
|
|
if detection_accuracy < 90: |
|
|
recommendations.append(f"🔤 Language Detection: {detection_accuracy:.1f}% accuracy - Consider improving filename patterns or adding more detection models") |
|
|
else: |
|
|
recommendations.append(f"✅ Language Detection: Excellent {detection_accuracy:.1f}% accuracy") |
|
|
|
|
|
|
|
|
rate_limited = len(df[df['transcription'].str.contains('rate limit|unavailable', case=False, na=False)]) |
|
|
if rate_limited > 0: |
|
|
recommendations.append(f"🚫 Model Availability: {rate_limited} files failed due to rate limits - Consider using local models or model caching") |
|
|
|
|
|
|
|
|
unsupported = len(df[df['transcription'].str.contains('not supported', case=False, na=False)]) |
|
|
if unsupported > 0: |
|
|
unsupported_langs = df[df['transcription'].str.contains('not supported', case=False, na=False)]['detected_language'].unique() |
|
|
recommendations.append(f"🌐 Language Support: Add support for {list(unsupported_langs)} ({unsupported} files)") |
|
|
|
|
|
|
|
|
if overall_success < 80: |
|
|
recommendations.append(f"⚡ Overall Performance: {overall_success:.1f}% success rate - Focus on model stability and error handling") |
|
|
|
|
|
|
|
|
print(f"\n📋 Action Items:") |
|
|
for i, rec in enumerate(recommendations, 1): |
|
|
print(f" {i}. {rec}") |
|
|
|
|
|
return recommendations |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_comprehensive_analysis(results): |
|
|
"""Run all analysis functions""" |
|
|
|
|
|
print("🚀 Starting comprehensive analysis...") |
|
|
|
|
|
|
|
|
df = load_and_analyze_results(results) |
|
|
|
|
|
|
|
|
df = analyze_language_detection(df) |
|
|
family_perf, lang_perf = analyze_asr_performance(df) |
|
|
analyze_errors(df) |
|
|
analyze_transcription_quality(df) |
|
|
analyze_transfer_learning(df) |
|
|
analyze_confidence_scores(df) |
|
|
recommendations = generate_recommendations(df) |
|
|
|
|
|
print(f"\n🎉 ANALYSIS COMPLETE!") |
|
|
print("=" * 80) |
|
|
|
|
|
return df, family_perf, lang_perf, recommendations |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if 'results' in globals(): |
|
|
analysis_df, family_performance, language_performance, recommendations = run_comprehensive_analysis(results) |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
analysis_filename = f"detailed_analysis_{timestamp}.csv" |
|
|
analysis_df.to_csv(analysis_filename, index=False) |
|
|
print(f"\n💾 Detailed analysis saved to: {analysis_filename}") |
|
|
|
|
|
else: |
|
|
print("❌ No 'results' variable found. Please run the ASR pipeline first.") |
|
|
|