Spaces:
Sleeping
Sleeping
| import os | |
| import streamlit as st | |
| import logging | |
| from dotenv import load_dotenv | |
| # Load environment variables first, before any other code | |
| load_dotenv() | |
| # Import configuration defaults (after loading .env to prioritize environment variables) | |
| from config import ENV_DEFAULTS, DEFAULT_CONFIG | |
| # Configure logging based on configuration | |
| log_level = os.environ.get('LOGLEVEL', DEFAULT_CONFIG['LOGLEVEL']).upper() | |
| logging.basicConfig( | |
| level=getattr(logging, log_level), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| # Only log to console if level is INFO or higher | |
| logging.StreamHandler() if log_level != 'WARNING' else logging.NullHandler() | |
| ] | |
| ) | |
| # Configure app | |
| st.set_page_config(page_title="Translator & Readability", layout="wide") | |
| # Check for missing environment variables and use defaults from config | |
| for var, default in ENV_DEFAULTS.items(): | |
| if var not in os.environ: | |
| logging.debug(f"Environment variable {var} not found, using default: {default}") | |
| os.environ[var] = default | |
| # Model configuration from default config | |
| MODEL_CONFIG = { | |
| "max_parallel_models": DEFAULT_CONFIG["MAX_PARALLEL_MODELS"], | |
| "session_timeout": DEFAULT_CONFIG["SESSION_TIMEOUT"], | |
| "allow_gpu": DEFAULT_CONFIG["ALLOW_GPU"] | |
| } | |
| # Initialize model semaphore for limiting concurrent model usage | |
| import threading | |
| model_semaphore = threading.Semaphore(MODEL_CONFIG["max_parallel_models"]) | |
| import tempfile | |
| import io | |
| from docx import Document | |
| import uuid | |
| import traceback | |
| from models.nltk_resources import setup_nltk | |
| from utils.file_readers import read_file | |
| from utils.text_processing import detect_language | |
| from utils.readability_indices import ( | |
| flesch_reading_ease, | |
| flesch_kincaid_grade_level, | |
| gunning_fog_index, | |
| smog_index, | |
| highlight_complex_text | |
| ) | |
| from utils.formatting import color_code_index | |
| from utils.tilmash_translation import tilmash_translate, display_tilmash_streaming_translation | |
| # Initialize session state for user identification | |
| if 'session_id' not in st.session_state: | |
| st.session_state.session_id = str(uuid.uuid4()) | |
| if 'translation_lock' not in st.session_state: | |
| st.session_state.translation_lock = False | |
| def handle_translation(): | |
| st.header("Перевод (Kazakh, Russian, English)") | |
| # Show session ID in sidebar for debugging | |
| with st.sidebar.expander("Session Info", expanded=False): | |
| st.write(f"Session ID: {st.session_state.session_id}") | |
| # Add GPU usage option | |
| if MODEL_CONFIG["allow_gpu"]: | |
| st.session_state.use_gpu = st.checkbox("Использовать GPU (быстрее)", value=True) | |
| if st.session_state.use_gpu: | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| gpu_info = f"CUDA: {torch.cuda.get_device_name(0)}" | |
| st.success(f"Доступен GPU: {gpu_info}") | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| st.success("Доступен Apple Silicon GPU (MPS)") | |
| else: | |
| st.warning("GPU не обнаружен, будет использован CPU") | |
| st.session_state.use_gpu = False | |
| except ImportError: | |
| st.warning("PyTorch не установлен, будет использован CPU") | |
| st.session_state.use_gpu = False | |
| else: | |
| st.session_state.use_gpu = False | |
| st.write("GPU отключен в конфигурации") | |
| translate_input_method = st.radio("Способ ввода текста:", ["Загрузить файл", "Вставить текст"]) | |
| input_text = "" | |
| if translate_input_method == "Загрузить файл": | |
| uploaded_file = st.file_uploader("Выберите файл (.txt, .docx, .pdf)", type=["txt", "docx", "pdf"]) | |
| if uploaded_file is not None: | |
| suffix = os.path.splitext(uploaded_file.name)[1] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: | |
| tmp_file.write(uploaded_file.getbuffer()) | |
| temp_file_path = tmp_file.name | |
| input_text = read_file(temp_file_path) | |
| os.remove(temp_file_path) | |
| # Скрытый предпросмотр: показываем только по клику | |
| with st.expander("Показать предварительный просмотр файла", expanded=False): | |
| st.text_area( | |
| "Содержимое (только просмотр)", | |
| value=input_text, | |
| height=160, | |
| disabled=True | |
| ) | |
| else: | |
| input_text = st.text_area("Вставьте ваш текст здесь", height=200) | |
| if input_text: | |
| auto_detect = st.checkbox("Автоматически определить язык", value=True) | |
| src_lang = None | |
| if auto_detect: | |
| detected_lang = detect_language(input_text) | |
| if detected_lang in ['ru', 'en', 'kk']: | |
| st.info(f"Определён язык: {detected_lang}") | |
| src_lang = detected_lang | |
| else: | |
| st.warning("Не удалось определить язык. Выберите вручную.") | |
| src_lang = st.selectbox("Язык текста", ["ru", "en", "kk"]) | |
| else: | |
| src_lang = st.selectbox("Язык текста", ["ru", "en", "kk"]) | |
| if src_lang == "ru": | |
| tgt_options = ["en", "kk"] | |
| elif src_lang == "en": | |
| tgt_options = ["ru", "kk"] | |
| else: | |
| tgt_options = ["ru", "en"] | |
| tgt_lang = st.selectbox("Перевод на:", tgt_options) | |
| if st.button("Перевести"): | |
| # Prevent multiple concurrent translations from same session | |
| if st.session_state.translation_lock: | |
| st.warning("Перевод уже выполняется. Пожалуйста, дождитесь завершения.") | |
| return | |
| # Set translation lock | |
| st.session_state.translation_lock = True | |
| try: | |
| # Use the model semaphore to limit concurrent model access | |
| acquired = model_semaphore.acquire(blocking=False) | |
| if not acquired: | |
| st.warning("Максимальное количество параллельных моделей достигнуто. Пожалуйста, попробуйте позже.") | |
| st.session_state.translation_lock = False | |
| return | |
| try: | |
| st.subheader("Результат перевода:") | |
| # Get the approximate size of the text to determine if chunking is needed | |
| approx_text_size = len(input_text) / 4 # rough approximation (4 chars ≈ 1 token) | |
| needs_chunking = approx_text_size > 500 # If text is likely over 500 tokens | |
| # Display appropriate spinner message | |
| spinner_message = "Processing text in chunks..." if needs_chunking else "Processing translation..." | |
| # Create a dedicated translator instance for this session | |
| from utils.tilmash_translation import TilmashTranslator | |
| # Используем GPU если включено в настройках | |
| use_gpu = getattr(st.session_state, 'use_gpu', False) | |
| translator = TilmashTranslator(use_gpu=use_gpu) | |
| with st.spinner(spinner_message): | |
| try: | |
| # Use direct streaming approach with session-specific translator | |
| result = "" | |
| translation_placeholder = st.empty() | |
| # Stream translation | |
| for chunk in translator.translate_streaming(input_text, src_lang, tgt_lang): | |
| result += chunk | |
| translation_placeholder.markdown(result) | |
| except Exception as e: | |
| st.error(f"Translation error: {str(e)}") | |
| logging.error(f"Tilmash translation error: {traceback.format_exc()}") | |
| result = None | |
| if result: | |
| # Prepare download capability | |
| doc = Document() | |
| doc.add_paragraph(result) | |
| doc_io = io.BytesIO() | |
| doc.save(doc_io) | |
| doc_io.seek(0) | |
| st.download_button( | |
| label="Скачать переведённый текст (.docx)", | |
| data=doc_io, | |
| file_name="translated_text.docx", | |
| mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| ) | |
| else: | |
| st.warning("Не удалось выполнить перевод.") | |
| # Unload Tilmash model after use | |
| try: | |
| if translator.initialized: | |
| translator.unload_model() | |
| except Exception as unload_error: | |
| logging.error(f"Error unloading Tilmash model: {str(unload_error)}") | |
| except Exception as tilmash_error: | |
| st.error(f"Tilmash model error: {str(tilmash_error)}") | |
| logging.error(f"Tilmash model error: {traceback.format_exc()}") | |
| finally: | |
| # Release the semaphore | |
| model_semaphore.release() | |
| except Exception as outer_error: | |
| st.error(f"Unexpected error: {str(outer_error)}") | |
| logging.error(f"Unexpected error: {traceback.format_exc()}") | |
| finally: | |
| # Release translation lock | |
| st.session_state.translation_lock = False | |
| def handle_readability_analysis(): | |
| st.header("Анализ удобочитаемости текста") | |
| input_method = st.radio("Способ ввода текста:", ["Загрузить файл", "Вставить текст"]) | |
| text = "" | |
| if input_method == "Загрузить файл": | |
| uploaded_file = st.file_uploader("Выберите файл (.txt, .docx, .pdf)", type=["txt", "docx", "pdf"]) | |
| if uploaded_file is not None: | |
| suffix = os.path.splitext(uploaded_file.name)[1] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: | |
| tmp_file.write(uploaded_file.getbuffer()) | |
| temp_file_path = tmp_file.name | |
| text = read_file(temp_file_path) | |
| os.remove(temp_file_path) | |
| # Скрытый предпросмотр: показываем только по клику | |
| with st.expander("Показать предварительный просмотр файла", expanded=False): | |
| st.text_area( | |
| "Содержимое (только просмотр)", | |
| value=text, | |
| height=160, | |
| disabled=True | |
| ) | |
| else: | |
| text = st.text_area("Вставьте ваш текст здесь", height=200) | |
| if text: | |
| auto_detect = st.checkbox("Определить язык автоматически", value=True) | |
| if auto_detect: | |
| detected_lang = detect_language(text) | |
| st.info(f"Определён язык: {detected_lang}") | |
| lang_code = detected_lang if detected_lang in ['ru', 'en', 'kk'] else 'en' | |
| else: | |
| lang_code = st.selectbox("Язык текста", ["ru", "en", "kk"]) | |
| if st.button("Анализировать"): | |
| # Prevent multiple concurrent analyses | |
| if 'analysis_lock' in st.session_state and st.session_state.analysis_lock: | |
| st.warning("Анализ уже выполняется. Пожалуйста, дождитесь завершения.") | |
| return | |
| # Set analysis lock | |
| st.session_state.analysis_lock = True | |
| try: | |
| # Use the model semaphore for consistency with translation | |
| acquired = model_semaphore.acquire(blocking=False) | |
| if not acquired: | |
| st.warning("Система загружена. Пожалуйста, попробуйте позже.") | |
| st.session_state.analysis_lock = False | |
| return | |
| try: | |
| with st.spinner("Выполняется анализ..."): | |
| fre = flesch_reading_ease(text, lang_code) | |
| fkgl = flesch_kincaid_grade_level(text, lang_code) | |
| fog = gunning_fog_index(text, lang_code) | |
| smog = smog_index(text, lang_code) | |
| highlighted_text, complex_words_list = highlight_complex_text(text, lang_code) | |
| st.subheader("Результаты удобочитаемости") | |
| st.markdown( | |
| f"**Индекс удобочитаемости Флеша:** {color_code_index('Flesch Reading Ease', fre)}", | |
| unsafe_allow_html=True | |
| ) | |
| st.markdown( | |
| f"**Индекс Флеша-Кинкейда:** {color_code_index('Flesch-Kincaid Grade Level', fkgl)}", | |
| unsafe_allow_html=True | |
| ) | |
| st.markdown( | |
| f"**Индекс тумана Ганнинга:** {color_code_index('Gunning Fog Index', fog)}", | |
| unsafe_allow_html=True | |
| ) | |
| st.markdown( | |
| f"**Индекс SMOG:** {color_code_index('SMOG Index', smog)}", | |
| unsafe_allow_html=True | |
| ) | |
| st.subheader("Сложные слова") | |
| st.write(", ".join(set(complex_words_list))) | |
| finally: | |
| # Release the semaphore | |
| model_semaphore.release() | |
| finally: | |
| # Release analysis lock | |
| st.session_state.analysis_lock = False | |
| def main(): | |
| setup_nltk() | |
| # Log the model configuration only once per session | |
| if 'model_config_logged' not in st.session_state: | |
| logging.info(f"Using model configuration: {MODEL_CONFIG}") | |
| st.session_state.model_config_logged = True | |
| # Проверка доступности GPU при запуске | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| gpu_name = torch.cuda.get_device_name(0) | |
| cuda_ver = torch.version.cuda if hasattr(torch.version, "cuda") else "N/A" | |
| logging.info(f"Обнаружен GPU: {gpu_name}, CUDA {cuda_ver}") | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| logging.info("Обнаружен Apple Silicon GPU (MPS)") | |
| else: | |
| logging.warning("GPU не обнаружен. Устанавливаем устройство на CPU") | |
| if not torch.cuda.is_available(): | |
| # Вывод диагностической информации | |
| logging.warning("Диагностика CUDA:") | |
| logging.warning(f"torch.__version__: {torch.__version__}") | |
| if hasattr(torch.version, "cuda"): | |
| logging.warning(f"torch.version.cuda: {torch.version.cuda}") | |
| if hasattr(torch.cuda, "is_available"): | |
| logging.warning(f"torch.cuda.is_available(): {torch.cuda.is_available()}") | |
| except ImportError: | |
| logging.warning("PyTorch не установлен, будет использован CPU") | |
| except Exception as e: | |
| logging.warning(f"Ошибка при проверке GPU: {str(e)}") | |
| st.title("Translation & Readability Analysis") | |
| st.sidebar.header("Функциональность") | |
| functionality = st.sidebar.radio("Выберите режим:", ["Перевод", "Анализ удобочитаемости"]) | |
| if functionality == "Перевод": | |
| handle_translation() | |
| elif functionality == "Анализ удобочитаемости": | |
| handle_readability_analysis() | |
| if __name__ == "__main__": | |
| main() |