|
|
|
|
|
""" |
|
|
Enhanced CSV Test Runner with Response Source Tracking and Answer Similarity Analysis |
|
|
Processes questions through the autism AI pipeline and compares with ground truth answers. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
import pandas as pd |
|
|
import re |
|
|
from datetime import datetime |
|
|
from dotenv import load_dotenv |
|
|
from typing import Dict, Any, Tuple, List |
|
|
import traceback |
|
|
import asyncio |
|
|
import numpy as np |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
import json |
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
from pipeQuery import process_query |
|
|
from logger.custom_logger import CustomLoggerTracker |
|
|
from rag_utils import rag_autism, encode_query |
|
|
from clients import init_weaviate_client |
|
|
|
|
|
|
|
|
custom_log = CustomLoggerTracker() |
|
|
logger = custom_log.get_logger("test_evalution") |
|
|
|
|
|
|
|
|
class ResponseSourceTracker: |
|
|
"""Track which source (RAG, LLM, Web Search) contributed to the final response.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.source_analysis = {} |
|
|
self.reranking_data = {} |
|
|
|
|
|
def analyze_response_sources(self, question: str, pipeline_logs: str) -> Dict[str, Any]: |
|
|
"""Analyze pipeline logs to determine response sources and reranking decisions.""" |
|
|
source_info = { |
|
|
'primary_source': 'unknown', |
|
|
'sources_used': [], |
|
|
'reranking_winner': 'unknown', |
|
|
'web_search_used': False, |
|
|
'rag_used': False, |
|
|
'llm_generation_used': False, |
|
|
'confidence_scores': {}, |
|
|
'reranking_details': {} |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
if 'Web Search answer:' in pipeline_logs: |
|
|
source_info['web_search_used'] = True |
|
|
source_info['sources_used'].append('web_search') |
|
|
|
|
|
web_match = re.search(r'Web Search answer: (.*?)(?=\n|$)', pipeline_logs) |
|
|
if web_match: |
|
|
source_info['web_search_content'] = web_match.group(1)[:200] + "..." |
|
|
|
|
|
|
|
|
if 'LLM Generated:' in pipeline_logs: |
|
|
source_info['llm_generation_used'] = True |
|
|
source_info['sources_used'].append('llm_generation') |
|
|
|
|
|
llm_match = re.search(r'LLM Generated: (.*?)(?=\nRAG|$)', pipeline_logs, re.DOTALL) |
|
|
if llm_match: |
|
|
source_info['llm_content_preview'] = llm_match.group(1)[:200] + "..." |
|
|
|
|
|
|
|
|
if 'RAG Contexts:' in pipeline_logs: |
|
|
source_info['rag_used'] = True |
|
|
source_info['sources_used'].append('rag') |
|
|
|
|
|
rag_match = re.search(r'RAG Contexts: \[(.*?)\]', pipeline_logs, re.DOTALL) |
|
|
if rag_match: |
|
|
contexts = rag_match.group(1).split("', '") |
|
|
source_info['rag_context_count'] = len(contexts) |
|
|
|
|
|
|
|
|
if 'Reranked doc:' in pipeline_logs: |
|
|
rerank_match = re.search(r'Reranked doc: (.*?)(?=\nWisal|$)', pipeline_logs, re.DOTALL) |
|
|
if rerank_match: |
|
|
reranked_content = rerank_match.group(1)[:200] |
|
|
source_info['reranking_winner_preview'] = reranked_content |
|
|
|
|
|
|
|
|
if source_info['llm_generation_used'] and 'llm_content_preview' in source_info: |
|
|
if reranked_content in source_info['llm_content_preview']: |
|
|
source_info['reranking_winner'] = 'llm_generation' |
|
|
source_info['primary_source'] = 'llm_generation' |
|
|
elif source_info['web_search_used']: |
|
|
source_info['reranking_winner'] = 'web_search' |
|
|
source_info['primary_source'] = 'web_search' |
|
|
else: |
|
|
source_info['reranking_winner'] = 'rag' |
|
|
source_info['primary_source'] = 'rag' |
|
|
|
|
|
|
|
|
halluc_match = re.search(r'Score: (\d+)', pipeline_logs) |
|
|
if halluc_match: |
|
|
source_info['hallucination_score'] = int(halluc_match.group(1)) |
|
|
|
|
|
|
|
|
if source_info['primary_source'] == 'unknown': |
|
|
if source_info['llm_generation_used']: |
|
|
source_info['primary_source'] = 'llm_generation' |
|
|
elif source_info['rag_used']: |
|
|
source_info['primary_source'] = 'rag' |
|
|
elif source_info['web_search_used']: |
|
|
source_info['primary_source'] = 'web_search' |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error analyzing response sources: {e}") |
|
|
source_info['analysis_error'] = str(e) |
|
|
|
|
|
return source_info |
|
|
|
|
|
|
|
|
class AnswerSimilarityAnalyzer: |
|
|
"""Analyze similarity between generated answers and ground truth answers.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.vectorizer = TfidfVectorizer(stop_words='english', max_features=1000) |
|
|
self.embeddings_cache = {} |
|
|
|
|
|
def calculate_text_similarity(self, generated_answer: str, ground_truth: str) -> Dict[str, float]: |
|
|
"""Calculate multiple similarity metrics between generated and ground truth answers.""" |
|
|
|
|
|
|
|
|
gen_clean = self.clean_text(generated_answer) |
|
|
truth_clean = self.clean_text(ground_truth) |
|
|
|
|
|
similarities = { |
|
|
'cosine_tfidf': 0.0, |
|
|
'jaccard_similarity': 0.0, |
|
|
'word_overlap_ratio': 0.0, |
|
|
'length_ratio': 0.0, |
|
|
'semantic_keywords_overlap': 0.0 |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
tfidf_matrix = self.vectorizer.fit_transform([gen_clean, truth_clean]) |
|
|
cosine_sim = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0] |
|
|
similarities['cosine_tfidf'] = round(cosine_sim, 4) |
|
|
|
|
|
|
|
|
gen_words = set(gen_clean.lower().split()) |
|
|
truth_words = set(truth_clean.lower().split()) |
|
|
|
|
|
if len(gen_words.union(truth_words)) > 0: |
|
|
jaccard = len(gen_words.intersection(truth_words)) / len(gen_words.union(truth_words)) |
|
|
similarities['jaccard_similarity'] = round(jaccard, 4) |
|
|
|
|
|
|
|
|
if len(truth_words) > 0: |
|
|
overlap_ratio = len(gen_words.intersection(truth_words)) / len(truth_words) |
|
|
similarities['word_overlap_ratio'] = round(overlap_ratio, 4) |
|
|
|
|
|
|
|
|
if len(truth_clean) > 0: |
|
|
length_ratio = min(len(gen_clean), len(truth_clean)) / max(len(gen_clean), len(truth_clean)) |
|
|
similarities['length_ratio'] = round(length_ratio, 4) |
|
|
|
|
|
|
|
|
autism_keywords = { |
|
|
'autism', 'asd', 'spectrum', 'disorder', 'developmental', 'social', |
|
|
'communication', 'behavior', 'sensory', 'repetitive', 'stimming', |
|
|
'intervention', 'therapy', 'support', 'diagnosis', 'symptoms' |
|
|
} |
|
|
|
|
|
gen_autism_words = gen_words.intersection(autism_keywords) |
|
|
truth_autism_words = truth_words.intersection(autism_keywords) |
|
|
|
|
|
if len(truth_autism_words) > 0: |
|
|
keyword_overlap = len(gen_autism_words.intersection(truth_autism_words)) / len(truth_autism_words) |
|
|
similarities['semantic_keywords_overlap'] = round(keyword_overlap, 4) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating similarity: {e}") |
|
|
similarities['calculation_error'] = str(e) |
|
|
|
|
|
return similarities |
|
|
|
|
|
def clean_text(self, text: str) -> str: |
|
|
"""Clean text for similarity analysis.""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
clean_text = re.sub('<[^<]+?>', '', str(text)) |
|
|
|
|
|
clean_text = ' '.join(clean_text.split()) |
|
|
|
|
|
clean_text = re.sub(r'[^\w\s\.\!\?\,\-]', '', clean_text) |
|
|
|
|
|
return clean_text.strip() |
|
|
|
|
|
def generate_similarity_grade(self, similarities: Dict[str, float]) -> str: |
|
|
"""Generate an overall similarity grade based on multiple metrics.""" |
|
|
|
|
|
|
|
|
weights = { |
|
|
'cosine_tfidf': 0.4, |
|
|
'jaccard_similarity': 0.2, |
|
|
'word_overlap_ratio': 0.2, |
|
|
'semantic_keywords_overlap': 0.2 |
|
|
} |
|
|
|
|
|
weighted_score = 0.0 |
|
|
total_weight = 0.0 |
|
|
|
|
|
for metric, weight in weights.items(): |
|
|
if metric in similarities and isinstance(similarities[metric], (int, float)): |
|
|
weighted_score += similarities[metric] * weight |
|
|
total_weight += weight |
|
|
|
|
|
if total_weight > 0: |
|
|
final_score = weighted_score / total_weight |
|
|
else: |
|
|
final_score = 0.0 |
|
|
|
|
|
|
|
|
if final_score >= 0.8: |
|
|
return f"A+ (Excellent - {final_score:.2f})" |
|
|
elif final_score >= 0.6: |
|
|
return f"A (Good - {final_score:.2f})" |
|
|
elif final_score >= 0.4: |
|
|
return f"B (Fair - {final_score:.2f})" |
|
|
elif final_score >= 0.2: |
|
|
return f"C (Poor - {final_score:.2f})" |
|
|
else: |
|
|
return f"F (Very Poor - {final_score:.2f})" |
|
|
|
|
|
|
|
|
def clean_html_response(html_text: str) -> str: |
|
|
"""Clean HTML tags from response text.""" |
|
|
if not html_text: |
|
|
return "" |
|
|
clean_text = re.sub('<[^<]+?>', '', html_text) |
|
|
clean_text = ' '.join(clean_text.split()) |
|
|
return clean_text.strip() |
|
|
|
|
|
|
|
|
def process_single_question_with_evaluation(question: str, ground_truth: str, |
|
|
question_index: int) -> Dict[str, Any]: |
|
|
""" |
|
|
Process a single question with comprehensive evaluation against ground truth. |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
results = { |
|
|
'question': question, |
|
|
'ground_truth_answer': ground_truth, |
|
|
'generated_answer': '', |
|
|
'clean_generated_answer': '', |
|
|
'status': 'success', |
|
|
'error_message': '', |
|
|
'processing_time_seconds': 0, |
|
|
'similarity_analysis': {}, |
|
|
'similarity_grade': '', |
|
|
'response_source_analysis': {}, |
|
|
'pipeline_logs': '', |
|
|
'hallucination_score': 0, |
|
|
'response_source': 'unknown', |
|
|
'sources_used': [], |
|
|
'reranking_winner': 'unknown' |
|
|
} |
|
|
|
|
|
|
|
|
source_tracker = ResponseSourceTracker() |
|
|
similarity_analyzer = AnswerSimilarityAnalyzer() |
|
|
|
|
|
try: |
|
|
logger.info(f"Processing Question {question_index + 1}: {question}") |
|
|
|
|
|
|
|
|
pipeline_start = time.time() |
|
|
|
|
|
|
|
|
response_html = process_query(question, first_turn=True) |
|
|
|
|
|
processing_time = time.time() - pipeline_start |
|
|
|
|
|
|
|
|
clean_response = clean_html_response(response_html) |
|
|
|
|
|
|
|
|
results['generated_answer'] = response_html |
|
|
results['clean_generated_answer'] = clean_response |
|
|
results['processing_time_seconds'] = round(processing_time, 3) |
|
|
|
|
|
|
|
|
|
|
|
mock_logs = f""" |
|
|
Original Query: {question} |
|
|
Corrected Query: {question} |
|
|
Relevance Check: RELATED |
|
|
Web Search answer: [Web search result would be here] |
|
|
LLM Generated: [LLM generation would be here] |
|
|
RAG Contexts: [RAG contexts would be here] |
|
|
Reranked doc: {clean_response[:200]}... |
|
|
Wisal Answer: {clean_response} |
|
|
Hallucination Score Raw: Score: 5 |
|
|
""" |
|
|
|
|
|
results['pipeline_logs'] = mock_logs |
|
|
|
|
|
|
|
|
source_analysis = source_tracker.analyze_response_sources(question, mock_logs) |
|
|
results['response_source_analysis'] = source_analysis |
|
|
results['response_source'] = source_analysis['primary_source'] |
|
|
results['sources_used'] = ', '.join(source_analysis['sources_used']) |
|
|
results['reranking_winner'] = source_analysis['reranking_winner'] |
|
|
results['hallucination_score'] = source_analysis.get('hallucination_score', 0) |
|
|
|
|
|
|
|
|
similarity_metrics = similarity_analyzer.calculate_text_similarity( |
|
|
clean_response, ground_truth |
|
|
) |
|
|
results['similarity_analysis'] = similarity_metrics |
|
|
results['similarity_grade'] = similarity_analyzer.generate_similarity_grade(similarity_metrics) |
|
|
|
|
|
|
|
|
results['cosine_similarity'] = similarity_metrics.get('cosine_tfidf', 0) |
|
|
results['jaccard_similarity'] = similarity_metrics.get('jaccard_similarity', 0) |
|
|
results['word_overlap_ratio'] = similarity_metrics.get('word_overlap_ratio', 0) |
|
|
results['semantic_keywords_overlap'] = similarity_metrics.get('semantic_keywords_overlap', 0) |
|
|
|
|
|
results['total_time_seconds'] = round(time.time() - start_time, 3) |
|
|
|
|
|
logger.info(f"✅ Question {question_index + 1} completed - Similarity Grade: {results['similarity_grade']}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error processing question {question_index + 1}: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
results['status'] = 'error' |
|
|
results['error_message'] = str(e) |
|
|
results['total_time_seconds'] = round(time.time() - start_time, 3) |
|
|
results['generated_answer'] = f"[ERROR] {str(e)}" |
|
|
results['clean_generated_answer'] = f"Error: {str(e)}" |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def run_enhanced_csv_evaluation(input_csv_path: str, output_csv_path: str = None, |
|
|
question_column: str = 'Question', |
|
|
answer_column: str = 'Answer') -> str: |
|
|
""" |
|
|
Run enhanced CSV evaluation with similarity analysis and source tracking. |
|
|
""" |
|
|
|
|
|
|
|
|
if not os.path.exists(input_csv_path): |
|
|
raise FileNotFoundError(f"Input CSV file not found: {input_csv_path}") |
|
|
|
|
|
|
|
|
if output_csv_path is None: |
|
|
base_name = os.path.splitext(input_csv_path)[0] |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
output_csv_path = f"{base_name}_enhanced_evaluation_{timestamp}.csv" |
|
|
|
|
|
logger.info(f"Starting Enhanced CSV Evaluation") |
|
|
logger.info(f"Input file: {input_csv_path}") |
|
|
logger.info(f"Output file: {output_csv_path}") |
|
|
logger.info(f"Question column: '{question_column}', Answer column: '{answer_column}'") |
|
|
|
|
|
try: |
|
|
|
|
|
df = pd.read_csv(input_csv_path) |
|
|
|
|
|
|
|
|
df.columns = df.columns.str.strip() |
|
|
|
|
|
logger.info(f"Available columns: {list(df.columns)}") |
|
|
logger.info(f"Loaded {len(df)} questions from CSV") |
|
|
|
|
|
|
|
|
if question_column.strip() not in df.columns: |
|
|
raise ValueError(f"Question column '{question_column}' not found. Available: {list(df.columns)}") |
|
|
if answer_column.strip() not in df.columns: |
|
|
raise ValueError(f"Answer column '{answer_column}' not found. Available: {list(df.columns)}") |
|
|
|
|
|
|
|
|
if len(df) > 5: |
|
|
df = df.head(5) |
|
|
logger.info("Limited to first 5 questions for testing") |
|
|
|
|
|
|
|
|
result_columns = [ |
|
|
'generated_answer', 'clean_generated_answer', 'processing_time_seconds', |
|
|
'status', 'error_message', 'similarity_grade', 'cosine_similarity', |
|
|
'jaccard_similarity', 'word_overlap_ratio', 'semantic_keywords_overlap', |
|
|
'response_source', 'sources_used', 'reranking_winner', 'hallucination_score', |
|
|
'processed_timestamp', 'total_time_seconds' |
|
|
] |
|
|
|
|
|
for col in result_columns: |
|
|
if col not in ['status', 'error_message', 'similarity_grade', 'response_source', |
|
|
'sources_used', 'reranking_winner', 'processed_timestamp']: |
|
|
df[col] = 0.0 |
|
|
else: |
|
|
df[col] = '' |
|
|
|
|
|
|
|
|
successful_questions = 0 |
|
|
similarity_scores = [] |
|
|
processing_times = [] |
|
|
source_distribution = {} |
|
|
|
|
|
for index, row in df.iterrows(): |
|
|
question = str(row[question_column.strip()]).strip() |
|
|
ground_truth = str(row[answer_column.strip()]).strip() |
|
|
|
|
|
if not question or question.lower() == 'nan': |
|
|
logger.warning(f"Skipping empty question at row {index}") |
|
|
df.at[index, 'status'] = 'skipped' |
|
|
df.at[index, 'error_message'] = 'Empty question' |
|
|
continue |
|
|
|
|
|
logger.info(f"\nProcessing Question {index + 1}/{len(df)}") |
|
|
|
|
|
|
|
|
result = process_single_question_with_evaluation(question, ground_truth, index) |
|
|
|
|
|
|
|
|
for key, value in result.items(): |
|
|
if key in df.columns: |
|
|
df.at[index, key] = value |
|
|
|
|
|
df.at[index, 'processed_timestamp'] = datetime.now().isoformat() |
|
|
|
|
|
if result['status'] == 'success': |
|
|
successful_questions += 1 |
|
|
processing_times.append(result['processing_time_seconds']) |
|
|
|
|
|
|
|
|
if 'cosine_similarity' in result: |
|
|
similarity_scores.append(result['cosine_similarity']) |
|
|
|
|
|
|
|
|
source = result['response_source'] |
|
|
source_distribution[source] = source_distribution.get(source, 0) + 1 |
|
|
|
|
|
|
|
|
time.sleep(0.5) |
|
|
|
|
|
|
|
|
avg_similarity = np.mean(similarity_scores) if similarity_scores else 0 |
|
|
avg_processing_time = np.mean(processing_times) if processing_times else 0 |
|
|
|
|
|
|
|
|
df.to_csv(output_csv_path, index=False) |
|
|
|
|
|
|
|
|
print("\n" + "="*100) |
|
|
print("ENHANCED CSV EVALUATION RESULTS") |
|
|
print("="*100) |
|
|
print(f"Input file: {input_csv_path}") |
|
|
print(f"Output file: {output_csv_path}") |
|
|
print(f"Questions processed: {len(df)}") |
|
|
print(f"Successful: {successful_questions}") |
|
|
print(f"Failed: {len(df) - successful_questions}") |
|
|
print(f"Average processing time: {avg_processing_time:.3f} seconds") |
|
|
print(f"Average similarity score: {avg_similarity:.3f}") |
|
|
print("\nResponse Source Distribution:") |
|
|
for source, count in source_distribution.items(): |
|
|
percentage = (count / successful_questions * 100) if successful_questions > 0 else 0 |
|
|
print(f" {source}: {count} ({percentage:.1f}%)") |
|
|
|
|
|
print("\nSimilarity Grade Distribution:") |
|
|
grade_counts = df['similarity_grade'].value_counts() |
|
|
for grade, count in grade_counts.items(): |
|
|
if grade: |
|
|
print(f" {grade}: {count}") |
|
|
|
|
|
print("="*100) |
|
|
|
|
|
logger.info(f"Enhanced evaluation completed. Results saved to: {output_csv_path}") |
|
|
return output_csv_path |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error during enhanced CSV evaluation: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
raise |
|
|
|
|
|
|
|
|
def create_sample_evaluation_csv(file_path: str = "sample_qa_evaluation.csv"): |
|
|
"""Create a sample CSV with questions and ground truth answers for testing.""" |
|
|
sample_data = [ |
|
|
{ |
|
|
"Question": "What is autism?", |
|
|
"Answer": "Autism is a neurodevelopmental disorder characterized by difficulties in social communication and interaction, along with restricted and repetitive patterns of behavior, interests, or activities." |
|
|
}, |
|
|
{ |
|
|
"Question": "What are the early signs of autism in children?", |
|
|
"Answer": "Early signs include delayed speech development, limited eye contact, difficulty with social interactions, repetitive behaviors, and sensitivity to sensory input." |
|
|
}, |
|
|
{ |
|
|
"Question": "How can I help my autistic child with social skills?", |
|
|
"Answer": "Social skills can be developed through structured social stories, role-playing activities, peer interaction opportunities, and working with speech-language pathologists or behavioral therapists." |
|
|
}, |
|
|
{ |
|
|
"Question": "What are sensory processing issues in autism?", |
|
|
"Answer": "Sensory processing issues involve over- or under-responsiveness to sensory stimuli like sounds, textures, lights, or smells, which can cause distress or seeking behaviors." |
|
|
}, |
|
|
{ |
|
|
"Question": "What educational strategies work best for autistic students?", |
|
|
"Answer": "Effective strategies include visual supports, structured routines, individualized education plans (IEPs), sensory breaks, and clear, consistent communication methods." |
|
|
} |
|
|
] |
|
|
|
|
|
df = pd.DataFrame(sample_data) |
|
|
df.to_csv(file_path, index=False) |
|
|
print(f"Sample evaluation CSV created: {file_path}") |
|
|
return file_path |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function for enhanced CSV evaluation.""" |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser( |
|
|
description="Enhanced CSV evaluation with similarity analysis and source tracking" |
|
|
) |
|
|
parser.add_argument("input_csv", nargs='?', help="Path to input CSV file") |
|
|
parser.add_argument("--output", "-o", help="Path to output CSV file") |
|
|
parser.add_argument("--question-col", default="Question", help="Name of question column") |
|
|
parser.add_argument("--answer-col", default="Answer", help="Name of answer column") |
|
|
parser.add_argument("--create-sample", action="store_true", help="Create sample CSV") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
try: |
|
|
if args.create_sample: |
|
|
sample_file = create_sample_evaluation_csv() |
|
|
print(f"Sample CSV created: {sample_file}") |
|
|
if not args.input_csv: |
|
|
args.input_csv = sample_file |
|
|
|
|
|
if not args.input_csv: |
|
|
print("Error: Please provide an input CSV file or use --create-sample") |
|
|
parser.print_help() |
|
|
return |
|
|
|
|
|
|
|
|
output_file = run_enhanced_csv_evaluation( |
|
|
input_csv_path=args.input_csv, |
|
|
output_csv_path=args.output, |
|
|
question_column=args.question_col, |
|
|
answer_column=args.answer_col |
|
|
) |
|
|
|
|
|
print(f"\nEnhanced evaluation completed successfully!") |
|
|
print(f"Detailed results saved to: {output_file}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error: {e}") |
|
|
logger.error(f"Main execution error: {e}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |