import gradio as gr import time import os import re from typing import List, Tuple, Optional from datetime import datetime import shutil import asyncio from io import BytesIO import numpy as np import websockets from dotenv import load_dotenv from PIL import Image # Load environment variables first load_dotenv() # Import your existing modules with error handling try: from pipeQuery import process_query, clean_pipeline_result except ImportError as e: print(f"Warning: pipeQuery import failed: {e}") def process_query(query): return "Pipeline not available" def clean_pipeline_result(result): return str(result) try: from audio_utils import generate_tts_response, get_transcription_or_text, GeminiHandler except ImportError as e: print(f"Warning: audio_utils import failed: {e}") def generate_tts_response(text, voice): return None, "TTS not available" def get_transcription_or_text(text, audio): return text or "No input", "Text used" class GeminiHandler: def __init__(self): pass def copy(self): return GeminiHandler() def stop(self): pass try: from rag_steps import ingest_file except ImportError as e: print(f"Warning: rag_steps import failed: {e}") def ingest_file(file_path): return f"File ingestion not available for {file_path}" from logger.custom_logger import CustomLoggerTracker try: from docs_utils import old_doc_ingestion, old_doc_qa, user_doc_ingest, user_doc_qa, rag_dom_ingest, rag_dom_qa except ImportError as e: print(f"Warning: docs_utils import failed: {e}") def old_doc_ingestion(file_path): return f"Old doc ingestion not available for {file_path}" def old_doc_qa(query): return "Old doc QA not available" def user_doc_ingest(file_path): return f"User doc ingestion not available for {file_path}" def user_doc_qa(query): return "User doc QA not available" def rag_dom_ingest(file_path): return f"RAG domain ingestion not available for {file_path}" def rag_dom_qa(query): return "RAG domain QA not available" try: from fastrtc import ( WebRTC, get_cloudflare_turn_credentials_async, wait_for_item, ) except ImportError as e: print(f"Warning: fastrtc import failed: {e}") # Create fallback WebRTC class class WebRTC: def __init__(self, **kwargs): pass def stream(self, *args, **kwargs): pass def get_cloudflare_turn_credentials_async(): return {} def wait_for_item(*args): pass try: from google import genai except ImportError as e: print(f"Warning: google.genai import failed: {e}") genai = None from gradio.utils import get_space # Initialize logger custom_log = CustomLoggerTracker() logger = custom_log.get_logger("gradio_demo") # Global state management class DemoState: def __init__(self): self.query_count = 0 self.session_stats = {"total_queries": 0, "avg_response_time": 0} self.recent_queries = [] self.last_uploaded_old_doc = None self.document_stats = { "old_documents": 0, "user_specific": 0, "domain_documents": 0, "new_documents": 0 } def update_stats(self, query: str, response_time: float): self.query_count += 1 self.session_stats["total_queries"] += 1 # Update average response time current_avg = self.session_stats["avg_response_time"] new_avg = ((current_avg * (self.query_count - 1)) + response_time) / self.query_count self.session_stats["avg_response_time"] = new_avg # Keep recent queries (last 5) self.recent_queries.insert(0, {"query": query[:50] + "...", "time": datetime.now().strftime("%H:%M:%S")}) if len(self.recent_queries) > 5: self.recent_queries.pop() demo_state = DemoState() # Enhanced CSS with dark theme and animations ENHANCED_CSS = """ /* Modern Dark Theme with Animations */ .gradio-container { background: linear-gradient(135deg, #0a0a0a 0%, #1a1a2e 25%, #16213e 75%, #0f1419 100%) !important; color: #e0e6ed !important; min-height: 100vh; font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; } /* Animated gradient background */ .gradio-container::before { content: ''; position: fixed; top: 0; left: 0; width: 100%; height: 100%; background: linear-gradient(45deg, #667eea, #764ba2, #4facfe, #00f2fe); background-size: 400% 400%; animation: gradientShift 15s ease infinite; opacity: 0.03; z-index: -1; } @keyframes gradientShift { 0% { background-position: 0% 50%; } 50% { background-position: 100% 50%; } 100% { background-position: 0% 50%; } } /* Header styling with glow effect */ .main-header { text-align: center; padding: 2rem 0; background: rgba(255, 255, 255, 0.02); border-radius: 20px; margin-bottom: 2rem; border: 1px solid rgba(255, 255, 255, 0.1); position: relative; overflow: hidden; } .main-header::before { content: ''; position: absolute; top: 0; left: -100%; width: 100%; height: 100%; background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.1), transparent); animation: shimmer 3s infinite; } @keyframes shimmer { 0% { left: -100%; } 100% { left: 100%; } } .main-title { font-size: 3rem; font-weight: 700; background: linear-gradient(135deg, #4facfe 0%, #00f2fe 50%, #667eea 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; background-clip: text; margin-bottom: 0.5rem; text-shadow: 0 0 30px rgba(79, 172, 254, 0.3); } .subtitle { font-size: 1.2rem; color: #a0aec0; margin-bottom: 1rem; } .status-indicator { display: inline-flex; align-items: center; gap: 0.5rem; padding: 0.5rem 1rem; background: rgba(79, 172, 254, 0.1); border: 1px solid rgba(79, 172, 254, 0.3); border-radius: 25px; font-size: 0.9rem; } .status-dot { width: 8px; height: 8px; background: #00f2fe; border-radius: 50%; animation: pulse 2s infinite; } @keyframes pulse { 0%, 100% { opacity: 1; transform: scale(1); } 50% { opacity: 0.5; transform: scale(1.2); } } /* Enhanced chatbot styling */ .chatbot-container { background: rgba(0, 0, 0, 0.3) !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; border-radius: 20px !important; backdrop-filter: blur(20px) !important; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3) !important; } .message { animation: fadeInUp 0.5s ease-out; margin: 0.5rem 0; padding: 1rem; border-radius: 15px; position: relative; overflow: hidden; } .message.user { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; color: white !important; margin-left: 2rem; box-shadow: 0 4px 15px rgba(102, 126, 234, 0.3); } .message.assistant { background: linear-gradient(135deg, rgba(79, 172, 254, 0.1) 0%, rgba(0, 242, 254, 0.1) 100%) !important; border: 1px solid rgba(79, 172, 254, 0.3); color: #e0e6ed !important; margin-right: 2rem; } @keyframes fadeInUp { from { opacity: 0; transform: translateY(20px); } to { opacity: 1; transform: translateY(0); } } /* Enhanced input styling */ .input-container { background: rgba(255, 255, 255, 0.05) !important; border: 2px solid rgba(255, 255, 255, 0.1) !important; border-radius: 15px !important; backdrop-filter: blur(10px) !important; transition: all 0.3s ease; } .input-container:focus-within { border-color: rgba(79, 172, 254, 0.5) !important; box-shadow: 0 0 20px rgba(79, 172, 254, 0.2) !important; } /* Enhanced buttons with hover effects */ .enhanced-button { border: none !important; border-radius: 12px !important; font-weight: 600 !important; text-transform: uppercase !important; letter-spacing: 0.5px !important; padding: 12px 24px !important; transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; position: relative !important; overflow: hidden !important; box-shadow: 0 4px 15px rgba(0, 0, 0, 0.2) !important; } .enhanced-button::before { content: ''; position: absolute; top: 0; left: -100%; width: 100%; height: 100%; background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.2), transparent); transition: left 0.6s; } .enhanced-button:hover::before { left: 100%; } .primary-button { background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%) !important; color: white !important; } .primary-button:hover { transform: translateY(-2px) !important; box-shadow: 0 8px 25px rgba(79, 172, 254, 0.4) !important; } .secondary-button { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; color: white !important; } .secondary-button:hover { transform: translateY(-2px) !important; box-shadow: 0 8px 25px rgba(102, 126, 234, 0.4) !important; } .danger-button { background: linear-gradient(135deg, #ff6b6b 0%, #ee5a52 100%) !important; color: white !important; } .danger-button:hover { transform: translateY(-2px) !important; box-shadow: 0 8px 25px rgba(255, 107, 107, 0.4) !important; } /* Stats panel */ .stats-panel { background: rgba(255, 255, 255, 0.03); border: 1px solid rgba(255, 255, 255, 0.1); border-radius: 15px; padding: 1.5rem; margin: 1rem 0; } .stat-item { display: flex; justify-content: space-between; align-items: center; padding: 0.5rem 0; border-bottom: 1px solid rgba(255, 255, 255, 0.1); } .stat-value { font-weight: 600; color: #4facfe; } /* Page content */ .page-content { padding: 1rem; background: rgba(255, 255, 255, 0.02); border-radius: 15px; border: 1px solid rgba(255, 255, 255, 0.1); min-height: 60vh; } /* Tab styling */ .tab-nav { background: rgba(255, 255, 255, 0.05) !important; border-radius: 10px !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; } /* WebRTC styling */ .webrtc-container { background: rgba(0, 0, 0, 0.3) !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; border-radius: 20px !important; backdrop-filter: blur(20px) !important; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3) !important; padding: 1rem; } /* Responsive design */ @media (max-width: 768px) { .main-title { font-size: 2rem; } .gradio-container { padding: 10px; } .message.user { margin-left: 1rem; } .message.assistant { margin-right: 1rem; } } /* Scrollbar styling */ ::-webkit-scrollbar { width: 8px; } ::-webkit-scrollbar-track { background: rgba(255, 255, 255, 0.05); border-radius: 4px; } ::-webkit-scrollbar-thumb { background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%); border-radius: 4px; } ::-webkit-scrollbar-thumb:hover { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); } """ # 2. Fix the document type choices to match the functions DOCUMENT_TYPES = { "old_documents": { "label": "šŸ“š Old Documents", "description": "Historical documents, legacy materials (uses old_doc_ingestion)", "color": "#8B4513", }, "user_specific": { "label": "šŸ‘¤ User-Specific Documents", "description": "Personal files, user manuals (uses user_doc_ingest)", "color": "#4facfe", }, "domain_documents": { "label": "šŸ„ Domain Documents", "description": "Medical papers, research articles (uses rag_dom_ingest)", "color": "#00f2fe", }, "new_documents": { "label": "šŸ†• New Documents", "description": "Recent uploads using standard RAG pipeline", "color": "#667eea", } } def format_response_time(seconds: float) -> str: """Format response time for display""" if seconds < 1: return f"{int(seconds * 1000)}ms" elif seconds < 60: return f"{seconds:.1f}s" else: minutes = int(seconds // 60) remaining_seconds = seconds % 60 return f"{minutes}m {remaining_seconds:.1f}s" def create_stats_display(): """Create dynamic statistics display""" total_queries = demo_state.session_stats["total_queries"] avg_time = demo_state.session_stats["avg_response_time"] stats_html = f"""

