|
|
from logger.custom_logger import CustomLoggerTracker
|
|
|
from dotenv import load_dotenv
|
|
|
from docs_utils import *
|
|
|
from audio_utils import transcribe_audio
|
|
|
from pipeQuery import process_query
|
|
|
import os
|
|
|
import time
|
|
|
from typing import Dict, List, Tuple, Optional, Any
|
|
|
from configs import load_yaml_config
|
|
|
|
|
|
|
|
|
config = load_yaml_config("config.yaml")
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
custom_log = CustomLoggerTracker()
|
|
|
logger = custom_log.get_logger("utils")
|
|
|
logger.info("Logger initialized for utils/functions module")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
env = os.getenv("ENVIRONMENT", "production")
|
|
|
SESSION_ID = "default"
|
|
|
pending_clarifications: Dict[str, str] = {}
|
|
|
SILICONFLOW_API_KEY = os.getenv("SILICONFLOW_API_KEY", "")
|
|
|
SILICONFLOW_URL = os.getenv("SILICONFLOW_URL", "").strip()
|
|
|
SILICONFLOW_CHAT_URL = os.getenv(
|
|
|
"SILICONFLOW_CHAT_URL", "https://api.siliconflow.com/v1/chat/completions").strip()
|
|
|
|
|
|
|
|
|
VALID_DOC_TYPES = {
|
|
|
"Knowledge Document": "knowledge",
|
|
|
"User-Specific Document": "user_specific",
|
|
|
"Old Document": "old",
|
|
|
"New Document": "new",
|
|
|
"None": None
|
|
|
}
|
|
|
|
|
|
if not SILICONFLOW_API_KEY:
|
|
|
logger.warning("SILICONFLOW_API_KEY is not set. LLM/Reranker calls may fail.")
|
|
|
if not SILICONFLOW_URL:
|
|
|
logger.warning("SILICONFLOW_URL is not set. OpenAI client base_url will not work.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_document_type(doc_type: str) -> bool:
|
|
|
return doc_type in VALID_DOC_TYPES
|
|
|
|
|
|
def get_upload_directory() -> str:
|
|
|
upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs")
|
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
return upload_dir
|
|
|
|
|
|
def save_uploaded_file(doc_file: Any, filename: Optional[str] = None) -> str:
|
|
|
if doc_file is None:
|
|
|
raise ValueError("Document file cannot be None")
|
|
|
|
|
|
if filename:
|
|
|
safe_filename = os.path.basename(filename)
|
|
|
else:
|
|
|
safe_filename = os.path.basename(getattr(doc_file, 'name', 'unknown_file'))
|
|
|
|
|
|
if not safe_filename or safe_filename == 'unknown_file':
|
|
|
safe_filename = f"document_{int(time.time())}"
|
|
|
|
|
|
upload_dir = get_upload_directory()
|
|
|
save_path = os.path.join(upload_dir, safe_filename)
|
|
|
|
|
|
logger.info(f"Saving document to: {save_path}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
if hasattr(doc_file, 'read'):
|
|
|
file_bytes = doc_file.read()
|
|
|
else:
|
|
|
with open(str(doc_file), 'rb') as f:
|
|
|
file_bytes = f.read()
|
|
|
|
|
|
|
|
|
if not file_bytes:
|
|
|
raise ValueError("File appears to be empty")
|
|
|
|
|
|
with open(save_path, "wb") as f:
|
|
|
f.write(file_bytes)
|
|
|
|
|
|
logger.info(f"Successfully saved uploaded file to {save_path}")
|
|
|
return save_path
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error saving file: {e}")
|
|
|
raise IOError(f"Failed to save file: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def process_document_by_type(query: str, save_path: str, doc_type: str) -> str:
|
|
|
if not validate_document_type(doc_type):
|
|
|
raise ValueError(f"Invalid document type: {doc_type}")
|
|
|
try:
|
|
|
if doc_type == "Knowledge Document":
|
|
|
logger.info("Processing as Knowledge Document")
|
|
|
status = rag_dom_ingest(save_path)
|
|
|
answer = rag_dom_qa(query)
|
|
|
return f"[Knowledge Document Uploaded]\n{status}\n\n{answer}"
|
|
|
|
|
|
elif doc_type == "User-Specific Document":
|
|
|
logger.info("Processing as User-Specific Document")
|
|
|
status = user_doc_ingest(save_path)
|
|
|
answer = user_doc_qa(query)
|
|
|
return f"[User-Specific Document Uploaded]\n{status}\n\n{answer}"
|
|
|
|
|
|
elif doc_type == "Old Document":
|
|
|
logger.info("Processing as Old Document")
|
|
|
status = old_doc_ingestion(save_path)
|
|
|
answer = old_doc_qa(query)
|
|
|
return f"[Old Document Uploaded]\n{status}\n\n{answer}"
|
|
|
|
|
|
elif doc_type == "New Document":
|
|
|
logger.info("Processing as New Document")
|
|
|
status = user_doc_ingest(save_path)
|
|
|
answer = user_doc_qa(query)
|
|
|
return f"[New Document Uploaded]\n{status}\n\n{answer}"
|
|
|
|
|
|
else:
|
|
|
raise ValueError(f"Unsupported document type: {doc_type}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing document type {doc_type}: {e}")
|
|
|
raise
|
|
|
|
|
|
def validate_query(query: str) -> bool:
|
|
|
"""Validate user query"""
|
|
|
return query and query.strip() and len(query.strip()) > 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main_pipeline_interface(query: str) -> str:
|
|
|
if not validate_query(query):
|
|
|
raise ValueError("Query cannot be empty")
|
|
|
logger.info("Main pipeline interface called")
|
|
|
try:
|
|
|
return process_query(query, first_turn=True)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in main pipeline: {e}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
def main_pipeline_with_doc_and_history(
|
|
|
query: str,
|
|
|
doc_file: Any,
|
|
|
doc_type: str,
|
|
|
history: str
|
|
|
) -> Tuple[str, str]:
|
|
|
if not validate_query(query):
|
|
|
return "Please provide a valid query.", history
|
|
|
logger.info("Pipeline with doc and history called")
|
|
|
try:
|
|
|
response = main_pipeline_with_doc(query, doc_file, doc_type)
|
|
|
updated_history = f"{history}\nUser: {query}\nWisal: {response}\n"
|
|
|
return response, updated_history
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in pipeline with doc and history: {e}")
|
|
|
error_response = f"Sorry, I encountered an error: {str(e)}"
|
|
|
updated_history = f"{history}\nUser: {query}\nWisal: {error_response}\n"
|
|
|
return error_response, updated_history
|
|
|
|
|
|
|
|
|
|
|
|
def main_pipeline_with_doc(query: str, doc_file: Any, doc_type: str) -> str:
|
|
|
if not validate_query(query):
|
|
|
return "Please provide a valid query."
|
|
|
logger.info(f"Pipeline with doc called - doc_type: {doc_type}")
|
|
|
|
|
|
if doc_file is None or doc_type == "None":
|
|
|
logger.info("No document provided, using main pipeline")
|
|
|
try:
|
|
|
return process_query(query, first_turn=True)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in main pipeline: {e}")
|
|
|
return f"Sorry, I encountered an error processing your query: {str(e)}"
|
|
|
|
|
|
if not validate_document_type(doc_type):
|
|
|
logger.warning(f"Invalid document type: {doc_type}")
|
|
|
return f"Invalid document type: {doc_type}. Valid types are: {', '.join(VALID_DOC_TYPES.keys())}"
|
|
|
try:
|
|
|
save_path = save_uploaded_file(doc_file)
|
|
|
return process_document_by_type(query, save_path, doc_type)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in document processing: {e}")
|
|
|
return f"Sorry, I encountered an error processing your document: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
def pipeline_with_history(
|
|
|
message: str,
|
|
|
doc_file: Any,
|
|
|
doc_type: str,
|
|
|
history: List[List[str]]
|
|
|
) -> Tuple[List[List[str]], str]:
|
|
|
logger.info("Pipeline with history called")
|
|
|
history = history or []
|
|
|
if not validate_query(message):
|
|
|
logger.warning("Empty message received")
|
|
|
error_msg = "Please provide a valid message."
|
|
|
history.append([message or "", error_msg])
|
|
|
return history, ""
|
|
|
try:
|
|
|
response = main_pipeline_with_doc(message, doc_file, doc_type)
|
|
|
history.append([message, response])
|
|
|
logger.info("Successfully processed message with history")
|
|
|
return history, ""
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in pipeline with history: {e}")
|
|
|
error_response = f"Sorry, I encountered an error: {str(e)}"
|
|
|
history.append([message, error_response])
|
|
|
return history, ""
|
|
|
|
|
|
|
|
|
|
|
|
def unified_handler(
|
|
|
user_text: Optional[str],
|
|
|
audio_file: Any,
|
|
|
chat_history: List[Tuple[str, str]]
|
|
|
) -> Tuple[List[Tuple[str, str]], str, Any]:
|
|
|
logger.info("Unified handler called")
|
|
|
chat_history = chat_history or []
|
|
|
msg_from_user = None
|
|
|
if validate_query(user_text):
|
|
|
msg_from_user = user_text
|
|
|
logger.info("Processing text input")
|
|
|
elif audio_file:
|
|
|
logger.info("Processing audio input")
|
|
|
try:
|
|
|
transcription_gen = transcribe_audio(audio_file)
|
|
|
last_out = ""
|
|
|
for out in transcription_gen:
|
|
|
if isinstance(out, str) and out.startswith("[ERROR]"):
|
|
|
chat_history.append(("System", out))
|
|
|
return chat_history, "", None
|
|
|
elif isinstance(out, str) and not out.startswith("Status:"):
|
|
|
last_out = out
|
|
|
if validate_query(last_out):
|
|
|
msg_from_user = last_out
|
|
|
logger.info("Successfully transcribed audio")
|
|
|
else:
|
|
|
chat_history.append(("System", "Could not transcribe audio properly"))
|
|
|
return chat_history, "", None
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing audio: {e}")
|
|
|
chat_history.append(("System", f"Audio processing error: {str(e)}"))
|
|
|
return chat_history, "", None
|
|
|
if msg_from_user:
|
|
|
try:
|
|
|
logger.info(f"Processing message: {msg_from_user[:50]}...")
|
|
|
chat_history.append(("User", msg_from_user))
|
|
|
wisal_reply = process_query(msg_from_user)
|
|
|
chat_history.append(("Wisal", wisal_reply))
|
|
|
logger.info("Successfully processed message in unified handler")
|
|
|
return chat_history, "", None
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing query: {e}")
|
|
|
chat_history.append(("System", f"Processing error: {str(e)}"))
|
|
|
return chat_history, "", None
|
|
|
logger.warning("No valid input received in unified handler")
|
|
|
chat_history.append(("System", "Please provide either text or audio input"))
|
|
|
return chat_history, "", None
|
|
|
|
|
|
|
|
|
def wisal_handler(
|
|
|
user_text: Optional[str],
|
|
|
audio_file: Any,
|
|
|
chat_history: List[Tuple[str, str]]
|
|
|
) -> Tuple[List[Tuple[str, str]], str, Any]:
|
|
|
logger.info("Wisal handler called")
|
|
|
chat_history = chat_history or []
|
|
|
if validate_query(user_text):
|
|
|
logger.info("Processing text input in Wisal handler")
|
|
|
try:
|
|
|
response = process_query(user_text)
|
|
|
chat_history.append(("User", user_text))
|
|
|
chat_history.append(("Wisal", response))
|
|
|
return chat_history, "", None
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing text in Wisal handler: {e}")
|
|
|
chat_history.append(("User", user_text))
|
|
|
chat_history.append(("System", f"Processing error: {str(e)}"))
|
|
|
return chat_history, "", None
|
|
|
|
|
|
if audio_file:
|
|
|
logger.info("Processing audio input in Wisal handler")
|
|
|
try:
|
|
|
transcription = None
|
|
|
for out in transcribe_audio(audio_file):
|
|
|
if isinstance(out, str) and out.startswith("[ERROR]"):
|
|
|
chat_history.append(("System", out))
|
|
|
return chat_history, "", None
|
|
|
if isinstance(out, str) and not out.startswith("Status:"):
|
|
|
transcription = out
|
|
|
if validate_query(transcription):
|
|
|
logger.info("Successfully transcribed audio")
|
|
|
chat_history.append(("User", transcription))
|
|
|
wisal_reply = process_query(transcription)
|
|
|
chat_history.append(("Wisal", wisal_reply))
|
|
|
return chat_history, "", None
|
|
|
else:
|
|
|
chat_history.append(("System", "Could not transcribe audio properly"))
|
|
|
return chat_history, "", None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing audio in Wisal handler: {e}")
|
|
|
chat_history.append(("System", f"Audio processing error: {str(e)}"))
|
|
|
return chat_history, "", None
|
|
|
logger.warning("No valid input received in Wisal handler")
|
|
|
chat_history.append(("System", "Please provide either text or audio input"))
|
|
|
return chat_history, "", None
|
|
|
|
|
|
|
|
|
if __name__=="__main__":
|
|
|
|
|
|
|
|
|
pdf_test = "tests/Computational Requirements for Embed.pdf"
|
|
|
docs_test = "tests/Computational Requirements for Embed.docx"
|
|
|
txt_test = "assets/RAG_Documents/Autism_Books_1.txt"
|
|
|
|
|
|
print(f"=" * 70)
|
|
|
print("COMPREHENSIVE UTILS/FUNCTIONS TEST SUITE")
|
|
|
print(f"=" * 70)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 1: UTILITY FUNCTIONS")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
|
|
|
print("Testing document type validation...")
|
|
|
valid_types = ["Knowledge Document", "User-Specific Document", "Old Document", "New Document", "None"]
|
|
|
invalid_types = ["Random Document", "Invalid Type", "", None]
|
|
|
|
|
|
for doc_type in valid_types:
|
|
|
result = validate_document_type(doc_type)
|
|
|
print(f"β Valid type '{doc_type}': {result}")
|
|
|
|
|
|
for doc_type in invalid_types:
|
|
|
result = validate_document_type(doc_type)
|
|
|
print(f"β Invalid type '{doc_type}': {result}")
|
|
|
|
|
|
|
|
|
print("\nTesting upload directory creation...")
|
|
|
try:
|
|
|
upload_dir = get_upload_directory()
|
|
|
if os.path.exists(upload_dir):
|
|
|
print(f"β Upload directory created/exists: {upload_dir}")
|
|
|
else:
|
|
|
print(f"β Upload directory not found: {upload_dir}")
|
|
|
except Exception as e:
|
|
|
print(f"β Upload directory test failed: {e}")
|
|
|
|
|
|
|
|
|
print("\nTesting query validation...")
|
|
|
valid_queries = ["What is autism?", "Help me understand treatments", "a"]
|
|
|
invalid_queries = ["", " ", None]
|
|
|
|
|
|
for query in valid_queries:
|
|
|
result = validate_query(query)
|
|
|
print(f"β Valid query '{query}': {result}")
|
|
|
|
|
|
for query in invalid_queries:
|
|
|
result = validate_query(query)
|
|
|
print(f"β Invalid query '{query}': {result}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 2: FILE UPLOAD AND SAVING WITH REAL FILES")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
test_files = [
|
|
|
(pdf_test, "PDF"),
|
|
|
(docs_test, "DOCX"),
|
|
|
(txt_test, "TXT")
|
|
|
]
|
|
|
|
|
|
for file_path, file_type in test_files:
|
|
|
print(f"\nTesting {file_type} file upload: {os.path.basename(file_path)}")
|
|
|
if os.path.exists(file_path):
|
|
|
try:
|
|
|
save_path = save_uploaded_file(file_path, f"test_{file_type.lower()}.{file_type.lower()}")
|
|
|
if os.path.exists(save_path):
|
|
|
print(f"β {file_type} file saved successfully: {save_path}")
|
|
|
|
|
|
|
|
|
original_size = os.path.getsize(file_path)
|
|
|
saved_size = os.path.getsize(save_path)
|
|
|
if original_size == saved_size:
|
|
|
print(f"β {file_type} file size matches: {saved_size} bytes")
|
|
|
else:
|
|
|
print(f"β {file_type} file size mismatch: {original_size} vs {saved_size}")
|
|
|
else:
|
|
|
print(f"β {file_type} file not found after saving")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β {file_type} file upload failed: {e}")
|
|
|
else:
|
|
|
print(f"β {file_type} test file not found: {file_path}")
|
|
|
|
|
|
|
|
|
print("\nTesting error handling for invalid files...")
|
|
|
try:
|
|
|
save_uploaded_file(None)
|
|
|
print("β Should have failed with None file")
|
|
|
except ValueError as e:
|
|
|
print(f"β Correctly handled None file: {e}")
|
|
|
except Exception as e:
|
|
|
print(f"β Handled error: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 3: DOCUMENT PROCESSING BY TYPE WITH REAL FILES")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
test_query = "What does this document say about computational requirements?"
|
|
|
|
|
|
|
|
|
if os.path.exists(txt_test):
|
|
|
print(f"Testing document processing with: {os.path.basename(txt_test)}")
|
|
|
|
|
|
for doc_type in ["Knowledge Document", "User-Specific Document", "Old Document", "New Document"]:
|
|
|
print(f"\nTesting {doc_type} processing...")
|
|
|
try:
|
|
|
result = process_document_by_type(test_query, txt_test, doc_type)
|
|
|
print(f"β {doc_type} processed successfully")
|
|
|
print(f" Response preview: {result[:150]}...")
|
|
|
except Exception as e:
|
|
|
print(f"β {doc_type} processing failed: {e}")
|
|
|
else:
|
|
|
print(f"β Text test file not found: {txt_test}")
|
|
|
|
|
|
|
|
|
if os.path.exists(pdf_test):
|
|
|
print(f"\nTesting PDF document processing: {os.path.basename(pdf_test)}")
|
|
|
try:
|
|
|
result = process_document_by_type(test_query, pdf_test, "Knowledge Document")
|
|
|
print(f"β PDF processed as Knowledge Document successfully")
|
|
|
print(f" Response preview: {result[:150]}...")
|
|
|
except Exception as e:
|
|
|
print(f"β PDF processing failed: {e}")
|
|
|
else:
|
|
|
print(f"β PDF test file not found: {pdf_test}")
|
|
|
|
|
|
|
|
|
print(f"\nTesting invalid document type...")
|
|
|
try:
|
|
|
if os.path.exists(txt_test):
|
|
|
process_document_by_type(test_query, txt_test, "Invalid Type")
|
|
|
print("β Should have failed with invalid type")
|
|
|
else:
|
|
|
print("β Skipping invalid type test - no test file available")
|
|
|
except ValueError as e:
|
|
|
print(f"β Correctly handled invalid document type: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 4: MAIN PIPELINE INTERFACE")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
|
|
|
print("Testing main pipeline interface...")
|
|
|
test_queries = [
|
|
|
"What is autism?",
|
|
|
"How can I help a child with autism?",
|
|
|
"Tell me about autism interventions",
|
|
|
"What are the symptoms of ASD?"
|
|
|
]
|
|
|
|
|
|
for query in test_queries:
|
|
|
print(f"\nTesting query: '{query}'")
|
|
|
try:
|
|
|
result = main_pipeline_interface(query)
|
|
|
print(f"β Pipeline response received: {result[:100]}...")
|
|
|
except Exception as e:
|
|
|
print(f"β Pipeline failed for query '{query}': {e}")
|
|
|
|
|
|
|
|
|
print(f"\nTesting non-autism related query...")
|
|
|
try:
|
|
|
result = main_pipeline_interface("What's the weather like?")
|
|
|
print(f"β Non-autism query handled: {result[:100]}...")
|
|
|
except Exception as e:
|
|
|
print(f"β Non-autism query failed: {e}")
|
|
|
|
|
|
|
|
|
print(f"\nTesting empty query...")
|
|
|
try:
|
|
|
main_pipeline_interface("")
|
|
|
print("β Should have failed with empty query")
|
|
|
except ValueError as e:
|
|
|
print(f"β Correctly handled empty query: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 5: PIPELINE WITH DOCUMENT AND HISTORY")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
|
|
|
if os.path.exists(txt_test):
|
|
|
print("Testing pipeline with document and history...")
|
|
|
try:
|
|
|
initial_history = "Previous conversation history here."
|
|
|
|
|
|
response, updated_history = main_pipeline_with_doc_and_history(
|
|
|
"What information is in this document about autism?",
|
|
|
txt_test,
|
|
|
"Knowledge Document",
|
|
|
initial_history
|
|
|
)
|
|
|
|
|
|
print(f"β Pipeline with document successful")
|
|
|
print(f" Response preview: {response[:100]}...")
|
|
|
print(f" History updated: {'Yes' if len(updated_history) > len(initial_history) else 'No'}")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Pipeline with document failed: {e}")
|
|
|
else:
|
|
|
print(f"β Cannot test with document - file not found: {txt_test}")
|
|
|
|
|
|
|
|
|
print("\nTesting pipeline without document...")
|
|
|
try:
|
|
|
response = main_pipeline_with_doc("What is autism spectrum disorder?", None, "None")
|
|
|
print(f"β Pipeline without document successful: {response[:100]}...")
|
|
|
except Exception as e:
|
|
|
print(f"β Pipeline without document failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 6: PIPELINE WITH HISTORY MANAGEMENT")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
print("Testing pipeline with history management...")
|
|
|
try:
|
|
|
initial_history = [["Previous user message", "Previous bot response"]]
|
|
|
|
|
|
history, cleared_input = pipeline_with_history(
|
|
|
"Tell me about autism therapy approaches",
|
|
|
None,
|
|
|
"None",
|
|
|
initial_history
|
|
|
)
|
|
|
|
|
|
if len(history) > len(initial_history):
|
|
|
print("β History updated successfully")
|
|
|
print(f" History entries: {len(history)}")
|
|
|
print(f" Latest entry: {history[-1][0][:50]}...")
|
|
|
else:
|
|
|
print("β History not updated properly")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Pipeline with history failed: {e}")
|
|
|
|
|
|
|
|
|
if os.path.exists(txt_test):
|
|
|
print("\nTesting history pipeline with document...")
|
|
|
try:
|
|
|
history, cleared = pipeline_with_history(
|
|
|
"Analyze this document for autism information",
|
|
|
txt_test,
|
|
|
"User-Specific Document",
|
|
|
[]
|
|
|
)
|
|
|
|
|
|
if len(history) > 0:
|
|
|
print("β Document processing in history pipeline successful")
|
|
|
print(f" Response preview: {history[-1][1][:100]}...")
|
|
|
else:
|
|
|
print("β No history entries created")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β History pipeline with document failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 7: UNIFIED HANDLER")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
|
|
|
print("Testing unified handler with text input...")
|
|
|
try:
|
|
|
history, cleared_text, cleared_audio = unified_handler(
|
|
|
"What are the early signs of autism?",
|
|
|
None,
|
|
|
[]
|
|
|
)
|
|
|
|
|
|
if len(history) >= 2:
|
|
|
print("β Text input processed successfully")
|
|
|
print(f" User message: {history[-2][1][:50]}...")
|
|
|
print(f" Wisal response: {history[-1][1][:50]}...")
|
|
|
else:
|
|
|
print("β Text input not processed correctly")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Unified handler with text failed: {e}")
|
|
|
|
|
|
|
|
|
print("\nTesting unified handler with no input...")
|
|
|
try:
|
|
|
history, cleared_text, cleared_audio = unified_handler(None, None, [])
|
|
|
|
|
|
if any("Please provide either text or audio input" in str(entry) for entry in history):
|
|
|
print("β No input handled correctly")
|
|
|
else:
|
|
|
print("β No input not handled properly")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Unified handler with no input failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 8: WISAL HANDLER")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
|
|
|
print("Testing Wisal handler with text input...")
|
|
|
try:
|
|
|
history, cleared_text, cleared_audio = wisal_handler(
|
|
|
"Explain autism sensory sensitivities",
|
|
|
None,
|
|
|
[]
|
|
|
)
|
|
|
|
|
|
if len(history) >= 2:
|
|
|
print("β Wisal text processing successful")
|
|
|
print(f" User message: {history[-2][1][:50]}...")
|
|
|
print(f" Wisal response: {history[-1][1][:50]}...")
|
|
|
else:
|
|
|
print("β Wisal response not found in history")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Wisal handler with text failed: {e}")
|
|
|
|
|
|
|
|
|
print("\nTesting Wisal handler with no input...")
|
|
|
try:
|
|
|
history, cleared_text, cleared_audio = wisal_handler(None, None, [])
|
|
|
|
|
|
if len(history) > 0:
|
|
|
print("β Wisal no input handled correctly")
|
|
|
print(f" System message: {history[-1][1]}")
|
|
|
else:
|
|
|
print("β Wisal no input not handled")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Wisal handler with no input failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 9: ERROR HANDLING AND EDGE CASES")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
|
|
|
print("Testing with very long query...")
|
|
|
try:
|
|
|
long_query = "autism " * 100 + "what are the symptoms and treatments?"
|
|
|
result = main_pipeline_interface(long_query)
|
|
|
print("β Long query handled successfully")
|
|
|
print(f" Response preview: {result[:100]}...")
|
|
|
except Exception as e:
|
|
|
print(f"β Long query error handled: {e}")
|
|
|
|
|
|
|
|
|
print("\nTesting with special characters...")
|
|
|
try:
|
|
|
special_query = "What about autism? π§©π #autism @support"
|
|
|
result = main_pipeline_interface(special_query)
|
|
|
print("β Special characters handled successfully")
|
|
|
print(f" Response preview: {result[:100]}...")
|
|
|
except Exception as e:
|
|
|
print(f"β Special characters error handled: {e}")
|
|
|
|
|
|
|
|
|
print("\nTesting with non-existent file...")
|
|
|
try:
|
|
|
result = main_pipeline_with_doc(
|
|
|
"Analyze this document",
|
|
|
"non_existent_file.txt",
|
|
|
"Knowledge Document"
|
|
|
)
|
|
|
print(f"β Non-existent file handled: {result[:100]}...")
|
|
|
except Exception as e:
|
|
|
print(f"β Non-existent file error handled: {e}")
|
|
|
|
|
|
|
|
|
print("\nTesting with invalid document type...")
|
|
|
try:
|
|
|
if os.path.exists(txt_test):
|
|
|
result = main_pipeline_with_doc(
|
|
|
"Test query",
|
|
|
txt_test,
|
|
|
"Invalid Document Type"
|
|
|
)
|
|
|
print(f"β Invalid document type handled: {result}")
|
|
|
else:
|
|
|
print("β Skipping invalid document type test - no test file")
|
|
|
except Exception as e:
|
|
|
print(f"β Invalid document type error handled: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
print("TEST 10: CONFIGURATION AND ENVIRONMENT")
|
|
|
print(f"{'=' * 50}")
|
|
|
|
|
|
print("Checking environment variables...")
|
|
|
env_vars = [
|
|
|
"SILICONFLOW_API_KEY",
|
|
|
"SILICONFLOW_URL",
|
|
|
"SILICONFLOW_CHAT_URL",
|
|
|
"ENVIRONMENT"
|
|
|
]
|
|
|
|
|
|
for var in env_vars:
|
|
|
value = os.getenv(var)
|
|
|
if value:
|
|
|
print(f"β {var}: Set (length: {len(value)})")
|
|
|
else:
|
|
|
print(f"β {var}: Not set")
|
|
|
|
|
|
print(f"\nChecking global variables...")
|
|
|
try:
|
|
|
print(f"β Environment: {env}")
|
|
|
print(f"β Session ID: {SESSION_ID}")
|
|
|
print(f"β Valid doc types: {len(VALID_DOC_TYPES)} types")
|
|
|
print(f"β Pending clarifications: {type(pending_clarifications)}")
|
|
|
|
|
|
|
|
|
for key, value in VALID_DOC_TYPES.items():
|
|
|
print(f" - {key}: {value}")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Global variables error: {e}")
|
|
|
|
|
|
|
|
|
print(f"\nChecking test files availability...")
|
|
|
test_files_check = [
|
|
|
(pdf_test, "PDF"),
|
|
|
(docs_test, "DOCX"),
|
|
|
(txt_test, "TXT")
|
|
|
]
|
|
|
|
|
|
available_files = 0
|
|
|
for file_path, file_type in test_files_check:
|
|
|
if os.path.exists(file_path):
|
|
|
size = os.path.getsize(file_path)
|
|
|
print(f"β {file_type} test file available: {os.path.basename(file_path)} ({size} bytes)")
|
|
|
available_files += 1
|
|
|
else:
|
|
|
print(f"β {file_type} test file missing: {file_path}")
|
|
|
|
|
|
print(f" Available test files: {available_files}/{len(test_files_check)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n{'=' * 70}")
|
|
|
print("TEST SUMMARY")
|
|
|
print(f"{'=' * 70}")
|
|
|
print("β Utility functions tested")
|
|
|
print("β File upload and saving with real files verified")
|
|
|
print("β Document processing by type with real files checked")
|
|
|
print("β Main pipeline interface tested")
|
|
|
print("β Pipeline with document and history verified")
|
|
|
print("β History management tested")
|
|
|
print("β Unified handler functionality checked")
|
|
|
print("β Wisal handler tested")
|
|
|
print("β Error handling and edge cases validated")
|
|
|
print("β Configuration and environment checked")
|
|
|
print(f"{'=' * 70}")
|
|
|
print("UTILS/FUNCTIONS TEST SUITE COMPLETED")
|
|
|
print(f"{'=' * 70}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
upload_dir = get_upload_directory()
|
|
|
if os.path.exists(upload_dir):
|
|
|
test_file_count = 0
|
|
|
for file in os.listdir(upload_dir):
|
|
|
if file.startswith(('test_', 'temp_')):
|
|
|
os.remove(os.path.join(upload_dir, file))
|
|
|
test_file_count += 1
|
|
|
if test_file_count > 0:
|
|
|
print(f"β Cleaned up {test_file_count} test files")
|
|
|
else:
|
|
|
print("β No test files to clean up")
|
|
|
except Exception as e:
|
|
|
print(f"β Cleanup warning: {e}") |