import os import sys import time import pandas as pd import re from datetime import datetime from dotenv import load_dotenv from typing import Dict, Any, Tuple import traceback import asyncio # Setup paths and environment sys.path.append(os.path.dirname(os.path.abspath(__file__))) load_dotenv() # Import your existing modules from utils import process_query from logger.custom_logger import CustomLoggerTracker from rag_utils import rag_autism, encode_query from clients import init_weaviate_client # Initialize logger custom_log = CustomLoggerTracker() logger = custom_log.get_logger("csv_test") class DetailedTimingTracker: """Enhanced timing tracker with detailed pipeline step analysis.""" def __init__(self): self.timings = {} self.start_times = {} self.total_start_time = None self.step_logs = [] self.rag_performance_data = {} def start_timer(self, step_name: str): """Start timing a specific step.""" timestamp = datetime.now() self.start_times[step_name] = time.time() if self.total_start_time is None: self.total_start_time = time.time() log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] STARTED: {step_name}" self.step_logs.append(log_entry) logger.debug(log_entry) def end_timer(self, step_name: str, additional_info: str = ""): """End timing a specific step with optional additional information.""" if step_name in self.start_times: elapsed = time.time() - self.start_times[step_name] self.timings[step_name] = round(elapsed, 3) timestamp = datetime.now() log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] COMPLETED: {step_name} - {elapsed:.3f}s" if additional_info: log_entry += f" | {additional_info}" self.step_logs.append(log_entry) logger.debug(log_entry) # Special handling for RAG-related steps if "rag" in step_name.lower() or "retrieval" in step_name.lower(): self.rag_performance_data[step_name] = { 'duration': elapsed, 'additional_info': additional_info, 'timestamp': timestamp.isoformat() } return elapsed return 0 def add_rag_analysis(self, query: str, results_count: int, embedding_time: float, weaviate_time: float, total_rag_time: float): """Add detailed RAG performance analysis.""" analysis = { 'query_length': len(query), 'query_words': len(query.split()), 'results_count': results_count, 'embedding_generation_time': round(embedding_time, 3), 'weaviate_query_time': round(weaviate_time, 3), 'total_rag_time': round(total_rag_time, 3), 'embedding_percentage': round((embedding_time / total_rag_time) * 100, 2) if total_rag_time > 0 else 0, 'weaviate_percentage': round((weaviate_time / total_rag_time) * 100, 2) if total_rag_time > 0 else 0, } self.rag_performance_data['detailed_analysis'] = analysis log_entry = (f"RAG ANALYSIS: Query({len(query)} chars, {len(query.split())} words) -> " f"Embedding: {embedding_time:.3f}s ({analysis['embedding_percentage']}%), " f"Weaviate: {weaviate_time:.3f}s ({analysis['weaviate_percentage']}%), " f"Results: {results_count}, Total: {total_rag_time:.3f}s") self.step_logs.append(log_entry) logger.info(log_entry) def log_step(self, message: str, level: str = "info"): """Log a custom step message.""" timestamp = datetime.now() log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] {message}" self.step_logs.append(log_entry) if level == "error": logger.error(log_entry) elif level == "warning": logger.warning(log_entry) else: logger.info(log_entry) def get_total_time(self): """Get total processing time.""" if self.total_start_time: return round(time.time() - self.total_start_time, 3) return 0 def get_step_logs(self): """Get all step logs as a formatted string.""" return "\n".join(self.step_logs) def analyze_performance_bottlenecks(self): """Analyze and identify performance bottlenecks.""" if not self.timings: return "No timing data available" sorted_timings = sorted(self.timings.items(), key=lambda x: x[1], reverse=True) total_time = sum(self.timings.values()) analysis = ["PERFORMANCE BOTTLENECK ANALYSIS:"] analysis.append(f"Total Pipeline Time: {total_time:.3f}s") analysis.append("-" * 50) for step, duration in sorted_timings: percentage = (duration / total_time) * 100 if total_time > 0 else 0 status = "šŸ”“ SLOW" if percentage > 25 else "🟔 MODERATE" if percentage > 10 else "🟢 FAST" analysis.append(f"{status} {step}: {duration:.3f}s ({percentage:.1f}%)") # RAG-specific analysis if self.rag_performance_data: analysis.append("\nRAG DETAILED ANALYSIS:") analysis.append("-" * 30) if 'detailed_analysis' in self.rag_performance_data: rag_data = self.rag_performance_data['detailed_analysis'] analysis.append(f"Query complexity: {rag_data['query_words']} words, {rag_data['query_length']} chars") analysis.append(f"Embedding generation: {rag_data['embedding_generation_time']}s ({rag_data['embedding_percentage']}%)") analysis.append(f"Weaviate query: {rag_data['weaviate_query_time']}s ({rag_data['weaviate_percentage']}%)") analysis.append(f"Results retrieved: {rag_data['results_count']}") # Performance recommendations if rag_data['embedding_percentage'] > 50: analysis.append("āš ļø ISSUE: Embedding generation is the bottleneck") analysis.append("šŸ’” SUGGESTION: Consider caching embeddings or using faster embedding model") if rag_data['weaviate_percentage'] > 60: analysis.append("āš ļø ISSUE: Weaviate query is the bottleneck") analysis.append("šŸ’” SUGGESTION: Check Weaviate connection, indexing, or reduce vector dimensions") return "\n".join(analysis) def reset(self): """Reset all timers and logs.""" self.timings.clear() self.start_times.clear() self.step_logs.clear() self.rag_performance_data.clear() self.total_start_time = None def clean_html_response(html_text: str) -> str: """Clean HTML tags from response text.""" if not html_text: return "" # Remove HTML tags clean_text = re.sub('<[^<]+?>', '', html_text) # Clean up extra whitespace clean_text = ' '.join(clean_text.split()) return clean_text.strip() async def detailed_rag_analysis(query: str): """Perform detailed RAG analysis with timing breakdown.""" start_time = time.time() # Step 1: Embedding generation timing embedding_start = time.time() query_embedding = encode_query(query) embedding_time = time.time() - embedding_start if not query_embedding: return { 'results': [], 'embedding_time': embedding_time, 'weaviate_time': 0, 'total_time': time.time() - start_time, 'results_count': 0, 'error': 'Failed to generate embedding' } # Step 2: Weaviate query timing weaviate_start = time.time() try: client = init_weaviate_client() if client is None: raise Exception("Weaviate client not initialized") coll = client.collections.get("Books") # Assuming collection name from config res = coll.query.near_vector( near_vector=query_embedding, limit=3 ) results = [] if getattr(res, "objects", None): results = [obj.properties.get("text", "[No Text]") for obj in res.objects] client.close() weaviate_time = time.time() - weaviate_start except Exception as e: weaviate_time = time.time() - weaviate_start return { 'results': [], 'embedding_time': embedding_time, 'weaviate_time': weaviate_time, 'total_time': time.time() - start_time, 'results_count': 0, 'error': str(e) } total_time = time.time() - start_time return { 'results': results, 'embedding_time': embedding_time, 'weaviate_time': weaviate_time, 'total_time': total_time, 'results_count': len(results), 'error': None } def process_single_question_enhanced(question: str, question_index: int) -> Dict[str, Any]: """ Process a single question with enhanced logging and timing analysis. Args: question: The question to process question_index: Index of the question for logging Returns: Dictionary containing detailed answer and timing information """ timer = DetailedTimingTracker() results = { 'question': question, 'answer': '', 'clean_answer': '', 'status': 'success', 'error_message': '', 'total_time_seconds': 0, 'processing_steps': {}, 'detailed_logs': '', 'performance_analysis': '', 'rag_analysis': {} } try: logger.info(f"=" * 80) logger.info(f"PROCESSING QUESTION {question_index + 1}") logger.info(f"Question: {question}") logger.info(f"=" * 80) timer.log_step(f"Starting processing for question {question_index + 1}: '{question[:100]}...'") # Start total timing timer.start_timer('Total_Processing') # Detailed RAG analysis before main processing timer.start_timer('RAG_Analysis') timer.log_step("Performing detailed RAG analysis...") loop = asyncio.get_event_loop() if loop.is_running(): rag_analysis = loop.run_until_complete(detailed_rag_analysis(question)) else: rag_analysis = asyncio.run(detailed_rag_analysis(question)) timer.end_timer('RAG_Analysis', f"Retrieved {rag_analysis['results_count']} results, " f"Embedding: {rag_analysis['embedding_time']:.3f}s, " f"Weaviate: {rag_analysis['weaviate_time']:.3f}s") # Add RAG performance data to timer timer.add_rag_analysis( question, rag_analysis['results_count'], rag_analysis['embedding_time'], rag_analysis['weaviate_time'], rag_analysis['total_time'] ) # Store RAG analysis results results['rag_analysis'] = rag_analysis # Process the query using existing pipeline with detailed timing timer.start_timer('Pipeline_Processing') timer.log_step("Starting main pipeline processing...") response_html = process_query(question, first_turn=True) pipeline_time = timer.end_timer('Pipeline_Processing', f"Response length: {len(response_html)} chars") # Clean the response timer.start_timer('Response_Cleaning') clean_response = clean_html_response(response_html) cleaning_time = timer.end_timer('Response_Cleaning', f"Cleaned length: {len(clean_response)} chars") # Store results results['answer'] = response_html results['clean_answer'] = clean_response results['total_time_seconds'] = timer.get_total_time() results['processing_steps'] = { 'pipeline_processing_seconds': pipeline_time, 'response_cleaning_seconds': cleaning_time, 'rag_analysis_seconds': timer.timings.get('RAG_Analysis', 0), 'embedding_generation_seconds': rag_analysis['embedding_time'], 'weaviate_query_seconds': rag_analysis['weaviate_time'], **timer.timings } # Generate detailed logs and performance analysis results['detailed_logs'] = timer.get_step_logs() results['performance_analysis'] = timer.analyze_performance_bottlenecks() timer.log_step(f"Question {question_index + 1} completed successfully in {results['total_time_seconds']:.3f}s") logger.info(f"āœ… Question {question_index + 1} completed in {results['total_time_seconds']:.3f}s") logger.info(f"Performance Analysis:\n{results['performance_analysis']}") except Exception as e: error_msg = f"Error processing question {question_index + 1}: {str(e)}" timer.log_step(error_msg, "error") logger.error(error_msg) logger.error(f"Traceback: {traceback.format_exc()}") results['status'] = 'error' results['error_message'] = str(e) results['total_time_seconds'] = timer.get_total_time() results['answer'] = f"[ERROR] {str(e)}" results['clean_answer'] = f"Error: {str(e)}" results['detailed_logs'] = timer.get_step_logs() results['performance_analysis'] = timer.analyze_performance_bottlenecks() return results def run_csv_test(input_csv_path: str, output_csv_path: str = None, question_column: str = 'question', max_samples: int = None) -> str: # Validate input file if not os.path.exists(input_csv_path): raise FileNotFoundError(f"Input CSV file not found: {input_csv_path}") # Generate output path if not provided if output_csv_path is None: base_name = os.path.splitext(input_csv_path)[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") sample_suffix = f"_{max_samples}samples" if max_samples else "" output_csv_path = f"{base_name}_detailed_results{sample_suffix}_{timestamp}_{max_samples}.csv" logger.info(f"šŸš€ STARTING ENHANCED CSV TEST RUN") logger.info(f"Input file: {input_csv_path}") logger.info(f"Output file: {output_csv_path}") if max_samples: logger.info(f"Max samples to process: {max_samples}") logger.info(f"=" * 100) try: # Read input CSV df = pd.read_csv(input_csv_path) logger.info(f"šŸ“Š Loaded {len(df)} questions from CSV") # Validate question column exists if question_column not in df.columns: raise ValueError(f"Column '{question_column}' not found. Available columns: {list(df.columns)}") # Limit to specified number of samples original_count = len(df) if max_samples is not None and max_samples > 0: if len(df) > max_samples: df = df.head(max_samples) logger.info(f"āš ļø Limited to first {max_samples} questions as specified (from {original_count} total)") else: logger.info(f"šŸ“‹ Processing all {len(df)} questions (max_samples={max_samples} not reached)") else: logger.info(f"šŸ“‹ Processing all {len(df)} questions (no sample limit specified)") # Initialize enhanced result columns result_columns = [ 'answer', 'clean_answer', 'total_time_seconds', 'status', 'error_message', 'processed_timestamp', 'pipeline_processing_seconds', 'response_cleaning_seconds', 'rag_analysis_seconds', 'embedding_generation_seconds', 'weaviate_query_seconds', 'rag_results_count', 'rag_embedding_time', 'rag_weaviate_time', 'rag_total_time', 'detailed_logs', 'performance_analysis', 'bottleneck_step', 'performance_grade' ] for col in result_columns: df[col] = '' if col in ['answer', 'clean_answer', 'status', 'error_message', 'processed_timestamp', 'detailed_logs', 'performance_analysis', 'bottleneck_step', 'performance_grade'] else 0.0 # Process each question with enhanced analysis total_start_time = time.time() successful_questions = 0 performance_data = [] for index, row in df.iterrows(): question = str(row[question_column]).strip() if not question or question.lower() == 'nan': logger.warning(f"ā­ļø Skipping empty question at row {index}") df.at[index, 'status'] = 'skipped' df.at[index, 'error_message'] = 'Empty question' continue logger.info(f"\nšŸ”„ Processing Question {index + 1}/{len(df)}") # Process the question with enhanced analysis result = process_single_question_enhanced(question, index) # Update dataframe with enhanced results df.at[index, 'answer'] = result['answer'] df.at[index, 'clean_answer'] = result['clean_answer'] df.at[index, 'total_time_seconds'] = result['total_time_seconds'] df.at[index, 'status'] = result['status'] df.at[index, 'error_message'] = result['error_message'] df.at[index, 'processed_timestamp'] = datetime.now().isoformat() df.at[index, 'detailed_logs'] = result['detailed_logs'] df.at[index, 'performance_analysis'] = result['performance_analysis'] # Extract processing step timings steps = result['processing_steps'] for step_key, time_value in steps.items(): if step_key in df.columns: df.at[index, step_key] = time_value # RAG-specific analysis rag_analysis = result.get('rag_analysis', {}) df.at[index, 'rag_results_count'] = rag_analysis.get('results_count', 0) df.at[index, 'rag_embedding_time'] = rag_analysis.get('embedding_time', 0) df.at[index, 'rag_weaviate_time'] = rag_analysis.get('weaviate_time', 0) df.at[index, 'rag_total_time'] = rag_analysis.get('total_time', 0) # Performance grading and bottleneck identification timings = result['processing_steps'] if timings: slowest_step = max(timings.items(), key=lambda x: x[1]) df.at[index, 'bottleneck_step'] = slowest_step[0] # Performance grading based on total time total_time = result['total_time_seconds'] if total_time < 2: grade = "A+ (Excellent)" elif total_time < 4: grade = "A (Good)" elif total_time < 6: grade = "B (Average)" elif total_time < 10: grade = "C (Slow)" else: grade = "D (Very Slow)" df.at[index, 'performance_grade'] = grade if result['status'] == 'success': successful_questions += 1 performance_data.append(result['total_time_seconds']) # Add delay between questions to avoid rate limiting time.sleep(1) # Calculate comprehensive summary statistics total_test_time = time.time() - total_start_time avg_time_per_question = df[df['status'] == 'success']['total_time_seconds'].mean() median_time = df[df['status'] == 'success']['total_time_seconds'].median() # RAG performance analysis across all questions avg_rag_time = df[df['status'] == 'success']['rag_total_time'].mean() avg_embedding_time = df[df['status'] == 'success']['rag_embedding_time'].mean() avg_weaviate_time = df[df['status'] == 'success']['rag_weaviate_time'].mean() logger.info(f"\n" + "=" * 100) logger.info(f"šŸŽÆ ENHANCED TEST RUN COMPLETED!") logger.info(f"=" * 100) logger.info(f"šŸ“ˆ SUMMARY STATISTICS:") logger.info(f" Total questions in dataset: {original_count}") logger.info(f" Questions processed: {len(df)}") logger.info(f" āœ… Successful: {successful_questions}") logger.info(f" āŒ Failed: {len(df) - successful_questions}") logger.info(f" šŸ• Total test time: {total_test_time:.2f} seconds") logger.info(f" ā±ļø Average time per question: {avg_time_per_question:.3f} seconds") logger.info(f" šŸ“Š Median time per question: {median_time:.3f} seconds") logger.info(f"") logger.info(f"šŸ” RAG PERFORMANCE ANALYSIS:") logger.info(f" Average RAG total time: {avg_rag_time:.3f} seconds") logger.info(f" Average embedding time: {avg_embedding_time:.3f} seconds ({(avg_embedding_time/avg_rag_time*100):.1f}%)") logger.info(f" Average Weaviate time: {avg_weaviate_time:.3f} seconds ({(avg_weaviate_time/avg_rag_time*100):.1f}%)") # Performance bottleneck analysis bottleneck_counts = df['bottleneck_step'].value_counts() logger.info(f"") logger.info(f"🚨 BOTTLENECK ANALYSIS:") for step, count in bottleneck_counts.items(): logger.info(f" {step}: {count} times ({count/len(df)*100:.1f}%)") # Save results to CSV df.to_csv(output_csv_path, index=False) logger.info(f"šŸ’¾ Results saved to: {output_csv_path}") # Print summary to console print("\n" + "="*100) print("šŸ¤– ENHANCED CSV TEST RESULTS SUMMARY") print("="*100) print(f"šŸ“ Input file: {input_csv_path}") print(f"šŸ“Š Output file: {output_csv_path}") print(f"šŸ“‹ Total questions in dataset: {original_count}") print(f"šŸ“‹ Questions processed: {len(df)}") print(f"āœ… Successful: {successful_questions}") print(f"āŒ Failed: {len(df) - successful_questions}") print(f"šŸ• Total time: {total_test_time:.2f} seconds") print(f"ā±ļø Average time per question: {avg_time_per_question:.3f} seconds") print(f"šŸ” RAG avg time: {avg_rag_time:.3f}s (Embedding: {avg_embedding_time:.3f}s, Weaviate: {avg_weaviate_time:.3f}s)") print("="*100) return output_csv_path except Exception as e: error_msg = f"āŒ Error during enhanced CSV test run: {str(e)}" logger.error(error_msg) logger.error(f"Traceback: {traceback.format_exc()}") raise def create_sample_csv(file_path: str = "sample_questions.csv", num_samples: int = 5): """Create a sample CSV file with autism-related questions for testing.""" base_questions = [ "Can you tell me more about autism?", "What are the early signs of autism in children?", "How can I help my autistic child with social skills?", "What are sensory processing issues in autism?", "What educational strategies work best for autistic students?", "How do I support an autistic family member?", "What are common myths about autism?", "How does autism affect communication?", "What therapies are available for autism?", "How can schools better support autistic students?", "What are the strengths often associated with autism?", "How do I explain autism to other children?", "What workplace accommodations help autistic employees?", "How does autism present differently in girls vs boys?", "What should I know about autism diagnosis?" ] # Select the specified number of questions (cycle through if needed) if num_samples <= len(base_questions): sample_questions = base_questions[:num_samples] else: # If more samples requested than available, repeat questions sample_questions = [] for i in range(num_samples): sample_questions.append(base_questions[i % len(base_questions)]) df = pd.DataFrame({'question': sample_questions}) df.to_csv(file_path, index=False) print(f"āœ… Sample CSV created with {num_samples} questions: {file_path}") return file_path def main(): """Main function to run the enhanced CSV test.""" import argparse parser = argparse.ArgumentParser( description="Run enhanced tests on questions from CSV file with detailed analysis", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python script.py questions.csv --samples 10 python script.py questions.csv --samples 50 --output results.csv python script.py --create-sample --samples 20 python script.py questions.csv --column "user_question" --samples 5 """ ) parser.add_argument("input_csv", nargs='?', help="Path to input CSV file with questions") parser.add_argument("--output", "-o", help="Path to output CSV file") parser.add_argument("--column", "-c", default="question", help="Name of question column (default: 'question')") parser.add_argument("--samples", "-s", type=int, default=None, help="Maximum number of samples to process (default: process all)") parser.add_argument("--create-sample", action="store_true", help="Create a sample CSV file for testing") parser.add_argument("--sample-size", type=int, default=5, help="Number of questions to include in sample CSV (default: 5, max: 15)") args = parser.parse_args() try: if args.create_sample: # Limit sample size to reasonable range sample_size = max(1, min(args.sample_size, 15)) sample_file = create_sample_csv(num_samples=sample_size) print(f"āœ… Sample CSV created with {sample_size} questions: {sample_file}") if not args.input_csv: args.input_csv = sample_file if not args.input_csv: print("āŒ Error: Please provide an input CSV file or use --create-sample") parser.print_help() return # Display configuration print(f"\nšŸ”§ CONFIGURATION:") print(f" Input file: {args.input_csv}") print(f" Question column: {args.column}") print(f" Max samples: {args.samples if args.samples else 'No limit (process all)'}") if args.output: print(f" Output file: {args.output}") print(f" {'='*50}") # Run the enhanced test output_file = run_csv_test( input_csv_path=args.input_csv, output_csv_path=args.output, question_column=args.column, max_samples=args.samples ) print(f"\nšŸŽ‰ Enhanced test completed successfully!") print(f"šŸ“Š Detailed results saved to: {output_file}") except KeyboardInterrupt: print(f"\nā¹ļø Test interrupted by user") logger.info("Test run interrupted by user") except Exception as e: print(f"āŒ Error: {e}") logger.error(f"Main execution error: {e}") if __name__ == "__main__": main()