šŸ“Š Session Statistics

Total Queries: {total_queries}
Average Response Time: {format_response_time(avg_time)}
Session Started: {datetime.now().strftime('%H:%M')}
""" # Document statistics doc_stats = demo_state.document_stats if any(doc_stats.values()): stats_html += """

šŸ“ Document Statistics

""" for doc_type, count in doc_stats.items(): if count > 0: type_info = DOCUMENT_TYPES.get(doc_type, {}) color = type_info.get("color", "#4facfe") label = type_info.get("label", doc_type.replace("_", " ").title()) stats_html += f"""
{label}: {count}
""" stats_html += "
" if demo_state.recent_queries: stats_html += """

šŸ•’ Recent Queries

""" for query_info in demo_state.recent_queries: stats_html += f"""
{query_info['query']} {query_info['time']}
""" stats_html += "
" return stats_html # Document processing functions def process_old_document(file_path: str, query: str = None) -> str: """Process old documents using existing old_doc_ingestion""" try: # Update stats demo_state.document_stats["old_documents"] += 1 demo_state.last_uploaded_old_doc = file_path # Use existing old document processing status = old_doc_ingestion(file_path) if query: answer = old_doc_qa(query) return f"šŸ“š Old Document processed: {status}\n\n{answer}" return f"šŸ“š Old Document ingested successfully: {status}" except Exception as e: logger.error(f"Error processing old document: {e}") return f"āŒ Error processing old document: {str(e)}" def process_user_document(file_path: str, query: str = None) -> str: """Process user-specific documents""" try: demo_state.document_stats["user_specific"] += 1 result = ingest_file(file_path) if query: response = process_query(query) cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response)) return f"šŸ‘¤ User Document processed: {result}\n\n{cleaned_response}" return f"šŸ‘¤ User-specific document ingested successfully: {result}" except Exception as e: logger.error(f"Error processing user document: {e}") return f"āŒ Error processing user document: {str(e)}" # 5. Create fixed voice chat functions def start_voice_chat(): """Start voice chat with proper initialization""" try: # Initialize or reset the handler global voice_handler voice_handler = GeminiHandler() return "šŸŽ¤ Voice chat started... Speak now!" except Exception as e: logger.error(f"Error starting voice chat: {e}") return f"āŒ Failed to start voice chat: {str(e)}" def stop_voice_chat(): """Stop voice chat properly""" try: global voice_handler if 'voice_handler' in globals() and voice_handler: voice_handler.stop() return "ā¹ļø Voice chat stopped." except Exception as e: logger.error(f"Error stopping voice chat: {e}") return f"āŒ Error stopping voice chat: {str(e)}" def process_domain_document(file_path: str, query: str = None) -> str: """Process domain-specific documents""" try: demo_state.document_stats["domain_documents"] += 1 result = ingest_file(file_path) if query: response = process_query(query) cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response)) return f"šŸ„ Domain Document processed: {result}\n\n{cleaned_response}" return f"šŸ„ Domain document ingested successfully: {result}" except Exception as e: logger.error(f"Error processing domain document: {e}") return f"āŒ Error processing domain document: {str(e)}" def process_new_document(file_path: str, query: str = None) -> str: """Process new documents""" try: demo_state.document_stats["new_documents"] += 1 result = ingest_file(file_path) if query: response = process_query(query) cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response)) return f"šŸ†• New Document processed: {result}\n\n{cleaned_response}" return f"šŸ†• New document ingested successfully: {result}" except Exception as e: logger.error(f"Error processing new document: {e}") return f"āŒ Error processing new document: {str(e)}" # Main processing functions def process_with_stats(user_input: str, audio_input, chat_history: List) -> Tuple[List, str, str, str, Optional[tuple]]: """Process query with timing and statistics""" start_time = time.time() try: # Get input text query = user_input.strip() if user_input else "" if audio_input and not query: query, status = get_transcription_or_text("", audio_input) if query.startswith("[ERROR]"): return chat_history, "", f"āŒ {status}", create_stats_display(), None if not query: return chat_history, "", "āš ļø Please provide text or audio input.", create_stats_display(), None logger.info(f"Processing query: {query[:50]}...") # Process the query response = process_query(query) response_time = time.time() - start_time # Update statistics demo_state.update_stats(query, response_time) # Clean response if isinstance(response, tuple): cleaned_response = clean_pipeline_result(response[0] if response[0] else response[1]) else: cleaned_response = clean_pipeline_result(str(response)) # Update chat history new_history = chat_history.copy() new_history.append({"role": "user", "content": query}) new_history.append({"role": "assistant", "content": cleaned_response}) # Create status message status_msg = f"āœ… Response generated in {format_response_time(response_time)}" # Update stats display stats_display = create_stats_display() logger.info(f"Query processed successfully in {response_time:.2f}s") return new_history, "", status_msg, stats_display, None except Exception as e: logger.error(f"Error processing query: {e}") error_msg = f"āŒ Error: {str(e)[:100]}..." return chat_history, "", error_msg, create_stats_display(), None def process_with_audio(user_input: str, audio_input, voice_dropdown: str, chat_history: List) -> Tuple[List, Optional[tuple], str, str, str, Optional[tuple]]: """Fixed audio processing function""" start_time = time.time() try: # Get input text with better audio handling query = user_input.strip() if user_input else "" if audio_input and not query: try: # Check if audio_utils functions exist and work query, status = get_transcription_or_text("", audio_input) if not query or query.startswith("[ERROR]"): return chat_history, None, "", f"āŒ Audio transcription failed: {status}", create_stats_display(), None except Exception as audio_error: logger.error(f"Audio transcription error: {audio_error}") return chat_history, None, "", f"āŒ Audio processing error: {str(audio_error)}", create_stats_display(), None if not query: return chat_history, None, "", "āš ļø Please provide text or audio input.", create_stats_display(), None logger.info(f"Processing query with audio: {query[:50]}...") # Process the query try: response = process_query(query) except Exception as query_error: logger.error(f"Query processing error: {query_error}") return chat_history, None, "", f"āŒ Query processing failed: {str(query_error)}", create_stats_display(), None # Clean response if isinstance(response, tuple): cleaned_response = clean_pipeline_result(response[0] if response[0] else response[1]) else: cleaned_response = clean_pipeline_result(str(response)) # Update chat history new_history = chat_history.copy() new_history.append({"role": "user", "content": query}) new_history.append({"role": "assistant", "content": cleaned_response}) # Generate audio with better error handling audio_response = None tts_status = "Audio generation skipped" try: # Check if TTS functions are available if 'generate_tts_response' in globals(): audio_data, tts_status = generate_tts_response(cleaned_response, voice_dropdown) audio_response = audio_data if audio_data else None else: tts_status = "TTS function not available" except Exception as audio_error: logger.error(f"Audio generation error: {audio_error}") audio_response = None tts_status = f"Audio generation failed: {str(audio_error)[:50]}..." response_time = time.time() - start_time demo_state.update_stats(query, response_time) status_msg = f"āœ… Response generated in {format_response_time(response_time)}" if audio_response: status_msg += " | šŸŽµ Audio ready" else: status_msg += f" | āš ļø {tts_status}" stats_display = create_stats_display() return new_history, audio_response, "", status_msg, stats_display, None except Exception as e: logger.error(f"Error processing audio query: {e}") error_msg = f"āŒ Error: {str(e)[:100]}..." return chat_history, None, "", error_msg, create_stats_display(), None # Global handler instance # 8. Update event handlers with fixed functions def setup_fixed_event_handlers(components): """Setup event handlers with the fixed functions""" # Fixed upload handler that includes query processing components['upload_btn'].click( fn=lambda file, doc_type, query: handle_document_upload(file, doc_type, query), inputs=[components['doc_file'], components['doc_type'], components['doc_query']], outputs=[components['upload_status']] ) # Fixed audio processing components['both_btn'].click( fn=process_with_audio, inputs=[components['user_input'], components['audio_input'], components['voice_dropdown'], components['chat_history']], outputs=[components['chatbot'], components['audio_output'], components['user_input'], components['status_output'], components['stats_display'], components['audio_input']]) # Fixed voice chat handlers components['start_voice_btn'].click( fn=start_voice_chat, outputs=[components['voice_status']]) components['stop_voice_btn'].click( fn=stop_voice_chat, outputs=[components['voice_status']]) global voice_handler voice_handler = GeminiHandler() components['webrtc_audio'].stream( voice_handler, inputs=[components['webrtc_audio']], outputs=[components['webrtc_audio']], time_limit=180 if get_space() else None, concurrency_limit=2 if get_space() else None, ) # 8. Audio fallback function if TTS is not working def safe_audio_processing(text_response: str, voice: str) -> Optional[tuple]: """Safe audio processing with fallback""" try: # Check if audio_utils is available if 'generate_tts_response' in globals(): return generate_tts_response(text_response, voice) else: logger.warning("TTS function not available, skipping audio generation") return None except Exception as e: logger.error(f"Audio processing failed: {e}") return None # 9. Add proper imports check def check_required_imports(): """Check if all required modules are available""" missing_modules = [] try: from docs_utils import old_doc_ingestion, old_doc_qa, user_doc_ingest, user_doc_qa, rag_dom_ingest, rag_dom_qa except ImportError as e: missing_modules.append(f"docs_utils functions: {e}") try: from audio_utils import generate_tts_response, get_transcription_or_text except ImportError as e: missing_modules.append(f"audio_utils functions: {e}") try: from pipeQuery import process_query, clean_pipeline_result except ImportError as e: missing_modules.append(f"pipeQuery functions: {e}") if missing_modules: logger.warning(f"Missing modules: {missing_modules}") return False, missing_modules return True, [] # 8. Audio fallback function if TTS is not working def safe_audio_processing(text_response: str, voice: str) -> Optional[tuple]: """Safe audio processing with fallback""" try: # Check if audio_utils is available if 'generate_tts_response' in globals(): return generate_tts_response(text_response, voice) else: logger.warning("TTS function not available, skipping audio generation") return None except Exception as e: logger.error(f"Audio processing failed: {e}") return None # 9. Add proper imports check def handle_document_upload(file, doc_type: str, query: str = "") -> str: """Handle document upload with proper function mapping to docs_utils.py""" if not file: return "āš ļø Please select a file to upload." if doc_type == "None" or not doc_type: return "āš ļø Please select a document type." try: logger.info(f"Processing file upload: {file.name} as {doc_type}") # Map document types to actual functions from docs_utils.py if doc_type == "old_documents": from docs_utils import old_doc_ingestion, old_doc_qa # First ingest the document ingest_result = old_doc_ingestion(file.name) demo_state.document_stats["old_documents"] += 1 # If query provided, also get answer if query and query.strip(): answer = old_doc_qa(query) return f"šŸ“š Old Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" return f"šŸ“š Old Document ingested: {ingest_result}" elif doc_type == "user_specific": from docs_utils import user_doc_ingest, user_doc_qa # Ingest the document ingest_result = user_doc_ingest(file.name) demo_state.document_stats["user_specific"] += 1 # If query provided, also get answer if query and query.strip(): answer = user_doc_qa(query) return f"šŸ‘¤ User Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" return f"šŸ‘¤ User-specific document ingested: {ingest_result}" elif doc_type == "domain_documents": from docs_utils import rag_dom_ingest, rag_dom_qa # Ingest the document ingest_result = rag_dom_ingest(file.name) demo_state.document_stats["domain_documents"] += 1 # If query provided, also get answer if query and query.strip(): answer = rag_dom_qa(query) return f"šŸ„ Domain Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" return f"šŸ„ Domain document ingested: {ingest_result}" elif doc_type == "new_documents": # Use the standard RAG pipeline for new documents from rag_steps import ingest_file ingest_result = ingest_file(file.name) demo_state.document_stats["new_documents"] += 1 # If query provided, process through main pipeline if query and query.strip(): response = process_query(query) cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response)) return f"šŸ†• New Document processed: {ingest_result}\n\nQuery Answer:\n{cleaned_response}" return f"šŸ†• New document ingested: {ingest_result}" else: return f"āŒ Unknown document type: {doc_type}" except Exception as e: logger.error(f"File upload error: {e}") return f"āŒ File upload failed: {str(e)}" def process_old_doc_query(query: str, doc_file) -> str: """Process query for old documents specifically""" global demo_state if doc_file: try: upload_dir = os.path.join("assets", "uploaded_docs") os.makedirs(upload_dir, exist_ok=True) safe_filename = os.path.basename(doc_file.name) save_path = os.path.join(upload_dir, safe_filename) shutil.copy2(doc_file.name, save_path) demo_state.last_uploaded_old_doc = save_path return process_old_document(save_path, query) except Exception as e: return f"āŒ Error processing file: {str(e)}" else: if demo_state.last_uploaded_old_doc and os.path.exists(demo_state.last_uploaded_old_doc): try: answer = old_doc_qa(query) return f"[Using previously uploaded document: {os.path.basename(demo_state.last_uploaded_old_doc)}]\n\n{answer}" except Exception as e: return f"āŒ Error querying document: {str(e)}" else: return "āŒ No document uploaded. Please upload an old document to proceed." def clear_chat(): """Clear chat history and reset stats""" demo_state.__init__() # Reset stats return [], None, "šŸ”„ Chat cleared and stats reset!", create_stats_display() def create_main_chat_tab(): """Create the main chat interface tab""" with gr.Row(): with gr.Column(scale=3): # Main chatbot interface with gr.Group(): chatbot = gr.Chatbot( type='messages', label="šŸ’¬ Conversation with Wisal", height=500, show_copy_button=True, elem_classes="chatbot-container", ) # Status and audio output with gr.Row(): with gr.Column(): status_output = gr.Textbox( label="šŸ“Š System Status", interactive=False, max_lines=2, elem_classes="input-container" ) with gr.Column(): audio_output = gr.Audio( label="šŸŽµ Audio Response", interactive=False, show_download_button=True, elem_classes="input-container" ) # Input section with gr.Group(): with gr.Row(): user_input = gr.Textbox( placeholder="Ask me anything about autism...", label="šŸ’­ Your Message", lines=2, scale=3, elem_classes="input-container" ) audio_input = gr.Audio( sources=["microphone", "upload"], type="filepath", label="šŸŽ¤ Voice Input", scale=1, elem_classes="input-container" ) voice_dropdown = gr.Dropdown( label="šŸŽ™ļø Voice Selection", choices=["Kore", "Puck", "Zephyr", "Leda", "Fenrir", "Charon", "Orus", "Aoede", "Callirrhoe"], value="Kore", elem_classes="input-container" ) # Action buttons with gr.Row(): text_btn = gr.Button( "šŸ’¬ Text Response", variant="secondary", elem_classes="enhanced-button secondary-button" ) audio_btn = gr.Button( "šŸŽµ Audio Response", variant="secondary", elem_classes="enhanced-button secondary-button" ) both_btn = gr.Button( "šŸš€ Text + Audio", variant="primary", elem_classes="enhanced-button primary-button" ) with gr.Row(): clear_btn = gr.Button( "šŸ—‘ļø Clear Chat", variant="stop", elem_classes="enhanced-button danger-button" ) with gr.Column(scale=1): # Statistics panel stats_display = gr.HTML( create_stats_display(), label="šŸ“ˆ Statistics" ) # Enhanced Document upload section with gr.Group(): # Fixed Document upload section gr.Markdown("### šŸ“ Document Upload") doc_file = gr.File( label="Upload Document", file_types=[".pdf", ".docx", ".txt"], elem_classes="input-container" ) # Fixed dropdown with proper choices doc_type = gr.Dropdown( label="Document Type", choices=[ ("šŸ“š Old Documents", "old_documents"), ("šŸ‘¤ User-Specific Documents", "user_specific"), ("šŸ„ Domain Documents", "domain_documents"), ("šŸ†• New Documents", "new_documents") ], value="user_specific", elem_classes="input-container" ) # Optional query field for immediate Q&A doc_query = gr.Textbox( label="Optional: Ask about this document", placeholder="What does this document say about...", lines=2, elem_classes="input-container" ) upload_btn = gr.Button( "šŸ“¤ Upload & Process", elem_classes="enhanced-button primary-button" ) upload_status = gr.Textbox( label="Upload Status", interactive=False, lines=4, elem_classes="input-container" ) # Document type info display doc_info = gr.HTML( """

