Spaces:
Running
Running
milwright
improve url grounding by including source urls in context and citation instructions
b3be50c
| """ | |
| Refactored Space Template for HuggingFace Gradio Chat Interface | |
| Implements Gradio 5.x best practices with clean tab structure | |
| """ | |
| SPACE_TEMPLATE = '''import gradio as gr | |
| import tempfile | |
| import os | |
| import requests | |
| import json | |
| import re | |
| from bs4 import BeautifulSoup | |
| from datetime import datetime | |
| import urllib.parse | |
| from pathlib import Path | |
| from typing import List, Dict, Optional, Any, Tuple | |
| # Configuration | |
| SPACE_NAME = {name} | |
| SPACE_DESCRIPTION = {description} | |
| # Default configuration values | |
| DEFAULT_CONFIG = {{ | |
| 'name': SPACE_NAME, | |
| 'description': SPACE_DESCRIPTION, | |
| 'system_prompt': {system_prompt}, | |
| 'temperature': {temperature}, | |
| 'max_tokens': {max_tokens}, | |
| 'model': {model}, | |
| 'api_key_var': {api_key_var}, | |
| 'theme': {theme}, | |
| 'grounding_urls': {grounding_urls}, | |
| 'enable_dynamic_urls': {enable_dynamic_urls}, | |
| 'enable_file_upload': {enable_file_upload}, | |
| 'examples': {examples}, | |
| 'language': {language}, | |
| 'locked': False | |
| }} | |
| # Available themes with proper instantiation | |
| AVAILABLE_THEMES = {{ | |
| "Default": gr.themes.Default(), | |
| "Soft": gr.themes.Soft(), | |
| "Glass": gr.themes.Glass(), | |
| "Monochrome": gr.themes.Monochrome(), | |
| "Base": gr.themes.Base() | |
| }} | |
| class ConfigurationManager: | |
| """Manage configuration with validation and persistence""" | |
| def __init__(self): | |
| self.config_path = "config.json" | |
| self.backup_dir = "config_backups" | |
| self._config = None | |
| def load(self) -> Dict[str, Any]: | |
| """Load configuration from file with fallback to defaults""" | |
| try: | |
| with open(self.config_path, 'r') as f: | |
| self._config = json.load(f) | |
| print("β Loaded configuration from config.json") | |
| return self._config | |
| except FileNotFoundError: | |
| print("βΉοΈ No config.json found, using default configuration") | |
| self._config = DEFAULT_CONFIG.copy() | |
| self.save(self._config) | |
| return self._config | |
| except Exception as e: | |
| print(f"β οΈ Error loading config.json: {{e}}, using defaults") | |
| self._config = DEFAULT_CONFIG.copy() | |
| return self._config | |
| def save(self, config: Dict[str, Any]) -> bool: | |
| """Save configuration with automatic backup""" | |
| try: | |
| # Create backup if config exists | |
| if os.path.exists(self.config_path): | |
| self._create_backup() | |
| # Save new configuration | |
| with open(self.config_path, 'w') as f: | |
| json.dump(config, f, indent=2) | |
| self._config = config | |
| return True | |
| except Exception as e: | |
| print(f"β Error saving configuration: {{e}}") | |
| return False | |
| def _create_backup(self): | |
| """Create timestamped backup""" | |
| try: | |
| os.makedirs(self.backup_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| backup_path = os.path.join(self.backup_dir, f"config_{{timestamp}}.json") | |
| with open(self.config_path, 'r') as source: | |
| config_data = json.load(source) | |
| with open(backup_path, 'w') as backup: | |
| json.dump(config_data, backup, indent=2) | |
| self._cleanup_old_backups() | |
| except Exception as e: | |
| print(f"β οΈ Error creating backup: {{e}}") | |
| def _cleanup_old_backups(self, keep=10): | |
| """Keep only the most recent backups""" | |
| try: | |
| backups = sorted([ | |
| f for f in os.listdir(self.backup_dir) | |
| if f.startswith('config_') and f.endswith('.json') | |
| ]) | |
| if len(backups) > keep: | |
| for old_backup in backups[:-keep]: | |
| os.remove(os.path.join(self.backup_dir, old_backup)) | |
| except Exception as e: | |
| print(f"β οΈ Error cleaning up backups: {{e}}") | |
| def get(self, key: str, default: Any = None) -> Any: | |
| """Get configuration value""" | |
| if self._config is None: | |
| self.load() | |
| return self._config.get(key, default) | |
| # Initialize configuration manager | |
| config_manager = ConfigurationManager() | |
| config = config_manager.load() | |
| # Load configuration values | |
| SPACE_NAME = config.get('name', DEFAULT_CONFIG['name']) | |
| SPACE_DESCRIPTION = config.get('description', DEFAULT_CONFIG['description']) | |
| SYSTEM_PROMPT = config.get('system_prompt', DEFAULT_CONFIG['system_prompt']) | |
| temperature = config.get('temperature', DEFAULT_CONFIG['temperature']) | |
| max_tokens = config.get('max_tokens', DEFAULT_CONFIG['max_tokens']) | |
| MODEL = config.get('model', DEFAULT_CONFIG['model']) | |
| THEME = config.get('theme', DEFAULT_CONFIG['theme']) | |
| GROUNDING_URLS = config.get('grounding_urls', DEFAULT_CONFIG['grounding_urls']) | |
| ENABLE_DYNAMIC_URLS = config.get('enable_dynamic_urls', DEFAULT_CONFIG['enable_dynamic_urls']) | |
| ENABLE_FILE_UPLOAD = config.get('enable_file_upload', DEFAULT_CONFIG.get('enable_file_upload', True)) | |
| LANGUAGE = config.get('language', DEFAULT_CONFIG.get('language', 'English')) | |
| # Environment variables | |
| ACCESS_CODE = os.environ.get("ACCESS_CODE") | |
| API_KEY_VAR = config.get('api_key_var', DEFAULT_CONFIG['api_key_var']) | |
| API_KEY = os.environ.get(API_KEY_VAR, "").strip() or None | |
| HF_TOKEN = os.environ.get('HF_TOKEN', '') | |
| SPACE_ID = os.environ.get('SPACE_ID', '') | |
| # Utility functions | |
| def validate_api_key() -> bool: | |
| """Validate API key configuration""" | |
| if not API_KEY: | |
| print(f"β οΈ API KEY CONFIGURATION ERROR:") | |
| print(f" Variable name: {{API_KEY_VAR}}") | |
| print(f" Status: Not set or empty") | |
| print(f" Action needed: Set '{{API_KEY_VAR}}' in HuggingFace Space secrets") | |
| return False | |
| elif not API_KEY.startswith('sk-or-'): | |
| print(f"β οΈ API KEY FORMAT WARNING:") | |
| print(f" Variable name: {{API_KEY_VAR}}") | |
| print(f" Note: OpenRouter keys should start with 'sk-or-'") | |
| return True | |
| else: | |
| print(f"β API Key configured successfully") | |
| return True | |
| def validate_url_domain(url: str) -> bool: | |
| """Validate URL domain""" | |
| try: | |
| from urllib.parse import urlparse | |
| parsed = urlparse(url) | |
| return bool(parsed.netloc and parsed.scheme in ['http', 'https']) | |
| except: | |
| return False | |
| def fetch_url_content(url: str, max_length: int = 3000) -> str: | |
| """Fetch and convert URL content to text""" | |
| try: | |
| if not validate_url_domain(url): | |
| return f"β Invalid URL format: {{url}}" | |
| headers = {{ | |
| 'User-Agent': 'Mozilla/5.0 (compatible; HuggingFace-Space/1.0)' | |
| }} | |
| response = requests.get(url, headers=headers, timeout=5) | |
| response.raise_for_status() | |
| content_type = response.headers.get('content-type', '').lower() | |
| if 'text/html' in content_type: | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.extract() | |
| # Get text content | |
| text = soup.get_text(separator=' ', strip=True) | |
| # Clean up whitespace | |
| text = ' '.join(text.split()) | |
| # Limit content length | |
| if len(text) > max_length: | |
| text = text[:max_length] + "... [truncated]" | |
| return f"π **Content from:** {{url}}\\n\\n{{text}}\\n" | |
| elif any(ct in content_type for ct in ['text/plain', 'application/json']): | |
| text = response.text | |
| if len(text) > max_length: | |
| text = text[:max_length] + "... [truncated]" | |
| return f"π **Content from:** {{url}}\\n\\n{{text}}\\n" | |
| else: | |
| return f"β οΈ Unsupported content type at {{url}}: {{content_type}}" | |
| except requests.exceptions.Timeout: | |
| return f"β±οΈ Timeout accessing {{url}}" | |
| except requests.exceptions.RequestException as e: | |
| return f"β Error accessing {{url}}: {{str(e)}}" | |
| except Exception as e: | |
| return f"β Unexpected error with {{url}}: {{str(e)}}" | |
| def extract_urls_from_text(text: str) -> List[str]: | |
| """Extract URLs from message text""" | |
| url_pattern = r'https?://[^\\s<>"{{}}|\\\\^`\\[\\]]+(?:\\.[^\\s<>"{{}}|\\\\^`\\[\\]])*' | |
| urls = re.findall(url_pattern, text) | |
| return [url.rstrip('.,;:)?!') for url in urls] | |
| def process_file_upload(file_path: str) -> str: | |
| """Process uploaded file with Gradio best practices""" | |
| if not file_path or not os.path.exists(file_path): | |
| return "β File not found" | |
| try: | |
| file_size = os.path.getsize(file_path) | |
| file_name = os.path.basename(file_path) | |
| _, ext = os.path.splitext(file_path.lower()) | |
| # Text file extensions | |
| text_extensions = {{ | |
| '.txt', '.md', '.markdown', '.rst', | |
| '.py', '.js', '.jsx', '.ts', '.tsx', '.json', '.yaml', '.yml', | |
| '.html', '.htm', '.xml', '.css', '.scss', | |
| '.java', '.c', '.cpp', '.h', '.cs', '.go', '.rs', | |
| '.sh', '.bash', '.log', '.csv', '.sql' | |
| }} | |
| max_chars = 5000 # Define max_chars limit for file reading | |
| if ext in text_extensions: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read(max_chars) | |
| if len(content) == max_chars: | |
| content += "\\n... [truncated]" | |
| return f"π **{{file_name}}** ({{file_size:,}} bytes)\\n```{{ext[1:]}}\\n{{content}}\\n```" | |
| # Special file types | |
| elif ext == '.pdf': | |
| return f"π **{{file_name}}** (PDF, {{file_size:,}} bytes)\\nβ οΈ PDF support requires PyPDF2" | |
| elif ext in {{'.jpg', '.jpeg', '.png', '.gif', '.webp'}}: | |
| return f"πΌοΈ **{{file_name}}** (Image, {{file_size:,}} bytes)" | |
| elif ext in {{'.xlsx', '.xls'}}: | |
| return f"π **{{file_name}}** (Spreadsheet, {{file_size:,}} bytes)" | |
| elif ext in {{'.zip', '.tar', '.gz', '.rar'}}: | |
| return f"ποΈ **{{file_name}}** (Archive, {{file_size:,}} bytes)" | |
| else: | |
| return f"π **{{file_name}}** ({{ext or 'no extension'}}, {{file_size:,}} bytes)" | |
| except Exception as e: | |
| return f"β Error processing file: {{str(e)}}" | |
| # URL content cache | |
| _url_content_cache = {{}} | |
| def get_grounding_context() -> str: | |
| """Get grounding context from configured URLs with caching""" | |
| urls = GROUNDING_URLS | |
| if isinstance(urls, str): | |
| try: | |
| urls = json.loads(urls) | |
| except: | |
| return "" | |
| if not urls: | |
| return "" | |
| context_parts = [] | |
| # Process primary sources (first 2 URLs with 8000 char limit) | |
| primary_urls = urls[:2] | |
| if primary_urls: | |
| context_parts.append("π **PRIMARY SOURCES:**\\n") | |
| for i, url in enumerate(primary_urls, 1): | |
| if url in _url_content_cache: | |
| content = _url_content_cache[url] | |
| else: | |
| content = fetch_url_content(url, max_length=8000) | |
| _url_content_cache[url] = content | |
| if not content.startswith("β") and not content.startswith("β±οΈ"): | |
| context_parts.append(f"\\n**Primary Source {{i}} - {{url}}:**\\n{{content}}") | |
| # Process secondary sources (URLs 3+ with 2500 char limit) | |
| secondary_urls = urls[2:] | |
| if secondary_urls: | |
| context_parts.append("\\n\\nπ **SECONDARY SOURCES:**\\n") | |
| for i, url in enumerate(secondary_urls, 1): | |
| if url in _url_content_cache: | |
| content = _url_content_cache[url] | |
| else: | |
| content = fetch_url_content(url, max_length=2500) | |
| _url_content_cache[url] = content | |
| if not content.startswith("β") and not content.startswith("β±οΈ"): | |
| context_parts.append(f"\\n**Secondary Source {{i}} - {{url}}:**\\n{{content}}") | |
| if len(context_parts) > 0: | |
| return "\\n".join(context_parts) | |
| return "" | |
| def export_conversation_to_markdown(history: List[Dict[str, str]]) -> str: | |
| """Export conversation history to markdown""" | |
| if not history: | |
| return "No conversation to export." | |
| markdown_content = f"""# Conversation Export | |
| Generated on: {{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}} | |
| Space: {{SPACE_NAME}} | |
| Model: {{MODEL}} | |
| --- | |
| """ | |
| message_count = 0 | |
| for message in history: | |
| if isinstance(message, dict): | |
| role = message.get('role', 'unknown') | |
| content = message.get('content', '') | |
| if role == 'user': | |
| message_count += 1 | |
| markdown_content += f"## User Message {{message_count}}\\n\\n{{content}}\\n\\n" | |
| elif role == 'assistant': | |
| markdown_content += f"## Assistant Response {{message_count}}\\n\\n{{content}}\\n\\n---\\n\\n" | |
| return markdown_content | |
| def generate_response(message: str, history: List[Dict[str, str]], files: Optional[List] = None) -> str: | |
| """Generate response using OpenRouter API with file support""" | |
| # API key validation | |
| if not API_KEY: | |
| return f"""π **API Key Required** | |
| Please configure your OpenRouter API key: | |
| 1. Go to Settings (βοΈ) in your HuggingFace Space | |
| 2. Click 'Variables and secrets' | |
| 3. Add secret: **{{API_KEY_VAR}}** | |
| 4. Value: Your OpenRouter API key (starts with `sk-or-`) | |
| Get your API key at: https://openrouter.ai/keys""" | |
| # Process files if provided | |
| file_context = "" | |
| file_notification = "" | |
| if files: | |
| file_contents = [] | |
| file_names = [] | |
| for file_info in files: | |
| if isinstance(file_info, dict): | |
| file_path = file_info.get('path', file_info.get('name', '')) | |
| else: | |
| file_path = str(file_info) | |
| if file_path and os.path.exists(file_path): | |
| try: | |
| content = process_file_upload(file_path) | |
| file_contents.append(content) | |
| file_names.append(os.path.basename(file_path)) | |
| print(f"π Processed file: {{os.path.basename(file_path)}}") | |
| except Exception as e: | |
| print(f"β Error processing file: {{e}}") | |
| if file_contents: | |
| file_context = "\\n\\n[UPLOADED FILES]\\n" + "\\n\\n".join(file_contents) + "\\n" | |
| file_notification = f"\\n\\n[Note: Uploaded files: {{', '.join(file_names)}}]" | |
| # Get grounding context | |
| grounding_context = get_grounding_context() | |
| # Check for dynamic URLs in message | |
| if ENABLE_DYNAMIC_URLS: | |
| urls_in_message = extract_urls_from_text(message) | |
| if urls_in_message: | |
| print(f"π Found {{len(urls_in_message)}} URLs in message") | |
| dynamic_context = "\\nπ **Dynamic Context:**\\n" | |
| for url in urls_in_message[:3]: # Limit to 3 URLs | |
| content = fetch_url_content(url) | |
| if not content.startswith("β"): | |
| dynamic_context += f"\\n{{content}}" | |
| grounding_context += dynamic_context | |
| # Build messages with grounding context and file context in system prompt | |
| system_content = SYSTEM_PROMPT | |
| # Add language instruction if not English | |
| if LANGUAGE != 'English': | |
| system_content += f"\\n\\nIMPORTANT: You must respond EXCLUSIVELY in {{LANGUAGE}}. All your responses should be written entirely in {{LANGUAGE}}, even when user input is in a different language, particularly English." | |
| if grounding_context: | |
| system_content += "\\n\\nIMPORTANT: When providing information from the reference sources below, please cite the specific URL(s) where the information can be found." | |
| system_content = f"{{system_content}}\\n\\n{{grounding_context}}" | |
| if file_context: | |
| system_content = f"{{system_content}}\\n\\n{{file_context}}" | |
| messages = [{{"role": "system", "content": system_content}}] | |
| # Add conversation history | |
| for msg in history: | |
| if isinstance(msg, dict) and 'role' in msg and 'content' in msg: | |
| messages.append({{ | |
| "role": msg['role'], | |
| "content": msg['content'] | |
| }}) | |
| # Add current message | |
| messages.append({{ | |
| "role": "user", | |
| "content": message | |
| }}) | |
| # Make API request | |
| try: | |
| # Make API request | |
| headers = {{ | |
| "Authorization": f"Bearer {{API_KEY}}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": f"https://huggingface.co/spaces/{{SPACE_ID}}" if SPACE_ID else "https://huggingface.co", | |
| "X-Title": SPACE_NAME | |
| }} | |
| data = {{ | |
| "model": MODEL, | |
| "messages": messages, | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| "stream": False | |
| }} | |
| response = requests.post( | |
| "https://openrouter.ai/api/v1/chat/completions", | |
| headers=headers, | |
| json=data, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| ai_response = result['choices'][0]['message']['content'] | |
| # Add file notification if files were uploaded | |
| if file_notification: | |
| ai_response += file_notification | |
| return ai_response | |
| else: | |
| error_data = response.json() | |
| error_message = error_data.get('error', {{}}).get('message', 'Unknown error') | |
| return f"β API Error ({{response.status_code}}): {{error_message}}" | |
| except requests.exceptions.Timeout: | |
| return "β° Request timeout (30s limit). Try a shorter message or different model." | |
| except requests.exceptions.ConnectionError: | |
| return "π Connection error. Check your internet connection and try again." | |
| except Exception as e: | |
| return f"β Error: {{str(e)}}" | |
| # Chat history for export | |
| chat_history_store = [] | |
| def verify_hf_token_access() -> Tuple[bool, str]: | |
| """Verify HuggingFace token and access""" | |
| if not HF_TOKEN: | |
| return False, "No HF_TOKEN found" | |
| if not SPACE_ID: | |
| return False, "No SPACE_ID found - running locally?" | |
| try: | |
| headers = {{"Authorization": f"Bearer {{HF_TOKEN}}"}} | |
| response = requests.get( | |
| f"https://huggingface.co/api/spaces/{{SPACE_ID}}", | |
| headers=headers, | |
| timeout=5 | |
| ) | |
| if response.status_code == 200: | |
| return True, f"HF Token valid for {{SPACE_ID}}" | |
| else: | |
| return False, f"HF Token invalid or no access to {{SPACE_ID}}" | |
| except Exception as e: | |
| return False, f"Error verifying HF token: {{str(e)}}" | |
| # Create main interface with clean tab structure | |
| def create_interface(): | |
| """Create the Gradio interface with clean tab structure""" | |
| # Get theme | |
| theme = AVAILABLE_THEMES.get(THEME, gr.themes.Default()) | |
| # Validate API key on startup | |
| API_KEY_VALID = validate_api_key() | |
| # Check HuggingFace access | |
| HF_ACCESS_VALID, HF_ACCESS_MESSAGE = verify_hf_token_access() | |
| # Access control check | |
| has_access = ACCESS_CODE is None # No access code required | |
| with gr.Blocks(title=SPACE_NAME, theme=theme) as demo: | |
| # State for access control | |
| access_granted = gr.State(has_access) | |
| # Header - always visible | |
| gr.Markdown(f"# {{SPACE_NAME}}") | |
| gr.Markdown(SPACE_DESCRIPTION) | |
| # Access control panel (visible when access not granted) | |
| with gr.Column(visible=(not has_access)) as access_panel: | |
| gr.Markdown("### π Access Required") | |
| gr.Markdown("Please enter the access code:") | |
| with gr.Row(): | |
| access_input = gr.Textbox( | |
| label="Access Code", | |
| placeholder="Enter access code...", | |
| type="password", | |
| scale=3 | |
| ) | |
| access_btn = gr.Button("Submit", variant="primary", scale=1) | |
| access_status = gr.Markdown() | |
| # Main interface (visible when access granted) | |
| with gr.Column(visible=has_access) as main_panel: | |
| with gr.Tabs() as tabs: | |
| # Chat Tab | |
| with gr.Tab("π¬ Chat"): | |
| # Get examples | |
| examples = config.get('examples', []) | |
| if isinstance(examples, str): | |
| try: | |
| examples = json.loads(examples) | |
| except: | |
| examples = [] | |
| # State to hold uploaded files | |
| uploaded_files = gr.State([]) | |
| # Create chat interface | |
| chatbot = gr.Chatbot(type="messages", height=400) | |
| msg = gr.Textbox(label="Message", placeholder="Type your message here...", lines=2) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Send", variant="primary") | |
| clear_btn = gr.Button("Clear") | |
| # Export functionality | |
| with gr.Row(): | |
| export_btn = gr.DownloadButton( | |
| "π₯ Export Conversation", | |
| variant="secondary", | |
| size="sm" | |
| ) | |
| # Export handler | |
| def prepare_export(): | |
| if not chat_history_store: | |
| return None | |
| content = export_conversation_to_markdown(chat_history_store) | |
| # Create filename | |
| space_name_safe = re.sub(r'[^a-zA-Z0-9]+', '_', SPACE_NAME).lower() | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| filename = f"{{space_name_safe}}_conversation_{{timestamp}}.md" | |
| # Save to temp file | |
| temp_path = Path(tempfile.gettempdir()) / filename | |
| temp_path.write_text(content, encoding='utf-8') | |
| return str(temp_path) | |
| export_btn.click( | |
| prepare_export, | |
| outputs=[export_btn] | |
| ) | |
| # Examples section | |
| if examples: | |
| gr.Examples(examples=examples, inputs=msg) | |
| # Chat functionality | |
| def respond(message, chat_history, files_state, is_granted): | |
| if not is_granted: | |
| return chat_history, "", is_granted | |
| if not message: | |
| return chat_history, "", is_granted | |
| # Format history for the generate_response function | |
| formatted_history = [] | |
| for h in chat_history: | |
| if isinstance(h, dict): | |
| formatted_history.append(h) | |
| # Get response | |
| response = generate_response(message, formatted_history, files_state) | |
| # Update chat history | |
| chat_history = chat_history + [ | |
| {{"role": "user", "content": message}}, | |
| {{"role": "assistant", "content": response}} | |
| ] | |
| # Update stored history for export | |
| global chat_history_store | |
| chat_history_store = chat_history | |
| return chat_history, "", is_granted | |
| # Wire up the interface | |
| msg.submit(respond, [msg, chatbot, uploaded_files, access_granted], [chatbot, msg, access_granted]) | |
| submit_btn.click(respond, [msg, chatbot, uploaded_files, access_granted], [chatbot, msg, access_granted]) | |
| clear_btn.click(lambda: ([], ""), outputs=[chatbot, msg]) | |
| # File upload accordion | |
| if ENABLE_FILE_UPLOAD: | |
| with gr.Accordion("π Upload Files", open=False): | |
| file_upload = gr.File( | |
| label="Upload Files", | |
| file_types=None, | |
| file_count="multiple", | |
| visible=True, | |
| interactive=True | |
| ) | |
| clear_files_btn = gr.Button("Clear Files", size="sm", variant="secondary") | |
| uploaded_files_display = gr.Markdown("", visible=False) | |
| def handle_file_upload(files): | |
| if not files: | |
| return [], "", gr.update(visible=False) | |
| file_names = [] | |
| for file_info in files: | |
| if isinstance(file_info, dict): | |
| file_path = file_info.get('path', file_info.get('name', '')) | |
| else: | |
| file_path = str(file_info) | |
| if file_path and os.path.exists(file_path): | |
| file_names.append(os.path.basename(file_path)) | |
| if file_names: | |
| display_text = f"π **Uploaded files:** {{', '.join(file_names)}}" | |
| return files, display_text, gr.update(visible=True) | |
| return [], "", gr.update(visible=False) | |
| def clear_files(): | |
| return None, [], "", gr.update(visible=False) | |
| file_upload.change( | |
| handle_file_upload, | |
| inputs=[file_upload], | |
| outputs=[uploaded_files, uploaded_files_display, uploaded_files_display] | |
| ) | |
| clear_files_btn.click( | |
| clear_files, | |
| outputs=[file_upload, uploaded_files, uploaded_files_display, uploaded_files_display] | |
| ) | |
| # Configuration accordion | |
| with gr.Accordion("βΉοΈ Configuration", open=False): | |
| gr.JSON( | |
| value=config, | |
| label="config.json", | |
| show_label=True | |
| ) | |
| # Configuration Tab | |
| with gr.Tab("βοΈ Configuration"): | |
| gr.Markdown("## Configuration Management") | |
| # State for config tab authentication | |
| config_authenticated = gr.State(False) | |
| # Authentication panel | |
| with gr.Column(visible=True) as config_auth_panel: | |
| gr.Markdown("### π Authentication Required") | |
| gr.Markdown("Enter your HF_TOKEN to access configuration settings:") | |
| with gr.Row(): | |
| config_password = gr.Textbox( | |
| label="HF Token", | |
| placeholder="Enter your HF_TOKEN...", | |
| type="password", | |
| scale=3 | |
| ) | |
| config_auth_btn = gr.Button("Authenticate", variant="primary", scale=1) | |
| config_auth_status = gr.Markdown() | |
| # Configuration panel (hidden until authenticated) | |
| with gr.Column(visible=False) as config_panel: | |
| # Show authentication status | |
| if HF_ACCESS_VALID: | |
| gr.Markdown(f"β {{HF_ACCESS_MESSAGE}}") | |
| gr.Markdown("Configuration changes will be saved to the HuggingFace repository.") | |
| else: | |
| gr.Markdown(f"βΉοΈ {{HF_ACCESS_MESSAGE}}") | |
| gr.Markdown("Set HF_TOKEN in Space secrets to enable auto-save.") | |
| # Configuration editor | |
| gr.Markdown("### βοΈ Configuration Editor") | |
| # Show lock status if locked | |
| if config.get('locked', False): | |
| gr.Markdown("β οΈ **Note:** Configuration is locked.") | |
| # Basic settings | |
| with gr.Column(): | |
| edit_name = gr.Textbox( | |
| label="Space Name", | |
| value=config.get('name', ''), | |
| max_lines=1 | |
| ) | |
| edit_model = gr.Dropdown( | |
| label="Model", | |
| choices=[ | |
| # Google models | |
| "google/gemini-2.0-flash-001", | |
| "google/gemma-3-27b-it", | |
| # Anthropic models | |
| "anthropic/claude-3.5-sonnet", | |
| "anthropic/claude-3.5-haiku", | |
| # OpenAI models | |
| "openai/gpt-4o-mini-search-preview", | |
| "openai/gpt-4.1-nano", | |
| # MistralAI models | |
| "mistralai/mistral-medium-3", | |
| # DeepSeek models | |
| "deepseek/deepseek-r1-distill-qwen-32b", | |
| # NVIDIA models | |
| "nvidia/llama-3.1-nemotron-70b-instruct", | |
| # Qwen models | |
| "qwen/qwen3-30b-a3b-instruct-2507" | |
| ], | |
| value=config.get('model', ''), | |
| allow_custom_value=True | |
| ) | |
| edit_language = gr.Dropdown( | |
| label="Language", | |
| choices=[ | |
| "Arabic", | |
| "Bengali", | |
| "English", | |
| "French", | |
| "German", | |
| "Hindi", | |
| "Italian", | |
| "Japanese", | |
| "Korean", | |
| "Mandarin", | |
| "Portuguese", | |
| "Russian", | |
| "Spanish", | |
| "Turkish" | |
| ], | |
| value=config.get('language', 'English') | |
| ) | |
| edit_description = gr.Textbox( | |
| label="Description", | |
| value=config.get('description', ''), | |
| max_lines=2 | |
| ) | |
| edit_system_prompt = gr.Textbox( | |
| label="System Prompt", | |
| value=config.get('system_prompt', ''), | |
| lines=5 | |
| ) | |
| with gr.Row(): | |
| edit_temperature = gr.Slider( | |
| label="Temperature", | |
| minimum=0, | |
| maximum=2, | |
| value=config.get('temperature', 0.7), | |
| step=0.1 | |
| ) | |
| edit_max_tokens = gr.Slider( | |
| label="Max Tokens", | |
| minimum=50, | |
| maximum=4096, | |
| value=config.get('max_tokens', 750), | |
| step=50 | |
| ) | |
| edit_examples = gr.Textbox( | |
| label="Example Prompts (one per line)", | |
| value='\\n'.join(config.get('examples', [])), | |
| lines=3 | |
| ) | |
| # URL Grounding | |
| gr.Markdown("### URL Grounding") | |
| edit_grounding_urls = gr.Textbox( | |
| label="Grounding URLs (one per line)", | |
| placeholder="https://example.com/docs\\nhttps://example.com/api", | |
| value='\\n'.join(config.get('grounding_urls', [])), | |
| lines=5, | |
| info="First 2 URLs: Primary sources (8000 chars). URLs 3+: Secondary sources (2500 chars)." | |
| ) | |
| with gr.Row(): | |
| edit_enable_dynamic_urls = gr.Checkbox( | |
| label="Enable Dynamic URL Extraction", | |
| value=config.get('enable_dynamic_urls', True), | |
| info="Extract and fetch URLs from user messages" | |
| ) | |
| edit_enable_file_upload = gr.Checkbox( | |
| label="Enable File Upload", | |
| value=config.get('enable_file_upload', True), | |
| info="Allow users to upload files for context" | |
| ) | |
| # Configuration actions | |
| with gr.Row(): | |
| save_btn = gr.Button("πΎ Save Configuration", variant="primary") | |
| reset_btn = gr.Button("β©οΈ Reset to Defaults", variant="secondary") | |
| config_status = gr.Markdown() | |
| def save_configuration(name, description, system_prompt, model, language, temp, tokens, examples, grounding_urls, enable_dynamic_urls, enable_file_upload): | |
| """Save updated configuration""" | |
| try: | |
| updated_config = config.copy() | |
| updated_config.update({{ | |
| 'name': name, | |
| 'description': description, | |
| 'system_prompt': system_prompt, | |
| 'model': model, | |
| 'language': language, | |
| 'temperature': temp, | |
| 'max_tokens': int(tokens), | |
| 'examples': [ex.strip() for ex in examples.split('\\n') if ex.strip()], | |
| 'grounding_urls': [url.strip() for url in grounding_urls.split('\\n') if url.strip()], | |
| 'enable_dynamic_urls': enable_dynamic_urls, | |
| 'enable_file_upload': enable_file_upload, | |
| 'locked': config.get('locked', False) | |
| }}) | |
| if config_manager.save(updated_config): | |
| # Auto-commit if HF token is available | |
| if HF_TOKEN and SPACE_ID: | |
| try: | |
| from huggingface_hub import HfApi, CommitOperationAdd | |
| api = HfApi(token=HF_TOKEN) | |
| operations = [ | |
| CommitOperationAdd( | |
| path_or_fileobj=config_manager.config_path, | |
| path_in_repo="config.json" | |
| ) | |
| ] | |
| api.create_commit( | |
| repo_id=SPACE_ID, | |
| operations=operations, | |
| commit_message="Update configuration via web UI", | |
| commit_description=f"Configuration update at {{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}}", | |
| repo_type="space", | |
| token=HF_TOKEN | |
| ) | |
| return "β Configuration saved and committed to repository!" | |
| except Exception as e: | |
| return f"β Configuration saved locally. β οΈ Auto-commit failed: {{str(e)}}" | |
| else: | |
| return "β Configuration saved locally (no HF token for auto-commit)" | |
| else: | |
| return "β Failed to save configuration" | |
| except Exception as e: | |
| return f"β Error: {{str(e)}}" | |
| save_btn.click( | |
| save_configuration, | |
| inputs=[edit_name, edit_description, edit_system_prompt, edit_model, edit_language, | |
| edit_temperature, edit_max_tokens, edit_examples, edit_grounding_urls, | |
| edit_enable_dynamic_urls, edit_enable_file_upload], | |
| outputs=[config_status] | |
| ) | |
| def reset_configuration(): | |
| """Reset to default configuration""" | |
| try: | |
| if config_manager.save(DEFAULT_CONFIG): | |
| return ( | |
| DEFAULT_CONFIG['name'], | |
| DEFAULT_CONFIG['description'], | |
| DEFAULT_CONFIG['system_prompt'], | |
| DEFAULT_CONFIG['model'], | |
| DEFAULT_CONFIG.get('language', 'English'), | |
| DEFAULT_CONFIG['temperature'], | |
| DEFAULT_CONFIG['max_tokens'], | |
| '\\n'.join(DEFAULT_CONFIG['examples']), | |
| '\\n'.join(DEFAULT_CONFIG['grounding_urls']), | |
| DEFAULT_CONFIG['enable_dynamic_urls'], | |
| DEFAULT_CONFIG['enable_file_upload'], | |
| "β Reset to default configuration" | |
| ) | |
| else: | |
| return (*[gr.update() for _ in range(11)], "β Failed to reset") | |
| except Exception as e: | |
| return (*[gr.update() for _ in range(11)], f"β Error: {{str(e)}}") | |
| reset_btn.click( | |
| reset_configuration, | |
| outputs=[edit_name, edit_description, edit_system_prompt, edit_model, edit_language, | |
| edit_temperature, edit_max_tokens, edit_examples, edit_grounding_urls, | |
| edit_enable_dynamic_urls, edit_enable_file_upload, config_status] | |
| ) | |
| # Configuration tab authentication handler | |
| def handle_config_auth(password): | |
| """Handle configuration tab authentication""" | |
| if not HF_TOKEN: | |
| return ( | |
| gr.update(visible=True), # Keep auth panel visible | |
| gr.update(visible=False), # Keep config panel hidden | |
| gr.update(value="β No HF_TOKEN is set in Space secrets. Configuration cannot be enabled."), | |
| False | |
| ) | |
| if password == HF_TOKEN: | |
| return ( | |
| gr.update(visible=False), # Hide auth panel | |
| gr.update(visible=True), # Show config panel | |
| gr.update(value="β Authentication successful!"), | |
| True | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=True), # Keep auth panel visible | |
| gr.update(visible=False), # Keep config panel hidden | |
| gr.update(value="β Invalid HF_TOKEN. Please try again."), | |
| False | |
| ) | |
| config_auth_btn.click( | |
| handle_config_auth, | |
| inputs=[config_password], | |
| outputs=[config_auth_panel, config_panel, config_auth_status, config_authenticated] | |
| ) | |
| config_password.submit( | |
| handle_config_auth, | |
| inputs=[config_password], | |
| outputs=[config_auth_panel, config_panel, config_auth_status, config_authenticated] | |
| ) | |
| # Access control handler | |
| if ACCESS_CODE: | |
| def handle_access(code, current_state): | |
| if code == ACCESS_CODE: | |
| return ( | |
| gr.update(visible=False), # Hide access panel | |
| gr.update(visible=True), # Show main panel | |
| gr.update(value="β Access granted!"), # Status message | |
| True # Update state | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=True), # Keep access panel visible | |
| gr.update(visible=False), # Keep main panel hidden | |
| gr.update(value="β Invalid access code. Please try again."), # Status message | |
| False # State remains false | |
| ) | |
| access_btn.click( | |
| handle_access, | |
| inputs=[access_input, access_granted], | |
| outputs=[access_panel, main_panel, access_status, access_granted] | |
| ) | |
| access_input.submit( | |
| handle_access, | |
| inputs=[access_input, access_granted], | |
| outputs=[access_panel, main_panel, access_status, access_granted] | |
| ) | |
| return demo | |
| # Create and launch the interface | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch() | |
| ''' | |
| def get_template(): | |
| """Return the space template string""" | |
| return SPACE_TEMPLATE | |
| def validate_template(): | |
| """Validate that the template has all required placeholders""" | |
| required_placeholders = [ | |
| 'name', 'description', 'system_prompt', 'temperature', 'max_tokens', | |
| 'model', 'api_key_var', 'theme', 'grounding_urls', 'enable_dynamic_urls', | |
| 'enable_file_upload', 'examples', 'language' | |
| ] | |
| missing = [] | |
| for placeholder in required_placeholders: | |
| if f'{{{placeholder}}}' not in SPACE_TEMPLATE: | |
| missing.append(placeholder) | |
| if missing: | |
| raise ValueError(f"Template missing required placeholders: {missing}") | |
| return True |