Spaces:
Runtime error
Runtime error
| """ | |
| LocaleNLP Translation Service | |
| ============================ | |
| A multi-language translation application supporting English, Wolof, Hausa, and Darija. | |
| Features text, audio, and document translation with automatic chaining for all language pairs. | |
| Author: LocaleNLP | |
| """ | |
| import os | |
| import re | |
| import logging | |
| import tempfile | |
| import csv | |
| import requests | |
| import json | |
| from typing import Optional, Dict, Tuple, Any, Union | |
| from pathlib import Path | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import gradio as gr | |
| import torch | |
| import whisper | |
| import fitz # PyMuPDF | |
| import docx | |
| from bs4 import BeautifulSoup | |
| from markdown import markdown | |
| import chardet | |
| from transformers import pipeline, MarianTokenizer, AutoModelForSeq2SeqLM | |
| from huggingface_hub import login | |
| import base64 | |
| # ================================ | |
| # Configuration & Constants | |
| # ================================ | |
| class Language(str, Enum): | |
| """Supported languages for translation.""" | |
| ENGLISH = "English" | |
| WOLOF = "Wolof" | |
| HAUSA = "Hausa" | |
| DARIJA = "Darija" | |
| SWAHILI = "Swahili" | |
| BAMBARA = "Bambara" | |
| class InputMode(str, Enum): | |
| """Supported input modes.""" | |
| TEXT = "Text" | |
| AUDIO = "Audio" | |
| FILE = "File" | |
| class ModelConfig: | |
| """Configuration for translation models.""" | |
| model_name: str | |
| language_tag: str | |
| # Language pair configurations | |
| TRANSLATION_MODELS: Dict[Tuple[Language, Language], ModelConfig] = { | |
| (Language.ENGLISH, Language.WOLOF): ModelConfig( | |
| "LocaleNLP/localenlp-eng-wol-0.03", ">>wol<<" | |
| ), | |
| (Language.WOLOF, Language.ENGLISH): ModelConfig( | |
| "LocaleNLP/localenlp-wol-eng-0.03", ">>eng<<" | |
| ), | |
| (Language.ENGLISH, Language.HAUSA): ModelConfig( | |
| "LocaleNLP/localenlp-eng-hau-0.01", ">>hau<<" | |
| ), | |
| (Language.HAUSA, Language.ENGLISH): ModelConfig( | |
| "LocaleNLP/localenlp-hau-eng-0.01", ">>eng<<" | |
| ), | |
| (Language.ENGLISH, Language.DARIJA): ModelConfig( | |
| "LocaleNLP/english_darija", ">>dar<<" | |
| ), | |
| (Language.ENGLISH, Language.BAMBARA): ModelConfig( | |
| "LocaleNLP/localenlp-eng-bam-0.03", ">>bam<<" | |
| ), | |
| (Language.BAMBARA, Language.ENGLISH): ModelConfig( | |
| "LocaleNLP/localenlp-bam-eng-0.03", ">>eng<<" | |
| ), | |
| (Language.SWAHILI, Language.ENGLISH): ModelConfig( | |
| "LocaleNLP/localenlp-swa-eng-0.03", ">>eng<<" | |
| ), | |
| (Language.ENGLISH, Language.SWAHILI): ModelConfig( | |
| "LocaleNLP/localenlp-eng-swa-0.03", ">>swa<<" | |
| ), | |
| } | |
| # File type support | |
| SUPPORTED_FILE_TYPES = [ | |
| ".pdf", ".docx", ".html", ".htm", ".md", | |
| ".srt", ".txt", ".text" | |
| ] | |
| # Audio file extensions | |
| AUDIO_EXTENSIONS = [".wav", ".mp3", ".m4a"] | |
| # GitHub repository details | |
| GITHUB_REPO = "mgolomanta/Models_Evaluation" | |
| EVALUATION_FILE = "evaluation.csv" | |
| GITHUB_TOKEN = os.getenv("git_tk") | |
| # Local fallback file | |
| LOCAL_EVALUATION_FILE = "evaluation.csv" | |
| # ================================ | |
| # Logging Configuration | |
| # ================================ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ================================ | |
| # Model Management | |
| # ================================ | |
| class ModelManager: | |
| """Centralized model management for translation and transcription.""" | |
| def __init__(self): | |
| self._translation_pipeline = None | |
| self._whisper_model = None | |
| self._current_model_name = None | |
| def get_translation_pipeline( | |
| self, | |
| source_lang: Language, | |
| target_lang: Language | |
| ) -> Tuple[Any, str]: | |
| """ | |
| Load and return translation pipeline for given language pair. | |
| Args: | |
| source_lang: Source language | |
| target_lang: Target language | |
| Returns: | |
| Tuple of (pipeline, language_tag) | |
| Raises: | |
| ValueError: If language pair is not supported | |
| """ | |
| key = (source_lang, target_lang) | |
| if key not in TRANSLATION_MODELS: | |
| raise ValueError(f"Unsupported translation pair: {source_lang} -> {target_lang}") | |
| config = TRANSLATION_MODELS[key] | |
| # Load model if not loaded or different model needed | |
| if (self._translation_pipeline is None or | |
| self._current_model_name != config.model_name): | |
| logger.info(f"Loading translation model: {config.model_name}") | |
| # Authenticate with Hugging Face if token provided | |
| if hf_token := os.getenv("final_tk"): | |
| login(token=hf_token) | |
| model = AutoModelForSeq2SeqLM.from_pretrained( | |
| config.model_name, | |
| token=hf_token | |
| ).to(self._get_device()) | |
| tokenizer = MarianTokenizer.from_pretrained( | |
| config.model_name, | |
| token=hf_token | |
| ) | |
| self._translation_pipeline = pipeline( | |
| "translation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device=0 if self._get_device().type == "cuda" else -1 | |
| ) | |
| self._current_model_name = config.model_name | |
| return self._translation_pipeline, config.language_tag | |
| def get_whisper_model(self) -> Any: | |
| """ | |
| Load and return Whisper transcription model. | |
| Returns: | |
| Whisper model instance | |
| """ | |
| if self._whisper_model is None: | |
| logger.info("Loading Whisper base model...") | |
| self._whisper_model = whisper.load_model("large") | |
| return self._whisper_model | |
| def _get_device(self) -> torch.device: | |
| """Get appropriate device for model execution.""" | |
| return torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # ================================ | |
| # Content Processing | |
| # ================================ | |
| class ContentProcessor: | |
| """Handles extraction and processing of content from various sources.""" | |
| def extract_text_from_file(file_path: Union[str, Path]) -> str: | |
| """ | |
| Extract text content from various file formats. | |
| Args: | |
| file_path: Path to the file | |
| Returns: | |
| Extracted text content | |
| Raises: | |
| ValueError: If file type is unsupported | |
| Exception: If file processing fails | |
| """ | |
| file_path = Path(file_path) | |
| extension = file_path.suffix.lower() | |
| try: | |
| content = file_path.read_bytes() | |
| if extension == ".pdf": | |
| return ContentProcessor._extract_pdf_text(content) | |
| elif extension == ".docx": | |
| return ContentProcessor._extract_docx_text(file_path) | |
| elif extension in (".html", ".htm"): | |
| return ContentProcessor._extract_html_text(content) | |
| elif extension == ".md": | |
| return ContentProcessor._extract_markdown_text(content) | |
| elif extension == ".srt": | |
| return ContentProcessor._extract_srt_text(content) | |
| elif extension in (".txt", ".text"): | |
| return ContentProcessor._extract_plain_text(content) | |
| else: | |
| raise ValueError(f"Unsupported file type: {extension}") | |
| except Exception as e: | |
| logger.error(f"Failed to extract text from {file_path}: {e}") | |
| raise | |
| def _extract_pdf_text(content: bytes) -> str: | |
| """Extract text from PDF file.""" | |
| with fitz.open(stream=content, filetype="pdf") as doc: | |
| return "\n".join(page.get_text() for page in doc) | |
| def _extract_docx_text(file_path: Path) -> str: | |
| """Extract text from DOCX file.""" | |
| doc = docx.Document(str(file_path)) | |
| return "\n".join(paragraph.text for paragraph in doc.paragraphs) | |
| def _extract_html_text(content: bytes) -> str: | |
| """Extract text from HTML file.""" | |
| encoding = chardet.detect(content)["encoding"] or "utf-8" | |
| text = content.decode(encoding, errors="ignore") | |
| soup = BeautifulSoup(text, "html.parser") | |
| return soup.get_text() | |
| def _extract_markdown_text(content: bytes) -> str: | |
| """Extract text from Markdown file.""" | |
| encoding = chardet.detect(content)["encoding"] or "utf-8" | |
| text = content.decode(encoding, errors="ignore") | |
| html = markdown(text) | |
| soup = BeautifulSoup(html, "html.parser") | |
| return soup.get_text() | |
| def _extract_srt_text(content: bytes) -> str: | |
| """Extract text from SRT subtitle file.""" | |
| encoding = chardet.detect(content)["encoding"] or "utf-8" | |
| text = content.decode(encoding, errors="ignore") | |
| # Remove timestamp lines | |
| return re.sub(r"\d+\n\d{2}:\d{2}:\d{2},\d{3} --> .*?\n", "", text) | |
| def _extract_plain_text(content: bytes) -> str: | |
| """Extract text from plain text file.""" | |
| encoding = chardet.detect(content)["encoding"] or "utf-8" | |
| return content.decode(encoding, errors="ignore") | |
| # ================================ | |
| # Translation Service | |
| # ================================ | |
| class TranslationService: | |
| """Core translation service with advanced processing capabilities.""" | |
| def __init__(self, model_manager: ModelManager): | |
| self.model_manager = model_manager | |
| def translate( | |
| self, | |
| text: str, | |
| source_lang: Language, | |
| target_lang: Language | |
| ) -> str: | |
| """ | |
| Translate text from source to target language with automatic chaining. | |
| Args: | |
| text: Input text to translate | |
| source_lang: Source language | |
| target_lang: Target language | |
| Returns: | |
| Translated text | |
| """ | |
| if not text.strip(): | |
| return "No input text to translate." | |
| # Direct translation if model exists | |
| if (source_lang, target_lang) in TRANSLATION_MODELS: | |
| return self._direct_translate(text, source_lang, target_lang) | |
| # Automatic chaining through English | |
| return self._chained_translate(text, source_lang, target_lang) | |
| def _direct_translate( | |
| self, | |
| text: str, | |
| source_lang: Language, | |
| target_lang: Language | |
| ) -> str: | |
| """Perform direct translation using available model.""" | |
| pipeline_obj, lang_tag = self.model_manager.get_translation_pipeline( | |
| source_lang, target_lang | |
| ) | |
| return self._process_text_with_pipeline(text, pipeline_obj, lang_tag) | |
| def _chained_translate( | |
| self, | |
| text: str, | |
| source_lang: Language, | |
| target_lang: Language | |
| ) -> str: | |
| """ | |
| Perform chained translation through English as intermediate language. | |
| Args: | |
| text: Input text to translate | |
| source_lang: Source language | |
| target_lang: Target language | |
| Returns: | |
| Translated text through chaining | |
| """ | |
| # First: source_lang -> English | |
| intermediate_text = self._direct_translate( | |
| text, source_lang, Language.ENGLISH | |
| ) | |
| # Second: English -> target_lang | |
| final_text = self._direct_translate( | |
| intermediate_text, Language.ENGLISH, target_lang | |
| ) | |
| return final_text | |
| def _process_text_with_pipeline( | |
| self, | |
| text: str, | |
| pipeline_obj: Any, | |
| lang_tag: str | |
| ) -> str: | |
| """Process text using translation pipeline.""" | |
| # Process text in paragraphs | |
| paragraphs = text.splitlines() | |
| translated_paragraphs = [] | |
| with torch.no_grad(): | |
| for paragraph in paragraphs: | |
| if not paragraph.strip(): | |
| translated_paragraphs.append("") | |
| continue | |
| # Split into sentences and translate | |
| sentences = [ | |
| s.strip() for s in paragraph.split(". ") | |
| if s.strip() | |
| ] | |
| # Add language tag to each sentence | |
| formatted_sentences = [ | |
| f"{lang_tag} {sentence}" | |
| for sentence in sentences | |
| ] | |
| # Perform translation | |
| results = pipeline_obj( | |
| formatted_sentences, | |
| max_length=10000, | |
| num_beams=5, | |
| early_stopping=True, | |
| no_repeat_ngram_size=3, | |
| repetition_penalty=1.5, | |
| length_penalty=1.2 | |
| ) | |
| # Process results | |
| translated_sentences = [ | |
| result["translation_text"].capitalize() | |
| for result in results | |
| ] | |
| translated_paragraphs.append(". ".join(translated_sentences)) | |
| return "\n".join(translated_paragraphs) | |
| # ================================ | |
| # Audio Processing | |
| # ================================ | |
| class AudioProcessor: | |
| """Handles audio file transcription using Whisper.""" | |
| def __init__(self, model_manager: ModelManager): | |
| self.model_manager = model_manager | |
| def transcribe(self, audio_file_path: str) -> str: | |
| """ | |
| Transcribe audio file to text. | |
| Args: | |
| audio_file_path: Path to audio file | |
| Returns: | |
| Transcribed text | |
| """ | |
| model = self.model_manager.get_whisper_model() | |
| result = model.transcribe(audio_file_path) | |
| return result["text"] | |
| # ================================ | |
| # Evaluation Service | |
| # ================================ | |
| class EvaluationService: | |
| """Handles evaluation submissions with GitHub and local fallback.""" | |
| def escape_csv_field(text): | |
| """Escape text for CSV format.""" | |
| if text is None: | |
| return "" | |
| text = str(text) | |
| if '"' in text: | |
| text = text.replace('"', '""') | |
| if ',' in text or '"' in text or '\n' in text: | |
| text = f'"{text}"' | |
| return text | |
| def ensure_local_csv_exists(): | |
| """Ensure local CSV file exists with headers.""" | |
| if not os.path.exists(LOCAL_EVALUATION_FILE): | |
| headers = "source_language_name,target_language_name,user_input,model_output,notation_value,correct_answer\n" | |
| with open(LOCAL_EVALUATION_FILE, 'w', encoding='utf-8', newline='') as f: | |
| f.write(headers) | |
| def save_evaluation_locally( | |
| source_lang: str, | |
| target_lang: str, | |
| user_input: str, | |
| model_output: str, | |
| notation: Optional[str] = None, | |
| correct_answer: Optional[str] = None | |
| ) -> str: | |
| """Save evaluation to local CSV file.""" | |
| try: | |
| # Ensure file exists with headers | |
| EvaluationService.ensure_local_csv_exists() | |
| # Escape fields for CSV | |
| source_lang_escaped = EvaluationService.escape_csv_field(source_lang) | |
| target_lang_escaped = EvaluationService.escape_csv_field(target_lang) | |
| user_input_escaped = EvaluationService.escape_csv_field(user_input) | |
| model_output_escaped = EvaluationService.escape_csv_field(model_output) | |
| notation_escaped = EvaluationService.escape_csv_field(notation) | |
| correct_answer_escaped = EvaluationService.escape_csv_field(correct_answer) | |
| # Prepare the new evaluation data | |
| new_row = f"{source_lang_escaped},{target_lang_escaped},{user_input_escaped},{model_output_escaped},{notation_escaped},{correct_answer_escaped}\n" | |
| # Append to file | |
| with open(LOCAL_EVALUATION_FILE, 'a', encoding='utf-8', newline='') as f: | |
| f.write(new_row) | |
| return "✅ Evaluation saved locally!" | |
| except Exception as e: | |
| logger.error(f"Failed to save evaluation locally: {e}") | |
| return f"❌ Error saving evaluation locally: {str(e)}" | |
| def save_evaluation_to_github( | |
| source_lang: str, | |
| target_lang: str, | |
| user_input: str, | |
| model_output: str, | |
| notation: Optional[str] = None, | |
| correct_answer: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Save evaluation to GitHub CSV file with fallback to local storage. | |
| Args: | |
| source_lang: Source language name | |
| target_lang: Target language name | |
| user_input: User input text | |
| model_output: Model output text | |
| notation: Optional notation value | |
| correct_answer: Optional correct answer | |
| Returns: | |
| Status message | |
| """ | |
| try: | |
| # First try to save to GitHub | |
| if not GITHUB_TOKEN: | |
| # Fallback to local if no token | |
| return EvaluationService.save_evaluation_locally( | |
| source_lang, target_lang, user_input, model_output, notation, correct_answer | |
| ) | |
| # Escape fields for CSV | |
| source_lang_escaped = EvaluationService.escape_csv_field(source_lang) | |
| target_lang_escaped = EvaluationService.escape_csv_field(target_lang) | |
| user_input_escaped = EvaluationService.escape_csv_field(user_input) | |
| model_output_escaped = EvaluationService.escape_csv_field(model_output) | |
| notation_escaped = EvaluationService.escape_csv_field(notation) | |
| correct_answer_escaped = EvaluationService.escape_csv_field(correct_answer) | |
| # Prepare the new evaluation data | |
| new_row = f"{source_lang_escaped},{target_lang_escaped},{user_input_escaped},{model_output_escaped},{notation_escaped},{correct_answer_escaped}\n" | |
| # Try to read existing content from GitHub | |
| existing_content = "" | |
| file_sha = None | |
| try: | |
| url = f"https://api.github.com/repos/{GITHUB_REPO}/contents/{EVALUATION_FILE}" | |
| headers = { | |
| "Authorization": f"token {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github.v3+json" | |
| } | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| file_data = response.json() | |
| file_sha = file_data.get("sha") | |
| content = file_data.get("content", "") | |
| existing_content = base64.b64decode(content).decode('utf-8') | |
| except Exception as e: | |
| logger.warning(f"Could not read existing GitHub file: {e}") | |
| # Check if file exists and has headers | |
| if existing_content.strip(): | |
| # File exists, append new row | |
| csv_content = existing_content + new_row | |
| else: | |
| # File doesn't exist, create with headers | |
| headers = "source_language_name,target_language_name,user_input,model_output,notation_value,correct_answer\n" | |
| csv_content = headers + new_row | |
| # Encode content for GitHub API | |
| content_encoded = base64.b64encode(csv_content.encode('utf-8')).decode('utf-8') | |
| # Prepare GitHub API request | |
| url = f"https://api.github.com/repos/{GITHUB_REPO}/contents/{EVALUATION_FILE}" | |
| headers = { | |
| "Authorization": f"token {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github.v3+json" | |
| } | |
| # Prepare payload | |
| payload = { | |
| "message": "Add new evaluation", | |
| "content": content_encoded | |
| } | |
| # Add SHA if file exists (for update) | |
| if file_sha: | |
| payload["sha"] = file_sha | |
| # Send request to GitHub API | |
| response = requests.put(url, headers=headers, json=payload) | |
| if response.status_code in [200, 201]: | |
| return "✅ Evaluation submitted successfully to GitHub!" | |
| else: | |
| logger.error(f"GitHub API error: {response.status_code} - {response.text}") | |
| # Fallback to local storage | |
| return EvaluationService.save_evaluation_locally( | |
| source_lang, target_lang, user_input, model_output, notation, correct_answer | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to save evaluation to GitHub: {e}") | |
| # Fallback to local storage | |
| return EvaluationService.save_evaluation_locally( | |
| source_lang, target_lang, user_input, model_output, notation, correct_answer | |
| ) | |
| # ================================ | |
| # Main Application | |
| # ================================ | |
| class TranslationApp: | |
| """Main application orchestrating all components.""" | |
| def __init__(self): | |
| self.model_manager = ModelManager() | |
| self.content_processor = ContentProcessor() | |
| self.translation_service = TranslationService(self.model_manager) | |
| self.audio_processor = AudioProcessor(self.model_manager) | |
| self.evaluation_service = EvaluationService() | |
| def process_input( | |
| self, | |
| mode: InputMode, | |
| source_lang: Language, | |
| text_input: str, | |
| audio_file: Optional[str], | |
| file_obj: Optional[gr.FileData] | |
| ) -> str: | |
| """ | |
| Process input based on selected mode. | |
| Args: | |
| mode: Input mode | |
| source_lang: Source language | |
| text_input: Text input | |
| audio_file: Audio file path | |
| file_obj: Uploaded file object | |
| Returns: | |
| Processed text content | |
| """ | |
| if mode == InputMode.TEXT: | |
| return text_input | |
| elif mode == InputMode.AUDIO: | |
| #if source_lang != Language.ENGLISH: | |
| # raise ValueError("Audio input must be in English.") | |
| if not audio_file: | |
| raise ValueError("No audio file provided.") | |
| return self.audio_processor.transcribe(audio_file) | |
| elif mode == InputMode.FILE: | |
| if not file_obj: | |
| raise ValueError("No file uploaded.") | |
| return self.content_processor.extract_text_from_file(file_obj.name) | |
| return "" | |
| def submit_evaluation( | |
| self, | |
| source_lang: str, | |
| target_lang: str, | |
| user_input: str, | |
| model_output: str, | |
| notation: Optional[str], | |
| correct_answer: Optional[str] | |
| ) -> str: | |
| """Submit evaluation data.""" | |
| if not user_input.strip() or not model_output.strip(): | |
| return "⚠️ Please translate text before submitting evaluation." | |
| return self.evaluation_service.save_evaluation_to_github( | |
| source_lang, target_lang, user_input, model_output, notation, correct_answer | |
| ) | |
| def create_interface(self) -> gr.Blocks: | |
| """Create and return the Gradio interface.""" | |
| with gr.Blocks( | |
| title="LocaleNLP Translation Service", | |
| theme=gr.themes.Monochrome() | |
| ) as interface: | |
| # Header | |
| gr.Markdown(""" | |
| # 🌍 LocaleNLP Translation Service | |
| Translate between English, Wolof, Hausa,Bambara, Swahili and Darija with support for text, audio, and documents. | |
| """) | |
| # Input controls | |
| with gr.Row(): | |
| input_mode = gr.Radio( | |
| choices=[mode.value for mode in InputMode], | |
| label="Input Type", | |
| value=InputMode.TEXT.value | |
| ) | |
| input_lang = gr.Dropdown( | |
| choices=[lang.value for lang in Language], | |
| label="Input Language", | |
| value=Language.ENGLISH.value | |
| ) | |
| output_lang = gr.Dropdown( | |
| choices=[lang.value for lang in Language], | |
| label="Output Language", | |
| value=Language.WOLOF.value | |
| ) | |
| # Input components | |
| input_text = gr.Textbox( | |
| label="Enter Text", | |
| lines=8, | |
| visible=True, | |
| placeholder="Type or paste your text here..." | |
| ) | |
| audio_input = gr.Audio( | |
| label="Upload Audio", | |
| type="filepath", | |
| visible=False | |
| ) | |
| file_input = gr.File( | |
| file_types=SUPPORTED_FILE_TYPES, | |
| label="Upload Document", | |
| visible=False | |
| ) | |
| # Processing area | |
| extracted_text = gr.Textbox( | |
| label="Extracted / Transcribed Text", | |
| lines=8, | |
| interactive=False | |
| ) | |
| translate_btn = gr.Button( | |
| "🔄 Process & Translate", | |
| variant="secondary" | |
| ) | |
| output_text = gr.Textbox( | |
| label="Translated Text", | |
| lines=10, | |
| interactive=False | |
| ) | |
| # Store the last translation data for evaluation | |
| last_input_state = gr.State("") | |
| last_output_state = gr.State("") | |
| # Evaluation section | |
| gr.Markdown("### 📝 Model Evaluation") | |
| with gr.Group(): | |
| with gr.Row(): | |
| notation = gr.Radio( | |
| choices=["1", "2", "3", "4", "5"], | |
| label="Notation (1-5 stars)", | |
| value=None | |
| ) | |
| correct_translation = gr.Textbox( | |
| label="Correct Translation (if incorrect)", | |
| lines=3, | |
| placeholder="Enter the correct translation if the model output is wrong..." | |
| ) | |
| submit_evaluation_btn = gr.Button("Submit Evaluation", variant="primary") | |
| evaluation_status = gr.Textbox( | |
| label="Evaluation Status", | |
| interactive=False | |
| ) | |
| # Event handlers | |
| def update_visibility(mode: str) -> Dict[str, Any]: | |
| """Update component visibility based on input mode.""" | |
| return { | |
| input_text: gr.update(visible=(mode == InputMode.TEXT.value)), | |
| audio_input: gr.update(visible=(mode == InputMode.AUDIO.value)), | |
| file_input: gr.update(visible=(mode == InputMode.FILE.value)), | |
| extracted_text: gr.update(value="", visible=True), | |
| output_text: gr.update(value="") | |
| } | |
| def handle_process( | |
| mode: str, | |
| source_lang: str, | |
| text_input: str, | |
| audio_file: Optional[str], | |
| file_obj: Optional[gr.FileData] | |
| ) -> Tuple[str, str, str, str]: | |
| """Handle initial input processing.""" | |
| try: | |
| processed_text = self.process_input( | |
| InputMode(mode), | |
| Language(source_lang), | |
| text_input, | |
| audio_file, | |
| file_obj | |
| ) | |
| return processed_text, "", processed_text, "" | |
| except Exception as e: | |
| logger.error(f"Processing error: {e}") | |
| return "", f"❌ Error: {str(e)}", "", "" | |
| def handle_translate( | |
| extracted_text: str, | |
| source_lang: str, | |
| target_lang: str | |
| ) -> Tuple[str, str, str]: | |
| """Handle translation of processed text.""" | |
| if not extracted_text.strip(): | |
| return "📝 No text to translate.", extracted_text, "" | |
| try: | |
| result = self.translation_service.translate( | |
| extracted_text, | |
| Language(source_lang), | |
| Language(target_lang) | |
| ) | |
| return result, extracted_text, result | |
| except Exception as e: | |
| logger.error(f"Translation error: {e}") | |
| return f"❌ Translation error: {str(e)}", extracted_text, "" | |
| def handle_evaluation( | |
| source_lang: str, | |
| target_lang: str, | |
| user_input: str, | |
| model_output: str, | |
| notation_value: Optional[str], | |
| correct_answer: Optional[str] | |
| ) -> str: | |
| """Handle evaluation submission.""" | |
| return self.submit_evaluation( | |
| source_lang, | |
| target_lang, | |
| user_input, | |
| model_output, | |
| notation_value, | |
| correct_answer | |
| ) | |
| # Connect events | |
| input_mode.change( | |
| fn=update_visibility, | |
| inputs=input_mode, | |
| outputs=[input_text, audio_input, file_input, extracted_text, output_text] | |
| ) | |
| process_result = translate_btn.click( | |
| fn=handle_process, | |
| inputs=[input_mode, input_lang, input_text, audio_input, file_input], | |
| outputs=[extracted_text, output_text, last_input_state, last_output_state] | |
| ).then( | |
| fn=handle_translate, | |
| inputs=[extracted_text, input_lang, output_lang], | |
| outputs=[output_text, last_input_state, last_output_state] | |
| ) | |
| submit_evaluation_btn.click( | |
| fn=handle_evaluation, | |
| inputs=[ | |
| input_lang, | |
| output_lang, | |
| last_input_state, | |
| last_output_state, | |
| notation, | |
| correct_translation | |
| ], | |
| outputs=evaluation_status | |
| ) | |
| return interface | |
| # ================================ | |
| # Application Entry Point | |
| # ================================ | |
| def main(): | |
| """Main application entry point.""" | |
| # Check if GitHub token is set | |
| if not os.getenv("git_tk"): | |
| logger.warning("GITHUB_TOKEN environment variable not set. Evaluations will be saved locally.") | |
| print("⚠️ WARNING: GITHUB_TOKEN environment variable not set!") | |
| print(" Evaluations will be saved to local file only.") | |
| try: | |
| app = TranslationApp() | |
| interface = app.create_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.getenv("PORT", 7860)), | |
| share=False | |
| ) | |
| except Exception as e: | |
| logger.critical(f"Failed to start application: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() |