šŸ“‹ Document Types

šŸ“š Old Documents: Historical documents, legacy materials

šŸ‘¤ User-Specific: Personal files, user manuals

šŸ„ Domain Documents: Medical papers, research articles

šŸ†• New Documents: Recent uploads, latest materials

""" ) upload_btn = gr.Button( "šŸ“¤ Upload", elem_classes="enhanced-button primary-button" ) upload_status = gr.Textbox( label="Upload Status", interactive=False, lines=3, elem_classes="input-container" ) # Quick actions with gr.Group(): gr.Markdown("### ⚔ Quick Actions") sample_queries = [ "What is autism?", "Early signs of autism", "Autism therapy options", "Supporting autistic children" ] for query in sample_queries: btn = gr.Button( query, elem_classes="enhanced-button secondary-button", size="sm" ) btn.click( lambda q=query: q, outputs=[user_input] ) return { 'chatbot': chatbot, 'user_input': user_input, 'audio_input': audio_input, 'doc_query': doc_query, 'voice_dropdown': voice_dropdown, 'audio_output': audio_output, 'status_output': status_output, 'stats_display': stats_display, 'doc_file': doc_file, 'doc_type': doc_type, 'upload_status': upload_status, 'text_btn': text_btn, 'audio_btn': audio_btn, 'both_btn': both_btn, 'upload_btn': upload_btn, 'clear_btn': clear_btn } def create_old_documents_tab(): """Create the old documents Q&A tab""" with gr.Row(): with gr.Column(scale=2): gr.Markdown("## šŸ“š Old Documents Q&A") gr.Markdown("Upload historical documents, legacy materials, and archived content for specialized querying.") query = gr.Textbox( placeholder="Ask about old documents...", lines=3, label="šŸ’­ Your Question about Old Documents", elem_classes="input-container" ) doc_file = gr.File( label="šŸ“ Upload Old Document (PDF, DOCX, TXT)", file_types=[".pdf", ".docx", ".txt"], elem_classes="input-container" ) with gr.Row(): submit_btn = gr.Button( "šŸ” Submit Query", variant="primary", elem_classes="enhanced-button primary-button" ) clear_old_btn = gr.Button( "šŸ—‘ļø Clear", variant="secondary", elem_classes="enhanced-button danger-button" ) output = gr.Textbox( label="šŸ“„ Answer from Old Documents", lines=12, interactive=False, elem_classes="input-container" ) with gr.Column(scale=1): # Old documents info panel gr.HTML("""

