Spaces:
Sleeping
Sleeping
| import logging | |
| from typing import Dict, Any, List, Optional | |
| from transformers import pipeline | |
| import numpy as np | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| logger = logging.getLogger(__name__) | |
| class EvidenceAnalyzer: | |
| def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None): | |
| """ | |
| Initialize evidence analyzer with LLM and traditional approaches. | |
| Args: | |
| use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) | |
| model_registry: Optional shared model registry for better performance | |
| """ | |
| self.use_ai = use_ai | |
| self.llm_available = False | |
| self.model_registry = model_registry | |
| if use_ai: | |
| try: | |
| if model_registry and model_registry.is_available: | |
| # Use shared models | |
| self.classifier = model_registry.zero_shot | |
| self.llm_available = True | |
| logger.info("Using shared model pipeline for evidence analysis") | |
| else: | |
| # Initialize own pipeline | |
| self.classifier = pipeline( | |
| "zero-shot-classification", | |
| model="facebook/bart-large-mnli", | |
| device=-1, | |
| batch_size=8 | |
| ) | |
| self.llm_available = True | |
| logger.info("Initialized dedicated model pipeline for evidence analysis") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize LLM pipeline: {str(e)}") | |
| self.llm_available = False | |
| else: | |
| logger.info("Initializing evidence analyzer in traditional mode") | |
| # Traditional markers for fallback | |
| self.citation_markers = [ | |
| "according to", | |
| "said", | |
| "reported", | |
| "stated", | |
| "shows", | |
| "found", | |
| "study", | |
| "research", | |
| "data", | |
| "evidence" | |
| ] | |
| self.vague_markers = [ | |
| "some say", | |
| "many believe", | |
| "people think", | |
| "experts claim", | |
| "sources say", | |
| "it is believed", | |
| "reportedly", | |
| "allegedly" | |
| ] | |
| def _analyze_with_llm(self, text: str) -> Dict[str, Any]: | |
| """Analyze evidence using LLM.""" | |
| try: | |
| logger.info("\n" + "="*50) | |
| logger.info("EVIDENCE ANALYSIS STARTED") | |
| logger.info("="*50) | |
| # Clean the text of formatting markers | |
| logger.info("Cleaning and preparing text...") | |
| cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '') | |
| cleaned_text = '\n'.join(line for line in cleaned_text.split('\n') | |
| if not line.startswith('[') and not line.startswith('More on')) | |
| logger.info(f"Text prepared - Length: {len(cleaned_text)} characters") | |
| # Download NLTK data if needed | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| except LookupError: | |
| logger.info("Downloading required NLTK data...") | |
| nltk.download('punkt') | |
| # Split text into chunks | |
| chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)] | |
| logger.info(f"Split text into {len(chunks)} chunks for processing") | |
| # Categories for evidence classification | |
| evidence_categories = [ | |
| "factual statement with source", | |
| "verifiable claim", | |
| "expert opinion", | |
| "data-backed claim", | |
| "unsubstantiated claim", | |
| "opinion statement" | |
| ] | |
| logger.info("\nUsing evidence categories:") | |
| for cat in evidence_categories: | |
| logger.info(f" - {cat}") | |
| chunk_scores = [] | |
| flagged_phrases = [] | |
| for i, chunk in enumerate(chunks, 1): | |
| logger.info(f"\n{'-'*30}") | |
| logger.info(f"Processing chunk {i}/{len(chunks)}") | |
| logger.info(f"Chunk length: {len(chunk)} characters") | |
| # Analyze each sentence in the chunk | |
| sentences = sent_tokenize(chunk) | |
| logger.info(f"Found {len(sentences)} sentences to analyze") | |
| sentence_count = 0 | |
| strong_evidence_count = 0 | |
| for sentence in sentences: | |
| if len(sentence.strip()) > 10: | |
| sentence_count += 1 | |
| # Classify the type of evidence | |
| result = self.classifier( | |
| sentence.strip(), | |
| evidence_categories, | |
| multi_label=True | |
| ) | |
| # Calculate evidence score for the sentence | |
| evidence_scores = { | |
| label: score | |
| for label, score in zip(result['labels'], result['scores']) | |
| } | |
| # Strong evidence indicators | |
| strong_evidence = sum([ | |
| evidence_scores.get("factual statement with source", 0), | |
| evidence_scores.get("data-backed claim", 0), | |
| evidence_scores.get("expert opinion", 0) | |
| ]) / 3 # Average the strong evidence scores | |
| # Weak or no evidence indicators | |
| weak_evidence = sum([ | |
| evidence_scores.get("unsubstantiated claim", 0), | |
| evidence_scores.get("opinion statement", 0) | |
| ]) / 2 # Average the weak evidence scores | |
| # Store scores for overall calculation | |
| chunk_scores.append({ | |
| 'strong_evidence': strong_evidence, | |
| 'weak_evidence': weak_evidence | |
| }) | |
| # Flag high-quality evidence | |
| if strong_evidence > 0.7 and not any( | |
| marker in sentence.lower() | |
| for marker in ['more on this story', 'click here', 'read more'] | |
| ): | |
| strong_evidence_count += 1 | |
| logger.info(f"Found strong evidence (score: {strong_evidence:.3f}):") | |
| logger.info(f" \"{sentence.strip()}\"") | |
| flagged_phrases.append({ | |
| 'text': sentence.strip(), | |
| 'type': 'strong_evidence', | |
| 'score': strong_evidence | |
| }) | |
| logger.info(f"Processed {sentence_count} sentences in chunk {i}") | |
| logger.info(f"Found {strong_evidence_count} sentences with strong evidence") | |
| # Calculate overall evidence score | |
| logger.info("\nCalculating final evidence scores...") | |
| if chunk_scores: | |
| avg_strong = np.mean([s['strong_evidence'] for s in chunk_scores]) | |
| avg_weak = np.mean([s['weak_evidence'] for s in chunk_scores]) | |
| logger.info("Average evidence scores:") | |
| logger.info(f" - Strong evidence: {avg_strong:.3f}") | |
| logger.info(f" - Weak evidence: {avg_weak:.3f}") | |
| # Evidence score formula: | |
| # - Reward strong evidence (70% weight) | |
| # - Penalize weak/unsubstantiated claims (30% weight) | |
| # - Ensure score is between 0 and 100 | |
| evidence_score = min(100, ( | |
| (avg_strong * 0.7) + | |
| ((1 - avg_weak) * 0.3) | |
| ) * 100) | |
| else: | |
| evidence_score = 0 | |
| logger.warning("No scores available, defaulting to 0") | |
| logger.info(f"Final evidence score: {evidence_score:.1f}") | |
| # Sort and select top evidence phrases | |
| sorted_phrases = sorted( | |
| flagged_phrases, | |
| key=lambda x: x['score'], | |
| reverse=True | |
| ) | |
| # Filter out formatting text and duplicates | |
| unique_phrases = [] | |
| seen = set() | |
| for phrase in sorted_phrases: | |
| clean_text = phrase['text'].strip() | |
| if clean_text not in seen and not any( | |
| marker in clean_text.lower() | |
| for marker in ['more on this story', 'click here', 'read more'] | |
| ): | |
| unique_phrases.append(clean_text) | |
| seen.add(clean_text) | |
| if len(unique_phrases) >= 5: | |
| break | |
| logger.info(f"\nFlagged {len(unique_phrases)} unique evidence-based phrases") | |
| logger.info("\nEvidence analysis completed successfully") | |
| return { | |
| "evidence_based_score": round(evidence_score, 1), | |
| "flagged_phrases": unique_phrases | |
| } | |
| except Exception as e: | |
| logger.error(f"LLM analysis failed: {str(e)}") | |
| return None | |
| def _analyze_traditional(self, text: str) -> Dict[str, Any]: | |
| """Traditional evidence analysis as fallback.""" | |
| try: | |
| text_lower = text.lower() | |
| # Find citations and evidence | |
| evidence_phrases = [] | |
| for marker in self.citation_markers: | |
| index = text_lower.find(marker) | |
| while index != -1: | |
| # Get the sentence containing the marker | |
| start = max(0, text_lower.rfind('.', 0, index) + 1) | |
| end = text_lower.find('.', index) | |
| if end == -1: | |
| end = len(text_lower) | |
| evidence_phrases.append(text[start:end].strip()) | |
| index = text_lower.find(marker, end) | |
| # Count vague references | |
| vague_count = sum(1 for marker in self.vague_markers if marker in text_lower) | |
| # Calculate score | |
| citation_count = len(evidence_phrases) | |
| base_score = min(citation_count * 20, 100) | |
| penalty = vague_count * 10 | |
| evidence_score = max(0, base_score - penalty) | |
| return { | |
| "evidence_based_score": evidence_score, | |
| "flagged_phrases": list(set(evidence_phrases))[:5] # Limit to top 5 unique phrases | |
| } | |
| except Exception as e: | |
| logger.error(f"Traditional analysis failed: {str(e)}") | |
| return { | |
| "evidence_based_score": 0, | |
| "flagged_phrases": [] | |
| } | |
| def analyze(self, text: str) -> Dict[str, Any]: | |
| """Analyze evidence using LLM with fallback to traditional method.""" | |
| try: | |
| # Try LLM analysis if enabled and available | |
| if self.use_ai and self.llm_available: | |
| llm_result = self._analyze_with_llm(text) | |
| if llm_result: | |
| return llm_result | |
| # Use traditional analysis | |
| logger.info("Using traditional evidence analysis") | |
| return self._analyze_traditional(text) | |
| except Exception as e: | |
| logger.error(f"Error in evidence analysis: {str(e)}") | |
| return { | |
| "evidence_based_score": 0, | |
| "flagged_phrases": [] | |
| } |