|
|
import os |
|
|
import sys |
|
|
import time |
|
|
import pandas as pd |
|
|
import re |
|
|
from datetime import datetime |
|
|
from dotenv import load_dotenv |
|
|
from typing import Dict, Any, Tuple |
|
|
import traceback |
|
|
import asyncio |
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
from utils import process_query |
|
|
from logger.custom_logger import CustomLoggerTracker |
|
|
from rag_utils import rag_autism, encode_query |
|
|
from clients import init_weaviate_client |
|
|
|
|
|
|
|
|
custom_log = CustomLoggerTracker() |
|
|
logger = custom_log.get_logger("csv_test") |
|
|
|
|
|
|
|
|
class DetailedTimingTracker: |
|
|
"""Enhanced timing tracker with detailed pipeline step analysis.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.timings = {} |
|
|
self.start_times = {} |
|
|
self.total_start_time = None |
|
|
self.step_logs = [] |
|
|
self.rag_performance_data = {} |
|
|
|
|
|
def start_timer(self, step_name: str): |
|
|
"""Start timing a specific step.""" |
|
|
timestamp = datetime.now() |
|
|
self.start_times[step_name] = time.time() |
|
|
if self.total_start_time is None: |
|
|
self.total_start_time = time.time() |
|
|
|
|
|
log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] STARTED: {step_name}" |
|
|
self.step_logs.append(log_entry) |
|
|
logger.debug(log_entry) |
|
|
|
|
|
def end_timer(self, step_name: str, additional_info: str = ""): |
|
|
"""End timing a specific step with optional additional information.""" |
|
|
if step_name in self.start_times: |
|
|
elapsed = time.time() - self.start_times[step_name] |
|
|
self.timings[step_name] = round(elapsed, 3) |
|
|
timestamp = datetime.now() |
|
|
|
|
|
log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] COMPLETED: {step_name} - {elapsed:.3f}s" |
|
|
if additional_info: |
|
|
log_entry += f" | {additional_info}" |
|
|
|
|
|
self.step_logs.append(log_entry) |
|
|
logger.debug(log_entry) |
|
|
|
|
|
|
|
|
if "rag" in step_name.lower() or "retrieval" in step_name.lower(): |
|
|
self.rag_performance_data[step_name] = { |
|
|
'duration': elapsed, |
|
|
'additional_info': additional_info, |
|
|
'timestamp': timestamp.isoformat() |
|
|
} |
|
|
|
|
|
return elapsed |
|
|
return 0 |
|
|
|
|
|
def add_rag_analysis(self, query: str, results_count: int, embedding_time: float, |
|
|
weaviate_time: float, total_rag_time: float): |
|
|
"""Add detailed RAG performance analysis.""" |
|
|
analysis = { |
|
|
'query_length': len(query), |
|
|
'query_words': len(query.split()), |
|
|
'results_count': results_count, |
|
|
'embedding_generation_time': round(embedding_time, 3), |
|
|
'weaviate_query_time': round(weaviate_time, 3), |
|
|
'total_rag_time': round(total_rag_time, 3), |
|
|
'embedding_percentage': round((embedding_time / total_rag_time) * 100, 2) if total_rag_time > 0 else 0, |
|
|
'weaviate_percentage': round((weaviate_time / total_rag_time) * 100, 2) if total_rag_time > 0 else 0, |
|
|
} |
|
|
|
|
|
self.rag_performance_data['detailed_analysis'] = analysis |
|
|
|
|
|
log_entry = (f"RAG ANALYSIS: Query({len(query)} chars, {len(query.split())} words) -> " |
|
|
f"Embedding: {embedding_time:.3f}s ({analysis['embedding_percentage']}%), " |
|
|
f"Weaviate: {weaviate_time:.3f}s ({analysis['weaviate_percentage']}%), " |
|
|
f"Results: {results_count}, Total: {total_rag_time:.3f}s") |
|
|
|
|
|
self.step_logs.append(log_entry) |
|
|
logger.info(log_entry) |
|
|
|
|
|
def log_step(self, message: str, level: str = "info"): |
|
|
"""Log a custom step message.""" |
|
|
timestamp = datetime.now() |
|
|
log_entry = f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] {message}" |
|
|
self.step_logs.append(log_entry) |
|
|
|
|
|
if level == "error": |
|
|
logger.error(log_entry) |
|
|
elif level == "warning": |
|
|
logger.warning(log_entry) |
|
|
else: |
|
|
logger.info(log_entry) |
|
|
|
|
|
def get_total_time(self): |
|
|
"""Get total processing time.""" |
|
|
if self.total_start_time: |
|
|
return round(time.time() - self.total_start_time, 3) |
|
|
return 0 |
|
|
|
|
|
def get_step_logs(self): |
|
|
"""Get all step logs as a formatted string.""" |
|
|
return "\n".join(self.step_logs) |
|
|
|
|
|
def analyze_performance_bottlenecks(self): |
|
|
"""Analyze and identify performance bottlenecks.""" |
|
|
if not self.timings: |
|
|
return "No timing data available" |
|
|
|
|
|
sorted_timings = sorted(self.timings.items(), key=lambda x: x[1], reverse=True) |
|
|
total_time = sum(self.timings.values()) |
|
|
|
|
|
analysis = ["PERFORMANCE BOTTLENECK ANALYSIS:"] |
|
|
analysis.append(f"Total Pipeline Time: {total_time:.3f}s") |
|
|
analysis.append("-" * 50) |
|
|
|
|
|
for step, duration in sorted_timings: |
|
|
percentage = (duration / total_time) * 100 if total_time > 0 else 0 |
|
|
status = "π΄ SLOW" if percentage > 25 else "π‘ MODERATE" if percentage > 10 else "π’ FAST" |
|
|
analysis.append(f"{status} {step}: {duration:.3f}s ({percentage:.1f}%)") |
|
|
|
|
|
|
|
|
if self.rag_performance_data: |
|
|
analysis.append("\nRAG DETAILED ANALYSIS:") |
|
|
analysis.append("-" * 30) |
|
|
if 'detailed_analysis' in self.rag_performance_data: |
|
|
rag_data = self.rag_performance_data['detailed_analysis'] |
|
|
analysis.append(f"Query complexity: {rag_data['query_words']} words, {rag_data['query_length']} chars") |
|
|
analysis.append(f"Embedding generation: {rag_data['embedding_generation_time']}s ({rag_data['embedding_percentage']}%)") |
|
|
analysis.append(f"Weaviate query: {rag_data['weaviate_query_time']}s ({rag_data['weaviate_percentage']}%)") |
|
|
analysis.append(f"Results retrieved: {rag_data['results_count']}") |
|
|
|
|
|
|
|
|
if rag_data['embedding_percentage'] > 50: |
|
|
analysis.append("β οΈ ISSUE: Embedding generation is the bottleneck") |
|
|
analysis.append("π‘ SUGGESTION: Consider caching embeddings or using faster embedding model") |
|
|
|
|
|
if rag_data['weaviate_percentage'] > 60: |
|
|
analysis.append("β οΈ ISSUE: Weaviate query is the bottleneck") |
|
|
analysis.append("π‘ SUGGESTION: Check Weaviate connection, indexing, or reduce vector dimensions") |
|
|
|
|
|
return "\n".join(analysis) |
|
|
|
|
|
def reset(self): |
|
|
"""Reset all timers and logs.""" |
|
|
self.timings.clear() |
|
|
self.start_times.clear() |
|
|
self.step_logs.clear() |
|
|
self.rag_performance_data.clear() |
|
|
self.total_start_time = None |
|
|
|
|
|
|
|
|
def clean_html_response(html_text: str) -> str: |
|
|
"""Clean HTML tags from response text.""" |
|
|
if not html_text: |
|
|
return "" |
|
|
|
|
|
clean_text = re.sub('<[^<]+?>', '', html_text) |
|
|
|
|
|
clean_text = ' '.join(clean_text.split()) |
|
|
return clean_text.strip() |
|
|
|
|
|
|
|
|
async def detailed_rag_analysis(query: str): |
|
|
"""Perform detailed RAG analysis with timing breakdown.""" |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
embedding_start = time.time() |
|
|
query_embedding = encode_query(query) |
|
|
embedding_time = time.time() - embedding_start |
|
|
|
|
|
if not query_embedding: |
|
|
return { |
|
|
'results': [], |
|
|
'embedding_time': embedding_time, |
|
|
'weaviate_time': 0, |
|
|
'total_time': time.time() - start_time, |
|
|
'results_count': 0, |
|
|
'error': 'Failed to generate embedding' |
|
|
} |
|
|
|
|
|
|
|
|
weaviate_start = time.time() |
|
|
try: |
|
|
client = init_weaviate_client() |
|
|
if client is None: |
|
|
raise Exception("Weaviate client not initialized") |
|
|
|
|
|
coll = client.collections.get("Books") |
|
|
res = coll.query.near_vector( |
|
|
near_vector=query_embedding, |
|
|
limit=3 |
|
|
) |
|
|
|
|
|
results = [] |
|
|
if getattr(res, "objects", None): |
|
|
results = [obj.properties.get("text", "[No Text]") for obj in res.objects] |
|
|
|
|
|
client.close() |
|
|
weaviate_time = time.time() - weaviate_start |
|
|
|
|
|
except Exception as e: |
|
|
weaviate_time = time.time() - weaviate_start |
|
|
return { |
|
|
'results': [], |
|
|
'embedding_time': embedding_time, |
|
|
'weaviate_time': weaviate_time, |
|
|
'total_time': time.time() - start_time, |
|
|
'results_count': 0, |
|
|
'error': str(e) |
|
|
} |
|
|
|
|
|
total_time = time.time() - start_time |
|
|
|
|
|
return { |
|
|
'results': results, |
|
|
'embedding_time': embedding_time, |
|
|
'weaviate_time': weaviate_time, |
|
|
'total_time': total_time, |
|
|
'results_count': len(results), |
|
|
'error': None |
|
|
} |
|
|
|
|
|
|
|
|
def process_single_question_enhanced(question: str, question_index: int) -> Dict[str, Any]: |
|
|
""" |
|
|
Process a single question with enhanced logging and timing analysis. |
|
|
|
|
|
Args: |
|
|
question: The question to process |
|
|
question_index: Index of the question for logging |
|
|
|
|
|
Returns: |
|
|
Dictionary containing detailed answer and timing information |
|
|
""" |
|
|
timer = DetailedTimingTracker() |
|
|
results = { |
|
|
'question': question, |
|
|
'answer': '', |
|
|
'clean_answer': '', |
|
|
'status': 'success', |
|
|
'error_message': '', |
|
|
'total_time_seconds': 0, |
|
|
'processing_steps': {}, |
|
|
'detailed_logs': '', |
|
|
'performance_analysis': '', |
|
|
'rag_analysis': {} |
|
|
} |
|
|
|
|
|
try: |
|
|
logger.info(f"=" * 80) |
|
|
logger.info(f"PROCESSING QUESTION {question_index + 1}") |
|
|
logger.info(f"Question: {question}") |
|
|
logger.info(f"=" * 80) |
|
|
|
|
|
timer.log_step(f"Starting processing for question {question_index + 1}: '{question[:100]}...'") |
|
|
|
|
|
|
|
|
timer.start_timer('Total_Processing') |
|
|
|
|
|
|
|
|
timer.start_timer('RAG_Analysis') |
|
|
timer.log_step("Performing detailed RAG analysis...") |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
if loop.is_running(): |
|
|
rag_analysis = loop.run_until_complete(detailed_rag_analysis(question)) |
|
|
else: |
|
|
rag_analysis = asyncio.run(detailed_rag_analysis(question)) |
|
|
|
|
|
timer.end_timer('RAG_Analysis', |
|
|
f"Retrieved {rag_analysis['results_count']} results, " |
|
|
f"Embedding: {rag_analysis['embedding_time']:.3f}s, " |
|
|
f"Weaviate: {rag_analysis['weaviate_time']:.3f}s") |
|
|
|
|
|
|
|
|
timer.add_rag_analysis( |
|
|
question, |
|
|
rag_analysis['results_count'], |
|
|
rag_analysis['embedding_time'], |
|
|
rag_analysis['weaviate_time'], |
|
|
rag_analysis['total_time'] |
|
|
) |
|
|
|
|
|
|
|
|
results['rag_analysis'] = rag_analysis |
|
|
|
|
|
|
|
|
timer.start_timer('Pipeline_Processing') |
|
|
timer.log_step("Starting main pipeline processing...") |
|
|
|
|
|
response_html = process_query(question, first_turn=True) |
|
|
|
|
|
pipeline_time = timer.end_timer('Pipeline_Processing', f"Response length: {len(response_html)} chars") |
|
|
|
|
|
|
|
|
timer.start_timer('Response_Cleaning') |
|
|
clean_response = clean_html_response(response_html) |
|
|
cleaning_time = timer.end_timer('Response_Cleaning', f"Cleaned length: {len(clean_response)} chars") |
|
|
|
|
|
|
|
|
results['answer'] = response_html |
|
|
results['clean_answer'] = clean_response |
|
|
results['total_time_seconds'] = timer.get_total_time() |
|
|
results['processing_steps'] = { |
|
|
'pipeline_processing_seconds': pipeline_time, |
|
|
'response_cleaning_seconds': cleaning_time, |
|
|
'rag_analysis_seconds': timer.timings.get('RAG_Analysis', 0), |
|
|
'embedding_generation_seconds': rag_analysis['embedding_time'], |
|
|
'weaviate_query_seconds': rag_analysis['weaviate_time'], |
|
|
**timer.timings |
|
|
} |
|
|
|
|
|
|
|
|
results['detailed_logs'] = timer.get_step_logs() |
|
|
results['performance_analysis'] = timer.analyze_performance_bottlenecks() |
|
|
|
|
|
timer.log_step(f"Question {question_index + 1} completed successfully in {results['total_time_seconds']:.3f}s") |
|
|
|
|
|
logger.info(f"β
Question {question_index + 1} completed in {results['total_time_seconds']:.3f}s") |
|
|
logger.info(f"Performance Analysis:\n{results['performance_analysis']}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error processing question {question_index + 1}: {str(e)}" |
|
|
timer.log_step(error_msg, "error") |
|
|
logger.error(error_msg) |
|
|
logger.error(f"Traceback: {traceback.format_exc()}") |
|
|
|
|
|
results['status'] = 'error' |
|
|
results['error_message'] = str(e) |
|
|
results['total_time_seconds'] = timer.get_total_time() |
|
|
results['answer'] = f"[ERROR] {str(e)}" |
|
|
results['clean_answer'] = f"Error: {str(e)}" |
|
|
results['detailed_logs'] = timer.get_step_logs() |
|
|
results['performance_analysis'] = timer.analyze_performance_bottlenecks() |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def run_csv_test(input_csv_path: str, output_csv_path: str = None, question_column: str = 'question', max_samples: int = None) -> str: |
|
|
|
|
|
if not os.path.exists(input_csv_path): |
|
|
raise FileNotFoundError(f"Input CSV file not found: {input_csv_path}") |
|
|
|
|
|
|
|
|
if output_csv_path is None: |
|
|
base_name = os.path.splitext(input_csv_path)[0] |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
sample_suffix = f"_{max_samples}samples" if max_samples else "" |
|
|
output_csv_path = f"{base_name}_detailed_results{sample_suffix}_{timestamp}_{max_samples}.csv" |
|
|
|
|
|
logger.info(f"π STARTING ENHANCED CSV TEST RUN") |
|
|
logger.info(f"Input file: {input_csv_path}") |
|
|
logger.info(f"Output file: {output_csv_path}") |
|
|
if max_samples: |
|
|
logger.info(f"Max samples to process: {max_samples}") |
|
|
logger.info(f"=" * 100) |
|
|
|
|
|
try: |
|
|
|
|
|
df = pd.read_csv(input_csv_path) |
|
|
logger.info(f"π Loaded {len(df)} questions from CSV") |
|
|
|
|
|
|
|
|
if question_column not in df.columns: |
|
|
raise ValueError(f"Column '{question_column}' not found. Available columns: {list(df.columns)}") |
|
|
|
|
|
|
|
|
original_count = len(df) |
|
|
if max_samples is not None and max_samples > 0: |
|
|
if len(df) > max_samples: |
|
|
df = df.head(max_samples) |
|
|
logger.info(f"β οΈ Limited to first {max_samples} questions as specified (from {original_count} total)") |
|
|
else: |
|
|
logger.info(f"π Processing all {len(df)} questions (max_samples={max_samples} not reached)") |
|
|
else: |
|
|
logger.info(f"π Processing all {len(df)} questions (no sample limit specified)") |
|
|
|
|
|
|
|
|
result_columns = [ |
|
|
'answer', 'clean_answer', 'total_time_seconds', 'status', 'error_message', |
|
|
'processed_timestamp', 'pipeline_processing_seconds', 'response_cleaning_seconds', |
|
|
'rag_analysis_seconds', 'embedding_generation_seconds', 'weaviate_query_seconds', |
|
|
'rag_results_count', 'rag_embedding_time', 'rag_weaviate_time', 'rag_total_time', |
|
|
'detailed_logs', 'performance_analysis', 'bottleneck_step', 'performance_grade' |
|
|
] |
|
|
|
|
|
for col in result_columns: |
|
|
df[col] = '' if col in ['answer', 'clean_answer', 'status', 'error_message', |
|
|
'processed_timestamp', 'detailed_logs', 'performance_analysis', |
|
|
'bottleneck_step', 'performance_grade'] else 0.0 |
|
|
|
|
|
|
|
|
total_start_time = time.time() |
|
|
successful_questions = 0 |
|
|
performance_data = [] |
|
|
|
|
|
for index, row in df.iterrows(): |
|
|
question = str(row[question_column]).strip() |
|
|
|
|
|
if not question or question.lower() == 'nan': |
|
|
logger.warning(f"βοΈ Skipping empty question at row {index}") |
|
|
df.at[index, 'status'] = 'skipped' |
|
|
df.at[index, 'error_message'] = 'Empty question' |
|
|
continue |
|
|
|
|
|
logger.info(f"\nπ Processing Question {index + 1}/{len(df)}") |
|
|
|
|
|
|
|
|
result = process_single_question_enhanced(question, index) |
|
|
|
|
|
|
|
|
df.at[index, 'answer'] = result['answer'] |
|
|
df.at[index, 'clean_answer'] = result['clean_answer'] |
|
|
df.at[index, 'total_time_seconds'] = result['total_time_seconds'] |
|
|
df.at[index, 'status'] = result['status'] |
|
|
df.at[index, 'error_message'] = result['error_message'] |
|
|
df.at[index, 'processed_timestamp'] = datetime.now().isoformat() |
|
|
df.at[index, 'detailed_logs'] = result['detailed_logs'] |
|
|
df.at[index, 'performance_analysis'] = result['performance_analysis'] |
|
|
|
|
|
|
|
|
steps = result['processing_steps'] |
|
|
for step_key, time_value in steps.items(): |
|
|
if step_key in df.columns: |
|
|
df.at[index, step_key] = time_value |
|
|
|
|
|
|
|
|
rag_analysis = result.get('rag_analysis', {}) |
|
|
df.at[index, 'rag_results_count'] = rag_analysis.get('results_count', 0) |
|
|
df.at[index, 'rag_embedding_time'] = rag_analysis.get('embedding_time', 0) |
|
|
df.at[index, 'rag_weaviate_time'] = rag_analysis.get('weaviate_time', 0) |
|
|
df.at[index, 'rag_total_time'] = rag_analysis.get('total_time', 0) |
|
|
|
|
|
|
|
|
timings = result['processing_steps'] |
|
|
if timings: |
|
|
slowest_step = max(timings.items(), key=lambda x: x[1]) |
|
|
df.at[index, 'bottleneck_step'] = slowest_step[0] |
|
|
|
|
|
|
|
|
total_time = result['total_time_seconds'] |
|
|
if total_time < 2: |
|
|
grade = "A+ (Excellent)" |
|
|
elif total_time < 4: |
|
|
grade = "A (Good)" |
|
|
elif total_time < 6: |
|
|
grade = "B (Average)" |
|
|
elif total_time < 10: |
|
|
grade = "C (Slow)" |
|
|
else: |
|
|
grade = "D (Very Slow)" |
|
|
|
|
|
df.at[index, 'performance_grade'] = grade |
|
|
|
|
|
if result['status'] == 'success': |
|
|
successful_questions += 1 |
|
|
performance_data.append(result['total_time_seconds']) |
|
|
|
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
|
total_test_time = time.time() - total_start_time |
|
|
avg_time_per_question = df[df['status'] == 'success']['total_time_seconds'].mean() |
|
|
median_time = df[df['status'] == 'success']['total_time_seconds'].median() |
|
|
|
|
|
|
|
|
avg_rag_time = df[df['status'] == 'success']['rag_total_time'].mean() |
|
|
avg_embedding_time = df[df['status'] == 'success']['rag_embedding_time'].mean() |
|
|
avg_weaviate_time = df[df['status'] == 'success']['rag_weaviate_time'].mean() |
|
|
|
|
|
logger.info(f"\n" + "=" * 100) |
|
|
logger.info(f"π― ENHANCED TEST RUN COMPLETED!") |
|
|
logger.info(f"=" * 100) |
|
|
logger.info(f"π SUMMARY STATISTICS:") |
|
|
logger.info(f" Total questions in dataset: {original_count}") |
|
|
logger.info(f" Questions processed: {len(df)}") |
|
|
logger.info(f" β
Successful: {successful_questions}") |
|
|
logger.info(f" β Failed: {len(df) - successful_questions}") |
|
|
logger.info(f" π Total test time: {total_test_time:.2f} seconds") |
|
|
logger.info(f" β±οΈ Average time per question: {avg_time_per_question:.3f} seconds") |
|
|
logger.info(f" π Median time per question: {median_time:.3f} seconds") |
|
|
logger.info(f"") |
|
|
logger.info(f"π RAG PERFORMANCE ANALYSIS:") |
|
|
logger.info(f" Average RAG total time: {avg_rag_time:.3f} seconds") |
|
|
logger.info(f" Average embedding time: {avg_embedding_time:.3f} seconds ({(avg_embedding_time/avg_rag_time*100):.1f}%)") |
|
|
logger.info(f" Average Weaviate time: {avg_weaviate_time:.3f} seconds ({(avg_weaviate_time/avg_rag_time*100):.1f}%)") |
|
|
|
|
|
|
|
|
bottleneck_counts = df['bottleneck_step'].value_counts() |
|
|
logger.info(f"") |
|
|
logger.info(f"π¨ BOTTLENECK ANALYSIS:") |
|
|
for step, count in bottleneck_counts.items(): |
|
|
logger.info(f" {step}: {count} times ({count/len(df)*100:.1f}%)") |
|
|
|
|
|
|
|
|
df.to_csv(output_csv_path, index=False) |
|
|
logger.info(f"πΎ Results saved to: {output_csv_path}") |
|
|
|
|
|
|
|
|
print("\n" + "="*100) |
|
|
print("π€ ENHANCED CSV TEST RESULTS SUMMARY") |
|
|
print("="*100) |
|
|
print(f"π Input file: {input_csv_path}") |
|
|
print(f"π Output file: {output_csv_path}") |
|
|
print(f"π Total questions in dataset: {original_count}") |
|
|
print(f"π Questions processed: {len(df)}") |
|
|
print(f"β
Successful: {successful_questions}") |
|
|
print(f"β Failed: {len(df) - successful_questions}") |
|
|
print(f"π Total time: {total_test_time:.2f} seconds") |
|
|
print(f"β±οΈ Average time per question: {avg_time_per_question:.3f} seconds") |
|
|
print(f"π RAG avg time: {avg_rag_time:.3f}s (Embedding: {avg_embedding_time:.3f}s, Weaviate: {avg_weaviate_time:.3f}s)") |
|
|
print("="*100) |
|
|
|
|
|
return output_csv_path |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"β Error during enhanced CSV test run: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(f"Traceback: {traceback.format_exc()}") |
|
|
raise |
|
|
|
|
|
|
|
|
def create_sample_csv(file_path: str = "sample_questions.csv", num_samples: int = 5): |
|
|
"""Create a sample CSV file with autism-related questions for testing.""" |
|
|
base_questions = [ |
|
|
"Can you tell me more about autism?", |
|
|
"What are the early signs of autism in children?", |
|
|
"How can I help my autistic child with social skills?", |
|
|
"What are sensory processing issues in autism?", |
|
|
"What educational strategies work best for autistic students?", |
|
|
"How do I support an autistic family member?", |
|
|
"What are common myths about autism?", |
|
|
"How does autism affect communication?", |
|
|
"What therapies are available for autism?", |
|
|
"How can schools better support autistic students?", |
|
|
"What are the strengths often associated with autism?", |
|
|
"How do I explain autism to other children?", |
|
|
"What workplace accommodations help autistic employees?", |
|
|
"How does autism present differently in girls vs boys?", |
|
|
"What should I know about autism diagnosis?" |
|
|
] |
|
|
|
|
|
|
|
|
if num_samples <= len(base_questions): |
|
|
sample_questions = base_questions[:num_samples] |
|
|
else: |
|
|
|
|
|
sample_questions = [] |
|
|
for i in range(num_samples): |
|
|
sample_questions.append(base_questions[i % len(base_questions)]) |
|
|
|
|
|
df = pd.DataFrame({'question': sample_questions}) |
|
|
df.to_csv(file_path, index=False) |
|
|
print(f"β
Sample CSV created with {num_samples} questions: {file_path}") |
|
|
return file_path |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function to run the enhanced CSV test.""" |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser( |
|
|
description="Run enhanced tests on questions from CSV file with detailed analysis", |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=""" |
|
|
Examples: |
|
|
python script.py questions.csv --samples 10 |
|
|
python script.py questions.csv --samples 50 --output results.csv |
|
|
python script.py --create-sample --samples 20 |
|
|
python script.py questions.csv --column "user_question" --samples 5 |
|
|
""" |
|
|
) |
|
|
|
|
|
parser.add_argument("input_csv", nargs='?', help="Path to input CSV file with questions") |
|
|
parser.add_argument("--output", "-o", help="Path to output CSV file") |
|
|
parser.add_argument("--column", "-c", default="question", help="Name of question column (default: 'question')") |
|
|
parser.add_argument("--samples", "-s", type=int, default=None, |
|
|
help="Maximum number of samples to process (default: process all)") |
|
|
parser.add_argument("--create-sample", action="store_true", help="Create a sample CSV file for testing") |
|
|
parser.add_argument("--sample-size", type=int, default=5, |
|
|
help="Number of questions to include in sample CSV (default: 5, max: 15)") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
try: |
|
|
if args.create_sample: |
|
|
|
|
|
sample_size = max(1, min(args.sample_size, 15)) |
|
|
sample_file = create_sample_csv(num_samples=sample_size) |
|
|
print(f"β
Sample CSV created with {sample_size} questions: {sample_file}") |
|
|
if not args.input_csv: |
|
|
args.input_csv = sample_file |
|
|
|
|
|
if not args.input_csv: |
|
|
print("β Error: Please provide an input CSV file or use --create-sample") |
|
|
parser.print_help() |
|
|
return |
|
|
|
|
|
|
|
|
print(f"\nπ§ CONFIGURATION:") |
|
|
print(f" Input file: {args.input_csv}") |
|
|
print(f" Question column: {args.column}") |
|
|
print(f" Max samples: {args.samples if args.samples else 'No limit (process all)'}") |
|
|
if args.output: |
|
|
print(f" Output file: {args.output}") |
|
|
print(f" {'='*50}") |
|
|
|
|
|
|
|
|
output_file = run_csv_test( |
|
|
input_csv_path=args.input_csv, |
|
|
output_csv_path=args.output, |
|
|
question_column=args.column, |
|
|
max_samples=args.samples |
|
|
) |
|
|
|
|
|
print(f"\nπ Enhanced test completed successfully!") |
|
|
print(f"π Detailed results saved to: {output_file}") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print(f"\nβΉοΈ Test interrupted by user") |
|
|
logger.info("Test run interrupted by user") |
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
logger.error(f"Main execution error: {e}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |