Autism_QA / utils.py
A7m0d's picture
Upload folder using huggingface_hub
712579e verified
from logger.custom_logger import CustomLoggerTracker
from dotenv import load_dotenv
from docs_utils import *
from audio_utils import transcribe_audio
from pipeQuery import process_query
import os
import time
from typing import Dict, List, Tuple, Optional, Any
from configs import load_yaml_config
## loading config yaml
config = load_yaml_config("config.yaml")
# Load .env early
load_dotenv()
# ---------------------------
# Custom Logger Initialization
# ---------------------------
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("utils")
logger.info("Logger initialized for utils/functions module")
# ---------------------------
# Environment & Globals
# ---------------------------
env = os.getenv("ENVIRONMENT", "production")
SESSION_ID = "default"
pending_clarifications: Dict[str, str] = {}
SILICONFLOW_API_KEY = os.getenv("SILICONFLOW_API_KEY", "")
SILICONFLOW_URL = os.getenv("SILICONFLOW_URL", "").strip()
SILICONFLOW_CHAT_URL = os.getenv(
"SILICONFLOW_CHAT_URL", "https://api.siliconflow.com/v1/chat/completions").strip()
# Document types mapping for validation
VALID_DOC_TYPES = {
"Knowledge Document": "knowledge",
"User-Specific Document": "user_specific",
"Old Document": "old",
"New Document": "new",
"None": None
}
if not SILICONFLOW_API_KEY:
logger.warning("SILICONFLOW_API_KEY is not set. LLM/Reranker calls may fail.")
if not SILICONFLOW_URL:
logger.warning("SILICONFLOW_URL is not set. OpenAI client base_url will not work.")
# ---------------------------
# Utility Functions - Refactored
# ---------------------------
def validate_document_type(doc_type: str) -> bool:
return doc_type in VALID_DOC_TYPES
def get_upload_directory() -> str:
upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs")
os.makedirs(upload_dir, exist_ok=True)
return upload_dir
def save_uploaded_file(doc_file: Any, filename: Optional[str] = None) -> str:
if doc_file is None:
raise ValueError("Document file cannot be None")
# Get safe filename
if filename:
safe_filename = os.path.basename(filename)
else:
safe_filename = os.path.basename(getattr(doc_file, 'name', 'unknown_file'))
if not safe_filename or safe_filename == 'unknown_file':
safe_filename = f"document_{int(time.time())}"
upload_dir = get_upload_directory()
save_path = os.path.join(upload_dir, safe_filename)
logger.info(f"Saving document to: {save_path}")
try:
# Handle file-like objects vs path strings
if hasattr(doc_file, 'read'):
file_bytes = doc_file.read()
else:
with open(str(doc_file), 'rb') as f:
file_bytes = f.read()
# Validate file content
if not file_bytes:
raise ValueError("File appears to be empty")
with open(save_path, "wb") as f:
f.write(file_bytes)
logger.info(f"Successfully saved uploaded file to {save_path}")
return save_path
except Exception as e:
logger.error(f"Error saving file: {e}")
raise IOError(f"Failed to save file: {e}")
def process_document_by_type(query: str, save_path: str, doc_type: str) -> str:
if not validate_document_type(doc_type):
raise ValueError(f"Invalid document type: {doc_type}")
try:
if doc_type == "Knowledge Document":
logger.info("Processing as Knowledge Document")
status = rag_dom_ingest(save_path)
answer = rag_dom_qa(query)
return f"[Knowledge Document Uploaded]\n{status}\n\n{answer}"
elif doc_type == "User-Specific Document":
logger.info("Processing as User-Specific Document")
status = user_doc_ingest(save_path)
answer = user_doc_qa(query)
return f"[User-Specific Document Uploaded]\n{status}\n\n{answer}"
elif doc_type == "Old Document":
logger.info("Processing as Old Document")
status = old_doc_ingestion(save_path)
answer = old_doc_qa(query)
return f"[Old Document Uploaded]\n{status}\n\n{answer}"
elif doc_type == "New Document":
logger.info("Processing as New Document")
status = user_doc_ingest(save_path)
answer = user_doc_qa(query)
return f"[New Document Uploaded]\n{status}\n\n{answer}"
else:
raise ValueError(f"Unsupported document type: {doc_type}")
except Exception as e:
logger.error(f"Error processing document type {doc_type}: {e}")
raise
def validate_query(query: str) -> bool:
"""Validate user query"""
return query and query.strip() and len(query.strip()) > 0
# ---------------------------
# Public Interfaces
# ---------------------------
def main_pipeline_interface(query: str) -> str:
if not validate_query(query):
raise ValueError("Query cannot be empty")
logger.info("Main pipeline interface called")
try:
return process_query(query, first_turn=True)
except Exception as e:
logger.error(f"Error in main pipeline: {e}")
raise
def main_pipeline_with_doc_and_history(
query: str,
doc_file: Any,
doc_type: str,
history: str
) -> Tuple[str, str]:
if not validate_query(query):
return "Please provide a valid query.", history
logger.info("Pipeline with doc and history called")
try:
response = main_pipeline_with_doc(query, doc_file, doc_type)
updated_history = f"{history}\nUser: {query}\nWisal: {response}\n"
return response, updated_history
except Exception as e:
logger.error(f"Error in pipeline with doc and history: {e}")
error_response = f"Sorry, I encountered an error: {str(e)}"
updated_history = f"{history}\nUser: {query}\nWisal: {error_response}\n"
return error_response, updated_history
def main_pipeline_with_doc(query: str, doc_file: Any, doc_type: str) -> str:
if not validate_query(query):
return "Please provide a valid query."
logger.info(f"Pipeline with doc called - doc_type: {doc_type}")
# If no document, use main pipeline
if doc_file is None or doc_type == "None":
logger.info("No document provided, using main pipeline")
try:
return process_query(query, first_turn=True)
except Exception as e:
logger.error(f"Error in main pipeline: {e}")
return f"Sorry, I encountered an error processing your query: {str(e)}"
# Validate document type
if not validate_document_type(doc_type):
logger.warning(f"Invalid document type: {doc_type}")
return f"Invalid document type: {doc_type}. Valid types are: {', '.join(VALID_DOC_TYPES.keys())}"
try:
save_path = save_uploaded_file(doc_file)
return process_document_by_type(query, save_path, doc_type)
except Exception as e:
logger.error(f"Error in document processing: {e}")
return f"Sorry, I encountered an error processing your document: {str(e)}"
def pipeline_with_history(
message: str,
doc_file: Any,
doc_type: str,
history: List[List[str]]
) -> Tuple[List[List[str]], str]:
logger.info("Pipeline with history called")
history = history or []
if not validate_query(message):
logger.warning("Empty message received")
error_msg = "Please provide a valid message."
history.append([message or "", error_msg])
return history, ""
try:
response = main_pipeline_with_doc(message, doc_file, doc_type)
history.append([message, response])
logger.info("Successfully processed message with history")
return history, ""
except Exception as e:
logger.error(f"Error in pipeline with history: {e}")
error_response = f"Sorry, I encountered an error: {str(e)}"
history.append([message, error_response])
return history, ""
def unified_handler(
user_text: Optional[str],
audio_file: Any,
chat_history: List[Tuple[str, str]]
) -> Tuple[List[Tuple[str, str]], str, Any]:
logger.info("Unified handler called")
chat_history = chat_history or []
msg_from_user = None
if validate_query(user_text):
msg_from_user = user_text
logger.info("Processing text input")
elif audio_file:
logger.info("Processing audio input")
try:
transcription_gen = transcribe_audio(audio_file)
last_out = ""
for out in transcription_gen:
if isinstance(out, str) and out.startswith("[ERROR]"):
chat_history.append(("System", out))
return chat_history, "", None
elif isinstance(out, str) and not out.startswith("Status:"):
last_out = out
if validate_query(last_out):
msg_from_user = last_out
logger.info("Successfully transcribed audio")
else:
chat_history.append(("System", "Could not transcribe audio properly"))
return chat_history, "", None
except Exception as e:
logger.error(f"Error processing audio: {e}")
chat_history.append(("System", f"Audio processing error: {str(e)}"))
return chat_history, "", None
if msg_from_user:
try:
logger.info(f"Processing message: {msg_from_user[:50]}...")
chat_history.append(("User", msg_from_user))
wisal_reply = process_query(msg_from_user)
chat_history.append(("Wisal", wisal_reply))
logger.info("Successfully processed message in unified handler")
return chat_history, "", None
except Exception as e:
logger.error(f"Error processing query: {e}")
chat_history.append(("System", f"Processing error: {str(e)}"))
return chat_history, "", None
logger.warning("No valid input received in unified handler")
chat_history.append(("System", "Please provide either text or audio input"))
return chat_history, "", None
def wisal_handler(
user_text: Optional[str],
audio_file: Any,
chat_history: List[Tuple[str, str]]
) -> Tuple[List[Tuple[str, str]], str, Any]:
logger.info("Wisal handler called")
chat_history = chat_history or []
if validate_query(user_text):
logger.info("Processing text input in Wisal handler")
try:
response = process_query(user_text)
chat_history.append(("User", user_text))
chat_history.append(("Wisal", response))
return chat_history, "", None
except Exception as e:
logger.error(f"Error processing text in Wisal handler: {e}")
chat_history.append(("User", user_text))
chat_history.append(("System", f"Processing error: {str(e)}"))
return chat_history, "", None
if audio_file:
logger.info("Processing audio input in Wisal handler")
try:
transcription = None
for out in transcribe_audio(audio_file):
if isinstance(out, str) and out.startswith("[ERROR]"):
chat_history.append(("System", out))
return chat_history, "", None
if isinstance(out, str) and not out.startswith("Status:"):
transcription = out
if validate_query(transcription):
logger.info("Successfully transcribed audio")
chat_history.append(("User", transcription))
wisal_reply = process_query(transcription)
chat_history.append(("Wisal", wisal_reply))
return chat_history, "", None
else:
chat_history.append(("System", "Could not transcribe audio properly"))
return chat_history, "", None
except Exception as e:
logger.error(f"Error processing audio in Wisal handler: {e}")
chat_history.append(("System", f"Audio processing error: {str(e)}"))
return chat_history, "", None
logger.warning("No valid input received in Wisal handler")
chat_history.append(("System", "Please provide either text or audio input"))
return chat_history, "", None
if __name__=="__main__":
# Test file paths
pdf_test = "tests/Computational Requirements for Embed.pdf"
docs_test = "tests/Computational Requirements for Embed.docx"
txt_test = "assets/RAG_Documents/Autism_Books_1.txt"
print(f"=" * 70)
print("COMPREHENSIVE UTILS/FUNCTIONS TEST SUITE")
print(f"=" * 70)
# ===========================
# Test 1: Utility Functions
# ===========================
print(f"\n{'=' * 50}")
print("TEST 1: UTILITY FUNCTIONS")
print(f"{'=' * 50}")
# Test document type validation
print("Testing document type validation...")
valid_types = ["Knowledge Document", "User-Specific Document", "Old Document", "New Document", "None"]
invalid_types = ["Random Document", "Invalid Type", "", None]
for doc_type in valid_types:
result = validate_document_type(doc_type)
print(f"βœ“ Valid type '{doc_type}': {result}")
for doc_type in invalid_types:
result = validate_document_type(doc_type)
print(f"βœ— Invalid type '{doc_type}': {result}")
# Test upload directory creation
print("\nTesting upload directory creation...")
try:
upload_dir = get_upload_directory()
if os.path.exists(upload_dir):
print(f"βœ“ Upload directory created/exists: {upload_dir}")
else:
print(f"βœ— Upload directory not found: {upload_dir}")
except Exception as e:
print(f"βœ— Upload directory test failed: {e}")
# Test query validation
print("\nTesting query validation...")
valid_queries = ["What is autism?", "Help me understand treatments", "a"]
invalid_queries = ["", " ", None]
for query in valid_queries:
result = validate_query(query)
print(f"βœ“ Valid query '{query}': {result}")
for query in invalid_queries:
result = validate_query(query)
print(f"βœ— Invalid query '{query}': {result}")
# ===========================
# Test 2: File Upload and Saving with Real Files
# ===========================
print(f"\n{'=' * 50}")
print("TEST 2: FILE UPLOAD AND SAVING WITH REAL FILES")
print(f"{'=' * 50}")
test_files = [
(pdf_test, "PDF"),
(docs_test, "DOCX"),
(txt_test, "TXT")
]
for file_path, file_type in test_files:
print(f"\nTesting {file_type} file upload: {os.path.basename(file_path)}")
if os.path.exists(file_path):
try:
save_path = save_uploaded_file(file_path, f"test_{file_type.lower()}.{file_type.lower()}")
if os.path.exists(save_path):
print(f"βœ“ {file_type} file saved successfully: {save_path}")
# Check file size
original_size = os.path.getsize(file_path)
saved_size = os.path.getsize(save_path)
if original_size == saved_size:
print(f"βœ“ {file_type} file size matches: {saved_size} bytes")
else:
print(f"βœ— {file_type} file size mismatch: {original_size} vs {saved_size}")
else:
print(f"βœ— {file_type} file not found after saving")
except Exception as e:
print(f"βœ— {file_type} file upload failed: {e}")
else:
print(f"βœ— {file_type} test file not found: {file_path}")
# Test error handling for invalid files
print("\nTesting error handling for invalid files...")
try:
save_uploaded_file(None)
print("βœ— Should have failed with None file")
except ValueError as e:
print(f"βœ“ Correctly handled None file: {e}")
except Exception as e:
print(f"βœ“ Handled error: {e}")
# ===========================
# Test 3: Document Processing by Type with Real Files
# ===========================
print(f"\n{'=' * 50}")
print("TEST 3: DOCUMENT PROCESSING BY TYPE WITH REAL FILES")
print(f"{'=' * 50}")
test_query = "What does this document say about computational requirements?"
# Test with text file
if os.path.exists(txt_test):
print(f"Testing document processing with: {os.path.basename(txt_test)}")
for doc_type in ["Knowledge Document", "User-Specific Document", "Old Document", "New Document"]:
print(f"\nTesting {doc_type} processing...")
try:
result = process_document_by_type(test_query, txt_test, doc_type)
print(f"βœ“ {doc_type} processed successfully")
print(f" Response preview: {result[:150]}...")
except Exception as e:
print(f"βœ— {doc_type} processing failed: {e}")
else:
print(f"βœ— Text test file not found: {txt_test}")
# Test with PDF file if available
if os.path.exists(pdf_test):
print(f"\nTesting PDF document processing: {os.path.basename(pdf_test)}")
try:
result = process_document_by_type(test_query, pdf_test, "Knowledge Document")
print(f"βœ“ PDF processed as Knowledge Document successfully")
print(f" Response preview: {result[:150]}...")
except Exception as e:
print(f"βœ— PDF processing failed: {e}")
else:
print(f"βœ— PDF test file not found: {pdf_test}")
# Test invalid document type
print(f"\nTesting invalid document type...")
try:
if os.path.exists(txt_test):
process_document_by_type(test_query, txt_test, "Invalid Type")
print("βœ— Should have failed with invalid type")
else:
print("⚠ Skipping invalid type test - no test file available")
except ValueError as e:
print(f"βœ“ Correctly handled invalid document type: {e}")
# ===========================
# Test 4: Main Pipeline Interface
# ===========================
print(f"\n{'=' * 50}")
print("TEST 4: MAIN PIPELINE INTERFACE")
print(f"{'=' * 50}")
# Test main pipeline with autism-related queries
print("Testing main pipeline interface...")
test_queries = [
"What is autism?",
"How can I help a child with autism?",
"Tell me about autism interventions",
"What are the symptoms of ASD?"
]
for query in test_queries:
print(f"\nTesting query: '{query}'")
try:
result = main_pipeline_interface(query)
print(f"βœ“ Pipeline response received: {result[:100]}...")
except Exception as e:
print(f"βœ— Pipeline failed for query '{query}': {e}")
# Test with non-autism query
print(f"\nTesting non-autism related query...")
try:
result = main_pipeline_interface("What's the weather like?")
print(f"βœ“ Non-autism query handled: {result[:100]}...")
except Exception as e:
print(f"βœ— Non-autism query failed: {e}")
# Test with empty query
print(f"\nTesting empty query...")
try:
main_pipeline_interface("")
print("βœ— Should have failed with empty query")
except ValueError as e:
print(f"βœ“ Correctly handled empty query: {e}")
# ===========================
# Test 5: Pipeline with Document and History
# ===========================
print(f"\n{'=' * 50}")
print("TEST 5: PIPELINE WITH DOCUMENT AND HISTORY")
print(f"{'=' * 50}")
# Test with real document
if os.path.exists(txt_test):
print("Testing pipeline with document and history...")
try:
initial_history = "Previous conversation history here."
response, updated_history = main_pipeline_with_doc_and_history(
"What information is in this document about autism?",
txt_test,
"Knowledge Document",
initial_history
)
print(f"βœ“ Pipeline with document successful")
print(f" Response preview: {response[:100]}...")
print(f" History updated: {'Yes' if len(updated_history) > len(initial_history) else 'No'}")
except Exception as e:
print(f"βœ— Pipeline with document failed: {e}")
else:
print(f"βœ— Cannot test with document - file not found: {txt_test}")
# Test without document
print("\nTesting pipeline without document...")
try:
response = main_pipeline_with_doc("What is autism spectrum disorder?", None, "None")
print(f"βœ“ Pipeline without document successful: {response[:100]}...")
except Exception as e:
print(f"βœ— Pipeline without document failed: {e}")
# ===========================
# Test 6: Pipeline with History Management
# ===========================
print(f"\n{'=' * 50}")
print("TEST 6: PIPELINE WITH HISTORY MANAGEMENT")
print(f"{'=' * 50}")
print("Testing pipeline with history management...")
try:
initial_history = [["Previous user message", "Previous bot response"]]
history, cleared_input = pipeline_with_history(
"Tell me about autism therapy approaches",
None,
"None",
initial_history
)
if len(history) > len(initial_history):
print("βœ“ History updated successfully")
print(f" History entries: {len(history)}")
print(f" Latest entry: {history[-1][0][:50]}...")
else:
print("βœ— History not updated properly")
except Exception as e:
print(f"βœ— Pipeline with history failed: {e}")
# Test with document in history pipeline
if os.path.exists(txt_test):
print("\nTesting history pipeline with document...")
try:
history, cleared = pipeline_with_history(
"Analyze this document for autism information",
txt_test,
"User-Specific Document",
[]
)
if len(history) > 0:
print("βœ“ Document processing in history pipeline successful")
print(f" Response preview: {history[-1][1][:100]}...")
else:
print("βœ— No history entries created")
except Exception as e:
print(f"βœ— History pipeline with document failed: {e}")
# ===========================
# Test 7: Unified Handler
# ===========================
print(f"\n{'=' * 50}")
print("TEST 7: UNIFIED HANDLER")
print(f"{'=' * 50}")
# Test with text input
print("Testing unified handler with text input...")
try:
history, cleared_text, cleared_audio = unified_handler(
"What are the early signs of autism?",
None,
[]
)
if len(history) >= 2: # User message + Wisal response
print("βœ“ Text input processed successfully")
print(f" User message: {history[-2][1][:50]}...")
print(f" Wisal response: {history[-1][1][:50]}...")
else:
print("βœ— Text input not processed correctly")
except Exception as e:
print(f"βœ— Unified handler with text failed: {e}")
# Test with no input
print("\nTesting unified handler with no input...")
try:
history, cleared_text, cleared_audio = unified_handler(None, None, [])
if any("Please provide either text or audio input" in str(entry) for entry in history):
print("βœ“ No input handled correctly")
else:
print("βœ— No input not handled properly")
except Exception as e:
print(f"βœ— Unified handler with no input failed: {e}")
# ===========================
# Test 8: Wisal Handler
# ===========================
print(f"\n{'=' * 50}")
print("TEST 8: WISAL HANDLER")
print(f"{'=' * 50}")
# Test with text input
print("Testing Wisal handler with text input...")
try:
history, cleared_text, cleared_audio = wisal_handler(
"Explain autism sensory sensitivities",
None,
[]
)
if len(history) >= 2:
print("βœ“ Wisal text processing successful")
print(f" User message: {history[-2][1][:50]}...")
print(f" Wisal response: {history[-1][1][:50]}...")
else:
print("βœ— Wisal response not found in history")
except Exception as e:
print(f"βœ— Wisal handler with text failed: {e}")
# Test Wisal handler with no input
print("\nTesting Wisal handler with no input...")
try:
history, cleared_text, cleared_audio = wisal_handler(None, None, [])
if len(history) > 0:
print("βœ“ Wisal no input handled correctly")
print(f" System message: {history[-1][1]}")
else:
print("βœ— Wisal no input not handled")
except Exception as e:
print(f"βœ— Wisal handler with no input failed: {e}")
# ===========================
# Test 9: Error Handling and Edge Cases
# ===========================
print(f"\n{'=' * 50}")
print("TEST 9: ERROR HANDLING AND EDGE CASES")
print(f"{'=' * 50}")
# Test with very long query
print("Testing with very long query...")
try:
long_query = "autism " * 100 + "what are the symptoms and treatments?"
result = main_pipeline_interface(long_query)
print("βœ“ Long query handled successfully")
print(f" Response preview: {result[:100]}...")
except Exception as e:
print(f"βœ“ Long query error handled: {e}")
# Test with special characters in query
print("\nTesting with special characters...")
try:
special_query = "What about autism? πŸ§©πŸ’™ #autism @support"
result = main_pipeline_interface(special_query)
print("βœ“ Special characters handled successfully")
print(f" Response preview: {result[:100]}...")
except Exception as e:
print(f"βœ“ Special characters error handled: {e}")
# Test with non-existent file
print("\nTesting with non-existent file...")
try:
result = main_pipeline_with_doc(
"Analyze this document",
"non_existent_file.txt",
"Knowledge Document"
)
print(f"βœ“ Non-existent file handled: {result[:100]}...")
except Exception as e:
print(f"βœ“ Non-existent file error handled: {e}")
# Test with invalid document type
print("\nTesting with invalid document type...")
try:
if os.path.exists(txt_test):
result = main_pipeline_with_doc(
"Test query",
txt_test,
"Invalid Document Type"
)
print(f"βœ“ Invalid document type handled: {result}")
else:
print("⚠ Skipping invalid document type test - no test file")
except Exception as e:
print(f"βœ“ Invalid document type error handled: {e}")
# ===========================
# Test 10: Configuration and Environment
# ===========================
print(f"\n{'=' * 50}")
print("TEST 10: CONFIGURATION AND ENVIRONMENT")
print(f"{'=' * 50}")
print("Checking environment variables...")
env_vars = [
"SILICONFLOW_API_KEY",
"SILICONFLOW_URL",
"SILICONFLOW_CHAT_URL",
"ENVIRONMENT"
]
for var in env_vars:
value = os.getenv(var)
if value:
print(f"βœ“ {var}: Set (length: {len(value)})")
else:
print(f"βœ— {var}: Not set")
print(f"\nChecking global variables...")
try:
print(f"βœ“ Environment: {env}")
print(f"βœ“ Session ID: {SESSION_ID}")
print(f"βœ“ Valid doc types: {len(VALID_DOC_TYPES)} types")
print(f"βœ“ Pending clarifications: {type(pending_clarifications)}")
# Check VALID_DOC_TYPES mapping
for key, value in VALID_DOC_TYPES.items():
print(f" - {key}: {value}")
except Exception as e:
print(f"βœ— Global variables error: {e}")
# Check test files availability
print(f"\nChecking test files availability...")
test_files_check = [
(pdf_test, "PDF"),
(docs_test, "DOCX"),
(txt_test, "TXT")
]
available_files = 0
for file_path, file_type in test_files_check:
if os.path.exists(file_path):
size = os.path.getsize(file_path)
print(f"βœ“ {file_type} test file available: {os.path.basename(file_path)} ({size} bytes)")
available_files += 1
else:
print(f"βœ— {file_type} test file missing: {file_path}")
print(f" Available test files: {available_files}/{len(test_files_check)}")
# ===========================
# Test Summary
# ===========================
print(f"\n{'=' * 70}")
print("TEST SUMMARY")
print(f"{'=' * 70}")
print("βœ“ Utility functions tested")
print("βœ“ File upload and saving with real files verified")
print("βœ“ Document processing by type with real files checked")
print("βœ“ Main pipeline interface tested")
print("βœ“ Pipeline with document and history verified")
print("βœ“ History management tested")
print("βœ“ Unified handler functionality checked")
print("βœ“ Wisal handler tested")
print("βœ“ Error handling and edge cases validated")
print("βœ“ Configuration and environment checked")
print(f"{'=' * 70}")
print("UTILS/FUNCTIONS TEST SUITE COMPLETED")
print(f"{'=' * 70}")
# Cleanup uploaded test files
try:
upload_dir = get_upload_directory()
if os.path.exists(upload_dir):
test_file_count = 0
for file in os.listdir(upload_dir):
if file.startswith(('test_', 'temp_')):
os.remove(os.path.join(upload_dir, file))
test_file_count += 1
if test_file_count > 0:
print(f"βœ“ Cleaned up {test_file_count} test files")
else:
print("βœ“ No test files to clean up")
except Exception as e:
print(f"βœ— Cleanup warning: {e}")