šŸ“š Old Documents Info

Purpose: Process historical and legacy documents that require specialized handling.

Best for:

Features:

""") # Recent old document queries old_doc_history = gr.HTML( """

šŸ“‹ Recent Queries

No recent queries yet.

""" ) return { 'query': query, 'doc_file': doc_file, 'output': output, 'submit_btn': submit_btn, 'clear_old_btn': clear_old_btn, 'old_doc_history': old_doc_history } def create_voice_chat_tab(): """Create the voice chat tab with WebRTC""" with gr.Row(): with gr.Column(scale=2): gr.Markdown("## šŸŽ™ļø Real-time Voice Chat") gr.Markdown("Experience real-time speech-to-speech conversation with Wisal using advanced voice AI.") # WebRTC component for real-time audio webrtc_audio = WebRTC( label="šŸŽ¤ Voice Chat Interface", modality="audio", mode="send-receive", elem_id="voice-chat-source", rtc_configuration=get_cloudflare_turn_credentials_async, icon="https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png", pulse_color="rgb(79, 172, 254)", icon_button_color="rgb(255, 255, 255)", elem_classes="webrtc-container" ) # Voice chat controls with gr.Row(): voice_status = gr.Textbox( label="šŸ”Š Voice Chat Status", value="Ready to start voice conversation...", interactive=False, elem_classes="input-container" ) with gr.Row(): start_voice_btn = gr.Button( "šŸŽ¤ Start Voice Chat", variant="primary", elem_classes="enhanced-button primary-button" ) stop_voice_btn = gr.Button( "ā¹ļø Stop Voice Chat", variant="secondary", elem_classes="enhanced-button danger-button" ) with gr.Column(scale=1): # Voice chat info gr.HTML("""

