import gradio as gr
import time
import os
import re
from typing import List, Tuple, Optional
from datetime import datetime
import shutil
import asyncio
from io import BytesIO
import numpy as np
import websockets
from dotenv import load_dotenv
from PIL import Image
# Load environment variables first
load_dotenv()
# Import your existing modules with error handling
try:
from pipeQuery import process_query, clean_pipeline_result
except ImportError as e:
print(f"Warning: pipeQuery import failed: {e}")
def process_query(query): return "Pipeline not available"
def clean_pipeline_result(result): return str(result)
try:
from audio_utils import generate_tts_response, get_transcription_or_text, GeminiHandler
except ImportError as e:
print(f"Warning: audio_utils import failed: {e}")
def generate_tts_response(text, voice): return None, "TTS not available"
def get_transcription_or_text(text, audio): return text or "No input", "Text used"
class GeminiHandler:
def __init__(self): pass
def copy(self): return GeminiHandler()
def stop(self): pass
try:
from rag_steps import ingest_file
except ImportError as e:
print(f"Warning: rag_steps import failed: {e}")
def ingest_file(file_path): return f"File ingestion not available for {file_path}"
from logger.custom_logger import CustomLoggerTracker
try:
from docs_utils import old_doc_ingestion, old_doc_qa, user_doc_ingest, user_doc_qa, rag_dom_ingest, rag_dom_qa
except ImportError as e:
print(f"Warning: docs_utils import failed: {e}")
def old_doc_ingestion(file_path): return f"Old doc ingestion not available for {file_path}"
def old_doc_qa(query): return "Old doc QA not available"
def user_doc_ingest(file_path): return f"User doc ingestion not available for {file_path}"
def user_doc_qa(query): return "User doc QA not available"
def rag_dom_ingest(file_path): return f"RAG domain ingestion not available for {file_path}"
def rag_dom_qa(query): return "RAG domain QA not available"
try:
from fastrtc import (
WebRTC,
get_cloudflare_turn_credentials_async,
wait_for_item,
)
except ImportError as e:
print(f"Warning: fastrtc import failed: {e}")
# Create fallback WebRTC class
class WebRTC:
def __init__(self, **kwargs): pass
def stream(self, *args, **kwargs): pass
def get_cloudflare_turn_credentials_async(): return {}
def wait_for_item(*args): pass
try:
from google import genai
except ImportError as e:
print(f"Warning: google.genai import failed: {e}")
genai = None
from gradio.utils import get_space
# Initialize logger
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("gradio_demo")
# Global state management
class DemoState:
def __init__(self):
self.query_count = 0
self.session_stats = {"total_queries": 0, "avg_response_time": 0}
self.recent_queries = []
self.last_uploaded_old_doc = None
self.document_stats = {
"old_documents": 0,
"user_specific": 0,
"domain_documents": 0,
"new_documents": 0
}
def update_stats(self, query: str, response_time: float):
self.query_count += 1
self.session_stats["total_queries"] += 1
# Update average response time
current_avg = self.session_stats["avg_response_time"]
new_avg = ((current_avg * (self.query_count - 1)) + response_time) / self.query_count
self.session_stats["avg_response_time"] = new_avg
# Keep recent queries (last 5)
self.recent_queries.insert(0, {"query": query[:50] + "...", "time": datetime.now().strftime("%H:%M:%S")})
if len(self.recent_queries) > 5:
self.recent_queries.pop()
demo_state = DemoState()
# Enhanced CSS with dark theme and animations
ENHANCED_CSS = """
/* Modern Dark Theme with Animations */
.gradio-container {
background: linear-gradient(135deg, #0a0a0a 0%, #1a1a2e 25%, #16213e 75%, #0f1419 100%) !important;
color: #e0e6ed !important;
min-height: 100vh;
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
}
/* Animated gradient background */
.gradio-container::before {
content: '';
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: linear-gradient(45deg, #667eea, #764ba2, #4facfe, #00f2fe);
background-size: 400% 400%;
animation: gradientShift 15s ease infinite;
opacity: 0.03;
z-index: -1;
}
@keyframes gradientShift {
0% { background-position: 0% 50%; }
50% { background-position: 100% 50%; }
100% { background-position: 0% 50%; }
}
/* Header styling with glow effect */
.main-header {
text-align: center;
padding: 2rem 0;
background: rgba(255, 255, 255, 0.02);
border-radius: 20px;
margin-bottom: 2rem;
border: 1px solid rgba(255, 255, 255, 0.1);
position: relative;
overflow: hidden;
}
.main-header::before {
content: '';
position: absolute;
top: 0;
left: -100%;
width: 100%;
height: 100%;
background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.1), transparent);
animation: shimmer 3s infinite;
}
@keyframes shimmer {
0% { left: -100%; }
100% { left: 100%; }
}
.main-title {
font-size: 3rem;
font-weight: 700;
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 50%, #667eea 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
background-clip: text;
margin-bottom: 0.5rem;
text-shadow: 0 0 30px rgba(79, 172, 254, 0.3);
}
.subtitle {
font-size: 1.2rem;
color: #a0aec0;
margin-bottom: 1rem;
}
.status-indicator {
display: inline-flex;
align-items: center;
gap: 0.5rem;
padding: 0.5rem 1rem;
background: rgba(79, 172, 254, 0.1);
border: 1px solid rgba(79, 172, 254, 0.3);
border-radius: 25px;
font-size: 0.9rem;
}
.status-dot {
width: 8px;
height: 8px;
background: #00f2fe;
border-radius: 50%;
animation: pulse 2s infinite;
}
@keyframes pulse {
0%, 100% { opacity: 1; transform: scale(1); }
50% { opacity: 0.5; transform: scale(1.2); }
}
/* Enhanced chatbot styling */
.chatbot-container {
background: rgba(0, 0, 0, 0.3) !important;
border: 1px solid rgba(255, 255, 255, 0.1) !important;
border-radius: 20px !important;
backdrop-filter: blur(20px) !important;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3) !important;
}
.message {
animation: fadeInUp 0.5s ease-out;
margin: 0.5rem 0;
padding: 1rem;
border-radius: 15px;
position: relative;
overflow: hidden;
}
.message.user {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
margin-left: 2rem;
box-shadow: 0 4px 15px rgba(102, 126, 234, 0.3);
}
.message.assistant {
background: linear-gradient(135deg, rgba(79, 172, 254, 0.1) 0%, rgba(0, 242, 254, 0.1) 100%) !important;
border: 1px solid rgba(79, 172, 254, 0.3);
color: #e0e6ed !important;
margin-right: 2rem;
}
@keyframes fadeInUp {
from {
opacity: 0;
transform: translateY(20px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
/* Enhanced input styling */
.input-container {
background: rgba(255, 255, 255, 0.05) !important;
border: 2px solid rgba(255, 255, 255, 0.1) !important;
border-radius: 15px !important;
backdrop-filter: blur(10px) !important;
transition: all 0.3s ease;
}
.input-container:focus-within {
border-color: rgba(79, 172, 254, 0.5) !important;
box-shadow: 0 0 20px rgba(79, 172, 254, 0.2) !important;
}
/* Enhanced buttons with hover effects */
.enhanced-button {
border: none !important;
border-radius: 12px !important;
font-weight: 600 !important;
text-transform: uppercase !important;
letter-spacing: 0.5px !important;
padding: 12px 24px !important;
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important;
position: relative !important;
overflow: hidden !important;
box-shadow: 0 4px 15px rgba(0, 0, 0, 0.2) !important;
}
.enhanced-button::before {
content: '';
position: absolute;
top: 0;
left: -100%;
width: 100%;
height: 100%;
background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.2), transparent);
transition: left 0.6s;
}
.enhanced-button:hover::before {
left: 100%;
}
.primary-button {
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%) !important;
color: white !important;
}
.primary-button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 8px 25px rgba(79, 172, 254, 0.4) !important;
}
.secondary-button {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
}
.secondary-button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 8px 25px rgba(102, 126, 234, 0.4) !important;
}
.danger-button {
background: linear-gradient(135deg, #ff6b6b 0%, #ee5a52 100%) !important;
color: white !important;
}
.danger-button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 8px 25px rgba(255, 107, 107, 0.4) !important;
}
/* Stats panel */
.stats-panel {
background: rgba(255, 255, 255, 0.03);
border: 1px solid rgba(255, 255, 255, 0.1);
border-radius: 15px;
padding: 1.5rem;
margin: 1rem 0;
}
.stat-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 0.5rem 0;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
}
.stat-value {
font-weight: 600;
color: #4facfe;
}
/* Page content */
.page-content {
padding: 1rem;
background: rgba(255, 255, 255, 0.02);
border-radius: 15px;
border: 1px solid rgba(255, 255, 255, 0.1);
min-height: 60vh;
}
/* Tab styling */
.tab-nav {
background: rgba(255, 255, 255, 0.05) !important;
border-radius: 10px !important;
border: 1px solid rgba(255, 255, 255, 0.1) !important;
}
/* WebRTC styling */
.webrtc-container {
background: rgba(0, 0, 0, 0.3) !important;
border: 1px solid rgba(255, 255, 255, 0.1) !important;
border-radius: 20px !important;
backdrop-filter: blur(20px) !important;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3) !important;
padding: 1rem;
}
/* Responsive design */
@media (max-width: 768px) {
.main-title { font-size: 2rem; }
.gradio-container { padding: 10px; }
.message.user { margin-left: 1rem; }
.message.assistant { margin-right: 1rem; }
}
/* Scrollbar styling */
::-webkit-scrollbar {
width: 8px;
}
::-webkit-scrollbar-track {
background: rgba(255, 255, 255, 0.05);
border-radius: 4px;
}
::-webkit-scrollbar-thumb {
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%);
border-radius: 4px;
}
::-webkit-scrollbar-thumb:hover {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}
"""
# 2. Fix the document type choices to match the functions
DOCUMENT_TYPES = {
"old_documents": {
"label": "š Old Documents",
"description": "Historical documents, legacy materials (uses old_doc_ingestion)",
"color": "#8B4513",
},
"user_specific": {
"label": "š¤ User-Specific Documents",
"description": "Personal files, user manuals (uses user_doc_ingest)",
"color": "#4facfe",
},
"domain_documents": {
"label": "š„ Domain Documents",
"description": "Medical papers, research articles (uses rag_dom_ingest)",
"color": "#00f2fe",
},
"new_documents": {
"label": "š New Documents",
"description": "Recent uploads using standard RAG pipeline",
"color": "#667eea",
}
}
def format_response_time(seconds: float) -> str:
"""Format response time for display"""
if seconds < 1:
return f"{int(seconds * 1000)}ms"
elif seconds < 60:
return f"{seconds:.1f}s"
else:
minutes = int(seconds // 60)
remaining_seconds = seconds % 60
return f"{minutes}m {remaining_seconds:.1f}s"
def create_stats_display():
"""Create dynamic statistics display"""
total_queries = demo_state.session_stats["total_queries"]
avg_time = demo_state.session_stats["avg_response_time"]
stats_html = f"""
š Session Statistics
Total Queries:
{total_queries}
Average Response Time:
{format_response_time(avg_time)}
Session Started:
{datetime.now().strftime('%H:%M')}
"""
# Document statistics
doc_stats = demo_state.document_stats
if any(doc_stats.values()):
stats_html += """
š Document Statistics
"""
for doc_type, count in doc_stats.items():
if count > 0:
type_info = DOCUMENT_TYPES.get(doc_type, {})
color = type_info.get("color", "#4facfe")
label = type_info.get("label", doc_type.replace("_", " ").title())
stats_html += f"""
{label}:
{count}
"""
stats_html += "
"
if demo_state.recent_queries:
stats_html += """
š Recent Queries
"""
for query_info in demo_state.recent_queries:
stats_html += f"""
{query_info['query']}
{query_info['time']}
"""
stats_html += "
"
return stats_html
# Document processing functions
def process_old_document(file_path: str, query: str = None) -> str:
"""Process old documents using existing old_doc_ingestion"""
try:
# Update stats
demo_state.document_stats["old_documents"] += 1
demo_state.last_uploaded_old_doc = file_path
# Use existing old document processing
status = old_doc_ingestion(file_path)
if query:
answer = old_doc_qa(query)
return f"š Old Document processed: {status}\n\n{answer}"
return f"š Old Document ingested successfully: {status}"
except Exception as e:
logger.error(f"Error processing old document: {e}")
return f"ā Error processing old document: {str(e)}"
def process_user_document(file_path: str, query: str = None) -> str:
"""Process user-specific documents"""
try:
demo_state.document_stats["user_specific"] += 1
result = ingest_file(file_path)
if query:
response = process_query(query)
cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response))
return f"š¤ User Document processed: {result}\n\n{cleaned_response}"
return f"š¤ User-specific document ingested successfully: {result}"
except Exception as e:
logger.error(f"Error processing user document: {e}")
return f"ā Error processing user document: {str(e)}"
# 5. Create fixed voice chat functions
def start_voice_chat():
"""Start voice chat with proper initialization"""
try:
# Initialize or reset the handler
global voice_handler
voice_handler = GeminiHandler()
return "š¤ Voice chat started... Speak now!"
except Exception as e:
logger.error(f"Error starting voice chat: {e}")
return f"ā Failed to start voice chat: {str(e)}"
def stop_voice_chat():
"""Stop voice chat properly"""
try:
global voice_handler
if 'voice_handler' in globals() and voice_handler:
voice_handler.stop()
return "ā¹ļø Voice chat stopped."
except Exception as e:
logger.error(f"Error stopping voice chat: {e}")
return f"ā Error stopping voice chat: {str(e)}"
def process_domain_document(file_path: str, query: str = None) -> str:
"""Process domain-specific documents"""
try:
demo_state.document_stats["domain_documents"] += 1
result = ingest_file(file_path)
if query:
response = process_query(query)
cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response))
return f"š„ Domain Document processed: {result}\n\n{cleaned_response}"
return f"š„ Domain document ingested successfully: {result}"
except Exception as e:
logger.error(f"Error processing domain document: {e}")
return f"ā Error processing domain document: {str(e)}"
def process_new_document(file_path: str, query: str = None) -> str:
"""Process new documents"""
try:
demo_state.document_stats["new_documents"] += 1
result = ingest_file(file_path)
if query:
response = process_query(query)
cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response))
return f"š New Document processed: {result}\n\n{cleaned_response}"
return f"š New document ingested successfully: {result}"
except Exception as e:
logger.error(f"Error processing new document: {e}")
return f"ā Error processing new document: {str(e)}"
# Main processing functions
def process_with_stats(user_input: str, audio_input, chat_history: List) -> Tuple[List, str, str, str, Optional[tuple]]:
"""Process query with timing and statistics"""
start_time = time.time()
try:
# Get input text
query = user_input.strip() if user_input else ""
if audio_input and not query:
query, status = get_transcription_or_text("", audio_input)
if query.startswith("[ERROR]"):
return chat_history, "", f"ā {status}", create_stats_display(), None
if not query:
return chat_history, "", "ā ļø Please provide text or audio input.", create_stats_display(), None
logger.info(f"Processing query: {query[:50]}...")
# Process the query
response = process_query(query)
response_time = time.time() - start_time
# Update statistics
demo_state.update_stats(query, response_time)
# Clean response
if isinstance(response, tuple):
cleaned_response = clean_pipeline_result(response[0] if response[0] else response[1])
else:
cleaned_response = clean_pipeline_result(str(response))
# Update chat history
new_history = chat_history.copy()
new_history.append({"role": "user", "content": query})
new_history.append({"role": "assistant", "content": cleaned_response})
# Create status message
status_msg = f"ā
Response generated in {format_response_time(response_time)}"
# Update stats display
stats_display = create_stats_display()
logger.info(f"Query processed successfully in {response_time:.2f}s")
return new_history, "", status_msg, stats_display, None
except Exception as e:
logger.error(f"Error processing query: {e}")
error_msg = f"ā Error: {str(e)[:100]}..."
return chat_history, "", error_msg, create_stats_display(), None
def process_with_audio(user_input: str, audio_input, voice_dropdown: str, chat_history: List) -> Tuple[List, Optional[tuple], str, str, str, Optional[tuple]]:
"""Fixed audio processing function"""
start_time = time.time()
try:
# Get input text with better audio handling
query = user_input.strip() if user_input else ""
if audio_input and not query:
try:
# Check if audio_utils functions exist and work
query, status = get_transcription_or_text("", audio_input)
if not query or query.startswith("[ERROR]"):
return chat_history, None, "", f"ā Audio transcription failed: {status}", create_stats_display(), None
except Exception as audio_error:
logger.error(f"Audio transcription error: {audio_error}")
return chat_history, None, "", f"ā Audio processing error: {str(audio_error)}", create_stats_display(), None
if not query:
return chat_history, None, "", "ā ļø Please provide text or audio input.", create_stats_display(), None
logger.info(f"Processing query with audio: {query[:50]}...")
# Process the query
try:
response = process_query(query)
except Exception as query_error:
logger.error(f"Query processing error: {query_error}")
return chat_history, None, "", f"ā Query processing failed: {str(query_error)}", create_stats_display(), None
# Clean response
if isinstance(response, tuple):
cleaned_response = clean_pipeline_result(response[0] if response[0] else response[1])
else:
cleaned_response = clean_pipeline_result(str(response))
# Update chat history
new_history = chat_history.copy()
new_history.append({"role": "user", "content": query})
new_history.append({"role": "assistant", "content": cleaned_response})
# Generate audio with better error handling
audio_response = None
tts_status = "Audio generation skipped"
try:
# Check if TTS functions are available
if 'generate_tts_response' in globals():
audio_data, tts_status = generate_tts_response(cleaned_response, voice_dropdown)
audio_response = audio_data if audio_data else None
else:
tts_status = "TTS function not available"
except Exception as audio_error:
logger.error(f"Audio generation error: {audio_error}")
audio_response = None
tts_status = f"Audio generation failed: {str(audio_error)[:50]}..."
response_time = time.time() - start_time
demo_state.update_stats(query, response_time)
status_msg = f"ā
Response generated in {format_response_time(response_time)}"
if audio_response:
status_msg += " | šµ Audio ready"
else:
status_msg += f" | ā ļø {tts_status}"
stats_display = create_stats_display()
return new_history, audio_response, "", status_msg, stats_display, None
except Exception as e:
logger.error(f"Error processing audio query: {e}")
error_msg = f"ā Error: {str(e)[:100]}..."
return chat_history, None, "", error_msg, create_stats_display(), None
# Global handler instance
# 8. Update event handlers with fixed functions
def setup_fixed_event_handlers(components):
"""Setup event handlers with the fixed functions"""
# Fixed upload handler that includes query processing
components['upload_btn'].click(
fn=lambda file, doc_type, query: handle_document_upload(file, doc_type, query),
inputs=[components['doc_file'], components['doc_type'], components['doc_query']],
outputs=[components['upload_status']]
)
# Fixed audio processing
components['both_btn'].click(
fn=process_with_audio,
inputs=[components['user_input'], components['audio_input'], components['voice_dropdown'], components['chat_history']],
outputs=[components['chatbot'], components['audio_output'], components['user_input'], components['status_output'], components['stats_display'], components['audio_input']])
# Fixed voice chat handlers
components['start_voice_btn'].click(
fn=start_voice_chat,
outputs=[components['voice_status']])
components['stop_voice_btn'].click(
fn=stop_voice_chat,
outputs=[components['voice_status']])
global voice_handler
voice_handler = GeminiHandler()
components['webrtc_audio'].stream(
voice_handler,
inputs=[components['webrtc_audio']],
outputs=[components['webrtc_audio']],
time_limit=180 if get_space() else None,
concurrency_limit=2 if get_space() else None,
)
# 8. Audio fallback function if TTS is not working
def safe_audio_processing(text_response: str, voice: str) -> Optional[tuple]:
"""Safe audio processing with fallback"""
try:
# Check if audio_utils is available
if 'generate_tts_response' in globals():
return generate_tts_response(text_response, voice)
else:
logger.warning("TTS function not available, skipping audio generation")
return None
except Exception as e:
logger.error(f"Audio processing failed: {e}")
return None
# 9. Add proper imports check
def check_required_imports():
"""Check if all required modules are available"""
missing_modules = []
try:
from docs_utils import old_doc_ingestion, old_doc_qa, user_doc_ingest, user_doc_qa, rag_dom_ingest, rag_dom_qa
except ImportError as e:
missing_modules.append(f"docs_utils functions: {e}")
try:
from audio_utils import generate_tts_response, get_transcription_or_text
except ImportError as e:
missing_modules.append(f"audio_utils functions: {e}")
try:
from pipeQuery import process_query, clean_pipeline_result
except ImportError as e:
missing_modules.append(f"pipeQuery functions: {e}")
if missing_modules:
logger.warning(f"Missing modules: {missing_modules}")
return False, missing_modules
return True, []
# 8. Audio fallback function if TTS is not working
def safe_audio_processing(text_response: str, voice: str) -> Optional[tuple]:
"""Safe audio processing with fallback"""
try:
# Check if audio_utils is available
if 'generate_tts_response' in globals():
return generate_tts_response(text_response, voice)
else:
logger.warning("TTS function not available, skipping audio generation")
return None
except Exception as e:
logger.error(f"Audio processing failed: {e}")
return None
# 9. Add proper imports check
def handle_document_upload(file, doc_type: str, query: str = "") -> str:
"""Handle document upload with proper function mapping to docs_utils.py"""
if not file:
return "ā ļø Please select a file to upload."
if doc_type == "None" or not doc_type:
return "ā ļø Please select a document type."
try:
logger.info(f"Processing file upload: {file.name} as {doc_type}")
# Map document types to actual functions from docs_utils.py
if doc_type == "old_documents":
from docs_utils import old_doc_ingestion, old_doc_qa
# First ingest the document
ingest_result = old_doc_ingestion(file.name)
demo_state.document_stats["old_documents"] += 1
# If query provided, also get answer
if query and query.strip():
answer = old_doc_qa(query)
return f"š Old Document processed: {ingest_result}\n\nQuery Answer:\n{answer}"
return f"š Old Document ingested: {ingest_result}"
elif doc_type == "user_specific":
from docs_utils import user_doc_ingest, user_doc_qa
# Ingest the document
ingest_result = user_doc_ingest(file.name)
demo_state.document_stats["user_specific"] += 1
# If query provided, also get answer
if query and query.strip():
answer = user_doc_qa(query)
return f"š¤ User Document processed: {ingest_result}\n\nQuery Answer:\n{answer}"
return f"š¤ User-specific document ingested: {ingest_result}"
elif doc_type == "domain_documents":
from docs_utils import rag_dom_ingest, rag_dom_qa
# Ingest the document
ingest_result = rag_dom_ingest(file.name)
demo_state.document_stats["domain_documents"] += 1
# If query provided, also get answer
if query and query.strip():
answer = rag_dom_qa(query)
return f"š„ Domain Document processed: {ingest_result}\n\nQuery Answer:\n{answer}"
return f"š„ Domain document ingested: {ingest_result}"
elif doc_type == "new_documents":
# Use the standard RAG pipeline for new documents
from rag_steps import ingest_file
ingest_result = ingest_file(file.name)
demo_state.document_stats["new_documents"] += 1
# If query provided, process through main pipeline
if query and query.strip():
response = process_query(query)
cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response))
return f"š New Document processed: {ingest_result}\n\nQuery Answer:\n{cleaned_response}"
return f"š New document ingested: {ingest_result}"
else:
return f"ā Unknown document type: {doc_type}"
except Exception as e:
logger.error(f"File upload error: {e}")
return f"ā File upload failed: {str(e)}"
def process_old_doc_query(query: str, doc_file) -> str:
"""Process query for old documents specifically"""
global demo_state
if doc_file:
try:
upload_dir = os.path.join("assets", "uploaded_docs")
os.makedirs(upload_dir, exist_ok=True)
safe_filename = os.path.basename(doc_file.name)
save_path = os.path.join(upload_dir, safe_filename)
shutil.copy2(doc_file.name, save_path)
demo_state.last_uploaded_old_doc = save_path
return process_old_document(save_path, query)
except Exception as e:
return f"ā Error processing file: {str(e)}"
else:
if demo_state.last_uploaded_old_doc and os.path.exists(demo_state.last_uploaded_old_doc):
try:
answer = old_doc_qa(query)
return f"[Using previously uploaded document: {os.path.basename(demo_state.last_uploaded_old_doc)}]\n\n{answer}"
except Exception as e:
return f"ā Error querying document: {str(e)}"
else:
return "ā No document uploaded. Please upload an old document to proceed."
def clear_chat():
"""Clear chat history and reset stats"""
demo_state.__init__() # Reset stats
return [], None, "š Chat cleared and stats reset!", create_stats_display()
def create_main_chat_tab():
"""Create the main chat interface tab"""
with gr.Row():
with gr.Column(scale=3):
# Main chatbot interface
with gr.Group():
chatbot = gr.Chatbot(
type='messages',
label="š¬ Conversation with Wisal",
height=500,
show_copy_button=True,
elem_classes="chatbot-container",
)
# Status and audio output
with gr.Row():
with gr.Column():
status_output = gr.Textbox(
label="š System Status",
interactive=False,
max_lines=2,
elem_classes="input-container"
)
with gr.Column():
audio_output = gr.Audio(
label="šµ Audio Response",
interactive=False,
show_download_button=True,
elem_classes="input-container"
)
# Input section
with gr.Group():
with gr.Row():
user_input = gr.Textbox(
placeholder="Ask me anything about autism...",
label="š Your Message",
lines=2,
scale=3,
elem_classes="input-container"
)
audio_input = gr.Audio(
sources=["microphone", "upload"],
type="filepath",
label="š¤ Voice Input",
scale=1,
elem_classes="input-container"
)
voice_dropdown = gr.Dropdown(
label="šļø Voice Selection",
choices=["Kore", "Puck", "Zephyr", "Leda", "Fenrir", "Charon", "Orus", "Aoede", "Callirrhoe"],
value="Kore",
elem_classes="input-container"
)
# Action buttons
with gr.Row():
text_btn = gr.Button(
"š¬ Text Response",
variant="secondary",
elem_classes="enhanced-button secondary-button"
)
audio_btn = gr.Button(
"šµ Audio Response",
variant="secondary",
elem_classes="enhanced-button secondary-button"
)
both_btn = gr.Button(
"š Text + Audio",
variant="primary",
elem_classes="enhanced-button primary-button"
)
with gr.Row():
clear_btn = gr.Button(
"šļø Clear Chat",
variant="stop",
elem_classes="enhanced-button danger-button"
)
with gr.Column(scale=1):
# Statistics panel
stats_display = gr.HTML(
create_stats_display(),
label="š Statistics"
)
# Enhanced Document upload section
with gr.Group():
# Fixed Document upload section
gr.Markdown("### š Document Upload")
doc_file = gr.File(
label="Upload Document",
file_types=[".pdf", ".docx", ".txt"],
elem_classes="input-container"
)
# Fixed dropdown with proper choices
doc_type = gr.Dropdown(
label="Document Type",
choices=[
("š Old Documents", "old_documents"),
("š¤ User-Specific Documents", "user_specific"),
("š„ Domain Documents", "domain_documents"),
("š New Documents", "new_documents")
],
value="user_specific",
elem_classes="input-container"
)
# Optional query field for immediate Q&A
doc_query = gr.Textbox(
label="Optional: Ask about this document",
placeholder="What does this document say about...",
lines=2,
elem_classes="input-container"
)
upload_btn = gr.Button(
"š¤ Upload & Process",
elem_classes="enhanced-button primary-button"
)
upload_status = gr.Textbox(
label="Upload Status",
interactive=False,
lines=4,
elem_classes="input-container"
)
# Document type info display
doc_info = gr.HTML(
"""
š Document Types
š Old Documents: Historical documents, legacy materials
š¤ User-Specific: Personal files, user manuals
š„ Domain Documents: Medical papers, research articles
š New Documents: Recent uploads, latest materials
"""
)
upload_btn = gr.Button(
"š¤ Upload",
elem_classes="enhanced-button primary-button"
)
upload_status = gr.Textbox(
label="Upload Status",
interactive=False,
lines=3,
elem_classes="input-container"
)
# Quick actions
with gr.Group():
gr.Markdown("### ā” Quick Actions")
sample_queries = [
"What is autism?",
"Early signs of autism",
"Autism therapy options",
"Supporting autistic children"
]
for query in sample_queries:
btn = gr.Button(
query,
elem_classes="enhanced-button secondary-button",
size="sm"
)
btn.click(
lambda q=query: q,
outputs=[user_input]
)
return {
'chatbot': chatbot,
'user_input': user_input,
'audio_input': audio_input,
'doc_query': doc_query,
'voice_dropdown': voice_dropdown,
'audio_output': audio_output,
'status_output': status_output,
'stats_display': stats_display,
'doc_file': doc_file,
'doc_type': doc_type,
'upload_status': upload_status,
'text_btn': text_btn,
'audio_btn': audio_btn,
'both_btn': both_btn,
'upload_btn': upload_btn,
'clear_btn': clear_btn
}
def create_old_documents_tab():
"""Create the old documents Q&A tab"""
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("## š Old Documents Q&A")
gr.Markdown("Upload historical documents, legacy materials, and archived content for specialized querying.")
query = gr.Textbox(
placeholder="Ask about old documents...",
lines=3,
label="š Your Question about Old Documents",
elem_classes="input-container"
)
doc_file = gr.File(
label="š Upload Old Document (PDF, DOCX, TXT)",
file_types=[".pdf", ".docx", ".txt"],
elem_classes="input-container"
)
with gr.Row():
submit_btn = gr.Button(
"š Submit Query",
variant="primary",
elem_classes="enhanced-button primary-button"
)
clear_old_btn = gr.Button(
"šļø Clear",
variant="secondary",
elem_classes="enhanced-button danger-button"
)
output = gr.Textbox(
label="š Answer from Old Documents",
lines=12,
interactive=False,
elem_classes="input-container"
)
with gr.Column(scale=1):
# Old documents info panel
gr.HTML("""
š Old Documents Info
Purpose: Process historical and legacy documents that require specialized handling.
Best for:
- Archived medical records
- Historical research papers
- Legacy documentation
- Older format materials
Features:
- Specialized processing pipeline
- Historical context awareness
- Legacy format support
""")
# Recent old document queries
old_doc_history = gr.HTML(
"""
š Recent Queries
No recent queries yet.
"""
)
return {
'query': query,
'doc_file': doc_file,
'output': output,
'submit_btn': submit_btn,
'clear_old_btn': clear_old_btn,
'old_doc_history': old_doc_history
}
def create_voice_chat_tab():
"""Create the voice chat tab with WebRTC"""
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("## šļø Real-time Voice Chat")
gr.Markdown("Experience real-time speech-to-speech conversation with Wisal using advanced voice AI.")
# WebRTC component for real-time audio
webrtc_audio = WebRTC(
label="š¤ Voice Chat Interface",
modality="audio",
mode="send-receive",
elem_id="voice-chat-source",
rtc_configuration=get_cloudflare_turn_credentials_async,
icon="https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png",
pulse_color="rgb(79, 172, 254)",
icon_button_color="rgb(255, 255, 255)",
elem_classes="webrtc-container"
)
# Voice chat controls
with gr.Row():
voice_status = gr.Textbox(
label="š Voice Chat Status",
value="Ready to start voice conversation...",
interactive=False,
elem_classes="input-container"
)
with gr.Row():
start_voice_btn = gr.Button(
"š¤ Start Voice Chat",
variant="primary",
elem_classes="enhanced-button primary-button"
)
stop_voice_btn = gr.Button(
"ā¹ļø Stop Voice Chat",
variant="secondary",
elem_classes="enhanced-button danger-button"
)
with gr.Column(scale=1):
# Voice chat info
gr.HTML("""
šļø Voice Chat Features
Real-time Features:
- Live speech recognition
- Instant AI responses
- Natural conversation flow
- Low-latency processing
Voice Quality:
- High-quality audio
- Noise cancellation
- Multiple voice options
- Emotional tone support
""")
# Audio visualizer placeholder
gr.HTML("""
""")
return {
'webrtc_audio': webrtc_audio,
'voice_status': voice_status,
'start_voice_btn': start_voice_btn,
'stop_voice_btn': stop_voice_btn
}
def create_document_management_tab():
"""Create the enhanced document management tab"""
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("## š Advanced Document Management")
gr.Markdown("Upload and manage different types of documents with specialized processing pipelines.")
# Document upload section
with gr.Group():
gr.Markdown("### š¤ Upload Documents")
upload_file = gr.File(
label="š Select Document",
file_types=[".pdf", ".docx", ".txt", ".md"],
elem_classes="input-container"
)
upload_doc_type = gr.Dropdown(
label="š Document Category",
choices=[(v["label"], k) for k, v in DOCUMENT_TYPES.items()],
value="user_specific",
elem_classes="input-container"
)
upload_query = gr.Textbox(
placeholder="Optional: Ask a question about this document...",
label="ā Optional Query",
lines=2,
elem_classes="input-container"
)
process_doc_btn = gr.Button(
"š Process Document",
variant="primary",
elem_classes="enhanced-button primary-button"
)
doc_result = gr.Textbox(
label="š Processing Result",
lines=8,
interactive=False,
elem_classes="input-container"
)
# Batch processing section
with gr.Group():
gr.Markdown("### š¦ Batch Processing")
batch_files = gr.File(
label="š Upload Multiple Documents",
file_count="multiple",
file_types=[".pdf", ".docx", ".txt"],
elem_classes="input-container"
)
batch_type = gr.Dropdown(
label="š Batch Category",
choices=[(v["label"], k) for k, v in DOCUMENT_TYPES.items()],
value="domain_documents",
elem_classes="input-container"
)
batch_process_btn = gr.Button(
"ā” Process Batch",
variant="secondary",
elem_classes="enhanced-button secondary-button"
)
batch_result = gr.Textbox(
label="š Batch Results",
lines=6,
interactive=False,
elem_classes="input-container"
)
with gr.Column(scale=1):
# Document statistics
doc_stats_display = gr.HTML(
create_stats_display(),
label="š Document Statistics"
)
# Document type details
gr.HTML(f"""
š Document Categories
{"".join([f'''
{info["label"]}
{info["description"]}
''' for info in DOCUMENT_TYPES.values()])}
""")
# Processing guidelines
gr.HTML("""
š” Processing Tips
File Formats: PDF, DOCX, TXT, MD
Size Limit: Up to 50MB per file
Batch Processing: Up to 10 files at once
Processing Time: 10-30 seconds per document
""")
return {
'upload_file': upload_file,
'upload_doc_type': upload_doc_type,
'upload_query': upload_query,
'doc_result': doc_result,
'batch_files': batch_files,
'batch_type': batch_type,
'batch_result': batch_result,
'process_doc_btn': process_doc_btn,
'batch_process_btn': batch_process_btn,
'doc_stats_display': doc_stats_display
}
def process_batch_documents(files, doc_type: str) -> str:
"""Process multiple documents in batch"""
if not files:
return "ā ļø No files selected for batch processing."
if doc_type == "None" or not doc_type:
return "ā ļø Please select a document type for batch processing."
results = []
successful = 0
failed = 0
for file in files:
try:
result = handle_document_upload(file, doc_type)
if result.startswith("ā"):
failed += 1
results.append(f"ā {os.path.basename(file.name)}: Failed")
else:
successful += 1
results.append(f"ā
{os.path.basename(file.name)}: Success")
except Exception as e:
failed += 1
results.append(f"ā {os.path.basename(file.name)}: {str(e)[:50]}...")
summary = f"š¦ Batch Processing Complete:\nā
Successful: {successful}\nā Failed: {failed}\n\n"
summary += "\n".join(results)
return summary
# def create_demo():
# """Create the enhanced integrated Gradio demo"""
# # Custom theme
# theme = gr.themes.Base(
# primary_hue="blue",
# secondary_hue="purple",
# neutral_hue="slate",
# font=gr.themes.GoogleFont("Inter")
# )
# with gr.Blocks(
# title="Wisal - Advanced Autism AI Assistant",
# theme=theme,
# css=ENHANCED_CSS,
# head="""
#
#
# """
# ) as demo:
# # State management
# chat_history = gr.State([])
# # Header
# gr.HTML("""
#
#
š§ Wisal
#
Advanced AI Assistant for Autism Spectrum Disorders
#
#
#
AI Assistant Active - All Services Online
#
#
# """)
# # Main tabbed interface
# with gr.Tabs(elem_classes="tab-nav") as tabs:
# # Main Chat Tab
# with gr.TabItem("š¬ Main Chat", elem_id="main-chat"):
# main_components = create_main_chat_tab()
# # Old Documents Tab
# with gr.TabItem("š Old Documents", elem_id="old-docs"):
# old_doc_components = create_old_documents_tab()
# # Voice Chat Tab
# with gr.TabItem("šļø Voice Chat", elem_id="voice-chat"):
# voice_components = create_voice_chat_tab()
# # Document Management Tab
# with gr.TabItem("š Document Management", elem_id="doc-management"):
# doc_mgmt_components = create_document_management_tab()
# # Event handlers for Main Chat Tab
# main_components['text_btn'].click(
# fn=process_with_stats,
# inputs=[main_components['user_input'], main_components['audio_input'], chat_history],
# outputs=[main_components['chatbot'], main_components['user_input'], main_components['status_output'], main_components['stats_display'], main_components['audio_input']]
# )
# main_components['both_btn'].click(
# fn=process_with_audio,
# inputs=[main_components['user_input'], main_components['audio_input'], main_components['voice_dropdown'], chat_history],
# outputs=[main_components['chatbot'], main_components['audio_output'], main_components['user_input'], main_components['status_output'], main_components['stats_display'], main_components['audio_input']]
# )
# main_components['user_input'].submit(
# fn=process_with_audio,
# inputs=[main_components['user_input'], main_components['audio_input'], main_components['voice_dropdown'], chat_history],
# outputs=[main_components['chatbot'], main_components['audio_output'], main_components['user_input'], main_components['status_output'], main_components['stats_display'], main_components['audio_input']]
# )
# main_components['upload_btn'].click(
# fn=handle_document_upload,
# inputs=[main_components['doc_file'], main_components['doc_type']],
# outputs=[main_components['upload_status']]
# )
# main_components['clear_btn'].click(
# fn=clear_chat,
# outputs=[main_components['chatbot'], main_components['audio_output'], main_components['status_output'], main_components['stats_display']]
# )
# # Event handlers for Old Documents Tab
# old_doc_components['submit_btn'].click(
# fn=process_old_doc_query,
# inputs=[old_doc_components['query'], old_doc_components['doc_file']],
# outputs=[old_doc_components['output']]
# )
# old_doc_components['clear_old_btn'].click(
# fn=lambda: ("", None, ""),
# outputs=[old_doc_components['query'], old_doc_components['doc_file'], old_doc_components['output']]
# )
# # Event handlers for Voice Chat Tab
# voice_components['webrtc_audio'].stream(
# GeminiHandler(),
# inputs=[voice_components['webrtc_audio']],
# outputs=[voice_components['webrtc_audio']],
# time_limit=180 if get_space() else None,
# concurrency_limit=2 if get_space() else None,
# )
# voice_components['start_voice_btn'].click(
# fn=lambda: "š¤ Voice chat started... Speak now!",
# outputs=[voice_components['voice_status']]
# )
# voice_components['stop_voice_btn'].click(
# fn=lambda: "ā¹ļø Voice chat stopped.",
# outputs=[voice_components['voice_status']]
# )
# # Event handlers for Document Management Tab
# doc_mgmt_components['process_doc_btn'].click(
# fn=handle_document_upload,
# inputs=[doc_mgmt_components['upload_file'], doc_mgmt_components['upload_doc_type'], doc_mgmt_components['upload_query']],
# outputs=[doc_mgmt_components['doc_result']]
# )
# doc_mgmt_components['batch_process_btn'].click(
# fn=process_batch_documents,
# inputs=[doc_mgmt_components['batch_files'], doc_mgmt_components['batch_type']],
# outputs=[doc_mgmt_components['batch_result']]
# )
# # Footer with usage instructions
# gr.HTML("""
#
#
š” How to Use Wisal
#
#
š¬ Main Chat: Primary interface for text and audio conversations with comprehensive features
#
š Old Documents: Specialized processing for historical and legacy documents
#
šļø Voice Chat: Real-time speech-to-speech conversation with advanced voice AI
#
š Document Management: Advanced document upload, processing, and batch operations
#
š¤ Audio Features: Support for voice input, multiple voice options, and audio responses
#
š Statistics: Real-time tracking of usage, performance, and document processing metrics
#
#
# """)
# return demo
"""
specific_utils.py - Enhanced Document Processing with Gradio Integration
This module integrates with gradio_utils.py to provide comprehensive document processing
and audio handling for the Wisal application.
"""
import os
import gradio as gr
from typing import Tuple, Optional, Dict, Any, List
import shutil
from datetime import datetime
# Import from gradio_utils.py for enhanced functionality
from gradio_utils import (
DOCUMENT_TYPES,
handle_document_upload,
process_with_audio,
process_with_stats,
create_stats_display,
demo_state,
safe_audio_processing,
process_old_doc_query,
clear_chat,
ENHANCED_CSS
)
# Import document functions from docs_utils
from docs_utils import (
old_doc_ingestion, old_doc_qa,
user_doc_ingest, user_doc_qa,
rag_dom_ingest, rag_dom_qa
)
# Import audio utilities
from audio_utils import get_transcription_or_text, generate_tts_response
# Import pipeline functions
from pipeQuery import process_query, clean_pipeline_result
from logger.custom_logger import CustomLoggerTracker
# Initialize logger
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("specific_utils")
logger.info("Logger initialized for specific utilities module")
def get_all_document_choices() -> List[Tuple[str, str]]:
"""Get all 4 document type choices for dropdown"""
return [
("š Old Documents", "old_documents"),
("š¤ User-Specific Documents", "user_specific"),
("š„ Domain Documents", "domain_documents"),
("š New Documents", "new_documents")
]
def enhanced_document_upload_handler(file, doc_type: str, query: str = "") -> str:
"""
Enhanced document upload handler that properly routes to correct functions
"""
if not file:
return "ā ļø Please select a file to upload."
if not doc_type or doc_type == "None":
return "ā ļø Please select a document type."
try:
logger.info(f"Processing document upload: {file.name} as {doc_type}")
# Route to appropriate processing function based on document type
if doc_type == "old_documents":
# Use old documents processing from docs_utils
demo_state.document_stats["old_documents"] += 1
ingest_result = old_doc_ingestion(file.name)
if query and query.strip():
answer = old_doc_qa(query)
return f"š Old Document processed: {ingest_result}\n\nQuery Answer:\n{answer}"
return f"š Old Document ingested: {ingest_result}"
elif doc_type == "user_specific":
# Use user-specific processing from docs_utils
demo_state.document_stats["user_specific"] += 1
ingest_result = user_doc_ingest(file.name)
if query and query.strip():
answer = user_doc_qa(query)
return f"š¤ User Document processed: {ingest_result}\n\nQuery Answer:\n{answer}"
return f"š¤ User-specific document ingested: {ingest_result}"
elif doc_type == "domain_documents":
# Use domain processing from docs_utils
demo_state.document_stats["domain_documents"] += 1
ingest_result = rag_dom_ingest(file.name)
if query and query.strip():
answer = rag_dom_qa(query)
return f"š„ Domain Document processed: {ingest_result}\n\nQuery Answer:\n{answer}"
return f"š„ Domain document ingested: {ingest_result}"
elif doc_type == "new_documents":
# Use standard pipeline for new documents
from rag_steps import ingest_file
demo_state.document_stats["new_documents"] += 1
ingest_result = ingest_file(file.name)
if query and query.strip():
response = process_query(query)
cleaned_response = clean_pipeline_result(
response[0] if isinstance(response, tuple) else str(response)
)
return f"š New Document processed: {ingest_result}\n\nQuery Answer:\n{cleaned_response}"
return f"š New document ingested: {ingest_result}"
else:
return f"ā Unknown document type: {doc_type}"
except Exception as e:
logger.error(f"Document upload error: {e}")
return f"ā Document processing failed: {str(e)}"
def enhanced_audio_transcription(audio_file) -> Tuple[str, str]:
"""
Enhanced audio transcription with better error handling
"""
if not audio_file:
return "", "No audio file provided"
try:
logger.info(f"Processing audio transcription...")
# Use the get_transcription_or_text function with proper error handling
transcribed_text, status = get_transcription_or_text("", audio_file)
if not transcribed_text or transcribed_text.startswith("[ERROR]"):
logger.warning(f"Transcription failed: {status}")
return "", f"ā Audio transcription failed: {status}"
logger.info(f"Transcription successful: {transcribed_text[:50]}...")
return transcribed_text.strip(), "ā
Audio transcribed successfully"
except Exception as e:
logger.error(f"Audio transcription error: {e}")
return "", f"ā Transcription error: {str(e)}"
def process_text_with_audio_support(user_input: str, audio_input, chat_history: List) -> Tuple[List, str, str, str]:
"""
Process input (text or audio) and return only text response with chat history
"""
try:
# Get input text
query = user_input.strip() if user_input else ""
# Process audio if no text provided
if audio_input and not query:
transcribed_text, transcription_status = enhanced_audio_transcription(audio_input)
if transcribed_text:
query = transcribed_text
else:
return chat_history, "", f"ā {transcription_status}", create_stats_display()
if not query:
return chat_history, "", "ā ļø Please provide text or audio input.", create_stats_display()
# Use the enhanced processing from gradio_utils
new_history, _, cleared_input, status_msg, stats_display, _ = process_with_stats(query, None, chat_history)
return new_history, cleared_input, status_msg, stats_display
except Exception as e:
logger.error(f"Text processing error: {e}")
return chat_history, "", f"ā Processing error: {str(e)}", create_stats_display()
def process_audio_only_response(user_input: str, audio_input, voice_dropdown: str, chat_history: List) -> Tuple[Optional[Any], str, str, str]:
"""
Process input and return only audio response
"""
try:
# Get input text
query = user_input.strip() if user_input else ""
# Process audio if no text provided
if audio_input and not query:
transcribed_text, transcription_status = enhanced_audio_transcription(audio_input)
if transcribed_text:
query = transcribed_text
else:
return None, "", f"ā {transcription_status}", create_stats_display()
if not query:
return None, "", "ā ļø Please provide text or audio input.", create_stats_display()
# Generate text response first
response = process_query(query)
if isinstance(response, tuple):
text_response = clean_pipeline_result(response[0] if response[0] else response[1])
else:
text_response = clean_pipeline_result(str(response))
# Generate audio from text response
audio_response = safe_audio_processing(text_response, voice_dropdown)
status_msg = "ā
Audio response generated successfully" if audio_response else "ā ļø Audio generation failed"
return audio_response, "", status_msg, create_stats_display()
except Exception as e:
logger.error(f"Audio processing error: {e}")
return None, "", f"ā Audio processing error: {str(e)}", create_stats_display()
def process_both_text_and_audio_response(
user_input: str,
audio_input,
voice_dropdown: str,
chat_history: List
) -> Tuple[List, Optional[Any], str, str, str]:
"""
Process input and return both text (in chat) and audio response
"""
try:
# Use the enhanced processing from gradio_utils
return process_with_audio(user_input, audio_input, voice_dropdown, chat_history)
except Exception as e:
logger.error(f"Combined processing error: {e}")
return chat_history, None, "", f"ā Processing error: {str(e)}", create_stats_display()
def create_document_info_panel() -> str:
"""Create HTML info panel for document types"""
return """
š Document Processing Options
š Old Documents
Historical documents, legacy materials, archived content
š¤ User-Specific Documents
Personal files, user manuals, individual documents
š„ Domain Documents
Medical papers, research articles, domain knowledge
š New Documents
Recent uploads, latest materials, standard processing
š” Tip: Each document type uses specialized processing optimized for that content category.
"""
def get_enhanced_css() -> str:
"""Get enhanced CSS from gradio_utils"""
return ENHANCED_CSS
def create_enhanced_file_upload_interface():
"""Create an enhanced file upload interface with all 4 document types"""
with gr.Group():
gr.Markdown("### š Enhanced Document Upload")
doc_file = gr.File(
label="š Upload Document (PDF, DOCX, TXT)",
file_types=[".pdf", ".docx", ".txt"],
elem_classes="input-container"
)
# Fixed dropdown with all 4 options
doc_type = gr.Dropdown(
label="š Document Type",
choices=get_all_document_choices(),
value="user_specific",
elem_classes="input-container"
)
# Optional query field
doc_query = gr.Textbox(
label="š Optional: Ask about this document",
placeholder="What does this document say about...",
lines=2,
elem_classes="input-container"
)
upload_btn = gr.Button(
"š¤ Upload & Process",
variant="primary",
elem_classes="enhanced-button primary-button"
)
upload_status = gr.Textbox(
label="š Upload Status",
interactive=False,
lines=5,
elem_classes="input-container"
)
# Document info panel
doc_info = gr.HTML(create_document_info_panel())
return {
'doc_file': doc_file,
'doc_type': doc_type,
'doc_query': doc_query,
'upload_btn': upload_btn,
'upload_status': upload_status,
'doc_info': doc_info
}
# Export main functions for use in main.py
__all__ = [
'get_all_document_choices',
'enhanced_document_upload_handler',
'enhanced_audio_transcription',
'process_text_with_audio_support',
'process_audio_only_response',
'process_both_text_and_audio_response',
'create_document_info_panel',
'create_enhanced_file_upload_interface',
'get_enhanced_css',
'ENHANCED_CSS',
'demo_state'
]