Autism_QA / test_runner.py
A7m0d's picture
Upload folder using huggingface_hub
712579e verified
import os
import sys
import time
import pandas as pd
import re
from datetime import datetime
from dotenv import load_dotenv
from typing import Dict, Any, Tuple
import traceback
import asyncio
# Setup paths and environment
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
load_dotenv()
# Import your existing modules
from utils import process_query
from logger.custom_logger import CustomLoggerTracker
from rag_utils import rag_autism, encode_query
from clients import init_weaviate_client
# Initialize logger
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("csv_test")
class DetailedTimingTracker:
"""Enhanced timing tracker with detailed pipeline step analysis."""
def __init__(self):
self.timings = {}
self.start_times = {}
self.total_start_time = None
self.step_logs = []
self.rag_performance_data = {}
def start_timer(self, step_name: str):
"""Start timing a specific step."""
timestamp = datetime.now()
self.start_times[step_name] = time.time()
if self.total_start_time is None:
self.total_start_time = time.time()
log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] STARTED: {step_name}"
self.step_logs.append(log_entry)
logger.debug(log_entry)
def end_timer(self, step_name: str, additional_info: str = ""):
"""End timing a specific step with optional additional information."""
if step_name in self.start_times:
elapsed = time.time() - self.start_times[step_name]
self.timings[step_name] = round(elapsed, 3)
timestamp = datetime.now()
log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] COMPLETED: {step_name} - {elapsed:.3f}s"
if additional_info:
log_entry += f" | {additional_info}"
self.step_logs.append(log_entry)
logger.debug(log_entry)
# Special handling for RAG-related steps
if "rag" in step_name.lower() or "retrieval" in step_name.lower():
self.rag_performance_data[step_name] = {
'duration': elapsed,
'additional_info': additional_info,
'timestamp': timestamp.isoformat()
}
return elapsed
return 0
def add_rag_analysis(self, query: str, results_count: int, embedding_time: float,
weaviate_time: float, total_rag_time: float):
"""Add detailed RAG performance analysis."""
analysis = {
'query_length': len(query),
'query_words': len(query.split()),
'results_count': results_count,
'embedding_generation_time': round(embedding_time, 3),
'weaviate_query_time': round(weaviate_time, 3),
'total_rag_time': round(total_rag_time, 3),
'embedding_percentage': round((embedding_time / total_rag_time) * 100, 2) if total_rag_time > 0 else 0,
'weaviate_percentage': round((weaviate_time / total_rag_time) * 100, 2) if total_rag_time > 0 else 0,
}
self.rag_performance_data['detailed_analysis'] = analysis
log_entry = (f"RAG ANALYSIS: Query({len(query)} chars, {len(query.split())} words) -> "
f"Embedding: {embedding_time:.3f}s ({analysis['embedding_percentage']}%), "
f"Weaviate: {weaviate_time:.3f}s ({analysis['weaviate_percentage']}%), "
f"Results: {results_count}, Total: {total_rag_time:.3f}s")
self.step_logs.append(log_entry)
logger.info(log_entry)
def log_step(self, message: str, level: str = "info"):
"""Log a custom step message."""
timestamp = datetime.now()
log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] {message}"
self.step_logs.append(log_entry)
if level == "error":
logger.error(log_entry)
elif level == "warning":
logger.warning(log_entry)
else:
logger.info(log_entry)
def get_total_time(self):
"""Get total processing time."""
if self.total_start_time:
return round(time.time() - self.total_start_time, 3)
return 0
def get_step_logs(self):
"""Get all step logs as a formatted string."""
return "\n".join(self.step_logs)
def analyze_performance_bottlenecks(self):
"""Analyze and identify performance bottlenecks."""
if not self.timings:
return "No timing data available"
sorted_timings = sorted(self.timings.items(), key=lambda x: x[1], reverse=True)
total_time = sum(self.timings.values())
analysis = ["PERFORMANCE BOTTLENECK ANALYSIS:"]
analysis.append(f"Total Pipeline Time: {total_time:.3f}s")
analysis.append("-" * 50)
for step, duration in sorted_timings:
percentage = (duration / total_time) * 100 if total_time > 0 else 0
status = "πŸ”΄ SLOW" if percentage > 25 else "🟑 MODERATE" if percentage > 10 else "🟒 FAST"
analysis.append(f"{status} {step}: {duration:.3f}s ({percentage:.1f}%)")
# RAG-specific analysis
if self.rag_performance_data:
analysis.append("\nRAG DETAILED ANALYSIS:")
analysis.append("-" * 30)
if 'detailed_analysis' in self.rag_performance_data:
rag_data = self.rag_performance_data['detailed_analysis']
analysis.append(f"Query complexity: {rag_data['query_words']} words, {rag_data['query_length']} chars")
analysis.append(f"Embedding generation: {rag_data['embedding_generation_time']}s ({rag_data['embedding_percentage']}%)")
analysis.append(f"Weaviate query: {rag_data['weaviate_query_time']}s ({rag_data['weaviate_percentage']}%)")
analysis.append(f"Results retrieved: {rag_data['results_count']}")
# Performance recommendations
if rag_data['embedding_percentage'] > 50:
analysis.append("⚠️ ISSUE: Embedding generation is the bottleneck")
analysis.append("πŸ’‘ SUGGESTION: Consider caching embeddings or using faster embedding model")
if rag_data['weaviate_percentage'] > 60:
analysis.append("⚠️ ISSUE: Weaviate query is the bottleneck")
analysis.append("πŸ’‘ SUGGESTION: Check Weaviate connection, indexing, or reduce vector dimensions")
return "\n".join(analysis)
def reset(self):
"""Reset all timers and logs."""
self.timings.clear()
self.start_times.clear()
self.step_logs.clear()
self.rag_performance_data.clear()
self.total_start_time = None
def clean_html_response(html_text: str) -> str:
"""Clean HTML tags from response text."""
if not html_text:
return ""
# Remove HTML tags
clean_text = re.sub('<[^<]+?>', '', html_text)
# Clean up extra whitespace
clean_text = ' '.join(clean_text.split())
return clean_text.strip()
async def detailed_rag_analysis(query: str):
"""Perform detailed RAG analysis with timing breakdown."""
start_time = time.time()
# Step 1: Embedding generation timing
embedding_start = time.time()
query_embedding = encode_query(query)
embedding_time = time.time() - embedding_start
if not query_embedding:
return {
'results': [],
'embedding_time': embedding_time,
'weaviate_time': 0,
'total_time': time.time() - start_time,
'results_count': 0,
'error': 'Failed to generate embedding'
}
# Step 2: Weaviate query timing
weaviate_start = time.time()
try:
client = init_weaviate_client()
if client is None:
raise Exception("Weaviate client not initialized")
coll = client.collections.get("Books") # Assuming collection name from config
res = coll.query.near_vector(
near_vector=query_embedding,
limit=3
)
results = []
if getattr(res, "objects", None):
results = [obj.properties.get("text", "[No Text]") for obj in res.objects]
client.close()
weaviate_time = time.time() - weaviate_start
except Exception as e:
weaviate_time = time.time() - weaviate_start
return {
'results': [],
'embedding_time': embedding_time,
'weaviate_time': weaviate_time,
'total_time': time.time() - start_time,
'results_count': 0,
'error': str(e)
}
total_time = time.time() - start_time
return {
'results': results,
'embedding_time': embedding_time,
'weaviate_time': weaviate_time,
'total_time': total_time,
'results_count': len(results),
'error': None
}
def process_single_question_enhanced(question: str, question_index: int) -> Dict[str, Any]:
"""
Process a single question with enhanced logging and timing analysis.
Args:
question: The question to process
question_index: Index of the question for logging
Returns:
Dictionary containing detailed answer and timing information
"""
timer = DetailedTimingTracker()
results = {
'question': question,
'answer': '',
'clean_answer': '',
'status': 'success',
'error_message': '',
'total_time_seconds': 0,
'processing_steps': {},
'detailed_logs': '',
'performance_analysis': '',
'rag_analysis': {}
}
try:
logger.info(f"=" * 80)
logger.info(f"PROCESSING QUESTION {question_index + 1}")
logger.info(f"Question: {question}")
logger.info(f"=" * 80)
timer.log_step(f"Starting processing for question {question_index + 1}: '{question[:100]}...'")
# Start total timing
timer.start_timer('Total_Processing')
# Detailed RAG analysis before main processing
timer.start_timer('RAG_Analysis')
timer.log_step("Performing detailed RAG analysis...")
loop = asyncio.get_event_loop()
if loop.is_running():
rag_analysis = loop.run_until_complete(detailed_rag_analysis(question))
else:
rag_analysis = asyncio.run(detailed_rag_analysis(question))
timer.end_timer('RAG_Analysis',
f"Retrieved {rag_analysis['results_count']} results, "
f"Embedding: {rag_analysis['embedding_time']:.3f}s, "
f"Weaviate: {rag_analysis['weaviate_time']:.3f}s")
# Add RAG performance data to timer
timer.add_rag_analysis(
question,
rag_analysis['results_count'],
rag_analysis['embedding_time'],
rag_analysis['weaviate_time'],
rag_analysis['total_time']
)
# Store RAG analysis results
results['rag_analysis'] = rag_analysis
# Process the query using existing pipeline with detailed timing
timer.start_timer('Pipeline_Processing')
timer.log_step("Starting main pipeline processing...")
response_html = process_query(question, first_turn=True)
pipeline_time = timer.end_timer('Pipeline_Processing', f"Response length: {len(response_html)} chars")
# Clean the response
timer.start_timer('Response_Cleaning')
clean_response = clean_html_response(response_html)
cleaning_time = timer.end_timer('Response_Cleaning', f"Cleaned length: {len(clean_response)} chars")
# Store results
results['answer'] = response_html
results['clean_answer'] = clean_response
results['total_time_seconds'] = timer.get_total_time()
results['processing_steps'] = {
'pipeline_processing_seconds': pipeline_time,
'response_cleaning_seconds': cleaning_time,
'rag_analysis_seconds': timer.timings.get('RAG_Analysis', 0),
'embedding_generation_seconds': rag_analysis['embedding_time'],
'weaviate_query_seconds': rag_analysis['weaviate_time'],
**timer.timings
}
# Generate detailed logs and performance analysis
results['detailed_logs'] = timer.get_step_logs()
results['performance_analysis'] = timer.analyze_performance_bottlenecks()
timer.log_step(f"Question {question_index + 1} completed successfully in {results['total_time_seconds']:.3f}s")
logger.info(f"βœ… Question {question_index + 1} completed in {results['total_time_seconds']:.3f}s")
logger.info(f"Performance Analysis:\n{results['performance_analysis']}")
except Exception as e:
error_msg = f"Error processing question {question_index + 1}: {str(e)}"
timer.log_step(error_msg, "error")
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
results['status'] = 'error'
results['error_message'] = str(e)
results['total_time_seconds'] = timer.get_total_time()
results['answer'] = f"[ERROR] {str(e)}"
results['clean_answer'] = f"Error: {str(e)}"
results['detailed_logs'] = timer.get_step_logs()
results['performance_analysis'] = timer.analyze_performance_bottlenecks()
return results
def run_csv_test(input_csv_path: str, output_csv_path: str = None, question_column: str = 'question', max_samples: int = None) -> str:
# Validate input file
if not os.path.exists(input_csv_path):
raise FileNotFoundError(f"Input CSV file not found: {input_csv_path}")
# Generate output path if not provided
if output_csv_path is None:
base_name = os.path.splitext(input_csv_path)[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
sample_suffix = f"_{max_samples}samples" if max_samples else ""
output_csv_path = f"{base_name}_detailed_results{sample_suffix}_{timestamp}_{max_samples}.csv"
logger.info(f"πŸš€ STARTING ENHANCED CSV TEST RUN")
logger.info(f"Input file: {input_csv_path}")
logger.info(f"Output file: {output_csv_path}")
if max_samples:
logger.info(f"Max samples to process: {max_samples}")
logger.info(f"=" * 100)
try:
# Read input CSV
df = pd.read_csv(input_csv_path)
logger.info(f"πŸ“Š Loaded {len(df)} questions from CSV")
# Validate question column exists
if question_column not in df.columns:
raise ValueError(f"Column '{question_column}' not found. Available columns: {list(df.columns)}")
# Limit to specified number of samples
original_count = len(df)
if max_samples is not None and max_samples > 0:
if len(df) > max_samples:
df = df.head(max_samples)
logger.info(f"⚠️ Limited to first {max_samples} questions as specified (from {original_count} total)")
else:
logger.info(f"πŸ“‹ Processing all {len(df)} questions (max_samples={max_samples} not reached)")
else:
logger.info(f"πŸ“‹ Processing all {len(df)} questions (no sample limit specified)")
# Initialize enhanced result columns
result_columns = [
'answer', 'clean_answer', 'total_time_seconds', 'status', 'error_message',
'processed_timestamp', 'pipeline_processing_seconds', 'response_cleaning_seconds',
'rag_analysis_seconds', 'embedding_generation_seconds', 'weaviate_query_seconds',
'rag_results_count', 'rag_embedding_time', 'rag_weaviate_time', 'rag_total_time',
'detailed_logs', 'performance_analysis', 'bottleneck_step', 'performance_grade'
]
for col in result_columns:
df[col] = '' if col in ['answer', 'clean_answer', 'status', 'error_message',
'processed_timestamp', 'detailed_logs', 'performance_analysis',
'bottleneck_step', 'performance_grade'] else 0.0
# Process each question with enhanced analysis
total_start_time = time.time()
successful_questions = 0
performance_data = []
for index, row in df.iterrows():
question = str(row[question_column]).strip()
if not question or question.lower() == 'nan':
logger.warning(f"⏭️ Skipping empty question at row {index}")
df.at[index, 'status'] = 'skipped'
df.at[index, 'error_message'] = 'Empty question'
continue
logger.info(f"\nπŸ”„ Processing Question {index + 1}/{len(df)}")
# Process the question with enhanced analysis
result = process_single_question_enhanced(question, index)
# Update dataframe with enhanced results
df.at[index, 'answer'] = result['answer']
df.at[index, 'clean_answer'] = result['clean_answer']
df.at[index, 'total_time_seconds'] = result['total_time_seconds']
df.at[index, 'status'] = result['status']
df.at[index, 'error_message'] = result['error_message']
df.at[index, 'processed_timestamp'] = datetime.now().isoformat()
df.at[index, 'detailed_logs'] = result['detailed_logs']
df.at[index, 'performance_analysis'] = result['performance_analysis']
# Extract processing step timings
steps = result['processing_steps']
for step_key, time_value in steps.items():
if step_key in df.columns:
df.at[index, step_key] = time_value
# RAG-specific analysis
rag_analysis = result.get('rag_analysis', {})
df.at[index, 'rag_results_count'] = rag_analysis.get('results_count', 0)
df.at[index, 'rag_embedding_time'] = rag_analysis.get('embedding_time', 0)
df.at[index, 'rag_weaviate_time'] = rag_analysis.get('weaviate_time', 0)
df.at[index, 'rag_total_time'] = rag_analysis.get('total_time', 0)
# Performance grading and bottleneck identification
timings = result['processing_steps']
if timings:
slowest_step = max(timings.items(), key=lambda x: x[1])
df.at[index, 'bottleneck_step'] = slowest_step[0]
# Performance grading based on total time
total_time = result['total_time_seconds']
if total_time < 2:
grade = "A+ (Excellent)"
elif total_time < 4:
grade = "A (Good)"
elif total_time < 6:
grade = "B (Average)"
elif total_time < 10:
grade = "C (Slow)"
else:
grade = "D (Very Slow)"
df.at[index, 'performance_grade'] = grade
if result['status'] == 'success':
successful_questions += 1
performance_data.append(result['total_time_seconds'])
# Add delay between questions to avoid rate limiting
time.sleep(1)
# Calculate comprehensive summary statistics
total_test_time = time.time() - total_start_time
avg_time_per_question = df[df['status'] == 'success']['total_time_seconds'].mean()
median_time = df[df['status'] == 'success']['total_time_seconds'].median()
# RAG performance analysis across all questions
avg_rag_time = df[df['status'] == 'success']['rag_total_time'].mean()
avg_embedding_time = df[df['status'] == 'success']['rag_embedding_time'].mean()
avg_weaviate_time = df[df['status'] == 'success']['rag_weaviate_time'].mean()
logger.info(f"\n" + "=" * 100)
logger.info(f"🎯 ENHANCED TEST RUN COMPLETED!")
logger.info(f"=" * 100)
logger.info(f"πŸ“ˆ SUMMARY STATISTICS:")
logger.info(f" Total questions in dataset: {original_count}")
logger.info(f" Questions processed: {len(df)}")
logger.info(f" βœ… Successful: {successful_questions}")
logger.info(f" ❌ Failed: {len(df) - successful_questions}")
logger.info(f" πŸ• Total test time: {total_test_time:.2f} seconds")
logger.info(f" ⏱️ Average time per question: {avg_time_per_question:.3f} seconds")
logger.info(f" πŸ“Š Median time per question: {median_time:.3f} seconds")
logger.info(f"")
logger.info(f"πŸ” RAG PERFORMANCE ANALYSIS:")
logger.info(f" Average RAG total time: {avg_rag_time:.3f} seconds")
logger.info(f" Average embedding time: {avg_embedding_time:.3f} seconds ({(avg_embedding_time/avg_rag_time*100):.1f}%)")
logger.info(f" Average Weaviate time: {avg_weaviate_time:.3f} seconds ({(avg_weaviate_time/avg_rag_time*100):.1f}%)")
# Performance bottleneck analysis
bottleneck_counts = df['bottleneck_step'].value_counts()
logger.info(f"")
logger.info(f"🚨 BOTTLENECK ANALYSIS:")
for step, count in bottleneck_counts.items():
logger.info(f" {step}: {count} times ({count/len(df)*100:.1f}%)")
# Save results to CSV
df.to_csv(output_csv_path, index=False)
logger.info(f"πŸ’Ύ Results saved to: {output_csv_path}")
# Print summary to console
print("\n" + "="*100)
print("πŸ€– ENHANCED CSV TEST RESULTS SUMMARY")
print("="*100)
print(f"πŸ“ Input file: {input_csv_path}")
print(f"πŸ“Š Output file: {output_csv_path}")
print(f"πŸ“‹ Total questions in dataset: {original_count}")
print(f"πŸ“‹ Questions processed: {len(df)}")
print(f"βœ… Successful: {successful_questions}")
print(f"❌ Failed: {len(df) - successful_questions}")
print(f"πŸ• Total time: {total_test_time:.2f} seconds")
print(f"⏱️ Average time per question: {avg_time_per_question:.3f} seconds")
print(f"πŸ” RAG avg time: {avg_rag_time:.3f}s (Embedding: {avg_embedding_time:.3f}s, Weaviate: {avg_weaviate_time:.3f}s)")
print("="*100)
return output_csv_path
except Exception as e:
error_msg = f"❌ Error during enhanced CSV test run: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
raise
def create_sample_csv(file_path: str = "sample_questions.csv", num_samples: int = 5):
"""Create a sample CSV file with autism-related questions for testing."""
base_questions = [
"Can you tell me more about autism?",
"What are the early signs of autism in children?",
"How can I help my autistic child with social skills?",
"What are sensory processing issues in autism?",
"What educational strategies work best for autistic students?",
"How do I support an autistic family member?",
"What are common myths about autism?",
"How does autism affect communication?",
"What therapies are available for autism?",
"How can schools better support autistic students?",
"What are the strengths often associated with autism?",
"How do I explain autism to other children?",
"What workplace accommodations help autistic employees?",
"How does autism present differently in girls vs boys?",
"What should I know about autism diagnosis?"
]
# Select the specified number of questions (cycle through if needed)
if num_samples <= len(base_questions):
sample_questions = base_questions[:num_samples]
else:
# If more samples requested than available, repeat questions
sample_questions = []
for i in range(num_samples):
sample_questions.append(base_questions[i % len(base_questions)])
df = pd.DataFrame({'question': sample_questions})
df.to_csv(file_path, index=False)
print(f"βœ… Sample CSV created with {num_samples} questions: {file_path}")
return file_path
def main():
"""Main function to run the enhanced CSV test."""
import argparse
parser = argparse.ArgumentParser(
description="Run enhanced tests on questions from CSV file with detailed analysis",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python script.py questions.csv --samples 10
python script.py questions.csv --samples 50 --output results.csv
python script.py --create-sample --samples 20
python script.py questions.csv --column "user_question" --samples 5
"""
)
parser.add_argument("input_csv", nargs='?', help="Path to input CSV file with questions")
parser.add_argument("--output", "-o", help="Path to output CSV file")
parser.add_argument("--column", "-c", default="question", help="Name of question column (default: 'question')")
parser.add_argument("--samples", "-s", type=int, default=None,
help="Maximum number of samples to process (default: process all)")
parser.add_argument("--create-sample", action="store_true", help="Create a sample CSV file for testing")
parser.add_argument("--sample-size", type=int, default=5,
help="Number of questions to include in sample CSV (default: 5, max: 15)")
args = parser.parse_args()
try:
if args.create_sample:
# Limit sample size to reasonable range
sample_size = max(1, min(args.sample_size, 15))
sample_file = create_sample_csv(num_samples=sample_size)
print(f"βœ… Sample CSV created with {sample_size} questions: {sample_file}")
if not args.input_csv:
args.input_csv = sample_file
if not args.input_csv:
print("❌ Error: Please provide an input CSV file or use --create-sample")
parser.print_help()
return
# Display configuration
print(f"\nπŸ”§ CONFIGURATION:")
print(f" Input file: {args.input_csv}")
print(f" Question column: {args.column}")
print(f" Max samples: {args.samples if args.samples else 'No limit (process all)'}")
if args.output:
print(f" Output file: {args.output}")
print(f" {'='*50}")
# Run the enhanced test
output_file = run_csv_test(
input_csv_path=args.input_csv,
output_csv_path=args.output,
question_column=args.column,
max_samples=args.samples
)
print(f"\nπŸŽ‰ Enhanced test completed successfully!")
print(f"πŸ“Š Detailed results saved to: {output_file}")
except KeyboardInterrupt:
print(f"\n⏹️ Test interrupted by user")
logger.info("Test run interrupted by user")
except Exception as e:
print(f"❌ Error: {e}")
logger.error(f"Main execution error: {e}")
if __name__ == "__main__":
main()