šŸŽ™ļø Voice Chat Features

Real-time Features:

Voice Quality:

""") # Audio visualizer placeholder gr.HTML("""

šŸŽµ Audio Visualizer

""") return { 'webrtc_audio': webrtc_audio, 'voice_status': voice_status, 'start_voice_btn': start_voice_btn, 'stop_voice_btn': stop_voice_btn } def create_document_management_tab(): """Create the enhanced document management tab""" with gr.Row(): with gr.Column(scale=2): gr.Markdown("## šŸ“ Advanced Document Management") gr.Markdown("Upload and manage different types of documents with specialized processing pipelines.") # Document upload section with gr.Group(): gr.Markdown("### šŸ“¤ Upload Documents") upload_file = gr.File( label="šŸ“Ž Select Document", file_types=[".pdf", ".docx", ".txt", ".md"], elem_classes="input-container" ) upload_doc_type = gr.Dropdown( label="šŸ“‹ Document Category", choices=[(v["label"], k) for k, v in DOCUMENT_TYPES.items()], value="user_specific", elem_classes="input-container" ) upload_query = gr.Textbox( placeholder="Optional: Ask a question about this document...", label="ā“ Optional Query", lines=2, elem_classes="input-container" ) process_doc_btn = gr.Button( "šŸš€ Process Document", variant="primary", elem_classes="enhanced-button primary-button" ) doc_result = gr.Textbox( label="šŸ“Š Processing Result", lines=8, interactive=False, elem_classes="input-container" ) # Batch processing section with gr.Group(): gr.Markdown("### šŸ“¦ Batch Processing") batch_files = gr.File( label="šŸ“š Upload Multiple Documents", file_count="multiple", file_types=[".pdf", ".docx", ".txt"], elem_classes="input-container" ) batch_type = gr.Dropdown( label="šŸ“‹ Batch Category", choices=[(v["label"], k) for k, v in DOCUMENT_TYPES.items()], value="domain_documents", elem_classes="input-container" ) batch_process_btn = gr.Button( "⚔ Process Batch", variant="secondary", elem_classes="enhanced-button secondary-button" ) batch_result = gr.Textbox( label="šŸ“ˆ Batch Results", lines=6, interactive=False, elem_classes="input-container" ) with gr.Column(scale=1): # Document statistics doc_stats_display = gr.HTML( create_stats_display(), label="šŸ“Š Document Statistics" ) # Document type details gr.HTML(f"""

