|
|
import gradio as gr |
|
|
import time |
|
|
import os |
|
|
import re |
|
|
from typing import List, Tuple, Optional |
|
|
from datetime import datetime |
|
|
import shutil |
|
|
import asyncio |
|
|
from io import BytesIO |
|
|
import numpy as np |
|
|
import websockets |
|
|
from dotenv import load_dotenv |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
try: |
|
|
from pipeQuery import process_query, clean_pipeline_result |
|
|
except ImportError as e: |
|
|
print(f"Warning: pipeQuery import failed: {e}") |
|
|
def process_query(query): return "Pipeline not available" |
|
|
def clean_pipeline_result(result): return str(result) |
|
|
|
|
|
try: |
|
|
from audio_utils import generate_tts_response, get_transcription_or_text, GeminiHandler |
|
|
except ImportError as e: |
|
|
print(f"Warning: audio_utils import failed: {e}") |
|
|
def generate_tts_response(text, voice): return None, "TTS not available" |
|
|
def get_transcription_or_text(text, audio): return text or "No input", "Text used" |
|
|
class GeminiHandler: |
|
|
def __init__(self): pass |
|
|
def copy(self): return GeminiHandler() |
|
|
def stop(self): pass |
|
|
|
|
|
try: |
|
|
from rag_steps import ingest_file |
|
|
except ImportError as e: |
|
|
print(f"Warning: rag_steps import failed: {e}") |
|
|
def ingest_file(file_path): return f"File ingestion not available for {file_path}" |
|
|
|
|
|
from logger.custom_logger import CustomLoggerTracker |
|
|
|
|
|
try: |
|
|
from docs_utils import old_doc_ingestion, old_doc_qa, user_doc_ingest, user_doc_qa, rag_dom_ingest, rag_dom_qa |
|
|
except ImportError as e: |
|
|
print(f"Warning: docs_utils import failed: {e}") |
|
|
def old_doc_ingestion(file_path): return f"Old doc ingestion not available for {file_path}" |
|
|
def old_doc_qa(query): return "Old doc QA not available" |
|
|
def user_doc_ingest(file_path): return f"User doc ingestion not available for {file_path}" |
|
|
def user_doc_qa(query): return "User doc QA not available" |
|
|
def rag_dom_ingest(file_path): return f"RAG domain ingestion not available for {file_path}" |
|
|
def rag_dom_qa(query): return "RAG domain QA not available" |
|
|
|
|
|
try: |
|
|
from fastrtc import ( |
|
|
WebRTC, |
|
|
get_cloudflare_turn_credentials_async, |
|
|
wait_for_item, |
|
|
) |
|
|
except ImportError as e: |
|
|
print(f"Warning: fastrtc import failed: {e}") |
|
|
|
|
|
class WebRTC: |
|
|
def __init__(self, **kwargs): pass |
|
|
def stream(self, *args, **kwargs): pass |
|
|
def get_cloudflare_turn_credentials_async(): return {} |
|
|
def wait_for_item(*args): pass |
|
|
|
|
|
try: |
|
|
from google import genai |
|
|
except ImportError as e: |
|
|
print(f"Warning: google.genai import failed: {e}") |
|
|
genai = None |
|
|
|
|
|
from gradio.utils import get_space |
|
|
|
|
|
|
|
|
custom_log = CustomLoggerTracker() |
|
|
logger = custom_log.get_logger("gradio_demo") |
|
|
|
|
|
|
|
|
class DemoState: |
|
|
def __init__(self): |
|
|
self.query_count = 0 |
|
|
self.session_stats = {"total_queries": 0, "avg_response_time": 0} |
|
|
self.recent_queries = [] |
|
|
self.last_uploaded_old_doc = None |
|
|
self.document_stats = { |
|
|
"old_documents": 0, |
|
|
"user_specific": 0, |
|
|
"domain_documents": 0, |
|
|
"new_documents": 0 |
|
|
} |
|
|
|
|
|
def update_stats(self, query: str, response_time: float): |
|
|
self.query_count += 1 |
|
|
self.session_stats["total_queries"] += 1 |
|
|
|
|
|
|
|
|
current_avg = self.session_stats["avg_response_time"] |
|
|
new_avg = ((current_avg * (self.query_count - 1)) + response_time) / self.query_count |
|
|
self.session_stats["avg_response_time"] = new_avg |
|
|
|
|
|
|
|
|
self.recent_queries.insert(0, {"query": query[:50] + "...", "time": datetime.now().strftime("%H:%M:%S")}) |
|
|
if len(self.recent_queries) > 5: |
|
|
self.recent_queries.pop() |
|
|
|
|
|
demo_state = DemoState() |
|
|
|
|
|
|
|
|
ENHANCED_CSS = """ |
|
|
/* Modern Dark Theme with Animations */ |
|
|
.gradio-container { |
|
|
background: linear-gradient(135deg, #0a0a0a 0%, #1a1a2e 25%, #16213e 75%, #0f1419 100%) !important; |
|
|
color: #e0e6ed !important; |
|
|
min-height: 100vh; |
|
|
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; |
|
|
} |
|
|
|
|
|
/* Animated gradient background */ |
|
|
.gradio-container::before { |
|
|
content: ''; |
|
|
position: fixed; |
|
|
top: 0; |
|
|
left: 0; |
|
|
width: 100%; |
|
|
height: 100%; |
|
|
background: linear-gradient(45deg, #667eea, #764ba2, #4facfe, #00f2fe); |
|
|
background-size: 400% 400%; |
|
|
animation: gradientShift 15s ease infinite; |
|
|
opacity: 0.03; |
|
|
z-index: -1; |
|
|
} |
|
|
|
|
|
@keyframes gradientShift { |
|
|
0% { background-position: 0% 50%; } |
|
|
50% { background-position: 100% 50%; } |
|
|
100% { background-position: 0% 50%; } |
|
|
} |
|
|
|
|
|
/* Header styling with glow effect */ |
|
|
.main-header { |
|
|
text-align: center; |
|
|
padding: 2rem 0; |
|
|
background: rgba(255, 255, 255, 0.02); |
|
|
border-radius: 20px; |
|
|
margin-bottom: 2rem; |
|
|
border: 1px solid rgba(255, 255, 255, 0.1); |
|
|
position: relative; |
|
|
overflow: hidden; |
|
|
} |
|
|
|
|
|
.main-header::before { |
|
|
content: ''; |
|
|
position: absolute; |
|
|
top: 0; |
|
|
left: -100%; |
|
|
width: 100%; |
|
|
height: 100%; |
|
|
background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.1), transparent); |
|
|
animation: shimmer 3s infinite; |
|
|
} |
|
|
|
|
|
@keyframes shimmer { |
|
|
0% { left: -100%; } |
|
|
100% { left: 100%; } |
|
|
} |
|
|
|
|
|
.main-title { |
|
|
font-size: 3rem; |
|
|
font-weight: 700; |
|
|
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 50%, #667eea 100%); |
|
|
-webkit-background-clip: text; |
|
|
-webkit-text-fill-color: transparent; |
|
|
background-clip: text; |
|
|
margin-bottom: 0.5rem; |
|
|
text-shadow: 0 0 30px rgba(79, 172, 254, 0.3); |
|
|
} |
|
|
|
|
|
.subtitle { |
|
|
font-size: 1.2rem; |
|
|
color: #a0aec0; |
|
|
margin-bottom: 1rem; |
|
|
} |
|
|
|
|
|
.status-indicator { |
|
|
display: inline-flex; |
|
|
align-items: center; |
|
|
gap: 0.5rem; |
|
|
padding: 0.5rem 1rem; |
|
|
background: rgba(79, 172, 254, 0.1); |
|
|
border: 1px solid rgba(79, 172, 254, 0.3); |
|
|
border-radius: 25px; |
|
|
font-size: 0.9rem; |
|
|
} |
|
|
|
|
|
.status-dot { |
|
|
width: 8px; |
|
|
height: 8px; |
|
|
background: #00f2fe; |
|
|
border-radius: 50%; |
|
|
animation: pulse 2s infinite; |
|
|
} |
|
|
|
|
|
@keyframes pulse { |
|
|
0%, 100% { opacity: 1; transform: scale(1); } |
|
|
50% { opacity: 0.5; transform: scale(1.2); } |
|
|
} |
|
|
|
|
|
/* Enhanced chatbot styling */ |
|
|
.chatbot-container { |
|
|
background: rgba(0, 0, 0, 0.3) !important; |
|
|
border: 1px solid rgba(255, 255, 255, 0.1) !important; |
|
|
border-radius: 20px !important; |
|
|
backdrop-filter: blur(20px) !important; |
|
|
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3) !important; |
|
|
} |
|
|
|
|
|
.message { |
|
|
animation: fadeInUp 0.5s ease-out; |
|
|
margin: 0.5rem 0; |
|
|
padding: 1rem; |
|
|
border-radius: 15px; |
|
|
position: relative; |
|
|
overflow: hidden; |
|
|
} |
|
|
|
|
|
.message.user { |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; |
|
|
color: white !important; |
|
|
margin-left: 2rem; |
|
|
box-shadow: 0 4px 15px rgba(102, 126, 234, 0.3); |
|
|
} |
|
|
|
|
|
.message.assistant { |
|
|
background: linear-gradient(135deg, rgba(79, 172, 254, 0.1) 0%, rgba(0, 242, 254, 0.1) 100%) !important; |
|
|
border: 1px solid rgba(79, 172, 254, 0.3); |
|
|
color: #e0e6ed !important; |
|
|
margin-right: 2rem; |
|
|
} |
|
|
|
|
|
@keyframes fadeInUp { |
|
|
from { |
|
|
opacity: 0; |
|
|
transform: translateY(20px); |
|
|
} |
|
|
to { |
|
|
opacity: 1; |
|
|
transform: translateY(0); |
|
|
} |
|
|
} |
|
|
|
|
|
/* Enhanced input styling */ |
|
|
.input-container { |
|
|
background: rgba(255, 255, 255, 0.05) !important; |
|
|
border: 2px solid rgba(255, 255, 255, 0.1) !important; |
|
|
border-radius: 15px !important; |
|
|
backdrop-filter: blur(10px) !important; |
|
|
transition: all 0.3s ease; |
|
|
} |
|
|
|
|
|
.input-container:focus-within { |
|
|
border-color: rgba(79, 172, 254, 0.5) !important; |
|
|
box-shadow: 0 0 20px rgba(79, 172, 254, 0.2) !important; |
|
|
} |
|
|
|
|
|
/* Enhanced buttons with hover effects */ |
|
|
.enhanced-button { |
|
|
border: none !important; |
|
|
border-radius: 12px !important; |
|
|
font-weight: 600 !important; |
|
|
text-transform: uppercase !important; |
|
|
letter-spacing: 0.5px !important; |
|
|
padding: 12px 24px !important; |
|
|
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; |
|
|
position: relative !important; |
|
|
overflow: hidden !important; |
|
|
box-shadow: 0 4px 15px rgba(0, 0, 0, 0.2) !important; |
|
|
} |
|
|
|
|
|
.enhanced-button::before { |
|
|
content: ''; |
|
|
position: absolute; |
|
|
top: 0; |
|
|
left: -100%; |
|
|
width: 100%; |
|
|
height: 100%; |
|
|
background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.2), transparent); |
|
|
transition: left 0.6s; |
|
|
} |
|
|
|
|
|
.enhanced-button:hover::before { |
|
|
left: 100%; |
|
|
} |
|
|
|
|
|
.primary-button { |
|
|
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%) !important; |
|
|
color: white !important; |
|
|
} |
|
|
|
|
|
.primary-button:hover { |
|
|
transform: translateY(-2px) !important; |
|
|
box-shadow: 0 8px 25px rgba(79, 172, 254, 0.4) !important; |
|
|
} |
|
|
|
|
|
.secondary-button { |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; |
|
|
color: white !important; |
|
|
} |
|
|
|
|
|
.secondary-button:hover { |
|
|
transform: translateY(-2px) !important; |
|
|
box-shadow: 0 8px 25px rgba(102, 126, 234, 0.4) !important; |
|
|
} |
|
|
|
|
|
.danger-button { |
|
|
background: linear-gradient(135deg, #ff6b6b 0%, #ee5a52 100%) !important; |
|
|
color: white !important; |
|
|
} |
|
|
|
|
|
.danger-button:hover { |
|
|
transform: translateY(-2px) !important; |
|
|
box-shadow: 0 8px 25px rgba(255, 107, 107, 0.4) !important; |
|
|
} |
|
|
|
|
|
/* Stats panel */ |
|
|
.stats-panel { |
|
|
background: rgba(255, 255, 255, 0.03); |
|
|
border: 1px solid rgba(255, 255, 255, 0.1); |
|
|
border-radius: 15px; |
|
|
padding: 1.5rem; |
|
|
margin: 1rem 0; |
|
|
} |
|
|
|
|
|
.stat-item { |
|
|
display: flex; |
|
|
justify-content: space-between; |
|
|
align-items: center; |
|
|
padding: 0.5rem 0; |
|
|
border-bottom: 1px solid rgba(255, 255, 255, 0.1); |
|
|
} |
|
|
|
|
|
.stat-value { |
|
|
font-weight: 600; |
|
|
color: #4facfe; |
|
|
} |
|
|
|
|
|
/* Page content */ |
|
|
.page-content { |
|
|
padding: 1rem; |
|
|
background: rgba(255, 255, 255, 0.02); |
|
|
border-radius: 15px; |
|
|
border: 1px solid rgba(255, 255, 255, 0.1); |
|
|
min-height: 60vh; |
|
|
} |
|
|
|
|
|
/* Tab styling */ |
|
|
.tab-nav { |
|
|
background: rgba(255, 255, 255, 0.05) !important; |
|
|
border-radius: 10px !important; |
|
|
border: 1px solid rgba(255, 255, 255, 0.1) !important; |
|
|
} |
|
|
|
|
|
/* WebRTC styling */ |
|
|
.webrtc-container { |
|
|
background: rgba(0, 0, 0, 0.3) !important; |
|
|
border: 1px solid rgba(255, 255, 255, 0.1) !important; |
|
|
border-radius: 20px !important; |
|
|
backdrop-filter: blur(20px) !important; |
|
|
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3) !important; |
|
|
padding: 1rem; |
|
|
} |
|
|
|
|
|
/* Responsive design */ |
|
|
@media (max-width: 768px) { |
|
|
.main-title { font-size: 2rem; } |
|
|
.gradio-container { padding: 10px; } |
|
|
.message.user { margin-left: 1rem; } |
|
|
.message.assistant { margin-right: 1rem; } |
|
|
} |
|
|
|
|
|
/* Scrollbar styling */ |
|
|
::-webkit-scrollbar { |
|
|
width: 8px; |
|
|
} |
|
|
|
|
|
::-webkit-scrollbar-track { |
|
|
background: rgba(255, 255, 255, 0.05); |
|
|
border-radius: 4px; |
|
|
} |
|
|
|
|
|
::-webkit-scrollbar-thumb { |
|
|
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%); |
|
|
border-radius: 4px; |
|
|
} |
|
|
|
|
|
::-webkit-scrollbar-thumb:hover { |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
DOCUMENT_TYPES = { |
|
|
"old_documents": { |
|
|
"label": "π Old Documents", |
|
|
"description": "Historical documents, legacy materials (uses old_doc_ingestion)", |
|
|
"color": "#8B4513", |
|
|
}, |
|
|
"user_specific": { |
|
|
"label": "π€ User-Specific Documents", |
|
|
"description": "Personal files, user manuals (uses user_doc_ingest)", |
|
|
"color": "#4facfe", |
|
|
}, |
|
|
"domain_documents": { |
|
|
"label": "π₯ Domain Documents", |
|
|
"description": "Medical papers, research articles (uses rag_dom_ingest)", |
|
|
"color": "#00f2fe", |
|
|
}, |
|
|
"new_documents": { |
|
|
"label": "π New Documents", |
|
|
"description": "Recent uploads using standard RAG pipeline", |
|
|
"color": "#667eea", |
|
|
} |
|
|
} |
|
|
def format_response_time(seconds: float) -> str: |
|
|
"""Format response time for display""" |
|
|
if seconds < 1: |
|
|
return f"{int(seconds * 1000)}ms" |
|
|
elif seconds < 60: |
|
|
return f"{seconds:.1f}s" |
|
|
else: |
|
|
minutes = int(seconds // 60) |
|
|
remaining_seconds = seconds % 60 |
|
|
return f"{minutes}m {remaining_seconds:.1f}s" |
|
|
|
|
|
def create_stats_display(): |
|
|
"""Create dynamic statistics display""" |
|
|
total_queries = demo_state.session_stats["total_queries"] |
|
|
avg_time = demo_state.session_stats["avg_response_time"] |
|
|
|
|
|
stats_html = f""" |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #4facfe; margin-bottom: 1rem;">π Session Statistics</h4> |
|
|
<div class="stat-item"> |
|
|
<span>Total Queries:</span> |
|
|
<span class="stat-value">{total_queries}</span> |
|
|
</div> |
|
|
<div class="stat-item"> |
|
|
<span>Average Response Time:</span> |
|
|
<span class="stat-value">{format_response_time(avg_time)}</span> |
|
|
</div> |
|
|
<div class="stat-item"> |
|
|
<span>Session Started:</span> |
|
|
<span class="stat-value">{datetime.now().strftime('%H:%M')}</span> |
|
|
</div> |
|
|
</div> |
|
|
""" |
|
|
|
|
|
|
|
|
doc_stats = demo_state.document_stats |
|
|
if any(doc_stats.values()): |
|
|
stats_html += """ |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #4facfe; margin-bottom: 1rem;">π Document Statistics</h4> |
|
|
""" |
|
|
for doc_type, count in doc_stats.items(): |
|
|
if count > 0: |
|
|
type_info = DOCUMENT_TYPES.get(doc_type, {}) |
|
|
color = type_info.get("color", "#4facfe") |
|
|
label = type_info.get("label", doc_type.replace("_", " ").title()) |
|
|
stats_html += f""" |
|
|
<div class="stat-item"> |
|
|
<span>{label}:</span> |
|
|
<span class="stat-value" style="color: {color};">{count}</span> |
|
|
</div> |
|
|
""" |
|
|
stats_html += "</div>" |
|
|
|
|
|
if demo_state.recent_queries: |
|
|
stats_html += """ |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #4facfe; margin-bottom: 1rem;">π Recent Queries</h4> |
|
|
""" |
|
|
for query_info in demo_state.recent_queries: |
|
|
stats_html += f""" |
|
|
<div class="stat-item"> |
|
|
<span>{query_info['query']}</span> |
|
|
<span class="stat-value">{query_info['time']}</span> |
|
|
</div> |
|
|
""" |
|
|
stats_html += "</div>" |
|
|
return stats_html |
|
|
|
|
|
|
|
|
def process_old_document(file_path: str, query: str = None) -> str: |
|
|
"""Process old documents using existing old_doc_ingestion""" |
|
|
try: |
|
|
|
|
|
demo_state.document_stats["old_documents"] += 1 |
|
|
demo_state.last_uploaded_old_doc = file_path |
|
|
|
|
|
|
|
|
status = old_doc_ingestion(file_path) |
|
|
|
|
|
if query: |
|
|
answer = old_doc_qa(query) |
|
|
return f"π Old Document processed: {status}\n\n{answer}" |
|
|
|
|
|
return f"π Old Document ingested successfully: {status}" |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing old document: {e}") |
|
|
return f"β Error processing old document: {str(e)}" |
|
|
|
|
|
def process_user_document(file_path: str, query: str = None) -> str: |
|
|
"""Process user-specific documents""" |
|
|
try: |
|
|
demo_state.document_stats["user_specific"] += 1 |
|
|
result = ingest_file(file_path) |
|
|
|
|
|
if query: |
|
|
response = process_query(query) |
|
|
cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response)) |
|
|
return f"π€ User Document processed: {result}\n\n{cleaned_response}" |
|
|
|
|
|
return f"π€ User-specific document ingested successfully: {result}" |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing user document: {e}") |
|
|
return f"β Error processing user document: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
def start_voice_chat(): |
|
|
"""Start voice chat with proper initialization""" |
|
|
try: |
|
|
|
|
|
global voice_handler |
|
|
voice_handler = GeminiHandler() |
|
|
return "π€ Voice chat started... Speak now!" |
|
|
except Exception as e: |
|
|
logger.error(f"Error starting voice chat: {e}") |
|
|
return f"β Failed to start voice chat: {str(e)}" |
|
|
|
|
|
def stop_voice_chat(): |
|
|
"""Stop voice chat properly""" |
|
|
try: |
|
|
global voice_handler |
|
|
if 'voice_handler' in globals() and voice_handler: |
|
|
voice_handler.stop() |
|
|
return "βΉοΈ Voice chat stopped." |
|
|
except Exception as e: |
|
|
logger.error(f"Error stopping voice chat: {e}") |
|
|
return f"β Error stopping voice chat: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
def process_domain_document(file_path: str, query: str = None) -> str: |
|
|
"""Process domain-specific documents""" |
|
|
try: |
|
|
demo_state.document_stats["domain_documents"] += 1 |
|
|
result = ingest_file(file_path) |
|
|
|
|
|
if query: |
|
|
response = process_query(query) |
|
|
cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response)) |
|
|
return f"π₯ Domain Document processed: {result}\n\n{cleaned_response}" |
|
|
|
|
|
return f"π₯ Domain document ingested successfully: {result}" |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing domain document: {e}") |
|
|
return f"β Error processing domain document: {str(e)}" |
|
|
|
|
|
def process_new_document(file_path: str, query: str = None) -> str: |
|
|
"""Process new documents""" |
|
|
try: |
|
|
demo_state.document_stats["new_documents"] += 1 |
|
|
result = ingest_file(file_path) |
|
|
|
|
|
if query: |
|
|
response = process_query(query) |
|
|
cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response)) |
|
|
return f"π New Document processed: {result}\n\n{cleaned_response}" |
|
|
|
|
|
return f"π New document ingested successfully: {result}" |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing new document: {e}") |
|
|
return f"β Error processing new document: {str(e)}" |
|
|
|
|
|
|
|
|
def process_with_stats(user_input: str, audio_input, chat_history: List) -> Tuple[List, str, str, str, Optional[tuple]]: |
|
|
"""Process query with timing and statistics""" |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
query = user_input.strip() if user_input else "" |
|
|
if audio_input and not query: |
|
|
query, status = get_transcription_or_text("", audio_input) |
|
|
if query.startswith("[ERROR]"): |
|
|
return chat_history, "", f"β {status}", create_stats_display(), None |
|
|
|
|
|
if not query: |
|
|
return chat_history, "", "β οΈ Please provide text or audio input.", create_stats_display(), None |
|
|
|
|
|
logger.info(f"Processing query: {query[:50]}...") |
|
|
|
|
|
|
|
|
response = process_query(query) |
|
|
response_time = time.time() - start_time |
|
|
|
|
|
|
|
|
demo_state.update_stats(query, response_time) |
|
|
|
|
|
|
|
|
if isinstance(response, tuple): |
|
|
cleaned_response = clean_pipeline_result(response[0] if response[0] else response[1]) |
|
|
else: |
|
|
cleaned_response = clean_pipeline_result(str(response)) |
|
|
|
|
|
|
|
|
new_history = chat_history.copy() |
|
|
new_history.append({"role": "user", "content": query}) |
|
|
new_history.append({"role": "assistant", "content": cleaned_response}) |
|
|
|
|
|
|
|
|
status_msg = f"β
Response generated in {format_response_time(response_time)}" |
|
|
|
|
|
|
|
|
stats_display = create_stats_display() |
|
|
|
|
|
logger.info(f"Query processed successfully in {response_time:.2f}s") |
|
|
return new_history, "", status_msg, stats_display, None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing query: {e}") |
|
|
error_msg = f"β Error: {str(e)[:100]}..." |
|
|
return chat_history, "", error_msg, create_stats_display(), None |
|
|
|
|
|
|
|
|
|
|
|
def process_with_audio(user_input: str, audio_input, voice_dropdown: str, chat_history: List) -> Tuple[List, Optional[tuple], str, str, str, Optional[tuple]]: |
|
|
"""Fixed audio processing function""" |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
query = user_input.strip() if user_input else "" |
|
|
|
|
|
if audio_input and not query: |
|
|
try: |
|
|
|
|
|
query, status = get_transcription_or_text("", audio_input) |
|
|
if not query or query.startswith("[ERROR]"): |
|
|
return chat_history, None, "", f"β Audio transcription failed: {status}", create_stats_display(), None |
|
|
except Exception as audio_error: |
|
|
logger.error(f"Audio transcription error: {audio_error}") |
|
|
return chat_history, None, "", f"β Audio processing error: {str(audio_error)}", create_stats_display(), None |
|
|
|
|
|
if not query: |
|
|
return chat_history, None, "", "β οΈ Please provide text or audio input.", create_stats_display(), None |
|
|
|
|
|
logger.info(f"Processing query with audio: {query[:50]}...") |
|
|
|
|
|
|
|
|
try: |
|
|
response = process_query(query) |
|
|
except Exception as query_error: |
|
|
logger.error(f"Query processing error: {query_error}") |
|
|
return chat_history, None, "", f"β Query processing failed: {str(query_error)}", create_stats_display(), None |
|
|
|
|
|
|
|
|
if isinstance(response, tuple): |
|
|
cleaned_response = clean_pipeline_result(response[0] if response[0] else response[1]) |
|
|
else: |
|
|
cleaned_response = clean_pipeline_result(str(response)) |
|
|
|
|
|
|
|
|
new_history = chat_history.copy() |
|
|
new_history.append({"role": "user", "content": query}) |
|
|
new_history.append({"role": "assistant", "content": cleaned_response}) |
|
|
|
|
|
|
|
|
audio_response = None |
|
|
tts_status = "Audio generation skipped" |
|
|
|
|
|
try: |
|
|
|
|
|
if 'generate_tts_response' in globals(): |
|
|
audio_data, tts_status = generate_tts_response(cleaned_response, voice_dropdown) |
|
|
audio_response = audio_data if audio_data else None |
|
|
else: |
|
|
tts_status = "TTS function not available" |
|
|
except Exception as audio_error: |
|
|
logger.error(f"Audio generation error: {audio_error}") |
|
|
audio_response = None |
|
|
tts_status = f"Audio generation failed: {str(audio_error)[:50]}..." |
|
|
|
|
|
response_time = time.time() - start_time |
|
|
demo_state.update_stats(query, response_time) |
|
|
|
|
|
status_msg = f"β
Response generated in {format_response_time(response_time)}" |
|
|
if audio_response: |
|
|
status_msg += " | π΅ Audio ready" |
|
|
else: |
|
|
status_msg += f" | β οΈ {tts_status}" |
|
|
|
|
|
stats_display = create_stats_display() |
|
|
|
|
|
return new_history, audio_response, "", status_msg, stats_display, None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing audio query: {e}") |
|
|
error_msg = f"β Error: {str(e)[:100]}..." |
|
|
return chat_history, None, "", error_msg, create_stats_display(), None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def setup_fixed_event_handlers(components): |
|
|
"""Setup event handlers with the fixed functions""" |
|
|
|
|
|
|
|
|
components['upload_btn'].click( |
|
|
fn=lambda file, doc_type, query: handle_document_upload(file, doc_type, query), |
|
|
inputs=[components['doc_file'], components['doc_type'], components['doc_query']], |
|
|
outputs=[components['upload_status']] |
|
|
) |
|
|
|
|
|
|
|
|
components['both_btn'].click( |
|
|
fn=process_with_audio, |
|
|
inputs=[components['user_input'], components['audio_input'], components['voice_dropdown'], components['chat_history']], |
|
|
outputs=[components['chatbot'], components['audio_output'], components['user_input'], components['status_output'], components['stats_display'], components['audio_input']]) |
|
|
|
|
|
|
|
|
components['start_voice_btn'].click( |
|
|
fn=start_voice_chat, |
|
|
outputs=[components['voice_status']]) |
|
|
components['stop_voice_btn'].click( |
|
|
fn=stop_voice_chat, |
|
|
outputs=[components['voice_status']]) |
|
|
global voice_handler |
|
|
voice_handler = GeminiHandler() |
|
|
components['webrtc_audio'].stream( |
|
|
voice_handler, |
|
|
inputs=[components['webrtc_audio']], |
|
|
outputs=[components['webrtc_audio']], |
|
|
time_limit=180 if get_space() else None, |
|
|
concurrency_limit=2 if get_space() else None, |
|
|
) |
|
|
|
|
|
|
|
|
def safe_audio_processing(text_response: str, voice: str) -> Optional[tuple]: |
|
|
"""Safe audio processing with fallback""" |
|
|
try: |
|
|
|
|
|
if 'generate_tts_response' in globals(): |
|
|
return generate_tts_response(text_response, voice) |
|
|
else: |
|
|
logger.warning("TTS function not available, skipping audio generation") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Audio processing failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def check_required_imports(): |
|
|
"""Check if all required modules are available""" |
|
|
missing_modules = [] |
|
|
|
|
|
try: |
|
|
from docs_utils import old_doc_ingestion, old_doc_qa, user_doc_ingest, user_doc_qa, rag_dom_ingest, rag_dom_qa |
|
|
except ImportError as e: |
|
|
missing_modules.append(f"docs_utils functions: {e}") |
|
|
|
|
|
try: |
|
|
from audio_utils import generate_tts_response, get_transcription_or_text |
|
|
except ImportError as e: |
|
|
missing_modules.append(f"audio_utils functions: {e}") |
|
|
|
|
|
try: |
|
|
from pipeQuery import process_query, clean_pipeline_result |
|
|
except ImportError as e: |
|
|
missing_modules.append(f"pipeQuery functions: {e}") |
|
|
|
|
|
if missing_modules: |
|
|
logger.warning(f"Missing modules: {missing_modules}") |
|
|
return False, missing_modules |
|
|
|
|
|
return True, [] |
|
|
|
|
|
|
|
|
def safe_audio_processing(text_response: str, voice: str) -> Optional[tuple]: |
|
|
"""Safe audio processing with fallback""" |
|
|
try: |
|
|
|
|
|
if 'generate_tts_response' in globals(): |
|
|
return generate_tts_response(text_response, voice) |
|
|
else: |
|
|
logger.warning("TTS function not available, skipping audio generation") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Audio processing failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def handle_document_upload(file, doc_type: str, query: str = "") -> str: |
|
|
"""Handle document upload with proper function mapping to docs_utils.py""" |
|
|
if not file: |
|
|
return "β οΈ Please select a file to upload." |
|
|
|
|
|
if doc_type == "None" or not doc_type: |
|
|
return "β οΈ Please select a document type." |
|
|
|
|
|
try: |
|
|
logger.info(f"Processing file upload: {file.name} as {doc_type}") |
|
|
|
|
|
|
|
|
if doc_type == "old_documents": |
|
|
from docs_utils import old_doc_ingestion, old_doc_qa |
|
|
|
|
|
ingest_result = old_doc_ingestion(file.name) |
|
|
demo_state.document_stats["old_documents"] += 1 |
|
|
|
|
|
|
|
|
if query and query.strip(): |
|
|
answer = old_doc_qa(query) |
|
|
return f"π Old Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" |
|
|
return f"π Old Document ingested: {ingest_result}" |
|
|
|
|
|
elif doc_type == "user_specific": |
|
|
from docs_utils import user_doc_ingest, user_doc_qa |
|
|
|
|
|
ingest_result = user_doc_ingest(file.name) |
|
|
demo_state.document_stats["user_specific"] += 1 |
|
|
|
|
|
|
|
|
if query and query.strip(): |
|
|
answer = user_doc_qa(query) |
|
|
return f"π€ User Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" |
|
|
return f"π€ User-specific document ingested: {ingest_result}" |
|
|
|
|
|
elif doc_type == "domain_documents": |
|
|
from docs_utils import rag_dom_ingest, rag_dom_qa |
|
|
|
|
|
ingest_result = rag_dom_ingest(file.name) |
|
|
demo_state.document_stats["domain_documents"] += 1 |
|
|
|
|
|
|
|
|
if query and query.strip(): |
|
|
answer = rag_dom_qa(query) |
|
|
return f"π₯ Domain Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" |
|
|
return f"π₯ Domain document ingested: {ingest_result}" |
|
|
|
|
|
elif doc_type == "new_documents": |
|
|
|
|
|
from rag_steps import ingest_file |
|
|
ingest_result = ingest_file(file.name) |
|
|
demo_state.document_stats["new_documents"] += 1 |
|
|
|
|
|
|
|
|
if query and query.strip(): |
|
|
response = process_query(query) |
|
|
cleaned_response = clean_pipeline_result(response[0] if isinstance(response, tuple) else str(response)) |
|
|
return f"π New Document processed: {ingest_result}\n\nQuery Answer:\n{cleaned_response}" |
|
|
return f"π New document ingested: {ingest_result}" |
|
|
|
|
|
else: |
|
|
return f"β Unknown document type: {doc_type}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"File upload error: {e}") |
|
|
return f"β File upload failed: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_old_doc_query(query: str, doc_file) -> str: |
|
|
"""Process query for old documents specifically""" |
|
|
global demo_state |
|
|
if doc_file: |
|
|
try: |
|
|
upload_dir = os.path.join("assets", "uploaded_docs") |
|
|
os.makedirs(upload_dir, exist_ok=True) |
|
|
safe_filename = os.path.basename(doc_file.name) |
|
|
save_path = os.path.join(upload_dir, safe_filename) |
|
|
shutil.copy2(doc_file.name, save_path) |
|
|
demo_state.last_uploaded_old_doc = save_path |
|
|
return process_old_document(save_path, query) |
|
|
except Exception as e: |
|
|
return f"β Error processing file: {str(e)}" |
|
|
|
|
|
else: |
|
|
if demo_state.last_uploaded_old_doc and os.path.exists(demo_state.last_uploaded_old_doc): |
|
|
try: |
|
|
answer = old_doc_qa(query) |
|
|
return f"[Using previously uploaded document: {os.path.basename(demo_state.last_uploaded_old_doc)}]\n\n{answer}" |
|
|
except Exception as e: |
|
|
return f"β Error querying document: {str(e)}" |
|
|
else: |
|
|
return "β No document uploaded. Please upload an old document to proceed." |
|
|
|
|
|
def clear_chat(): |
|
|
"""Clear chat history and reset stats""" |
|
|
demo_state.__init__() |
|
|
return [], None, "π Chat cleared and stats reset!", create_stats_display() |
|
|
|
|
|
|
|
|
def create_main_chat_tab(): |
|
|
"""Create the main chat interface tab""" |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=3): |
|
|
|
|
|
with gr.Group(): |
|
|
chatbot = gr.Chatbot( |
|
|
type='messages', |
|
|
label="π¬ Conversation with Wisal", |
|
|
height=500, |
|
|
show_copy_button=True, |
|
|
elem_classes="chatbot-container", |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
status_output = gr.Textbox( |
|
|
label="π System Status", |
|
|
interactive=False, |
|
|
max_lines=2, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
with gr.Column(): |
|
|
audio_output = gr.Audio( |
|
|
label="π΅ Audio Response", |
|
|
interactive=False, |
|
|
show_download_button=True, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
with gr.Row(): |
|
|
user_input = gr.Textbox( |
|
|
placeholder="Ask me anything about autism...", |
|
|
label="π Your Message", |
|
|
lines=2, |
|
|
scale=3, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
audio_input = gr.Audio( |
|
|
sources=["microphone", "upload"], |
|
|
type="filepath", |
|
|
label="π€ Voice Input", |
|
|
scale=1, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
voice_dropdown = gr.Dropdown( |
|
|
label="ποΈ Voice Selection", |
|
|
choices=["Kore", "Puck", "Zephyr", "Leda", "Fenrir", "Charon", "Orus", "Aoede", "Callirrhoe"], |
|
|
value="Kore", |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
text_btn = gr.Button( |
|
|
"π¬ Text Response", |
|
|
variant="secondary", |
|
|
elem_classes="enhanced-button secondary-button" |
|
|
) |
|
|
audio_btn = gr.Button( |
|
|
"π΅ Audio Response", |
|
|
variant="secondary", |
|
|
elem_classes="enhanced-button secondary-button" |
|
|
) |
|
|
both_btn = gr.Button( |
|
|
"π Text + Audio", |
|
|
variant="primary", |
|
|
elem_classes="enhanced-button primary-button" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
clear_btn = gr.Button( |
|
|
"ποΈ Clear Chat", |
|
|
variant="stop", |
|
|
elem_classes="enhanced-button danger-button" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
stats_display = gr.HTML( |
|
|
create_stats_display(), |
|
|
label="π Statistics" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
|
|
|
gr.Markdown("### π Document Upload") |
|
|
doc_file = gr.File( |
|
|
label="Upload Document", |
|
|
file_types=[".pdf", ".docx", ".txt"], |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
doc_type = gr.Dropdown( |
|
|
label="Document Type", |
|
|
choices=[ |
|
|
("π Old Documents", "old_documents"), |
|
|
("π€ User-Specific Documents", "user_specific"), |
|
|
("π₯ Domain Documents", "domain_documents"), |
|
|
("π New Documents", "new_documents") |
|
|
], |
|
|
value="user_specific", |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
doc_query = gr.Textbox( |
|
|
label="Optional: Ask about this document", |
|
|
placeholder="What does this document say about...", |
|
|
lines=2, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
upload_btn = gr.Button( |
|
|
"π€ Upload & Process", |
|
|
elem_classes="enhanced-button primary-button" |
|
|
) |
|
|
upload_status = gr.Textbox( |
|
|
label="Upload Status", |
|
|
interactive=False, |
|
|
lines=4, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
doc_info = gr.HTML( |
|
|
""" |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #4facfe; margin-bottom: 1rem;">π Document Types</h4> |
|
|
<div style="font-size: 0.9em; line-height: 1.5;"> |
|
|
<p><strong style="color: #8B4513;">π Old Documents:</strong> Historical documents, legacy materials</p> |
|
|
<p><strong style="color: #4facfe;">π€ User-Specific:</strong> Personal files, user manuals</p> |
|
|
<p><strong style="color: #00f2fe;">π₯ Domain Documents:</strong> Medical papers, research articles</p> |
|
|
<p><strong style="color: #667eea;">π New Documents:</strong> Recent uploads, latest materials</p> |
|
|
</div> |
|
|
</div> |
|
|
""" |
|
|
) |
|
|
|
|
|
upload_btn = gr.Button( |
|
|
"π€ Upload", |
|
|
elem_classes="enhanced-button primary-button" |
|
|
) |
|
|
upload_status = gr.Textbox( |
|
|
label="Upload Status", |
|
|
interactive=False, |
|
|
lines=3, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### β‘ Quick Actions") |
|
|
sample_queries = [ |
|
|
"What is autism?", |
|
|
"Early signs of autism", |
|
|
"Autism therapy options", |
|
|
"Supporting autistic children" |
|
|
] |
|
|
for query in sample_queries: |
|
|
btn = gr.Button( |
|
|
query, |
|
|
elem_classes="enhanced-button secondary-button", |
|
|
size="sm" |
|
|
) |
|
|
btn.click( |
|
|
lambda q=query: q, |
|
|
outputs=[user_input] |
|
|
) |
|
|
|
|
|
return { |
|
|
'chatbot': chatbot, |
|
|
'user_input': user_input, |
|
|
'audio_input': audio_input, |
|
|
'doc_query': doc_query, |
|
|
'voice_dropdown': voice_dropdown, |
|
|
'audio_output': audio_output, |
|
|
'status_output': status_output, |
|
|
'stats_display': stats_display, |
|
|
'doc_file': doc_file, |
|
|
'doc_type': doc_type, |
|
|
'upload_status': upload_status, |
|
|
'text_btn': text_btn, |
|
|
'audio_btn': audio_btn, |
|
|
'both_btn': both_btn, |
|
|
'upload_btn': upload_btn, |
|
|
'clear_btn': clear_btn |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_old_documents_tab(): |
|
|
"""Create the old documents Q&A tab""" |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("## π Old Documents Q&A") |
|
|
gr.Markdown("Upload historical documents, legacy materials, and archived content for specialized querying.") |
|
|
|
|
|
query = gr.Textbox( |
|
|
placeholder="Ask about old documents...", |
|
|
lines=3, |
|
|
label="π Your Question about Old Documents", |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
doc_file = gr.File( |
|
|
label="π Upload Old Document (PDF, DOCX, TXT)", |
|
|
file_types=[".pdf", ".docx", ".txt"], |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
submit_btn = gr.Button( |
|
|
"π Submit Query", |
|
|
variant="primary", |
|
|
elem_classes="enhanced-button primary-button" |
|
|
) |
|
|
clear_old_btn = gr.Button( |
|
|
"ποΈ Clear", |
|
|
variant="secondary", |
|
|
elem_classes="enhanced-button danger-button" |
|
|
) |
|
|
|
|
|
output = gr.Textbox( |
|
|
label="π Answer from Old Documents", |
|
|
lines=12, |
|
|
interactive=False, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
gr.HTML(""" |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #8B4513; margin-bottom: 1rem;">π Old Documents Info</h4> |
|
|
<div style="line-height: 1.6; font-size: 0.9em;"> |
|
|
<p><strong>Purpose:</strong> Process historical and legacy documents that require specialized handling.</p> |
|
|
<p><strong>Best for:</strong></p> |
|
|
<ul style="margin-left: 1rem;"> |
|
|
<li>Archived medical records</li> |
|
|
<li>Historical research papers</li> |
|
|
<li>Legacy documentation</li> |
|
|
<li>Older format materials</li> |
|
|
</ul> |
|
|
<p><strong>Features:</strong></p> |
|
|
<ul style="margin-left: 1rem;"> |
|
|
<li>Specialized processing pipeline</li> |
|
|
<li>Historical context awareness</li> |
|
|
<li>Legacy format support</li> |
|
|
</ul> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
old_doc_history = gr.HTML( |
|
|
""" |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #8B4513; margin-bottom: 1rem;">π Recent Queries</h4> |
|
|
<p style="color: #a0aec0; font-style: italic;">No recent queries yet.</p> |
|
|
</div> |
|
|
""" |
|
|
) |
|
|
|
|
|
return { |
|
|
'query': query, |
|
|
'doc_file': doc_file, |
|
|
'output': output, |
|
|
'submit_btn': submit_btn, |
|
|
'clear_old_btn': clear_old_btn, |
|
|
'old_doc_history': old_doc_history |
|
|
} |
|
|
|
|
|
def create_voice_chat_tab(): |
|
|
"""Create the voice chat tab with WebRTC""" |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("## ποΈ Real-time Voice Chat") |
|
|
gr.Markdown("Experience real-time speech-to-speech conversation with Wisal using advanced voice AI.") |
|
|
|
|
|
|
|
|
webrtc_audio = WebRTC( |
|
|
label="π€ Voice Chat Interface", |
|
|
modality="audio", |
|
|
mode="send-receive", |
|
|
elem_id="voice-chat-source", |
|
|
rtc_configuration=get_cloudflare_turn_credentials_async, |
|
|
icon="https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png", |
|
|
pulse_color="rgb(79, 172, 254)", |
|
|
icon_button_color="rgb(255, 255, 255)", |
|
|
elem_classes="webrtc-container" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
voice_status = gr.Textbox( |
|
|
label="π Voice Chat Status", |
|
|
value="Ready to start voice conversation...", |
|
|
interactive=False, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
start_voice_btn = gr.Button( |
|
|
"π€ Start Voice Chat", |
|
|
variant="primary", |
|
|
elem_classes="enhanced-button primary-button" |
|
|
) |
|
|
stop_voice_btn = gr.Button( |
|
|
"βΉοΈ Stop Voice Chat", |
|
|
variant="secondary", |
|
|
elem_classes="enhanced-button danger-button" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
gr.HTML(""" |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #4facfe; margin-bottom: 1rem;">ποΈ Voice Chat Features</h4> |
|
|
<div style="line-height: 1.6; font-size: 0.9em;"> |
|
|
<p><strong>Real-time Features:</strong></p> |
|
|
<ul style="margin-left: 1rem;"> |
|
|
<li>Live speech recognition</li> |
|
|
<li>Instant AI responses</li> |
|
|
<li>Natural conversation flow</li> |
|
|
<li>Low-latency processing</li> |
|
|
</ul> |
|
|
<p><strong>Voice Quality:</strong></p> |
|
|
<ul style="margin-left: 1rem;"> |
|
|
<li>High-quality audio</li> |
|
|
<li>Noise cancellation</li> |
|
|
<li>Multiple voice options</li> |
|
|
<li>Emotional tone support</li> |
|
|
</ul> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #4facfe; margin-bottom: 1rem;">π΅ Audio Visualizer</h4> |
|
|
<div class="audio-visualizer"> |
|
|
<div class="audio-bar"></div> |
|
|
<div class="audio-bar"></div> |
|
|
<div class="audio-bar"></div> |
|
|
<div class="audio-bar"></div> |
|
|
<div class="audio-bar"></div> |
|
|
<div class="audio-bar"></div> |
|
|
<div class="audio-bar"></div> |
|
|
<div class="audio-bar"></div> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
return { |
|
|
'webrtc_audio': webrtc_audio, |
|
|
'voice_status': voice_status, |
|
|
'start_voice_btn': start_voice_btn, |
|
|
'stop_voice_btn': stop_voice_btn |
|
|
} |
|
|
|
|
|
|
|
|
def create_document_management_tab(): |
|
|
"""Create the enhanced document management tab""" |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("## π Advanced Document Management") |
|
|
gr.Markdown("Upload and manage different types of documents with specialized processing pipelines.") |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### π€ Upload Documents") |
|
|
|
|
|
upload_file = gr.File( |
|
|
label="π Select Document", |
|
|
file_types=[".pdf", ".docx", ".txt", ".md"], |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
upload_doc_type = gr.Dropdown( |
|
|
label="π Document Category", |
|
|
choices=[(v["label"], k) for k, v in DOCUMENT_TYPES.items()], |
|
|
value="user_specific", |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
upload_query = gr.Textbox( |
|
|
placeholder="Optional: Ask a question about this document...", |
|
|
label="β Optional Query", |
|
|
lines=2, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
process_doc_btn = gr.Button( |
|
|
"π Process Document", |
|
|
variant="primary", |
|
|
elem_classes="enhanced-button primary-button" |
|
|
) |
|
|
|
|
|
doc_result = gr.Textbox( |
|
|
label="π Processing Result", |
|
|
lines=8, |
|
|
interactive=False, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### π¦ Batch Processing") |
|
|
|
|
|
batch_files = gr.File( |
|
|
label="π Upload Multiple Documents", |
|
|
file_count="multiple", |
|
|
file_types=[".pdf", ".docx", ".txt"], |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
batch_type = gr.Dropdown( |
|
|
label="π Batch Category", |
|
|
choices=[(v["label"], k) for k, v in DOCUMENT_TYPES.items()], |
|
|
value="domain_documents", |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
batch_process_btn = gr.Button( |
|
|
"β‘ Process Batch", |
|
|
variant="secondary", |
|
|
elem_classes="enhanced-button secondary-button" |
|
|
) |
|
|
|
|
|
batch_result = gr.Textbox( |
|
|
label="π Batch Results", |
|
|
lines=6, |
|
|
interactive=False, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
doc_stats_display = gr.HTML( |
|
|
create_stats_display(), |
|
|
label="π Document Statistics" |
|
|
) |
|
|
|
|
|
|
|
|
gr.HTML(f""" |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #4facfe; margin-bottom: 1rem;">π Document Categories</h4> |
|
|
<div style="font-size: 0.85em; line-height: 1.4;"> |
|
|
{"".join([f''' |
|
|
<div style="margin-bottom: 1rem; padding: 0.5rem; border-left: 3px solid {info["color"]}; background: rgba(255,255,255,0.02);"> |
|
|
<strong style="color: {info["color"]};">{info["label"]}</strong><br> |
|
|
<span style="color: #a0aec0;">{info["description"]}</span> |
|
|
</div> |
|
|
''' for info in DOCUMENT_TYPES.values()])} |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<div class="stats-panel"> |
|
|
<h4 style="color: #4facfe; margin-bottom: 1rem;">π‘ Processing Tips</h4> |
|
|
<div style="font-size: 0.9em; line-height: 1.5; color: #a0aec0;"> |
|
|
<p><strong>File Formats:</strong> PDF, DOCX, TXT, MD</p> |
|
|
<p><strong>Size Limit:</strong> Up to 50MB per file</p> |
|
|
<p><strong>Batch Processing:</strong> Up to 10 files at once</p> |
|
|
<p><strong>Processing Time:</strong> 10-30 seconds per document</p> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
return { |
|
|
'upload_file': upload_file, |
|
|
'upload_doc_type': upload_doc_type, |
|
|
'upload_query': upload_query, |
|
|
'doc_result': doc_result, |
|
|
'batch_files': batch_files, |
|
|
'batch_type': batch_type, |
|
|
'batch_result': batch_result, |
|
|
'process_doc_btn': process_doc_btn, |
|
|
'batch_process_btn': batch_process_btn, |
|
|
'doc_stats_display': doc_stats_display |
|
|
} |
|
|
|
|
|
def process_batch_documents(files, doc_type: str) -> str: |
|
|
"""Process multiple documents in batch""" |
|
|
if not files: |
|
|
return "β οΈ No files selected for batch processing." |
|
|
|
|
|
if doc_type == "None" or not doc_type: |
|
|
return "β οΈ Please select a document type for batch processing." |
|
|
|
|
|
results = [] |
|
|
successful = 0 |
|
|
failed = 0 |
|
|
|
|
|
for file in files: |
|
|
try: |
|
|
result = handle_document_upload(file, doc_type) |
|
|
if result.startswith("β"): |
|
|
failed += 1 |
|
|
results.append(f"β {os.path.basename(file.name)}: Failed") |
|
|
else: |
|
|
successful += 1 |
|
|
results.append(f"β
{os.path.basename(file.name)}: Success") |
|
|
except Exception as e: |
|
|
failed += 1 |
|
|
results.append(f"β {os.path.basename(file.name)}: {str(e)[:50]}...") |
|
|
|
|
|
summary = f"π¦ Batch Processing Complete:\nβ
Successful: {successful}\nβ Failed: {failed}\n\n" |
|
|
summary += "\n".join(results) |
|
|
|
|
|
return summary |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
specific_utils.py - Enhanced Document Processing with Gradio Integration |
|
|
|
|
|
This module integrates with gradio_utils.py to provide comprehensive document processing |
|
|
and audio handling for the Wisal application. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import gradio as gr |
|
|
from typing import Tuple, Optional, Dict, Any, List |
|
|
import shutil |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
from gradio_utils import ( |
|
|
DOCUMENT_TYPES, |
|
|
handle_document_upload, |
|
|
process_with_audio, |
|
|
process_with_stats, |
|
|
create_stats_display, |
|
|
demo_state, |
|
|
safe_audio_processing, |
|
|
process_old_doc_query, |
|
|
clear_chat, |
|
|
ENHANCED_CSS |
|
|
) |
|
|
|
|
|
|
|
|
from docs_utils import ( |
|
|
old_doc_ingestion, old_doc_qa, |
|
|
user_doc_ingest, user_doc_qa, |
|
|
rag_dom_ingest, rag_dom_qa |
|
|
) |
|
|
|
|
|
|
|
|
from audio_utils import get_transcription_or_text, generate_tts_response |
|
|
|
|
|
|
|
|
from pipeQuery import process_query, clean_pipeline_result |
|
|
|
|
|
from logger.custom_logger import CustomLoggerTracker |
|
|
|
|
|
|
|
|
custom_log = CustomLoggerTracker() |
|
|
logger = custom_log.get_logger("specific_utils") |
|
|
logger.info("Logger initialized for specific utilities module") |
|
|
|
|
|
def get_all_document_choices() -> List[Tuple[str, str]]: |
|
|
"""Get all 4 document type choices for dropdown""" |
|
|
return [ |
|
|
("π Old Documents", "old_documents"), |
|
|
("π€ User-Specific Documents", "user_specific"), |
|
|
("π₯ Domain Documents", "domain_documents"), |
|
|
("π New Documents", "new_documents") |
|
|
] |
|
|
|
|
|
def enhanced_document_upload_handler(file, doc_type: str, query: str = "") -> str: |
|
|
""" |
|
|
Enhanced document upload handler that properly routes to correct functions |
|
|
""" |
|
|
if not file: |
|
|
return "β οΈ Please select a file to upload." |
|
|
|
|
|
if not doc_type or doc_type == "None": |
|
|
return "β οΈ Please select a document type." |
|
|
|
|
|
try: |
|
|
logger.info(f"Processing document upload: {file.name} as {doc_type}") |
|
|
|
|
|
|
|
|
if doc_type == "old_documents": |
|
|
|
|
|
demo_state.document_stats["old_documents"] += 1 |
|
|
ingest_result = old_doc_ingestion(file.name) |
|
|
|
|
|
if query and query.strip(): |
|
|
answer = old_doc_qa(query) |
|
|
return f"π Old Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" |
|
|
return f"π Old Document ingested: {ingest_result}" |
|
|
|
|
|
elif doc_type == "user_specific": |
|
|
|
|
|
demo_state.document_stats["user_specific"] += 1 |
|
|
ingest_result = user_doc_ingest(file.name) |
|
|
|
|
|
if query and query.strip(): |
|
|
answer = user_doc_qa(query) |
|
|
return f"π€ User Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" |
|
|
return f"π€ User-specific document ingested: {ingest_result}" |
|
|
|
|
|
elif doc_type == "domain_documents": |
|
|
|
|
|
demo_state.document_stats["domain_documents"] += 1 |
|
|
ingest_result = rag_dom_ingest(file.name) |
|
|
|
|
|
if query and query.strip(): |
|
|
answer = rag_dom_qa(query) |
|
|
return f"π₯ Domain Document processed: {ingest_result}\n\nQuery Answer:\n{answer}" |
|
|
return f"π₯ Domain document ingested: {ingest_result}" |
|
|
|
|
|
elif doc_type == "new_documents": |
|
|
|
|
|
from rag_steps import ingest_file |
|
|
demo_state.document_stats["new_documents"] += 1 |
|
|
ingest_result = ingest_file(file.name) |
|
|
|
|
|
if query and query.strip(): |
|
|
response = process_query(query) |
|
|
cleaned_response = clean_pipeline_result( |
|
|
response[0] if isinstance(response, tuple) else str(response) |
|
|
) |
|
|
return f"π New Document processed: {ingest_result}\n\nQuery Answer:\n{cleaned_response}" |
|
|
return f"π New document ingested: {ingest_result}" |
|
|
|
|
|
else: |
|
|
return f"β Unknown document type: {doc_type}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Document upload error: {e}") |
|
|
return f"β Document processing failed: {str(e)}" |
|
|
|
|
|
def enhanced_audio_transcription(audio_file) -> Tuple[str, str]: |
|
|
""" |
|
|
Enhanced audio transcription with better error handling |
|
|
""" |
|
|
if not audio_file: |
|
|
return "", "No audio file provided" |
|
|
|
|
|
try: |
|
|
logger.info(f"Processing audio transcription...") |
|
|
|
|
|
|
|
|
transcribed_text, status = get_transcription_or_text("", audio_file) |
|
|
|
|
|
if not transcribed_text or transcribed_text.startswith("[ERROR]"): |
|
|
logger.warning(f"Transcription failed: {status}") |
|
|
return "", f"β Audio transcription failed: {status}" |
|
|
|
|
|
logger.info(f"Transcription successful: {transcribed_text[:50]}...") |
|
|
return transcribed_text.strip(), "β
Audio transcribed successfully" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Audio transcription error: {e}") |
|
|
return "", f"β Transcription error: {str(e)}" |
|
|
|
|
|
def process_text_with_audio_support(user_input: str, audio_input, chat_history: List) -> Tuple[List, str, str, str]: |
|
|
""" |
|
|
Process input (text or audio) and return only text response with chat history |
|
|
""" |
|
|
try: |
|
|
|
|
|
query = user_input.strip() if user_input else "" |
|
|
|
|
|
|
|
|
if audio_input and not query: |
|
|
transcribed_text, transcription_status = enhanced_audio_transcription(audio_input) |
|
|
if transcribed_text: |
|
|
query = transcribed_text |
|
|
else: |
|
|
return chat_history, "", f"β {transcription_status}", create_stats_display() |
|
|
|
|
|
if not query: |
|
|
return chat_history, "", "β οΈ Please provide text or audio input.", create_stats_display() |
|
|
|
|
|
|
|
|
new_history, _, cleared_input, status_msg, stats_display, _ = process_with_stats(query, None, chat_history) |
|
|
|
|
|
return new_history, cleared_input, status_msg, stats_display |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Text processing error: {e}") |
|
|
return chat_history, "", f"β Processing error: {str(e)}", create_stats_display() |
|
|
|
|
|
def process_audio_only_response(user_input: str, audio_input, voice_dropdown: str, chat_history: List) -> Tuple[Optional[Any], str, str, str]: |
|
|
""" |
|
|
Process input and return only audio response |
|
|
""" |
|
|
try: |
|
|
|
|
|
query = user_input.strip() if user_input else "" |
|
|
|
|
|
|
|
|
if audio_input and not query: |
|
|
transcribed_text, transcription_status = enhanced_audio_transcription(audio_input) |
|
|
if transcribed_text: |
|
|
query = transcribed_text |
|
|
else: |
|
|
return None, "", f"β {transcription_status}", create_stats_display() |
|
|
|
|
|
if not query: |
|
|
return None, "", "β οΈ Please provide text or audio input.", create_stats_display() |
|
|
|
|
|
|
|
|
response = process_query(query) |
|
|
if isinstance(response, tuple): |
|
|
text_response = clean_pipeline_result(response[0] if response[0] else response[1]) |
|
|
else: |
|
|
text_response = clean_pipeline_result(str(response)) |
|
|
|
|
|
|
|
|
audio_response = safe_audio_processing(text_response, voice_dropdown) |
|
|
|
|
|
status_msg = "β
Audio response generated successfully" if audio_response else "β οΈ Audio generation failed" |
|
|
|
|
|
return audio_response, "", status_msg, create_stats_display() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Audio processing error: {e}") |
|
|
return None, "", f"β Audio processing error: {str(e)}", create_stats_display() |
|
|
|
|
|
def process_both_text_and_audio_response( |
|
|
user_input: str, |
|
|
audio_input, |
|
|
voice_dropdown: str, |
|
|
chat_history: List |
|
|
) -> Tuple[List, Optional[Any], str, str, str]: |
|
|
""" |
|
|
Process input and return both text (in chat) and audio response |
|
|
""" |
|
|
try: |
|
|
|
|
|
return process_with_audio(user_input, audio_input, voice_dropdown, chat_history) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Combined processing error: {e}") |
|
|
return chat_history, None, "", f"β Processing error: {str(e)}", create_stats_display() |
|
|
|
|
|
def create_document_info_panel() -> str: |
|
|
"""Create HTML info panel for document types""" |
|
|
return """ |
|
|
<div style="padding: 1rem; background: rgba(255,255,255,0.05); border-radius: 10px; margin: 0.5rem 0; border: 1px solid rgba(255,255,255,0.1);"> |
|
|
<h4 style="color: #4facfe; margin: 0 0 1rem 0;">π Document Processing Options</h4> |
|
|
<div style="font-size: 0.85em; line-height: 1.4;"> |
|
|
<div style="margin-bottom: 0.8rem; padding: 0.5rem; border-left: 3px solid #8B4513; background: rgba(139, 69, 19, 0.1);"> |
|
|
<strong style="color: #8B4513;">π Old Documents</strong><br> |
|
|
<span style="color: #a0aec0;">Historical documents, legacy materials, archived content</span> |
|
|
</div> |
|
|
<div style="margin-bottom: 0.8rem; padding: 0.5rem; border-left: 3px solid #4facfe; background: rgba(79, 172, 254, 0.1);"> |
|
|
<strong style="color: #4facfe;">π€ User-Specific Documents</strong><br> |
|
|
<span style="color: #a0aec0;">Personal files, user manuals, individual documents</span> |
|
|
</div> |
|
|
<div style="margin-bottom: 0.8rem; padding: 0.5rem; border-left: 3px solid #00f2fe; background: rgba(0, 242, 254, 0.1);"> |
|
|
<strong style="color: #00f2fe;">π₯ Domain Documents</strong><br> |
|
|
<span style="color: #a0aec0;">Medical papers, research articles, domain knowledge</span> |
|
|
</div> |
|
|
<div style="padding: 0.5rem; border-left: 3px solid #667eea; background: rgba(102, 126, 234, 0.1);"> |
|
|
<strong style="color: #667eea;">π New Documents</strong><br> |
|
|
<span style="color: #a0aec0;">Recent uploads, latest materials, standard processing</span> |
|
|
</div> |
|
|
</div> |
|
|
<div style="margin-top: 1rem; padding-top: 1rem; border-top: 1px solid rgba(255,255,255,0.1); font-size: 0.8em; color: #a0aec0;"> |
|
|
π‘ <strong>Tip:</strong> Each document type uses specialized processing optimized for that content category. |
|
|
</div> |
|
|
</div> |
|
|
""" |
|
|
|
|
|
def get_enhanced_css() -> str: |
|
|
"""Get enhanced CSS from gradio_utils""" |
|
|
return ENHANCED_CSS |
|
|
|
|
|
def create_enhanced_file_upload_interface(): |
|
|
"""Create an enhanced file upload interface with all 4 document types""" |
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### π Enhanced Document Upload") |
|
|
|
|
|
doc_file = gr.File( |
|
|
label="π Upload Document (PDF, DOCX, TXT)", |
|
|
file_types=[".pdf", ".docx", ".txt"], |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
doc_type = gr.Dropdown( |
|
|
label="π Document Type", |
|
|
choices=get_all_document_choices(), |
|
|
value="user_specific", |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
doc_query = gr.Textbox( |
|
|
label="π Optional: Ask about this document", |
|
|
placeholder="What does this document say about...", |
|
|
lines=2, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
upload_btn = gr.Button( |
|
|
"π€ Upload & Process", |
|
|
variant="primary", |
|
|
elem_classes="enhanced-button primary-button" |
|
|
) |
|
|
|
|
|
upload_status = gr.Textbox( |
|
|
label="π Upload Status", |
|
|
interactive=False, |
|
|
lines=5, |
|
|
elem_classes="input-container" |
|
|
) |
|
|
|
|
|
|
|
|
doc_info = gr.HTML(create_document_info_panel()) |
|
|
|
|
|
return { |
|
|
'doc_file': doc_file, |
|
|
'doc_type': doc_type, |
|
|
'doc_query': doc_query, |
|
|
'upload_btn': upload_btn, |
|
|
'upload_status': upload_status, |
|
|
'doc_info': doc_info |
|
|
} |
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
'get_all_document_choices', |
|
|
'enhanced_document_upload_handler', |
|
|
'enhanced_audio_transcription', |
|
|
'process_text_with_audio_support', |
|
|
'process_audio_only_response', |
|
|
'process_both_text_and_audio_response', |
|
|
'create_document_info_panel', |
|
|
'create_enhanced_file_upload_interface', |
|
|
'get_enhanced_css', |
|
|
'ENHANCED_CSS', |
|
|
'demo_state' |
|
|
] |
|
|
|