Spaces:
Running
Running
| """ | |
| File Service - Handles file processing and chunked text analysis | |
| """ | |
| import os | |
| import uuid | |
| from typing import Dict, Any, List, Tuple | |
| from werkzeug.utils import secure_filename | |
| from flask import current_app | |
| from .tokenizer_service import tokenizer_service | |
| from .stats_service import stats_service | |
| class FileService: | |
| """Service for handling file uploads and processing.""" | |
| # Allowed file extensions for security | |
| ALLOWED_EXTENSIONS = {'.txt', '.md', '.py', '.js', '.html', '.css', '.json', '.csv', '.log'} | |
| def is_allowed_file(filename: str) -> bool: | |
| """Check if the uploaded file has an allowed extension.""" | |
| if not filename: | |
| return False | |
| _, ext = os.path.splitext(filename.lower()) | |
| return ext in FileService.ALLOWED_EXTENSIONS | |
| def generate_secure_filename(original_filename: str) -> str: | |
| """Generate a secure filename with UUID prefix.""" | |
| if not original_filename: | |
| return f"{uuid.uuid4().hex}.txt" | |
| # Secure the filename and add UUID prefix to avoid conflicts | |
| secure_name = secure_filename(original_filename) | |
| name, ext = os.path.splitext(secure_name) | |
| return f"{uuid.uuid4().hex}_{name}{ext}" | |
| def save_uploaded_file(uploaded_file, upload_folder: str) -> str: | |
| """ | |
| Save uploaded file to the upload folder with a secure filename. | |
| Returns: | |
| str: Path to the saved file | |
| """ | |
| # Ensure upload folder exists | |
| os.makedirs(upload_folder, exist_ok=True) | |
| # Generate secure filename | |
| secure_filename_str = FileService.generate_secure_filename(uploaded_file.filename) | |
| file_path = os.path.join(upload_folder, secure_filename_str) | |
| # Save the file | |
| uploaded_file.save(file_path) | |
| return file_path | |
| def process_file_for_tokenization( | |
| file_path: str, | |
| model_id_or_name: str, | |
| preview_char_limit: int = 8096, | |
| max_display_tokens: int = 50000, | |
| chunk_size: int = 1024 * 1024 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a file for tokenization with chunked processing for large files. | |
| Args: | |
| file_path: Path to the file to process | |
| model_id_or_name: Tokenizer model to use | |
| preview_char_limit: Character limit for preview display | |
| max_display_tokens: Maximum tokens to display | |
| chunk_size: Size of chunks for processing large files | |
| Returns: | |
| Dict containing tokenization results | |
| """ | |
| # Load tokenizer | |
| tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name) | |
| if error: | |
| raise Exception(error) | |
| # Read the preview for display | |
| with open(file_path, 'r', errors='replace') as f: | |
| preview_text = f.read(preview_char_limit) | |
| # Tokenize preview for display | |
| preview_tokens = tokenizer.tokenize(preview_text) | |
| display_tokens = preview_tokens[:max_display_tokens] | |
| # Process full file for stats in chunks to avoid memory issues | |
| total_tokens = [] | |
| token_set = set() | |
| total_length = 0 | |
| with open(file_path, 'r', errors='replace') as f: | |
| while True: | |
| chunk = f.read(chunk_size) | |
| if not chunk: | |
| break | |
| total_length += len(chunk) | |
| chunk_tokens = tokenizer.tokenize(chunk) | |
| total_tokens.extend(chunk_tokens) | |
| token_set.update(chunk_tokens) | |
| # Calculate stats using approximation for original text | |
| stats = stats_service.get_token_stats(total_tokens, ' ' * total_length) | |
| # Format tokens for display | |
| token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer) | |
| return { | |
| 'tokens': token_data, | |
| 'stats': stats, | |
| 'display_limit_reached': len(total_tokens) > max_display_tokens, | |
| 'total_tokens': len(total_tokens), | |
| 'is_full_file': True, | |
| 'preview_only': True, | |
| 'tokenizer_info': tokenizer_info | |
| } | |
| def process_text_for_tokenization( | |
| text: str, | |
| model_id_or_name: str, | |
| is_preview: bool = False, | |
| preview_char_limit: int = 8096, | |
| max_display_tokens: int = 50000 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process regular text input for tokenization. | |
| Args: | |
| text: Input text to tokenize | |
| model_id_or_name: Tokenizer model to use | |
| is_preview: Whether this is a preview of a larger text | |
| preview_char_limit: Character limit for preview | |
| max_display_tokens: Maximum tokens to display | |
| Returns: | |
| Dict containing tokenization results | |
| """ | |
| # Load tokenizer | |
| tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name) | |
| if error: | |
| raise Exception(error) | |
| # Tokenize full text for stats | |
| all_tokens = tokenizer.tokenize(text) | |
| total_token_count = len(all_tokens) | |
| # For display: if it's a preview, only take first preview_char_limit chars | |
| preview_text = text[:preview_char_limit] if is_preview else text | |
| preview_tokens = tokenizer.tokenize(preview_text) | |
| display_tokens = preview_tokens[:max_display_tokens] | |
| # Calculate stats on full text | |
| stats = stats_service.get_token_stats(all_tokens, text) | |
| # Format tokens for display | |
| token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer) | |
| return { | |
| 'tokens': token_data, | |
| 'stats': stats, | |
| 'display_limit_reached': total_token_count > max_display_tokens and not is_preview, | |
| 'total_tokens': total_token_count, | |
| 'is_full_file': False, | |
| 'preview_only': is_preview, | |
| 'tokenizer_info': tokenizer_info | |
| } | |
| def cleanup_file(file_path: str): | |
| """Safely remove a file if it exists.""" | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| except OSError: | |
| pass # Ignore errors during cleanup | |
| # Global instance | |
| file_service = FileService() |