šŸ“‹ Document Categories

{"".join([f'''
{info["label"]}
{info["description"]}
''' for info in DOCUMENT_TYPES.values()])}
""") # Processing guidelines gr.HTML("""

šŸ’” Processing Tips

File Formats: PDF, DOCX, TXT, MD

Size Limit: Up to 50MB per file

Batch Processing: Up to 10 files at once

Processing Time: 10-30 seconds per document

""") return { 'upload_file': upload_file, 'upload_doc_type': upload_doc_type, 'upload_query': upload_query, 'doc_result': doc_result, 'batch_files': batch_files, 'batch_type': batch_type, 'batch_result': batch_result, 'process_doc_btn': process_doc_btn, 'batch_process_btn': batch_process_btn, 'doc_stats_display': doc_stats_display } def process_batch_documents(files, doc_type: str) -> str: """Process multiple documents in batch""" if not files: return "āš ļø No files selected for batch processing." if doc_type == "None" or not doc_type: return "āš ļø Please select a document type for batch processing." results = [] successful = 0 failed = 0 for file in files: try: result = handle_document_upload(file, doc_type) if result.startswith("āŒ"): failed += 1 results.append(f"āŒ {os.path.basename(file.name)}: Failed") else: successful += 1 results.append(f"āœ… {os.path.basename(file.name)}: Success") except Exception as e: failed += 1 results.append(f"āŒ {os.path.basename(file.name)}: {str(e)[:50]}...") summary = f"šŸ“¦ Batch Processing Complete:\nāœ… Successful: {successful}\nāŒ Failed: {failed}\n\n" summary += "\n".join(results) return summary # def create_demo(): # """Create the enhanced integrated Gradio demo""" # # Custom theme # theme = gr.themes.Base( # primary_hue="blue", # secondary_hue="purple", # neutral_hue="slate", # font=gr.themes.GoogleFont("Inter") # ) # with gr.Blocks( # title="Wisal - Advanced Autism AI Assistant", # theme=theme, # css=ENHANCED_CSS, # head=""" # # # """ # ) as demo: # # State management # chat_history = gr.State([]) # # Header # gr.HTML(""" #
#

🧠 Wisal

#

Advanced AI Assistant for Autism Spectrum Disorders

