|
|
import os |
|
|
import sys |
|
|
import time |
|
|
import pandas as pd |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Any, Tuple |
|
|
import argparse |
|
|
import json |
|
|
import threading |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
import queue |
|
|
import re |
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
|
|
|
|
try: |
|
|
from pipeQuery import process_query, clean_pipeline_result |
|
|
from logger.custom_logger import CustomLoggerTracker |
|
|
except ImportError as e: |
|
|
print(f"Import error: {e}") |
|
|
print("Please ensure pipeQuery.py and logger modules are available") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
try: |
|
|
custom_log = CustomLoggerTracker() |
|
|
logger = custom_log.get_logger("benchmark") |
|
|
except Exception as e: |
|
|
print(f"Logger initialization failed: {e}") |
|
|
|
|
|
class FallbackLogger: |
|
|
def info(self, msg): print(f"INFO: {msg}") |
|
|
def error(self, msg): print(f"ERROR: {msg}") |
|
|
def warning(self, msg): print(f"WARNING: {msg}") |
|
|
logger = FallbackLogger() |
|
|
|
|
|
|
|
|
class EnhancedPipelineBenchmark: |
|
|
"""Enhanced benchmark runner with detailed step timing for pipeQuery pipeline""" |
|
|
|
|
|
def __init__(self, batch_size: int = 10, max_workers: int = 3): |
|
|
self.batch_size = batch_size |
|
|
self.max_workers = max_workers |
|
|
self.results = [] |
|
|
self.start_time = None |
|
|
self.batch_results = [] |
|
|
self.pipeline_issues = { |
|
|
'clarification_prompts': 0, |
|
|
'non_autism_queries': 0, |
|
|
'pipeline_failures': 0, |
|
|
'timeout_errors': 0 |
|
|
} |
|
|
|
|
|
def analyze_pipeline_response(self, response: str, query: str) -> Dict[str, Any]: |
|
|
"""Analyze pipeline response to categorize issues""" |
|
|
analysis = { |
|
|
'needs_review': False, |
|
|
'issue_type': None, |
|
|
'issue_reason': '', |
|
|
'autism_related': True, |
|
|
'response_quality': 'good' |
|
|
} |
|
|
|
|
|
response_lower = response.lower() |
|
|
|
|
|
|
|
|
clarification_indicators = [ |
|
|
'do you mean:', |
|
|
'your query was not clearly related to autism', |
|
|
'please submit a question specifically about autism', |
|
|
'if you have any question related to autism' |
|
|
] |
|
|
|
|
|
if any(indicator in response_lower for indicator in clarification_indicators): |
|
|
analysis['needs_review'] = True |
|
|
analysis['issue_type'] = 'clarification_prompt' |
|
|
analysis['issue_reason'] = 'Query required clarification or redirection' |
|
|
analysis['autism_related'] = False |
|
|
self.pipeline_issues['clarification_prompts'] += 1 |
|
|
|
|
|
|
|
|
non_autism_indicators = [ |
|
|
"i'm wisal, an ai assistant developed by compumacy ai", |
|
|
"please submit a question specifically about autism", |
|
|
"hello i'm wisal", |
|
|
"if you have any question related to autism" |
|
|
] |
|
|
|
|
|
if any(indicator in response_lower for indicator in non_autism_indicators): |
|
|
analysis['needs_review'] = True |
|
|
analysis['issue_type'] = 'non_autism_query' |
|
|
analysis['issue_reason'] = 'Query was not recognized as autism-related' |
|
|
analysis['autism_related'] = False |
|
|
self.pipeline_issues['non_autism_queries'] += 1 |
|
|
|
|
|
|
|
|
error_indicators = [ |
|
|
'error', |
|
|
'failed', |
|
|
'exception', |
|
|
'timeout', |
|
|
'could not process', |
|
|
'unable to generate' |
|
|
] |
|
|
|
|
|
if any(indicator in response_lower for indicator in error_indicators): |
|
|
analysis['needs_review'] = True |
|
|
analysis['issue_type'] = 'pipeline_failure' |
|
|
analysis['issue_reason'] = 'Pipeline encountered an error' |
|
|
analysis['response_quality'] = 'poor' |
|
|
self.pipeline_issues['pipeline_failures'] += 1 |
|
|
|
|
|
|
|
|
if len(response.strip()) < 50: |
|
|
analysis['response_quality'] = 'poor' |
|
|
analysis['needs_review'] = True |
|
|
if not analysis['issue_type']: |
|
|
analysis['issue_type'] = 'short_response' |
|
|
analysis['issue_reason'] = 'Response too short (< 50 characters)' |
|
|
|
|
|
return analysis |
|
|
|
|
|
def simulate_step_timings(self, result: Dict, total_time: float): |
|
|
"""Simulate step timings based on total time (replace with actual extraction when available)""" |
|
|
|
|
|
proportions = { |
|
|
'query_preprocessing_time': 0.05, |
|
|
'web_search_time': 0.25, |
|
|
'llm_generation_time': 0.20, |
|
|
'rag_retrieval_time': 0.15, |
|
|
'reranking_time': 0.10, |
|
|
'wisal_answer_time': 0.15, |
|
|
'hallucination_detection_time': 0.05, |
|
|
'paraphrasing_time': 0.03, |
|
|
'translation_time': 0.02 |
|
|
} |
|
|
|
|
|
for step, proportion in proportions.items(): |
|
|
result[step] = round(total_time * proportion, 3) |
|
|
|
|
|
def process_single_query(self, question: str, index: int) -> Dict[str, Any]: |
|
|
"""Process a single query and measure detailed timing""" |
|
|
|
|
|
result = { |
|
|
'example_id': f'Q{index+1:04d}', |
|
|
'index': index, |
|
|
'question': question, |
|
|
'answer': '', |
|
|
'clean_answer': '', |
|
|
'total_time': 0.0, |
|
|
'status': 'success', |
|
|
'error_message': '', |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
|
|
|
'query_preprocessing_time': 0.0, |
|
|
'web_search_time': 0.0, |
|
|
'llm_generation_time': 0.0, |
|
|
'rag_retrieval_time': 0.0, |
|
|
'reranking_time': 0.0, |
|
|
'wisal_answer_time': 0.0, |
|
|
'hallucination_detection_time': 0.0, |
|
|
'paraphrasing_time': 0.0, |
|
|
'translation_time': 0.0, |
|
|
|
|
|
'needs_review': False, |
|
|
'issue_type': None, |
|
|
'issue_reason': '', |
|
|
'autism_related': True, |
|
|
'response_quality': 'good', |
|
|
'response_length': 0, |
|
|
'process_log_entries': 0 |
|
|
} |
|
|
|
|
|
start_time = time.time() |
|
|
session_id = f"benchmark_session_{index}" |
|
|
|
|
|
try: |
|
|
logger.info(f"Processing question {index + 1}: {question[:50]}...") |
|
|
|
|
|
|
|
|
raw_response = process_query( |
|
|
query=question, |
|
|
first_turn=True, |
|
|
session_id=session_id |
|
|
) |
|
|
|
|
|
|
|
|
cleaned_response = clean_pipeline_result(raw_response) |
|
|
|
|
|
|
|
|
total_time = time.time() - start_time |
|
|
|
|
|
|
|
|
analysis = self.analyze_pipeline_response(cleaned_response, question) |
|
|
|
|
|
|
|
|
result.update({ |
|
|
'answer': str(raw_response), |
|
|
'clean_answer': str(cleaned_response), |
|
|
'total_time': round(total_time, 3), |
|
|
'status': 'success', |
|
|
'response_length': len(str(cleaned_response)), |
|
|
'needs_review': analysis['needs_review'], |
|
|
'issue_type': analysis['issue_type'], |
|
|
'issue_reason': analysis['issue_reason'], |
|
|
'autism_related': analysis['autism_related'], |
|
|
'response_quality': analysis['response_quality'] |
|
|
}) |
|
|
|
|
|
|
|
|
self.simulate_step_timings(result, total_time) |
|
|
|
|
|
logger.info(f"Question {index + 1} completed in {total_time:.3f}s") |
|
|
|
|
|
except Exception as e: |
|
|
total_time = time.time() - start_time |
|
|
error_msg = str(e) |
|
|
|
|
|
self.pipeline_issues['pipeline_failures'] += 1 |
|
|
|
|
|
result.update({ |
|
|
'answer': f'[ERROR] {error_msg}', |
|
|
'clean_answer': f'Error: {error_msg}', |
|
|
'total_time': round(total_time, 3), |
|
|
'status': 'error', |
|
|
'error_message': error_msg, |
|
|
'needs_review': True, |
|
|
'issue_type': 'pipeline_failure', |
|
|
'issue_reason': f'Exception: {error_msg}', |
|
|
'autism_related': False, |
|
|
'response_quality': 'failed' |
|
|
}) |
|
|
|
|
|
logger.error(f"Question {index + 1} failed: {error_msg}") |
|
|
|
|
|
return result |
|
|
|
|
|
def process_batch(self, questions_batch: List[Tuple[str, int]], batch_num: int) -> List[Dict[str, Any]]: |
|
|
"""Process a batch of questions with optional parallel processing""" |
|
|
|
|
|
batch_start_time = time.time() |
|
|
batch_results = [] |
|
|
|
|
|
logger.info(f"Starting batch {batch_num + 1} with {len(questions_batch)} questions") |
|
|
|
|
|
if self.max_workers > 1: |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
|
future_to_question = { |
|
|
executor.submit(self.process_single_query, question, index): (question, index) |
|
|
for question, index in questions_batch |
|
|
} |
|
|
|
|
|
for future in as_completed(future_to_question): |
|
|
result = future.result() |
|
|
batch_results.append(result) |
|
|
else: |
|
|
|
|
|
for question, index in questions_batch: |
|
|
result = self.process_single_query(question, index) |
|
|
batch_results.append(result) |
|
|
|
|
|
time.sleep(0.2) |
|
|
|
|
|
|
|
|
batch_results.sort(key=lambda x: x['index']) |
|
|
|
|
|
batch_time = time.time() - batch_start_time |
|
|
successful_in_batch = sum(1 for r in batch_results if r['status'] == 'success') |
|
|
needs_review_in_batch = sum(1 for r in batch_results if r['needs_review']) |
|
|
|
|
|
|
|
|
logger.info(f"Batch {batch_num + 1} completed in {batch_time:.2f}s") |
|
|
logger.info(f" Successful: {successful_in_batch}/{len(questions_batch)}") |
|
|
logger.info(f" Needs Review: {needs_review_in_batch}/{len(questions_batch)}") |
|
|
logger.info(f" Average time per question: {batch_time/len(questions_batch):.3f}s") |
|
|
|
|
|
|
|
|
batch_metadata = { |
|
|
'batch_num': batch_num + 1, |
|
|
'batch_size': len(questions_batch), |
|
|
'batch_time': round(batch_time, 3), |
|
|
'successful_count': successful_in_batch, |
|
|
'failed_count': len(questions_batch) - successful_in_batch, |
|
|
'needs_review_count': needs_review_in_batch, |
|
|
'avg_time_per_question': round(batch_time / len(questions_batch), 3), |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
self.batch_results.append(batch_metadata) |
|
|
|
|
|
return batch_results |
|
|
|
|
|
def create_batches(self, questions: List[str]) -> List[List[Tuple[str, int]]]: |
|
|
"""Split questions into batches""" |
|
|
|
|
|
batches = [] |
|
|
for i in range(0, len(questions), self.batch_size): |
|
|
batch = [(questions[j], j) for j in range(i, min(i + self.batch_size, len(questions)))] |
|
|
batches.append(batch) |
|
|
|
|
|
logger.info(f"Created {len(batches)} batches of size {self.batch_size}") |
|
|
return batches |
|
|
|
|
|
def save_batch_results(self, batch_results: List[Dict[str, Any]], batch_num: int, output_dir: str): |
|
|
"""Save results for a single batch with enhanced columns""" |
|
|
|
|
|
if not batch_results: |
|
|
return |
|
|
|
|
|
|
|
|
batch_df = pd.DataFrame(batch_results) |
|
|
|
|
|
|
|
|
batch_filename = f"batch_{batch_num + 1:03d}_results.csv" |
|
|
batch_path = os.path.join(output_dir, batch_filename) |
|
|
batch_df.to_csv(batch_path, index=False) |
|
|
|
|
|
logger.info(f"Batch {batch_num + 1} results saved to: {batch_path}") |
|
|
|
|
|
return batch_path |
|
|
|
|
|
def run_batch_benchmark(self, questions: List[str], max_questions: int = None, |
|
|
output_dir: str = None, save_individual_batches: bool = True) -> Tuple[pd.DataFrame, str]: |
|
|
"""Run benchmark on batches of questions""" |
|
|
|
|
|
|
|
|
self.pipeline_issues = { |
|
|
'clarification_prompts': 0, |
|
|
'non_autism_queries': 0, |
|
|
'pipeline_failures': 0, |
|
|
'timeout_errors': 0 |
|
|
} |
|
|
|
|
|
|
|
|
if max_questions and len(questions) > max_questions: |
|
|
questions = questions[:max_questions] |
|
|
logger.info(f"Limited to {max_questions} questions") |
|
|
|
|
|
logger.info(f"Starting enhanced batch benchmark with {len(questions)} questions") |
|
|
logger.info(f"Batch size: {self.batch_size}, Max workers: {self.max_workers}") |
|
|
|
|
|
|
|
|
if not output_dir: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
output_dir = f"benchmark_results_{timestamp}" |
|
|
|
|
|
if save_individual_batches: |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
logger.info(f"Results will be saved to: {output_dir}") |
|
|
|
|
|
self.start_time = time.time() |
|
|
|
|
|
|
|
|
batches = self.create_batches(questions) |
|
|
|
|
|
|
|
|
all_results = [] |
|
|
for batch_num, batch in enumerate(batches): |
|
|
logger.info(f"\n{'='*60}") |
|
|
logger.info(f"PROCESSING BATCH {batch_num + 1}/{len(batches)}") |
|
|
logger.info(f"{'='*60}") |
|
|
|
|
|
|
|
|
batch_results = self.process_batch(batch, batch_num) |
|
|
all_results.extend(batch_results) |
|
|
|
|
|
|
|
|
if save_individual_batches: |
|
|
self.save_batch_results(batch_results, batch_num, output_dir) |
|
|
|
|
|
|
|
|
if batch_num < len(batches) - 1: |
|
|
logger.info(f"Waiting 2 seconds before next batch...") |
|
|
time.sleep(2) |
|
|
|
|
|
|
|
|
self.results = all_results |
|
|
|
|
|
|
|
|
df = pd.DataFrame(all_results) |
|
|
|
|
|
|
|
|
total_time = time.time() - self.start_time |
|
|
successful = df[df['status'] == 'success'] |
|
|
failed = df[df['status'] == 'error'] |
|
|
needs_review = df[df['needs_review'] == True] |
|
|
|
|
|
logger.info(f"\n{'='*60}") |
|
|
logger.info(f"ENHANCED BENCHMARK COMPLETED") |
|
|
logger.info(f"{'='*60}") |
|
|
logger.info(f"Total time: {total_time:.2f} seconds") |
|
|
logger.info(f"Total questions: {len(df)}") |
|
|
logger.info(f"Total batches: {len(batches)}") |
|
|
logger.info(f"Successful: {len(successful)}") |
|
|
logger.info(f"Failed: {len(failed)}") |
|
|
logger.info(f"Needs Review: {len(needs_review)}") |
|
|
logger.info(f"Success rate: {len(successful)/len(df)*100:.1f}%") |
|
|
logger.info(f"Review rate: {len(needs_review)/len(df)*100:.1f}%") |
|
|
|
|
|
|
|
|
logger.info(f"\nPIPELINE ISSUES SUMMARY:") |
|
|
for issue_type, count in self.pipeline_issues.items(): |
|
|
if count > 0: |
|
|
logger.info(f" {issue_type.replace('_', ' ').title()}: {count}") |
|
|
|
|
|
if len(successful) > 0: |
|
|
avg_time = successful['total_time'].mean() |
|
|
throughput = len(successful) / total_time |
|
|
|
|
|
logger.info(f"\nPERFORMANCE METRICS:") |
|
|
logger.info(f"Average response time: {avg_time:.3f}s") |
|
|
logger.info(f"Throughput: {throughput:.2f} questions/second") |
|
|
|
|
|
|
|
|
step_columns = [col for col in df.columns if col.endswith('_time') and col != 'total_time'] |
|
|
if step_columns: |
|
|
logger.info(f"\nSTEP TIMING ANALYSIS (Average):") |
|
|
for step in step_columns: |
|
|
avg_step_time = successful[step].mean() |
|
|
step_name = step.replace('_time', '').replace('_', ' ').title() |
|
|
logger.info(f" {step_name}: {avg_step_time:.3f}s") |
|
|
|
|
|
return df, output_dir |
|
|
|
|
|
def save_final_results(self, df: pd.DataFrame, output_dir: str) -> Tuple[str, str]: |
|
|
"""Save final combined results and enhanced metadata""" |
|
|
|
|
|
|
|
|
combined_path = os.path.join(output_dir, "enhanced_combined_results.csv") |
|
|
df.to_csv(combined_path, index=False) |
|
|
logger.info(f"Enhanced combined results saved to: {combined_path}") |
|
|
|
|
|
|
|
|
batch_metadata_df = pd.DataFrame(self.batch_results) |
|
|
batch_metadata_path = os.path.join(output_dir, "batch_metadata.csv") |
|
|
batch_metadata_df.to_csv(batch_metadata_path, index=False) |
|
|
logger.info(f"Batch metadata saved to: {batch_metadata_path}") |
|
|
|
|
|
|
|
|
self.save_enhanced_summary_report(df, output_dir) |
|
|
|
|
|
|
|
|
self.save_pipeline_issues_report(df, output_dir) |
|
|
|
|
|
|
|
|
self.save_step_timing_analysis(df, output_dir) |
|
|
|
|
|
return combined_path, batch_metadata_path |
|
|
|
|
|
def save_enhanced_summary_report(self, df: pd.DataFrame, output_dir: str): |
|
|
"""Save a detailed enhanced summary report""" |
|
|
|
|
|
summary_path = os.path.join(output_dir, "benchmark_summary.txt") |
|
|
|
|
|
with open(summary_path, 'w') as f: |
|
|
f.write("ENHANCED BATCH BENCHMARK SUMMARY REPORT\n") |
|
|
f.write("=" * 60 + "\n") |
|
|
f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") |
|
|
|
|
|
|
|
|
successful = df[df['status'] == 'success'] |
|
|
failed = df[df['status'] == 'error'] |
|
|
needs_review = df[df['needs_review'] == True] |
|
|
|
|
|
f.write("OVERALL STATISTICS:\n") |
|
|
f.write(f"Total Questions: {len(df)}\n") |
|
|
f.write(f"Successful: {len(successful)} ({len(successful)/len(df)*100:.1f}%)\n") |
|
|
f.write(f"Failed: {len(failed)} ({len(failed)/len(df)*100:.1f}%)\n") |
|
|
f.write(f"Needs Review: {len(needs_review)} ({len(needs_review)/len(df)*100:.1f}%)\n") |
|
|
f.write(f"Batch Size: {self.batch_size}\n") |
|
|
f.write(f"Max Workers: {self.max_workers}\n\n") |
|
|
|
|
|
|
|
|
f.write("PIPELINE ISSUES BREAKDOWN:\n") |
|
|
for issue_type, count in self.pipeline_issues.items(): |
|
|
percentage = (count / len(df)) * 100 if len(df) > 0 else 0 |
|
|
f.write(f"{issue_type.replace('_', ' ').title()}: {count} ({percentage:.1f}%)\n") |
|
|
f.write("\n") |
|
|
|
|
|
if len(successful) > 0: |
|
|
f.write("TIMING STATISTICS:\n") |
|
|
f.write(f"Average Time: {successful['total_time'].mean():.3f}s\n") |
|
|
f.write(f"Median Time: {successful['total_time'].median():.3f}s\n") |
|
|
f.write(f"Min Time: {successful['total_time'].min():.3f}s\n") |
|
|
f.write(f"Max Time: {successful['total_time'].max():.3f}s\n") |
|
|
f.write(f"Std Dev: {successful['total_time'].std():.3f}s\n\n") |
|
|
|
|
|
|
|
|
step_columns = [col for col in df.columns if col.endswith('_time') and col != 'total_time'] |
|
|
if step_columns: |
|
|
f.write("STEP TIMING ANALYSIS:\n") |
|
|
for step in step_columns: |
|
|
avg_time = successful[step].mean() |
|
|
step_name = step.replace('_time', '').replace('_', ' ').title() |
|
|
f.write(f"{step_name}: {avg_time:.3f}s avg\n") |
|
|
f.write("\n") |
|
|
|
|
|
|
|
|
if 'response_quality' in df.columns: |
|
|
f.write("RESPONSE QUALITY ANALYSIS:\n") |
|
|
quality_counts = df['response_quality'].value_counts() |
|
|
for quality, count in quality_counts.items(): |
|
|
percentage = (count / len(df)) * 100 |
|
|
f.write(f"{quality.title()}: {count} ({percentage:.1f}%)\n") |
|
|
f.write("\n") |
|
|
|
|
|
|
|
|
f.write("BATCH PERFORMANCE:\n") |
|
|
for batch_meta in self.batch_results: |
|
|
f.write(f"Batch {batch_meta['batch_num']}: ") |
|
|
f.write(f"{batch_meta['successful_count']}/{batch_meta['batch_size']} successful, ") |
|
|
f.write(f"{batch_meta.get('needs_review_count', 0)} need review, ") |
|
|
f.write(f"{batch_meta['batch_time']:.2f}s total, ") |
|
|
f.write(f"{batch_meta['avg_time_per_question']:.3f}s avg\n") |
|
|
|
|
|
logger.info(f"Enhanced summary report saved to: {summary_path}") |
|
|
|
|
|
def save_pipeline_issues_report(self, df: pd.DataFrame, output_dir: str): |
|
|
"""Save detailed pipeline issues analysis""" |
|
|
|
|
|
issues_path = os.path.join(output_dir, "pipeline_issues_analysis.csv") |
|
|
|
|
|
|
|
|
issues_df = df[df['needs_review'] == True].copy() |
|
|
|
|
|
if len(issues_df) > 0: |
|
|
|
|
|
issue_columns = [ |
|
|
'example_id', 'question', 'clean_answer', 'issue_type', |
|
|
'issue_reason', 'autism_related', 'response_quality', |
|
|
'response_length', 'total_time', 'status' |
|
|
] |
|
|
|
|
|
issues_analysis = issues_df[issue_columns] |
|
|
issues_analysis.to_csv(issues_path, index=False) |
|
|
logger.info(f"Pipeline issues analysis saved to: {issues_path}") |
|
|
else: |
|
|
logger.info("No pipeline issues found - skipping issues report") |
|
|
|
|
|
def save_step_timing_analysis(self, df: pd.DataFrame, output_dir: str): |
|
|
"""Save detailed step timing analysis""" |
|
|
|
|
|
timing_path = os.path.join(output_dir, "step_timing_analysis.csv") |
|
|
|
|
|
|
|
|
successful_df = df[df['status'] == 'success'].copy() |
|
|
|
|
|
if len(successful_df) > 0: |
|
|
|
|
|
timing_columns = ['example_id', 'question', 'total_time'] |
|
|
step_columns = [col for col in df.columns if col.endswith('_time') and col != 'total_time'] |
|
|
timing_columns.extend(step_columns) |
|
|
|
|
|
timing_analysis = successful_df[timing_columns] |
|
|
timing_analysis.to_csv(timing_path, index=False) |
|
|
logger.info(f"Step timing analysis saved to: {timing_path}") |
|
|
else: |
|
|
logger.info("No successful queries for timing analysis") |
|
|
|
|
|
|
|
|
def load_questions_from_csv(file_path: str, question_column: str = 'question') -> List[str]: |
|
|
"""Load questions from CSV file""" |
|
|
|
|
|
if not os.path.exists(file_path): |
|
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
|
|
try: |
|
|
df = pd.read_csv(file_path) |
|
|
logger.info(f"Loaded CSV with {len(df)} rows") |
|
|
|
|
|
if question_column not in df.columns: |
|
|
available_columns = list(df.columns) |
|
|
raise ValueError(f"Column '{question_column}' not found. Available: {available_columns}") |
|
|
|
|
|
|
|
|
questions = [] |
|
|
for _, row in df.iterrows(): |
|
|
question = str(row[question_column]).strip() |
|
|
if question and question.lower() != 'nan': |
|
|
questions.append(question) |
|
|
|
|
|
logger.info(f"Extracted {len(questions)} valid questions") |
|
|
return questions |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Error reading CSV file: {e}") |
|
|
|
|
|
|
|
|
def create_sample_questions() -> List[str]: |
|
|
"""Create sample autism-related questions for testing""" |
|
|
|
|
|
sample_questions = [ |
|
|
"What are the early signs of autism in children?", |
|
|
"How can I help my autistic child with social skills?", |
|
|
"What are sensory processing issues in autism?", |
|
|
"What educational strategies work best for autistic students?", |
|
|
"How do I support an autistic family member?", |
|
|
"What are common myths about autism?", |
|
|
"How does autism affect communication?", |
|
|
"What therapies are available for autism?", |
|
|
"How can schools better support autistic students?", |
|
|
"What workplace accommodations help autistic employees?", |
|
|
"What is stimming and why do autistic people do it?", |
|
|
"How can I make my home more autism-friendly?", |
|
|
"What should I know about autism and employment?", |
|
|
"How do I explain autism to other children?", |
|
|
"What are the different types of autism spectrum disorders?", |
|
|
"How can technology help autistic individuals?", |
|
|
"What role does diet play in autism management?", |
|
|
"How do I find good autism resources in my area?", |
|
|
"What are the signs of autism in teenagers?", |
|
|
"How can I advocate for my autistic child at school?", |
|
|
"Tell me about the weather today", |
|
|
"What's 2+2?", |
|
|
] |
|
|
|
|
|
return sample_questions |
|
|
|
|
|
|
|
|
def print_enhanced_summary_stats(df: pd.DataFrame, batch_metadata: List[Dict], pipeline_issues: Dict): |
|
|
"""Print comprehensive enhanced summary statistics""" |
|
|
|
|
|
successful = df[df['status'] == 'success'] |
|
|
failed = df[df['status'] == 'error'] |
|
|
needs_review = df[df['needs_review'] == True] |
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("ENHANCED BATCH BENCHMARK SUMMARY") |
|
|
print("="*80) |
|
|
print(f"Total Questions: {len(df)}") |
|
|
print(f"Total Batches: {len(batch_metadata)}") |
|
|
print(f"Successful: {len(successful)} ({len(successful)/len(df)*100:.1f}%)") |
|
|
print(f"Failed: {len(failed)} ({len(failed)/len(df)*100:.1f}%)") |
|
|
print(f"Needs Review: {len(needs_review)} ({len(needs_review)/len(df)*100:.1f}%)") |
|
|
|
|
|
|
|
|
print(f"\nPIPELINE ISSUES BREAKDOWN:") |
|
|
total_issues = sum(pipeline_issues.values()) |
|
|
for issue_type, count in pipeline_issues.items(): |
|
|
if count > 0: |
|
|
percentage = (count / len(df)) * 100 if len(df) > 0 else 0 |
|
|
print(f" {issue_type.replace('_', ' ').title()}: {count} ({percentage:.1f}%)") |
|
|
|
|
|
if len(successful) > 0: |
|
|
print(f"\nOVERALL TIMING STATISTICS:") |
|
|
print(f"Average Time: {successful['total_time'].mean():.3f}s") |
|
|
print(f"Median Time: {successful['total_time'].median():.3f}s") |
|
|
print(f"Min Time: {successful['total_time'].min():.3f}s") |
|
|
print(f"Max Time: {successful['total_time'].max():.3f}s") |
|
|
print(f"Std Dev: {successful['total_time'].std():.3f}s") |
|
|
|
|
|
|
|
|
step_columns = [col for col in df.columns if col.endswith('_time') and col != 'total_time'] |
|
|
if step_columns: |
|
|
print(f"\nSTEP TIMING ANALYSIS (Average):") |
|
|
for step in step_columns: |
|
|
avg_time = successful[step].mean() |
|
|
step_name = step.replace('_time', '').replace('_', ' ').title() |
|
|
percentage_of_total = (avg_time / successful['total_time'].mean()) * 100 |
|
|
print(f" {step_name}: {avg_time:.3f}s ({percentage_of_total:.1f}% of total)") |
|
|
|
|
|
|
|
|
def get_grade(time_val): |
|
|
if time_val < 15: return "A+ (Excellent)" |
|
|
elif time_val < 20: return "A (Good)" |
|
|
elif time_val < 25: return "B (Average)" |
|
|
elif time_val < 40: return "C (Slow)" |
|
|
else: return "D (Very Slow)" |
|
|
|
|
|
grades = successful['total_time'].apply(get_grade) |
|
|
grade_counts = grades.value_counts() |
|
|
|
|
|
print(f"\nPERFORMANCE GRADES:") |
|
|
for grade, count in grade_counts.items(): |
|
|
print(f" {grade}: {count} questions ({count/len(successful)*100:.1f}%)") |
|
|
|
|
|
|
|
|
if 'response_quality' in df.columns: |
|
|
print(f"\nRESPONSE QUALITY ANALYSIS:") |
|
|
quality_counts = df['response_quality'].value_counts() |
|
|
for quality, count in quality_counts.items(): |
|
|
percentage = (count / len(df)) * 100 |
|
|
print(f" {quality.title()}: {count} ({percentage:.1f}%)") |
|
|
|
|
|
|
|
|
if 'autism_related' in df.columns: |
|
|
autism_related = df[df['autism_related'] == True] |
|
|
print(f"\nAUTISM RELEVANCE ANALYSIS:") |
|
|
print(f" Autism-related queries: {len(autism_related)} ({len(autism_related)/len(df)*100:.1f}%)") |
|
|
print(f" Non-autism queries: {len(df) - len(autism_related)} ({(len(df) - len(autism_related))/len(df)*100:.1f}%)") |
|
|
|
|
|
|
|
|
if batch_metadata: |
|
|
print(f"\nBATCH PERFORMANCE SUMMARY:") |
|
|
total_batch_time = sum(b['batch_time'] for b in batch_metadata) |
|
|
avg_batch_time = total_batch_time / len(batch_metadata) |
|
|
|
|
|
print(f"Average Batch Time: {avg_batch_time:.2f}s") |
|
|
print(f"Fastest Batch: {min(b['batch_time'] for b in batch_metadata):.2f}s") |
|
|
print(f"Slowest Batch: {max(b['batch_time'] for b in batch_metadata):.2f}s") |
|
|
|
|
|
|
|
|
print(f"\nINDIVIDUAL BATCH PERFORMANCE:") |
|
|
for batch in batch_metadata: |
|
|
success_rate = batch['successful_count'] / batch['batch_size'] * 100 |
|
|
review_count = batch.get('needs_review_count', 0) |
|
|
print(f" Batch {batch['batch_num']:2d}: {batch['successful_count']:2d}/{batch['batch_size']:2d} " |
|
|
f"({success_rate:5.1f}% success, {review_count:2d} review) " |
|
|
f"in {batch['batch_time']:6.2f}s ({batch['avg_time_per_question']:.3f}s avg)") |
|
|
|
|
|
if len(failed) > 0: |
|
|
print(f"\nERROR ANALYSIS:") |
|
|
error_counts = failed['error_message'].value_counts() |
|
|
for error, count in error_counts.head(5).items(): |
|
|
print(f" {error[:60]}...: {count} times") |
|
|
|
|
|
|
|
|
print(f"\nREVIEW RECOMMENDATIONS:") |
|
|
if len(needs_review) > 0: |
|
|
print(f" 📋 {len(needs_review)} questions need manual review") |
|
|
if 'issue_type' in df.columns: |
|
|
issue_types = needs_review['issue_type'].value_counts() |
|
|
for issue_type, count in issue_types.items(): |
|
|
print(f" - {issue_type.replace('_', ' ').title()}: {count} questions") |
|
|
else: |
|
|
print(f" ✅ No questions need manual review") |
|
|
|
|
|
print("="*80) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function to run the enhanced batch benchmark""" |
|
|
|
|
|
parser = argparse.ArgumentParser( |
|
|
description="Enhanced batch benchmark runner for pipeQuery autism AI pipeline with detailed step timing", |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=""" |
|
|
|
|
|
Examples: |
|
|
python benchmark_runner.py questions.csv |
|
|
python benchmark_runner.py questions.csv --batch-size 20 --max-workers 5 |
|
|
python benchmark_runner.py questions.csv --max 50 --output my_results |
|
|
python benchmark_runner.py --sample 25 --batch-size 5 |
|
|
python benchmark_runner.py --sample 100 --batch-size 10 --max-workers 3 |
|
|
""" |
|
|
) |
|
|
|
|
|
parser.add_argument('input_csv', nargs='?', help='Path to CSV file with questions') |
|
|
parser.add_argument('--column', '-c', default='question', |
|
|
help='Name of question column (default: question)') |
|
|
parser.add_argument('--max', '-m', type=int, |
|
|
help='Maximum number of questions to process') |
|
|
parser.add_argument('--output', '-o', |
|
|
help='Output directory path') |
|
|
parser.add_argument('--sample', '-s', type=int, |
|
|
help='Create and test with N sample questions') |
|
|
parser.add_argument('--batch-size', '-b', type=int, default=10, |
|
|
help='Number of questions per batch (default: 10)') |
|
|
parser.add_argument('--max-workers', '-w', type=int, default=3, |
|
|
help='Maximum worker threads per batch (default: 3)') |
|
|
parser.add_argument('--no-batch-files', action='store_true', |
|
|
help='Do not save individual batch files') |
|
|
parser.add_argument('--detailed-timing', action='store_true', default=True, |
|
|
help='Enable detailed step timing analysis (default: True)') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
try: |
|
|
|
|
|
benchmark = EnhancedPipelineBenchmark( |
|
|
batch_size=args.batch_size, |
|
|
max_workers=args.max_workers |
|
|
) |
|
|
|
|
|
|
|
|
if args.sample: |
|
|
print(f"Creating {args.sample} sample questions...") |
|
|
all_sample_questions = create_sample_questions() |
|
|
|
|
|
questions = (all_sample_questions * ((args.sample // len(all_sample_questions)) + 1))[:args.sample] |
|
|
elif args.input_csv: |
|
|
print(f"Loading questions from {args.input_csv}...") |
|
|
questions = load_questions_from_csv(args.input_csv, args.column) |
|
|
else: |
|
|
|
|
|
print("No input specified, using 15 sample questions...") |
|
|
questions = create_sample_questions()[:15] |
|
|
|
|
|
|
|
|
print(f"\nRunning enhanced batch benchmark on {len(questions)} questions...") |
|
|
print(f"Batch size: {args.batch_size}, Max workers: {args.max_workers}") |
|
|
print(f"Detailed timing: {'Enabled' if args.detailed_timing else 'Disabled'}") |
|
|
|
|
|
df, output_dir = benchmark.run_batch_benchmark( |
|
|
questions, |
|
|
args.max, |
|
|
args.output, |
|
|
save_individual_batches=not args.no_batch_files |
|
|
) |
|
|
|
|
|
|
|
|
combined_path, batch_metadata_path = benchmark.save_final_results(df, output_dir) |
|
|
|
|
|
|
|
|
print_enhanced_summary_stats(df, benchmark.batch_results, benchmark.pipeline_issues) |
|
|
|
|
|
print(f"\n📁 RESULTS SUMMARY:") |
|
|
print(f"Results directory: {output_dir}") |
|
|
print(f"Combined results: {combined_path}") |
|
|
print(f"Batch metadata: {batch_metadata_path}") |
|
|
|
|
|
|
|
|
additional_files = [ |
|
|
"benchmark_summary.txt", |
|
|
"pipeline_issues_analysis.csv", |
|
|
"step_timing_analysis.csv" |
|
|
] |
|
|
|
|
|
print(f"Additional analysis files:") |
|
|
for file in additional_files: |
|
|
file_path = os.path.join(output_dir, file) |
|
|
if os.path.exists(file_path): |
|
|
print(f" - {file}") |
|
|
|
|
|
|
|
|
successful = df[df['status'] == 'success'] |
|
|
if len(successful) > 0: |
|
|
print(f"\n🎯 KEY INSIGHTS:") |
|
|
avg_time = successful['total_time'].mean() |
|
|
needs_review_count = len(df[df['needs_review'] == True]) |
|
|
|
|
|
print(f" • Average processing time: {avg_time:.2f} seconds") |
|
|
print(f" • Questions needing review: {needs_review_count}/{len(df)} ({needs_review_count/len(df)*100:.1f}%)") |
|
|
|
|
|
if needs_review_count > 0: |
|
|
print(f" • Review the pipeline_issues_analysis.csv for detailed breakdown") |
|
|
|
|
|
|
|
|
step_columns = [col for col in df.columns if col.endswith('_time') and col != 'total_time'] |
|
|
if step_columns: |
|
|
slowest_step = None |
|
|
slowest_time = 0 |
|
|
for step in step_columns: |
|
|
avg_step_time = successful[step].mean() |
|
|
if avg_step_time > slowest_time: |
|
|
slowest_time = avg_step_time |
|
|
slowest_step = step.replace('_time', '').replace('_', ' ').title() |
|
|
|
|
|
if slowest_step: |
|
|
print(f" • Slowest pipeline step: {slowest_step} ({slowest_time:.3f}s avg)") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\nBenchmark interrupted by user") |
|
|
except Exception as e: |
|
|
print(f"Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return 1 |
|
|
|
|
|
return 0 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
exit(main()) |