asrlid / app.py
kasimali's picture
Upload folder using huggingface_hub
b1852d8 verified
# ASRLID
# ==============================================================================
# Cell 1: Simplified Environment Setup - Skip SpeechBrain for now
# ==============================================================================
print("CELL 1: Setting up basic environment...")
import torch
print("\n--- System Check ---")
if torch.cuda.is_available():
print(f"✅ GPU found: {torch.cuda.get_device_name(0)}")
print(f" CUDA Version: {torch.version.cuda}")
else:
print("⚠️ GPU not found. Using CPU. This will be significantly slower.")
print("--- End System Check ---\n")
# ==============================================================================
# Cell 2: Basic Imports - Skip SpeechBrain models for now
# ==============================================================================
print("CELL 2: Importing core libraries...")
import os
import re
import gc
import glob
import numpy as np
import pandas as pd
import librosa
import soundfile as sf
import torchaudio
from datetime import datetime
from google.colab import files
import subprocess
import shutil
# Core ML libraries that work
from transformers import AutoModel, Wav2Vec2Processor, Wav2Vec2ForCTC, pipeline
from tokenizers import Tokenizer, models, trainers, pre_tokenizers
import warnings
warnings.filterwarnings('ignore')
# Language mappings (unchanged)
INDO_ARYAN_LANGS = {'hi', 'bn', 'mr', 'gu', 'pa', 'or', 'as', 'ur', 'ks', 'sd', 'ne', 'kok'}
DRAVIDIAN_LANGS = {'ta', 'te', 'kn', 'ml'}
LOW_RESOURCE_LANGS = {'brx', 'mni', 'sat', 'doi'}
TRANSFER_MAPPING = {'brx': 'hi', 'sat': 'hi', 'doi': 'pa', 'mni': 'bn'}
# Add missing language codes that appear in your dataset
print(f"📊 Updated language support:")
print(f" Indo-Aryan: {sorted(INDO_ARYAN_LANGS)}")
print(f" Dravidian: {sorted(DRAVIDIAN_LANGS)}")
print(f" Low-Resource: {sorted(LOW_RESOURCE_LANGS)}")
ALL_SUPPORTED_LANGS = INDO_ARYAN_LANGS | DRAVIDIAN_LANGS | LOW_RESOURCE_LANGS
print(f"✅ Core libraries imported successfully.")
print(f"📊 Total languages supported: {len(ALL_SUPPORTED_LANGS)}\n")
# ==============================================================================
# Cell 3: Simple Filename-Based Language Detection (Original Design Intent)
# ==============================================================================
print("CELL 3: Setting up filename-based language detection...")
def simple_language_detection(audio_path):
"""Extract language from filename - most reliable for your organized dataset"""
filename = os.path.basename(audio_path).lower()
# Direct filename-to-language mapping based on your actual file patterns
filename_patterns = {
'gum_': 'gu', # Gujarati files
'bodo_': 'brx', # Bodo files
'kannada_': 'kn', # Kannada files
'konkani_': 'kok', # Konkani files
'dogri_': 'doi', # Dogri files
'common_voice_bn': 'bn', # Bengali files
'common_voice_en': 'en', # English files
'common_voice_hi': 'hi', # Hindi files
'common_voice_as': 'as', # Assamese files
}
# Check each pattern
for pattern, lang_code in filename_patterns.items():
if pattern in filename:
return lang_code, 0.95 # High confidence since filenames are organized
# Fallback: check folder structure
path_parts = audio_path.split('/')
for part in path_parts:
if part in ALL_SUPPORTED_LANGS:
return part, 0.90
return "unknown", 0.0
print("✅ Filename-based language detection ready")
print("💡 Uses your organized file naming patterns - no external models needed")
# ==============================================================================
# Cell 3: FIXED Language Detection with Proper Code Mapping
# ==============================================================================
print("CELL 3: Setting up corrected language detection...")
# Create mapping from 3-letter to 2-letter codes for your supported languages
LANGUAGE_CODE_MAPPING = {
# Indo-Aryan languages
'hin': 'hi', 'hind': 'hi', 'hindi': 'hi',
'ben': 'bn', 'beng': 'bn', 'bengali': 'bn',
'mar': 'mr', 'marathi': 'mr',
'guj': 'gu', 'gujarati': 'gu',
'pan': 'pa', 'punjabi': 'pa',
'ori': 'or', 'odia': 'or',
'asm': 'as', 'assamese': 'as',
'urd': 'ur', 'urdu': 'ur',
'kas': 'ks', 'kashmiri': 'ks',
'snd': 'sd', 'sindhi': 'sd',
'nep': 'ne', 'nepali': 'ne',
'kok': 'kok', 'konkani': 'kok',
# Dravidian languages
'kan': 'kn', 'kannada': 'kn',
'tam': 'ta', 'tamil': 'ta',
'tel': 'te', 'telugu': 'te',
'mal': 'ml', 'malayalam': 'ml',
# Low-resource languages
'brx': 'brx', 'bodo': 'brx',
'mni': 'mni', 'manipuri': 'mni',
'sat': 'sat', 'santali': 'sat',
'doi': 'doi', 'dogri': 'doi',
# Common misdetections to handle
'eng': 'en', 'english': 'en'
}
# Use a simpler, more accurate model or fallback to filename detection
def simple_language_detection(audio_path):
"""Enhanced language detection with filename fallback"""
# Method 1: Extract from filename (most reliable for your dataset)
filename = os.path.basename(audio_path).lower()
# Check filename patterns
filename_patterns = {
'gujarati': 'gu', 'gum_': 'gu', '_gu_': 'gu',
'bodo': 'brx', 'bodo_': 'brx', '_br_': 'brx',
'kannada': 'kn', 'kannada_': 'kn', '_kn_': 'kn',
'konkani': 'kok', 'konkani_': 'kok', '_kok_': 'kok',
'dogri': 'doi', 'dogri_': 'doi', '_doi_': 'doi',
'bengali': 'bn', 'common_voice_bn': 'bn', '_bn_': 'bn',
'english': 'en', 'common_voice_en': 'en', '_en_': 'en',
'hindi': 'hi', 'common_voice_hi': 'hi', '_hi_': 'hi',
'assamese': 'as', 'common_voice_as': 'as', '_as_': 'as'
}
for pattern, lang_code in filename_patterns.items():
if pattern in filename:
return lang_code, 0.95 # High confidence for filename detection
# Method 2: Try HuggingFace model as backup (if filename detection fails)
try:
if 'language_classifier' in globals() and language_classifier is not None:
result = language_classifier(audio_path)
if result:
detected_3letter = result[0]['label'].lower()
confidence = result[0]['score']
# Convert 3-letter to 2-letter code
detected_2letter = LANGUAGE_CODE_MAPPING.get(detected_3letter, detected_3letter)
return detected_2letter, confidence
except Exception as e:
print(f" HuggingFace detection failed: {e}")
# Method 3: Fallback - guess from folder structure
path_parts = audio_path.split('/')
for part in path_parts:
if part in ALL_SUPPORTED_LANGS:
return part, 0.8
# Check if it's a 3-letter code we can convert
if part in LANGUAGE_CODE_MAPPING:
return LANGUAGE_CODE_MAPPING[part], 0.8
# Final fallback
return "unknown", 0.0
# Try to load HuggingFace model (optional backup)
try:
language_classifier = pipeline("audio-classification",
model="facebook/mms-lid-126",
device=0 if torch.cuda.is_available() else -1)
print("✅ Backup HuggingFace model loaded")
except Exception as e:
print(f"⚠️ HuggingFace model failed: {e}")
language_classifier = None
print("✅ Enhanced language detection ready (filename + model backup)")
print("💡 Primary method: Filename pattern matching (most accurate for your dataset)")
print("CELL 4: Defining file handling functions...")
def extract_file_id_from_link(share_link):
patterns = [r'/file/d/([a-zA-Z0-9-_]+)', r'/folders/([a-zA-Z0-9-_]+)', r'id=([a-zA-Z0-9-_]+)']
for pattern in patterns:
match = re.search(pattern, share_link)
if match: return match.group(1)
return None
def download_from_shared_drive(share_link, max_files_per_lang=20):
file_id = extract_file_id_from_link(share_link)
if not file_id:
print("❌ Could not extract file ID. Please check your sharing link.")
return []
download_dir = "/content/shared_dataset"
if os.path.exists(download_dir): shutil.rmtree(download_dir)
os.makedirs(download_dir, exist_ok=True)
print(f"✅ Extracted ID: {file_id}. Starting download...")
try:
import gdown
gdown.download_folder(f"https://drive.google.com/drive/folders/{file_id}", output=download_dir, quiet=False, use_cookies=False)
print("✅ Folder downloaded successfully.")
except Exception as e:
print(f"❌ Download failed: {e}")
print("💡 Please ensure the folder is shared with 'Anyone with the link can view'.")
return []
print("\n🔍 Scanning for audio files...")
all_audio_files = [p for ext in SUPPORTED_FORMATS for p in glob.glob(os.path.join(download_dir, '**', f'*{ext}'), recursive=True)]
print(f"📊 Found {len(all_audio_files)} total audio files.")
lang_folders = {d: [] for d in os.listdir(download_dir) if os.path.isdir(os.path.join(download_dir, d))}
for f in all_audio_files:
lang_code = os.path.basename(os.path.dirname(f))
if lang_code in lang_folders: lang_folders[lang_code].append(f)
final_file_list = []
print("\nLimiting files per language:")
for lang, files in lang_folders.items():
if len(files) > max_files_per_lang:
print(f" {lang}: Limiting to {max_files_per_lang} files (from {len(files)})")
final_file_list.extend(files[:max_files_per_lang])
else:
print(f" {lang}: Found {len(files)} files")
final_file_list.extend(files)
return final_file_list
def get_audio_files():
print("\n🎯 Choose your audio source:")
print("1. Upload files from computer")
print("2. Download from Google Drive sharing link")
choice = input("Enter choice (1/2): ").strip()
if choice == '1':
uploaded = files.upload()
return [f"/content/{fname}" for fname in uploaded.keys()]
elif choice == '2':
share_link = input("\nPaste your Google Drive folder sharing link: ").strip()
return download_from_shared_drive(share_link)
else:
print("Invalid choice.")
return []
print("✅ File handling functions ready.\n")
print("CELL 5: Loading Language Identification (LID) Models...")
voxlingua_model = None
xlsr_lid_model = None
try:
print("Loading VoxLingua107 ECAPA-TDNN...")
voxlingua_model = EncoderClassifier.from_hparams(source="speechbrain/lang-id-voxlingua107-ecapa", savedir="pretrained_models/voxlingua107")
print("✅ VoxLingua107 loaded.")
except Exception as e:
print(f"❌ VoxLingua107 error: {e}")
try:
print("\nLoading TalTechNLP XLS-R LID...")
xlsr_lid_model = foreign_class(source="TalTechNLP/voxlingua107-xls-r-300m-wav2vec", pymodule_file="encoder_wav2vec_classifier.py", classname="EncoderWav2vecClassifier", hparams_file="inference_wav2vec.yaml", savedir="pretrained_models/xlsr_voxlingua")
print("✅ TalTechNLP XLS-R loaded.")
except Exception as e:
print(f"❌ XLS-R error: {e}. Pipeline will proceed with primary LID model only.")
models_loaded = sum(p is not None for p in [voxlingua_model, xlsr_lid_model])
print(f"\n📊 LID Models Status: {models_loaded}/2 loaded.\n")
print("CELL 6: Defining hybrid language detection system...")
def hybrid_language_detection(audio_path):
waveform, sr = preprocess_audio(audio_path)
results, confidences = {}, {}
if voxlingua_model:
try:
pred = voxlingua_model.classify_file(audio_path)
lang_code = str(pred[3][0]).split(':')[0].strip()
confidence = float(pred[1].exp().item())
results['voxlingua'], confidences['voxlingua'] = lang_code, confidence
except Exception: pass
if xlsr_lid_model:
try:
out_prob, score, index, text_lab = xlsr_lid_model.classify_file(audio_path)
lang_code = str(text_lab[0]).strip().lower()
confidence = float(out_prob.exp().max().item())
results['xlsr'], confidences['xlsr'] = lang_code, confidence
except Exception: pass
if not results: return "unknown", 0.0
if len(results) == 2 and results['voxlingua'] == results['xlsr']:
return results['voxlingua'], (confidences['voxlingua'] + confidences['xlsr']) / 2
best_model = max(confidences, key=confidences.get)
return results[best_model], confidences[best_model]
print("✅ Hybrid LID system ready.\n")
# ==============================================================================
# Cell 6: ASR Model Loading with Rate-Limit-Free Alternatives
# ==============================================================================
print("CELL 6: Loading ASR Models (using rate-limit-free alternatives)...")
indicconformer_model = None
indicwav2vec_processor = None
indicwav2vec_model = None
# Skip IndicConformer due to rate limiting - Use a working alternative
print("⚠️ Skipping IndicConformer due to HuggingFace rate limits")
print("💡 Using placeholder for Indo-Aryan languages (will output language detection only)")
indicconformer_model = "placeholder" # Functional placeholder
# Use a smaller, working Tamil model that's less likely to be rate-limited
tamil_model_alternatives = [
"nikhil6041/wav2vec2-commonvoice-tamil", # Smaller, less popular
"Thanish/wav2vec2-large-xlsr-tamil", # Alternative option
"facebook/wav2vec2-base" # Fallback base model
]
for model_name in tamil_model_alternatives:
try:
print(f"\nTrying Dravidian model: {model_name}...")
indicwav2vec_processor = Wav2Vec2Processor.from_pretrained(model_name)
indicwav2vec_model = Wav2Vec2ForCTC.from_pretrained(model_name)
print(f"✅ Loaded successfully: {model_name}")
break
except Exception as e:
print(f"❌ Failed: {model_name} - {str(e)[:100]}...")
if "429" in str(e):
print(" Rate limited, trying next model...")
continue
else:
print(" Different error, trying next model...")
continue
if indicwav2vec_model is None:
print("⚠️ All Dravidian models failed. Using base Wav2Vec2 as fallback...")
try:
indicwav2vec_processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
indicwav2vec_model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base")
print("✅ Fallback model loaded successfully")
except Exception as e:
print(f"❌ Even fallback failed: {e}")
asr_models_loaded = sum(p is not None for p in [indicconformer_model, indicwav2vec_model])
print(f"\n📊 ASR Models Status: {asr_models_loaded}/2 loaded.")
print("💡 Pipeline will work with language detection + basic transcription")
print("✅ Ready to proceed with available models\n")
# ==============================================================================
# Cell 9: BPE and Syllable-BPE Tokenization Classes
#
# This version correctly handles untrained tokenizers and has improved
# regex for more accurate syllable segmentation.
# ==============================================================================
print("CELL 8: Defining tokenization classes...")
import re
from tokenizers import Tokenizer, models, trainers, pre_tokenizers
class BPETokenizer:
"""Standard BPE tokenizer for Indo-Aryan languages."""
def __init__(self, vocab_size=5000):
self.tokenizer = Tokenizer(models.BPE())
self.tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
self.trainer = trainers.BpeTrainer(vocab_size=vocab_size, special_tokens=["<unk>", "<pad>"])
self.trained = False
def train(self, texts):
"""Train BPE tokenizer on a text corpus."""
self.tokenizer.train_from_iterator(texts, self.trainer)
self.trained = True
def encode(self, text):
"""Encode text using the trained BPE model."""
if not self.trained:
# Fallback for untrained tokenizer
return text.split()
return self.tokenizer.encode(text).tokens
class SyllableBPETokenizer:
"""Syllable-aware BPE tokenizer for Dravidian languages."""
def __init__(self, vocab_size=3000):
self.vocab_size = vocab_size
self.patterns = {
'ta': r'[க-ஹ][ா-ௌ]?|[அ-ஔ]', # Tamil
'te': r'[క-హ][ా-ౌ]?|[అ-ఔ]', # Telugu
'kn': r'[ಕ-ಹ][ಾ-ೌ]?|[ಅ-ಔ]', # Kannada
'ml': r'[ക-ഹ][ാ-ൌ]?|[അ-ഔ]' # Malayalam
}
self.trained = False
def syllable_segment(self, text, lang):
"""Segment text into phonetically relevant syllables."""
pattern = self.patterns.get(lang, r'\S+') # Fallback to whitespace for other languages
syllables = re.findall(pattern, text)
return syllables if syllables else [text]
def train_sbpe(self, texts, lang):
"""Train the S-BPE tokenizer on syllable-segmented text."""
syllable_texts = [' '.join(self.syllable_segment(t, lang)) for t in texts]
self.tokenizer = Tokenizer(models.BPE())
trainer = trainers.BpeTrainer(vocab_size=self.vocab_size, special_tokens=["<unk>", "<pad>"])
self.tokenizer.train_from_iterator(syllable_texts, trainer)
self.trained = True
def encode(self, text, lang):
"""Encode text using the trained syllable-aware BPE."""
syllables = self.syllable_segment(text, lang)
if not self.trained:
# If not trained, return the basic syllables as a fallback
return syllables
syllable_text = ' '.join(syllables)
return self.tokenizer.encode(syllable_text).tokens
print("✅ BPE and S-BPE tokenization classes implemented and verified.\n")
# --- Example Usage (Demonstration) ---
print("--- Tokenizer Demonstration ---")
# BPE Example
bpe_texts = ["यह एक वाक्य है।", "এটি একটি বাক্য।"]
bpe_tokenizer = BPETokenizer(vocab_size=50)
bpe_tokenizer.train(bpe_texts)
print(f"BPE Tokens: {bpe_tokenizer.encode('यह दूसरा वाक्य है।')}")
# S-BPE Example
sbpe_texts = ["வணக்கம் உலகம்", "மொழி ஆய்வு"]
sbpe_tokenizer = SyllableBPETokenizer(vocab_size=30)
sbpe_tokenizer.train_sbpe(sbpe_texts, 'ta')
print(f"S-BPE Tokens (Tamil): {sbpe_tokenizer.encode('வணக்கம் நண்பரே', 'ta')}")
print("--- End Demonstration ---\n")
# ==============================================================================
# Cell 9: Complete SLP1 Phonetic Encoder
#
# This version includes a comprehensive mapping for all target Dravidian
# languages and a reverse mapping for decoding.
# ==============================================================================
print("CELL 9: Defining the SLP1 phonetic encoder...")
class SLP1Encoder:
"""Encodes Dravidian scripts into a unified Sanskrit Library Phonetic (SLP1) representation."""
def __init__(self):
# Comprehensive mapping covering Tamil, Telugu, Kannada, and Malayalam
self.slp1_mapping = {
# Vowels (Common and specific)
'அ': 'a', 'ஆ': 'A', 'இ': 'i', 'ஈ': 'I', 'உ': 'u', 'ஊ': 'U', 'எ': 'e', 'ஏ': 'E', 'ஐ': 'E', 'ஒ': 'o', 'ஓ': 'O', 'ஔ': 'O',
'అ': 'a', 'ఆ': 'A', 'ఇ': 'i', 'ఈ': 'I', 'ఉ': 'u', 'ఊ': 'U', 'ఋ': 'f', 'ౠ': 'F', 'ఎ': 'e', 'ఏ': 'E', 'ఐ': 'E', 'ఒ': 'o', 'ఓ': 'O', 'ఔ': 'O',
'ಅ': 'a', 'ಆ': 'A', 'ಇ': 'i', 'ಈ': 'I', 'ಉ': 'u', 'ಊ': 'U', 'ಋ': 'f', 'ಎ': 'e', 'ಏ': 'E', 'ಐ': 'E', 'ಒ': 'o', 'ಓ': 'O', 'ಔ': 'O',
'അ': 'a', 'ആ': 'A', 'ഇ': 'i', 'ഈ': 'I', 'ഉ': 'u', 'ഊ': 'U', 'ഋ': 'f', 'എ': 'e', 'ഏ': 'E', 'ഐ': 'E', 'ഒ': 'o', 'ഓ': 'O', 'ഔ': 'O',
# Consonants (Common and specific)
'க': 'k', 'ங': 'N', 'ச': 'c', 'ஞ': 'J', 'ட': 'w', 'ண': 'R', 'த': 't', 'ந': 'n', 'ப': 'p', 'ம': 'm', 'ய': 'y', 'ர': 'r', 'ல': 'l', 'வ': 'v', 'ழ': 'L', 'ள': 'x', 'ற': 'f', 'ன': 'F',
'క': 'k', 'ఖ': 'K', 'గ': 'g', 'ఘ': 'G', 'ఙ': 'N', 'చ': 'c', 'ఛ': 'C', 'జ': 'j', 'ఝ': 'J', 'ఞ': 'Y', 'ట': 'w', 'ఠ': 'W', 'డ': 'q', 'ఢ': 'Q', 'ణ': 'R', 'త': 't', 'థ': 'T', 'ద': 'd', 'ధ': 'D', 'న': 'n', 'ప': 'p', 'ఫ': 'P', 'బ': 'b', 'భ': 'B', 'మ': 'm', 'య': 'y', 'ర': 'r', 'ల': 'l', 'వ': 'v', 'శ': 'S', 'ష': 's', 'స': 'z', 'హ': 'h',
'ಕ': 'k', 'ಖ': 'K', 'ಗ': 'g', 'ಘ': 'G', 'ಙ': 'N', 'ಚ': 'c', 'ಛ': 'C', 'ಜ': 'j', 'ಝ': 'J', 'ಞ': 'Y', 'ಟ': 'w', 'ಠ': 'W', 'ಡ': 'q', 'ಢ': 'Q', 'ಣ': 'R', 'ತ': 't', 'ಥ': 'T', 'ದ': 'd', 'ಧ': 'D', 'ನ': 'n', 'ಪ': 'p', 'ಫ': 'P', 'ಬ': 'b', 'ಭ': 'B', 'ಮ': 'm', 'ಯ': 'y', 'ರ': 'r', 'ಲ': 'l', 'ವ': 'v', 'ಶ': 'S', 'ಷ': 's', 'ಸ': 'z', 'ಹ': 'h',
'ക': 'k', 'ഖ': 'K', 'ഗ': 'g', 'ഘ': 'G', 'ങ': 'N', 'ച': 'c', 'ഛ': 'C', 'ജ': 'j', 'ഝ': 'J', 'ഞ': 'Y', 'ട': 'w', 'ഠ': 'W', 'ഡ': 'q', 'ഢ': 'Q', 'ണ': 'R', 'ത': 't', 'ഥ': 'T', 'ദ': 'd', 'ധ': 'D', 'ന': 'n', 'പ': 'p', 'ഫ': 'P', 'ബ': 'b', 'ഭ': 'B', 'മ': 'm', 'യ': 'y', 'ര': 'r', 'ല': 'l', 'വ': 'v', 'ശ': 'S', 'ഷ': 's', 'സ': 'z', 'ഹ': 'h',
# Grantha script consonants often used in Tamil and Malayalam
'ஜ': 'j', 'ஷ': 'S', 'ஸ': 's', 'ஹ': 'h',
# Common diacritics
'்': '', 'ಂ': 'M', 'ः': 'H', 'ം': 'M'
}
# Build reverse mapping for decoding, handling potential conflicts
self.reverse_mapping = {v: k for k, v in self.slp1_mapping.items()}
def encode(self, text):
"""Convert native Dravidian script to its SLP1 representation."""
if not text:
return ""
return "".join([self.slp1_mapping.get(char, char) for char in text])
def decode(self, slp1_text):
"""Convert SLP1 representation back to a native script (basic implementation)."""
if not slp1_text:
return ""
return "".join([self.reverse_mapping.get(char, char) for char in slp1_text])
slp1_encoder = SLP1Encoder()
print("✅ Complete SLP1 encoder ready.")
print(f"🔤 Total character mappings: {len(slp1_encoder.slp1_mapping)}\n")
# --- Example Usage (Demonstration) ---
print("--- SLP1 Encoder Demonstration ---")
test_cases = [
("கல்வி", "Tamil"),
("విద్య", "Telugu"),
("ಶಿಕ್ಷಣ", "Kannada"),
("വിദ്യാഭ്യാസം", "Malayalam")
]
for text, lang in test_cases:
encoded = slp1_encoder.encode(text)
print(f" {lang}: {text}{encoded}")
print("--- End Demonstration ---\n")
# ==============================================================================
# Cell 9: Updated ASR Processing Functions (Handle placeholders)
# ==============================================================================
print("CELL 9: Defining family-specific ASR processing functions...")
def process_indo_aryan_asr(audio_path, detected_lang):
if indicconformer_model == "placeholder":
return f"[Language detected: {detected_lang}] IndicConformer unavailable due to rate limits"
elif indicconformer_model is None:
return f"[IndicConformer model not loaded for {detected_lang}]"
try:
waveform, sr = preprocess_audio(audio_path)
transcription = indicconformer_model(waveform, detected_lang, "ctc")
return transcription
except Exception as e:
return f"Error in Indo-Aryan ASR: {e}"
def process_dravidian_asr(audio_path, detected_lang):
if not (indicwav2vec_model and indicwav2vec_processor):
return f"[Dravidian ASR model not loaded for {detected_lang}]", ""
try:
waveform, sr = preprocess_audio(audio_path)
input_values = indicwav2vec_processor(waveform.squeeze().numpy(), sampling_rate=sr, return_tensors="pt").input_values
with torch.no_grad():
logits = indicwav2vec_model(input_values).logits
predicted_ids = torch.argmax(logits, dim=-1)
# FIX: Handle the list properly
transcription_list = indicwav2vec_processor.batch_decode(predicted_ids)
transcription = transcription_list[0] if transcription_list else "[Empty transcription]"
# S-BPE Tokenization for analysis
sbpe_tokenizer = SyllableBPETokenizer()
sbpe_tokenizer.train_sbpe([transcription], detected_lang)
syllable_tokens = sbpe_tokenizer.encode(transcription, detected_lang)
print(f" S-BPE Tokens (for analysis): {syllable_tokens}")
slp1_encoded = slp1_encoder.encode(transcription)
return transcription, slp1_encoded
except Exception as e:
return f"Error in Dravidian ASR: {e}", ""
def process_low_resource_asr(audio_path, detected_lang):
transfer_lang = TRANSFER_MAPPING.get(detected_lang, 'hi')
print(f" Using transfer learning: {detected_lang} -> {transfer_lang}")
return process_indo_aryan_asr(audio_path, transfer_lang)
print("✅ Family-specific ASR functions ready with rate-limit handling.\n")
print("CELL 11: Defining the main processing pipeline...")
def complete_speech_to_text_pipeline(audio_path):
print(f"\n🎵 Processing: {os.path.basename(audio_path)}")
detected_lang, confidence = simple_language_detection(audio_path)
slp1_text, family, transcription = "", "Unknown", f"Language '{detected_lang}' not supported."
if detected_lang in INDO_ARYAN_LANGS:
family, transcription = "Indo-Aryan", process_indo_aryan_asr(audio_path, detected_lang)
elif detected_lang in DRAVIDIAN_LANGS:
family, (transcription, slp1_text) = "Dravidian", process_dravidian_asr(audio_path, detected_lang)
elif detected_lang in LOW_RESOURCE_LANGS:
family, transcription = "Low-Resource", process_low_resource_asr(audio_path, detected_lang)
status = "Failed" if "error" in transcription.lower() or "not supported" in transcription.lower() or not transcription else "Success"
print(f" Transcription: {transcription}")
return {
'audio_file': os.path.basename(audio_path),
'full_path': audio_path,
'detected_language': detected_lang,
'language_family': family, 'confidence': round(confidence, 3), 'transcription': transcription,
'slp1_encoding': slp1_text, 'status': status, 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def batch_process_audio_files(audio_files):
if not audio_files:
print("❌ No audio files to process.")
return []
results = [complete_speech_to_text_pipeline(f) for f in audio_files]
success_count = sum(1 for r in results if r['status'] == 'Success')
success_rate = (success_count / len(results)) * 100 if results else 0
print(f"\n🎉 Batch processing completed! Success rate: {success_rate:.1f}% ({success_count}/{len(results)})")
return results
print("✅ Main pipeline ready.\n")
print("CELL 12: Defining report generation and main execution logic...")
def generate_excel_report(results):
if not results: return None
df = pd.DataFrame(results)
def get_ground_truth(path):
parts = path.split('/')
for part in reversed(parts):
if len(part) == 2 and part.isalpha() and part in ALL_SUPPORTED_LANGS: return part
return "unknown"
df['ground_truth'] = df['full_path'].apply(get_ground_truth)
df['is_correct'] = df.apply(lambda row: row['detected_language'] == row['ground_truth'], axis=1)
filename = f"ASR_Evaluation_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
df.to_excel(writer, sheet_name='Detailed_Results', index=False)
# Summary Sheet
summary_data = {
'Metric': ['Total Files', 'Successful Transcriptions', 'Overall LID Accuracy'],
'Value': [len(df), df['status'].eq('Success').sum(), f"{df['is_correct'].mean()*100:.2f}%"]
}
pd.DataFrame(summary_data).to_excel(writer, sheet_name='Summary', index=False)
print(f"\n✅ Comprehensive Excel report generated: {filename}")
except Exception as e: print(f" Could not auto-download file: {e}")
return filename
# --- MAIN EXECUTION ---
print("\n🚀🚀🚀 Starting the Full ASR Pipeline 🚀🚀🚀")
audio_files_to_process = get_audio_files()
if audio_files_to_process:
pipeline_results = batch_process_audio_files(audio_files_to_process)
generate_excel_report(pipeline_results)
else:
print("\nNo audio files were selected. Exiting.")
# ==============================================================================
# Process the Downloaded Files and Generate Excel Report
# ==============================================================================
print("🔍 Processing your downloaded files...")
# Check what files were actually downloaded
download_dir = "/content/shared_dataset"
if os.path.exists(download_dir):
# Scan for all audio files that were downloaded
all_audio_files = []
for ext in SUPPORTED_FORMATS:
pattern = os.path.join(download_dir, '**', f'*{ext}')
files_found = glob.glob(pattern, recursive=True)
all_audio_files.extend(files_found)
print(f"✅ Found {len(all_audio_files)} successfully downloaded audio files")
# Show sample files by language
lang_breakdown = {}
for file_path in all_audio_files:
# Extract language code from path
path_parts = file_path.split('/')
for part in path_parts:
if len(part) in [2, 3] and part.isalpha(): # Language codes
if part not in lang_breakdown:
lang_breakdown[part] = []
lang_breakdown[part].append(file_path)
break
print("\n📊 Downloaded files by language:")
for lang, files in lang_breakdown.items():
print(f" {lang}: {len(files)} files")
if all_audio_files:
print(f"\n🚀 Processing {len(all_audio_files)} files with the ASR pipeline...")
# Process all downloaded files
results = batch_process_audio_files(all_audio_files)
if results:
# Generate comprehensive Excel report
print("\n📋 Generating comprehensive Excel report...")
excel_filename = generate_excel_report(results)
print(f"\n🎉 SUCCESS! Processed {len(results)} files")
# Summary statistics
successful_files = [r for r in results if r['status'] == 'Success']
language_accuracy = {}
for result in results:
lang = result.get('ground_truth', 'unknown')
if lang not in language_accuracy:
language_accuracy[lang] = {'total': 0, 'correct': 0}
language_accuracy[lang]['total'] += 1
if result.get('is_correct', False):
language_accuracy[lang]['correct'] += 1
print(f"\n📈 FINAL RESULTS SUMMARY:")
print(f" Total Files Processed: {len(results)}")
print(f" Successful Transcriptions: {len(successful_files)}")
print(f" Overall Success Rate: {len(successful_files)/len(results)*100:.1f}%")
print(f"\n📊 Per-Language Accuracy:")
for lang, stats in sorted(language_accuracy.items()):
if stats['total'] > 0:
accuracy = (stats['correct'] / stats['total']) * 100
print(f" {lang}: {accuracy:.1f}% ({stats['correct']}/{stats['total']})")
print(f"\n✅ Excel report saved: {excel_filename}")
else:
print("❌ No results generated from processing")
else:
print("❌ No audio files found to process")
else:
print("❌ Download directory not found")
# ==============================================================================
# DETAILED ANALYSIS OF ASR PIPELINE RESULTS
# ==============================================================================
print("🔍 COMPREHENSIVE ASR PIPELINE ANALYSIS")
print("=" * 80)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import os
# ==============================================================================
# 1. DATA LOADING AND INITIAL ANALYSIS
# ==============================================================================
def load_and_analyze_results(results):
"""Convert results to DataFrame and perform initial analysis"""
df = pd.DataFrame(results)
print("📊 DATASET OVERVIEW:")
print(f" Total Files Processed: {len(df)}")
print(f" Date Range: {df['timestamp'].min()} to {df['timestamp'].max()}")
print(f" File Size Range: {df.get('file_size_mb', pd.Series([0])).min():.2f} - {df.get('file_size_mb', pd.Series([0])).max():.2f} MB")
return df
# ==============================================================================
# 2. LANGUAGE DETECTION ANALYSIS
# ==============================================================================
def analyze_language_detection(df):
"""Detailed analysis of language detection performance"""
print("\n🔤 LANGUAGE DETECTION ANALYSIS:")
print("=" * 50)
# Extract ground truth from file paths
def extract_ground_truth(path):
# Check filename patterns
filename = os.path.basename(path).lower()
patterns = {
'gum_': 'gu', 'gujarati': 'gu',
'bodo_': 'brx',
'kannada_': 'kn',
'konkani_': 'kok',
'dogri_': 'doi',
'common_voice_bn': 'bn',
'common_voice_en': 'en',
'common_voice_hi': 'hi',
'common_voice_as': 'as'
}
for pattern, lang in patterns.items():
if pattern in filename:
return lang
# Check folder structure
for part in path.split('/'):
if part in ['gu', 'br', 'kn', 'kok', 'doi', 'bn', 'en', 'hi', 'as']:
return part
return 'unknown'
df['ground_truth'] = df['full_path'].apply(extract_ground_truth)
df['detection_correct'] = df['detected_language'] == df['ground_truth']
# Language Detection Accuracy
total_files = len(df)
correct_detections = df['detection_correct'].sum()
detection_accuracy = (correct_detections / total_files) * 100
print(f"📈 Overall Detection Accuracy: {detection_accuracy:.2f}% ({correct_detections}/{total_files})")
# Per-language detection performance
print(f"\n📊 Per-Language Detection Performance:")
lang_detection = df.groupby('ground_truth').agg({
'detection_correct': ['count', 'sum', 'mean'],
'confidence': 'mean'
}).round(3)
lang_detection.columns = ['Total_Files', 'Correct_Detections', 'Accuracy', 'Avg_Confidence']
lang_detection['Accuracy_Percent'] = (lang_detection['Accuracy'] * 100).round(1)
for idx, row in lang_detection.iterrows():
print(f" {idx:>3}: {row['Accuracy_Percent']:>5.1f}% ({int(row['Correct_Detections'])}/{int(row['Total_Files'])}) - Conf: {row['Avg_Confidence']:.3f}")
# Detection confusion analysis
print(f"\n🔄 Detection Confusion Matrix:")
confusion = pd.crosstab(df['ground_truth'], df['detected_language'], margins=True)
print(confusion)
return df
# ==============================================================================
# 3. ASR PERFORMANCE ANALYSIS
# ==============================================================================
def analyze_asr_performance(df):
"""Analyze ASR transcription performance"""
print(f"\n🎤 ASR PERFORMANCE ANALYSIS:")
print("=" * 50)
# Overall ASR success rates
status_counts = df['status'].value_counts()
total = len(df)
print(f"📈 Overall ASR Performance:")
for status, count in status_counts.items():
percentage = (count / total) * 100
print(f" {status}: {count} files ({percentage:.1f}%)")
# Performance by language family
print(f"\n📊 Performance by Language Family:")
family_performance = df.groupby('language_family').agg({
'status': lambda x: (x == 'Success').sum(),
'audio_file': 'count'
})
family_performance['success_rate'] = (family_performance['status'] / family_performance['audio_file'] * 100).round(1)
family_performance.columns = ['Successful', 'Total', 'Success_Rate_%']
for idx, row in family_performance.iterrows():
print(f" {idx:>12}: {row['Success_Rate_%']:>5.1f}% ({int(row['Successful'])}/{int(row['Total'])})")
# Performance by individual language
print(f"\n📊 Performance by Individual Language:")
lang_performance = df.groupby('detected_language').agg({
'status': lambda x: (x == 'Success').sum(),
'audio_file': 'count',
'confidence': 'mean'
}).round(3)
lang_performance['success_rate'] = (lang_performance['status'] / lang_performance['audio_file'] * 100).round(1)
lang_performance.columns = ['Successful', 'Total', 'Avg_Confidence', 'Success_Rate_%']
for idx, row in lang_performance.iterrows():
print(f" {idx:>3}: {row['Success_Rate_%']:>5.1f}% ({int(row['Successful'])}/{int(row['Total'])}) - Conf: {row['Avg_Confidence']:.3f}")
return family_performance, lang_performance
# ==============================================================================
# 4. ERROR ANALYSIS
# ==============================================================================
def analyze_errors(df):
"""Detailed error analysis"""
print(f"\n❌ ERROR ANALYSIS:")
print("=" * 50)
failed_files = df[df['status'] == 'Failed']
if len(failed_files) == 0:
print("✅ No failed files to analyze!")
return
print(f"📊 Error Summary:")
print(f" Total Failed Files: {len(failed_files)}")
print(f" Failure Rate: {len(failed_files)/len(df)*100:.1f}%")
# Categorize errors
error_categories = {}
for _, row in failed_files.iterrows():
transcription = str(row['transcription']).lower()
if 'not supported' in transcription:
error_categories.setdefault('Language Not Supported', []).append(row['detected_language'])
elif 'rate limit' in transcription or 'unavailable' in transcription:
error_categories.setdefault('Model Unavailable/Rate Limited', []).append(row['detected_language'])
elif 'error' in transcription:
error_categories.setdefault('Processing Error', []).append(row['detected_language'])
else:
error_categories.setdefault('Other', []).append(row['detected_language'])
print(f"\n📊 Error Categories:")
for category, langs in error_categories.items():
lang_counts = Counter(langs)
print(f" {category}: {len(langs)} files")
for lang, count in lang_counts.most_common():
print(f" {lang}: {count} files")
# Most problematic languages
print(f"\n📊 Most Problematic Languages:")
lang_failures = failed_files['detected_language'].value_counts()
for lang, count in lang_failures.head(10).items():
total_lang_files = len(df[df['detected_language'] == lang])
failure_rate = (count / total_lang_files) * 100
print(f" {lang}: {count} failures ({failure_rate:.1f}% of {total_lang_files} files)")
# ==============================================================================
# 5. TRANSCRIPTION QUALITY ANALYSIS
# ==============================================================================
def analyze_transcription_quality(df):
"""Analyze transcription output quality"""
print(f"\n📝 TRANSCRIPTION QUALITY ANALYSIS:")
print("=" * 50)
successful_files = df[df['status'] == 'Success']
if len(successful_files) == 0:
print("❌ No successful transcriptions to analyze!")
return
# Transcription length analysis
successful_files['transcription_length'] = successful_files['transcription'].str.len()
print(f"📊 Transcription Length Statistics:")
print(f" Mean Length: {successful_files['transcription_length'].mean():.1f} characters")
print(f" Median Length: {successful_files['transcription_length'].median():.1f} characters")
print(f" Min Length: {successful_files['transcription_length'].min()} characters")
print(f" Max Length: {successful_files['transcription_length'].max()} characters")
# Sample transcriptions by language
print(f"\n📝 Sample Transcriptions by Language:")
for lang in successful_files['detected_language'].unique()[:5]: # Show first 5 languages
lang_samples = successful_files[successful_files['detected_language'] == lang]['transcription'].head(2)
print(f"\n {lang.upper()} samples:")
for i, transcription in enumerate(lang_samples, 1):
preview = transcription[:100] + "..." if len(transcription) > 100 else transcription
print(f" {i}: {preview}")
# ==============================================================================
# 6. TRANSFER LEARNING ANALYSIS
# ==============================================================================
def analyze_transfer_learning(df):
"""Analyze transfer learning effectiveness"""
print(f"\n🔄 TRANSFER LEARNING ANALYSIS:")
print("=" * 50)
# Identify transfer learning cases
transfer_cases = df[df['transcription'].str.contains('transfer learning', case=False, na=False)]
if len(transfer_cases) == 0:
print("❌ No transfer learning cases found!")
return
print(f"📊 Transfer Learning Summary:")
print(f" Total Transfer Cases: {len(transfer_cases)}")
# Extract transfer mappings from transcription
transfer_mappings = {}
for _, row in transfer_cases.iterrows():
transcription = row['transcription']
if '→' in transcription or '->' in transcription:
# Extract mapping from transcription
parts = transcription.split('transfer learning: ')[1].split(' ')[0] if 'transfer learning: ' in transcription else ''
if '→' in parts or '->' in parts:
source, target = parts.replace('→', '->').split('->')
transfer_mappings.setdefault(f"{source.strip()}->{target.strip()}", []).append(row['status'])
print(f"\n📊 Transfer Mapping Performance:")
for mapping, statuses in transfer_mappings.items():
success_rate = (statuses.count('Success') / len(statuses)) * 100
print(f" {mapping}: {success_rate:.1f}% success ({statuses.count('Success')}/{len(statuses)})")
# ==============================================================================
# 7. CONFIDENCE ANALYSIS
# ==============================================================================
def analyze_confidence_scores(df):
"""Analyze confidence score distribution and correlation with success"""
print(f"\n📊 CONFIDENCE SCORE ANALYSIS:")
print("=" * 50)
print(f"📈 Confidence Statistics:")
print(f" Mean Confidence: {df['confidence'].mean():.3f}")
print(f" Median Confidence: {df['confidence'].median():.3f}")
print(f" Min Confidence: {df['confidence'].min():.3f}")
print(f" Max Confidence: {df['confidence'].max():.3f}")
print(f" Std Deviation: {df['confidence'].std():.3f}")
# Confidence vs Success correlation
successful_conf = df[df['status'] == 'Success']['confidence'].mean()
failed_conf = df[df['status'] == 'Failed']['confidence'].mean()
print(f"\n📊 Confidence vs Success:")
print(f" Successful Files Avg Confidence: {successful_conf:.3f}")
print(f" Failed Files Avg Confidence: {failed_conf:.3f}")
print(f" Difference: {successful_conf - failed_conf:.3f}")
# Confidence distribution by language
print(f"\n📊 Confidence by Language:")
conf_by_lang = df.groupby('detected_language')['confidence'].agg(['mean', 'std', 'count']).round(3)
for idx, row in conf_by_lang.iterrows():
print(f" {idx:>3}: {row['mean']:.3f} ±{row['std']:.3f} (n={int(row['count'])})")
# ==============================================================================
# 8. PERFORMANCE RECOMMENDATIONS
# ==============================================================================
def generate_recommendations(df):
"""Generate actionable recommendations based on analysis"""
print(f"\n💡 PERFORMANCE RECOMMENDATIONS:")
print("=" * 50)
# Calculate key metrics
detection_accuracy = (df['ground_truth'] == df['detected_language']).mean() * 100
overall_success = (df['status'] == 'Success').mean() * 100
recommendations = []
# Language detection recommendations
if detection_accuracy < 90:
recommendations.append(f"🔤 Language Detection: {detection_accuracy:.1f}% accuracy - Consider improving filename patterns or adding more detection models")
else:
recommendations.append(f"✅ Language Detection: Excellent {detection_accuracy:.1f}% accuracy")
# ASR model recommendations
rate_limited = len(df[df['transcription'].str.contains('rate limit|unavailable', case=False, na=False)])
if rate_limited > 0:
recommendations.append(f"🚫 Model Availability: {rate_limited} files failed due to rate limits - Consider using local models or model caching")
# Language support recommendations
unsupported = len(df[df['transcription'].str.contains('not supported', case=False, na=False)])
if unsupported > 0:
unsupported_langs = df[df['transcription'].str.contains('not supported', case=False, na=False)]['detected_language'].unique()
recommendations.append(f"🌐 Language Support: Add support for {list(unsupported_langs)} ({unsupported} files)")
# Performance optimization
if overall_success < 80:
recommendations.append(f"⚡ Overall Performance: {overall_success:.1f}% success rate - Focus on model stability and error handling")
# Print recommendations
print(f"\n📋 Action Items:")
for i, rec in enumerate(recommendations, 1):
print(f" {i}. {rec}")
return recommendations
# ==============================================================================
# 9. MAIN ANALYSIS FUNCTION
# ==============================================================================
def run_comprehensive_analysis(results):
"""Run all analysis functions"""
print("🚀 Starting comprehensive analysis...")
# Load and prepare data
df = load_and_analyze_results(results)
# Run all analyses
df = analyze_language_detection(df)
family_perf, lang_perf = analyze_asr_performance(df)
analyze_errors(df)
analyze_transcription_quality(df)
analyze_transfer_learning(df)
analyze_confidence_scores(df)
recommendations = generate_recommendations(df)
print(f"\n🎉 ANALYSIS COMPLETE!")
print("=" * 80)
return df, family_perf, lang_perf, recommendations
# ==============================================================================
# 10. EXECUTE ANALYSIS
# ==============================================================================
# Run the comprehensive analysis on your results
if 'results' in globals():
analysis_df, family_performance, language_performance, recommendations = run_comprehensive_analysis(results)
# Save detailed analysis to CSV
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
analysis_filename = f"detailed_analysis_{timestamp}.csv"
analysis_df.to_csv(analysis_filename, index=False)
print(f"\n💾 Detailed analysis saved to: {analysis_filename}")
else:
print("❌ No 'results' variable found. Please run the ASR pipeline first.")