#
#
# AI Assistant Active - All Services Online #
#
# """) # # Main tabbed interface # with gr.Tabs(elem_classes="tab-nav") as tabs: # # Main Chat Tab # with gr.TabItem("šŸ’¬ Main Chat", elem_id="main-chat"): # main_components = create_main_chat_tab() # # Old Documents Tab # with gr.TabItem("šŸ“š Old Documents", elem_id="old-docs"): # old_doc_components = create_old_documents_tab() # # Voice Chat Tab # with gr.TabItem("šŸŽ™ļø Voice Chat", elem_id="voice-chat"): # voice_components = create_voice_chat_tab() # # Document Management Tab # with gr.TabItem("šŸ“ Document Management", elem_id="doc-management"): # doc_mgmt_components = create_document_management_tab() # # Event handlers for Main Chat Tab # main_components['text_btn'].click( # fn=process_with_stats, # inputs=[main_components['user_input'], main_components['audio_input'], chat_history], # outputs=[main_components['chatbot'], main_components['user_input'], main_components['status_output'], main_components['stats_display'], main_components['audio_input']] # ) # main_components['both_btn'].click( # fn=process_with_audio, # inputs=[main_components['user_input'], main_components['audio_input'], main_components['voice_dropdown'], chat_history], # outputs=[main_components['chatbot'], main_components['audio_output'], main_components['user_input'], main_components['status_output'], main_components['stats_display'], main_components['audio_input']] # ) # main_components['user_input'].submit( # fn=process_with_audio, # inputs=[main_components['user_input'], main_components['audio_input'], main_components['voice_dropdown'], chat_history], # outputs=[main_components['chatbot'], main_components['audio_output'], main_components['user_input'], main_components['status_output'], main_components['stats_display'], main_components['audio_input']] # ) # main_components['upload_btn'].click( # fn=handle_document_upload, # inputs=[main_components['doc_file'], main_components['doc_type']], # outputs=[main_components['upload_status']] # ) # main_components['clear_btn'].click( # fn=clear_chat, # outputs=[main_components['chatbot'], main_components['audio_output'], main_components['status_output'], main_components['stats_display']] # ) # # Event handlers for Old Documents Tab # old_doc_components['submit_btn'].click( # fn=process_old_doc_query, # inputs=[old_doc_components['query'], old_doc_components['doc_file']], # outputs=[old_doc_components['output']] # ) # old_doc_components['clear_old_btn'].click( # fn=lambda: ("", None, ""), # outputs=[old_doc_components['query'], old_doc_components['doc_file'], old_doc_components['output']] # ) # # Event handlers for Voice Chat Tab # voice_components['webrtc_audio'].stream( # GeminiHandler(), # inputs=[voice_components['webrtc_audio']], # outputs=[voice_components['webrtc_audio']], # time_limit=180 if get_space() else None, # concurrency_limit=2 if get_space() else None, # ) # voice_components['start_voice_btn'].click( # fn=lambda: "šŸŽ¤ Voice chat started... Speak now!", # outputs=[voice_components['voice_status']] # ) # voice_components['stop_voice_btn'].click( # fn=lambda: "ā¹ļø Voice chat stopped.", # outputs=[voice_components['voice_status']] # ) # # Event handlers for Document Management Tab # doc_mgmt_components['process_doc_btn'].click( # fn=handle_document_upload, # inputs=[doc_mgmt_components['upload_file'], doc_mgmt_components['upload_doc_type'], doc_mgmt_components['upload_query']], # outputs=[doc_mgmt_components['doc_result']] # ) # doc_mgmt_components['batch_process_btn'].click( # fn=process_batch_documents, # inputs=[doc_mgmt_components['batch_files'], doc_mgmt_components['batch_type']], # outputs=[doc_mgmt_components['batch_result']] # ) # # Footer with usage instructions # gr.HTML(""" #
#

šŸ’” How to Use Wisal

#
#

šŸ’¬ Main Chat: Primary interface for text and audio conversations with comprehensive features

#

šŸ“š Old Documents: Specialized processing for historical and legacy documents

#

šŸŽ™ļø Voice Chat: Real-time speech-to-speech conversation with advanced voice AI

#

šŸ“ Document Management: Advanced document upload, processing, and batch operations

#

šŸŽ¤ Audio Features: Support for voice input, multiple voice options, and audio responses

#

šŸ“Š Statistics: Real-time tracking of usage, performance, and document processing metrics

#
#
# """) # return demo """ specific_utils.py - Enhanced Document Processing with Gradio Integration This module integrates with gradio_utils.py to provide comprehensive document processing and audio handling for the Wisal application. """ import os import gradio as gr from typing import Tuple, Optional, Dict, Any, List import shutil from datetime import datetime # Import from gradio_utils.py for enhanced functionality from gradio_utils import ( DOCUMENT_TYPES, handle_document_upload, process_with_audio, process_with_stats, create_stats_display, demo_state, safe_audio_processing, process_old_doc_query, clear_chat, ENHANCED_CSS ) # Import document functions from docs_utils from docs_utils import ( old_doc_ingestion, old_doc_qa, user_doc_ingest, user_doc_qa, rag_dom_ingest, rag_dom_qa ) # Import audio utilities from audio_utils import get_transcription_or_text, generate_tts_response # Import pipeline functions from pipeQuery import process_query, clean_pipeline_result from logger.custom_logger import CustomLoggerTracker # Initialize logger custom_log = CustomLoggerTracker() logger = custom_log.get_logger("specific_utils") logger.info("Logger initialized for specific utilities module") def get_all_document_choices() -> List[Tuple[str, str]]: """Get all 4 document type choices for dropdown""" return [ ("šŸ“š Old Documents", "old_documents"), ("šŸ‘¤ User-Specific Documents", "user_specific"), ("šŸ„ Domain Documents", "domain_documents"), ("šŸ†• New Documents", "new_documents") ] def enhanced_document_upload_handler(file, doc_type: str, query: str = "") -> str: """ Enhanced document upload handler that properly routes to correct functions """ if not file: return "āš ļø Please select a file to upload." if not doc_type or doc_type == "None": return "āš ļø Please select a document type." try: logger.info(f"Processing document upload: {file.name} as {doc_type}") # Route to appropriate processing function based on document type if doc_type == "old_documents": # Use old documents processing from docs_utils demo_state.document_stats["old_documents"] += 1 ingest_result = old_doc_ingestion(file.name) if query and query.strip(): answer = old_doc_qa(query) return f"šŸ“š Old Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" return f"šŸ“š Old Document ingested: {ingest_result}" elif doc_type == "user_specific": # Use user-specific processing from docs_utils demo_state.document_stats["user_specific"] += 1 ingest_result = user_doc_ingest(file.name) if query and query.strip(): answer = user_doc_qa(query) return f"šŸ‘¤ User Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" return f"šŸ‘¤ User-specific document ingested: {ingest_result}" elif doc_type == "domain_documents": # Use domain processing from docs_utils demo_state.document_stats["domain_documents"] += 1 ingest_result = rag_dom_ingest(file.name) if query and query.strip(): answer = rag_dom_qa(query) return f"šŸ„ Domain Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" return f"šŸ„ Domain document ingested: {ingest_result}" elif doc_type == "new_documents": # Use standard pipeline for new documents from rag_steps import ingest_file demo_state.document_stats["new_documents"] += 1 ingest_result = ingest_file(file.name) if query and query.strip(): response = process_query(query) cleaned_response = clean_pipeline_result( response[0] if isinstance(response, tuple) else str(response) ) return f"šŸ†• New Document processed: {ingest_result}\n\nQuery Answer:\n{cleaned_response}" return f"šŸ†• New document ingested: {ingest_result}" else: return f"āŒ Unknown document type: {doc_type}" except Exception as e: logger.error(f"Document upload error: {e}") return f"āŒ Document processing failed: {str(e)}" def enhanced_audio_transcription(audio_file) -> Tuple[str, str]: """ Enhanced audio transcription with better error handling """ if not audio_file: return "", "No audio file provided" try: logger.info(f"Processing audio transcription...") # Use the get_transcription_or_text function with proper error handling transcribed_text, status = get_transcription_or_text("", audio_file) if not transcribed_text or transcribed_text.startswith("[ERROR]"): logger.warning(f"Transcription failed: {status}") return "", f"āŒ Audio transcription failed: {status}" logger.info(f"Transcription successful: {transcribed_text[:50]}...") return transcribed_text.strip(), "āœ… Audio transcribed successfully" except Exception as e: logger.error(f"Audio transcription error: {e}") return "", f"āŒ Transcription error: {str(e)}" def process_text_with_audio_support(user_input: str, audio_input, chat_history: List) -> Tuple[List, str, str, str]: """ Process input (text or audio) and return only text response with chat history """ try: # Get input text query = user_input.strip() if user_input else "" # Process audio if no text provided if audio_input and not query: transcribed_text, transcription_status = enhanced_audio_transcription(audio_input) if transcribed_text: query = transcribed_text else: return chat_history, "", f"āŒ {transcription_status}", create_stats_display() if not query: return chat_history, "", "āš ļø Please provide text or audio input.", create_stats_display() # Use the enhanced processing from gradio_utils new_history, _, cleared_input, status_msg, stats_display, _ = process_with_stats(query, None, chat_history) return new_history, cleared_input, status_msg, stats_display except Exception as e: logger.error(f"Text processing error: {e}") return chat_history, "", f"āŒ Processing error: {str(e)}", create_stats_display() def process_audio_only_response(user_input: str, audio_input, voice_dropdown: str, chat_history: List) -> Tuple[Optional[Any], str, str, str]: """ Process input and return only audio response """ try: # Get input text query = user_input.strip() if user_input else "" # Process audio if no text provided if audio_input and not query: transcribed_text, transcription_status = enhanced_audio_transcription(audio_input) if transcribed_text: query = transcribed_text else: return None, "", f"āŒ {transcription_status}", create_stats_display() if not query: return None, "", "āš ļø Please provide text or audio input.", create_stats_display() # Generate text response first response = process_query(query) if isinstance(response, tuple): text_response = clean_pipeline_result(response[0] if response[0] else response[1]) else: text_response = clean_pipeline_result(str(response)) # Generate audio from text response audio_response = safe_audio_processing(text_response, voice_dropdown) status_msg = "āœ… Audio response generated successfully" if audio_response else "āš ļø Audio generation failed" return audio_response, "", status_msg, create_stats_display() except Exception as e: logger.error(f"Audio processing error: {e}") return None, "", f"āŒ Audio processing error: {str(e)}", create_stats_display() def process_both_text_and_audio_response( user_input: str, audio_input, voice_dropdown: str, chat_history: List ) -> Tuple[List, Optional[Any], str, str, str]: """ Process input and return both text (in chat) and audio response """ try: # Use the enhanced processing from gradio_utils return process_with_audio(user_input, audio_input, voice_dropdown, chat_history) except Exception as e: logger.error(f"Combined processing error: {e}") return chat_history, None, "", f"āŒ Processing error: {str(e)}", create_stats_display() def create_document_info_panel() -> str: """Create HTML info panel for document types""" return """

šŸ“‹ Document Processing Options

šŸ“š Old Documents
Historical documents, legacy materials, archived content
šŸ‘¤ User-Specific Documents
Personal files, user manuals, individual documents
šŸ„ Domain Documents
Medical papers, research articles, domain knowledge
šŸ†• New Documents
Recent uploads, latest materials, standard processing
šŸ’” Tip: Each document type uses specialized processing optimized for that content category.
""" def get_enhanced_css() -> str: """Get enhanced CSS from gradio_utils""" return ENHANCED_CSS def create_enhanced_file_upload_interface(): """Create an enhanced file upload interface with all 4 document types""" with gr.Group(): gr.Markdown("### šŸ“ Enhanced Document Upload") doc_file = gr.File( label="šŸ“Ž Upload Document (PDF, DOCX, TXT)", file_types=[".pdf", ".docx", ".txt"], elem_classes="input-container" ) # Fixed dropdown with all 4 options doc_type = gr.Dropdown( label="šŸ“‹ Document Type", choices=get_all_document_choices(), value="user_specific", elem_classes="input-container" ) # Optional query field doc_query = gr.Textbox( label="šŸ’­ Optional: Ask about this document", placeholder="What does this document say about...", lines=2, elem_classes="input-container" ) upload_btn = gr.Button( "šŸ“¤ Upload & Process", variant="primary", elem_classes="enhanced-button primary-button" ) upload_status = gr.Textbox( label="šŸ“Š Upload Status", interactive=False, lines=5, elem_classes="input-container" ) # Document info panel doc_info = gr.HTML(create_document_info_panel()) return { 'doc_file': doc_file, 'doc_type': doc_type, 'doc_query': doc_query, 'upload_btn': upload_btn, 'upload_status': upload_status, 'doc_info': doc_info } # Export main functions for use in main.py __all__ = [ 'get_all_document_choices', 'enhanced_document_upload_handler', 'enhanced_audio_transcription', 'process_text_with_audio_support', 'process_audio_only_response', 'process_both_text_and_audio_response', 'create_document_info_panel', 'create_enhanced_file_upload_interface', 'get_enhanced_css', 'ENHANCED_CSS', 'demo_state' ]