Spaces:
Sleeping
Sleeping
| from transformers import pipeline, AutoTokenizer | |
| import unittest | |
| from mediaunmasked.scrapers.article_scraper import ArticleScraper | |
| from tabulate import tabulate | |
| import torch | |
| from typing import List | |
| import logging | |
| import transformers | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MediaUnmaskLLMTester(unittest.TestCase): | |
| transformers.logging.set_verbosity_error() | |
| def setUp(self): | |
| """Set up LLMs and scrape article.""" | |
| self.models = { | |
| # Upgraded Evidence-Based Models | |
| "RoBERTa-MNLI": {"model": "roberta-large-mnli", "max_length": 512}, # Corrected to standard MNLI model | |
| "DeBERTa-Fact": {"model": "MoritzLaurer/DeBERTa-v3-large-mnli-fever-anli-ling-wanli", "max_length": 512}, | |
| "T5-Large": {"model": "google/t5-v1_1-large", "max_length": 512}, | |
| "SciBERT": {"model": "allenai/scibert_scivocab_uncased", "max_length": 512}, | |
| "BART-FEVER": {"model": "facebook/bart-large", "max_length": 1024}, # Note: Needs FEVER fine-tuning | |
| "MultiQA-MiniLM": {"model": "sentence-transformers/multi-qa-MiniLM-L6-cos-v1", "max_length": 512}, | |
| # Existing Models for Benchmarking | |
| "BART-MNLI": {"model": "facebook/bart-large-mnli", "max_length": 1024}, | |
| "RoBERTa-Bias": {"model": "cardiffnlp/twitter-roberta-base-hate", "max_length": 512}, | |
| "DistilBERT-Sentiment": {"model": "distilbert-base-uncased-finetuned-sst-2-english", "max_length": 512}, | |
| "GPT2-Generation": {"model": "gpt2", "max_length": 1024}, | |
| } | |
| self.device = 0 if torch.cuda.is_available() else -1 | |
| self.scraper = ArticleScraper() | |
| self.article_url = "https://www.snopes.com/fact-check/trump-super-bowl-cost-taxpayers/" | |
| self.article_data = self.scraper.scrape_article(self.article_url) or {} | |
| self.results = { | |
| "headline": self.article_data.get("headline", "No headline"), | |
| "content": self.article_data.get("content", "No content available"), | |
| "scores": {} | |
| } | |
| self.tokenizers = {name: AutoTokenizer.from_pretrained(model["model"]) for name, model in self.models.items()} | |
| def _split_content(self, model_name: str, content: str) -> List[str]: | |
| """Split content into sections within model token limits, ensuring valid output.""" | |
| tokenizer = self.tokenizers[model_name] | |
| max_length = self.models[model_name]["max_length"] | |
| if not content or not content.strip(): | |
| return ["No valid content"] | |
| encoded = tokenizer.encode_plus(content, add_special_tokens=True, truncation=True, max_length=max_length) | |
| decoded = tokenizer.decode(encoded["input_ids"], skip_special_tokens=True) | |
| return [decoded] if decoded.strip() else ["No valid content"] | |
| def _get_flagged_phrases(self, model_pipeline, sections, threshold=0.6, top_k=5): | |
| """Extract top-scoring flagged phrases while handling None values safely.""" | |
| if not sections or not isinstance(sections, list): | |
| return [("None", "N/A")] | |
| flagged_phrases = [] | |
| for section in sections: | |
| if not section or not isinstance(section, str) or not section.strip(): # Ensure section is a valid string | |
| continue | |
| sentences = [s.strip() for s in section.split(". ") if s.strip()] | |
| for sentence in sentences: | |
| if not sentence or not isinstance(sentence, str): # Double-check before running the model | |
| continue | |
| try: | |
| preds = model_pipeline(sentence) | |
| if preds and isinstance(preds, list): | |
| top_pred = max(preds, key=lambda x: x["score"]) | |
| if top_pred["score"] >= threshold: | |
| short_phrase = " ".join(sentence.split()[:10]) # Shorten for readability | |
| flagged_phrases.append((short_phrase, top_pred["score"], top_pred["label"])) | |
| except Exception as e: | |
| logger.error(f"Error analyzing sentence: {e}") | |
| continue | |
| flagged_phrases.sort(key=lambda x: x[1], reverse=True) | |
| return [(phrase, label) for phrase, _, label in flagged_phrases[:top_k]] or [("None", "N/A")] | |
| def test_headline_vs_content(self): | |
| """Check headline-content alignment.""" | |
| headline = self.results["headline"] | |
| content = self.results["content"] | |
| for model_name in self.models: | |
| with self.subTest(model=model_name): | |
| analyzer = pipeline("text-classification", model=self.models[model_name]["model"], device=self.device) | |
| sections = self._split_content(model_name, content) | |
| headline_score = max(analyzer(headline), key=lambda x: x["score"])["score"] | |
| content_scores = [max(analyzer(section), key=lambda x: x["score"])["score"] for section in sections] | |
| avg_content_score = sum(content_scores) / len(content_scores) | |
| consistency_score = abs(headline_score - avg_content_score) | |
| flagged_phrases = self._get_flagged_phrases(analyzer, sections) | |
| self.results["scores"].setdefault("headline_vs_content", {})[model_name] = { | |
| "score": consistency_score, | |
| "flagged_phrases": flagged_phrases | |
| } | |
| self.assertIsNotNone(consistency_score) | |
| def test_evidence_based(self): | |
| """Test evidence-based content.""" | |
| content = self.results["content"] | |
| for model_name in self.models: | |
| if any(keyword in model_name.lower() for keyword in ["mnli", "fact", "fever", "qa"]): | |
| with self.subTest(model=model_name): | |
| classifier = pipeline("zero-shot-classification", model=self.models[model_name]["model"], device=self.device) | |
| sections = self._split_content(model_name, content) | |
| results = [classifier(section, candidate_labels=["evidence-based", "opinion", "misleading"]) for section in sections] | |
| avg_score = sum(r["scores"][r["labels"].index("evidence-based")] for r in results) / len(results) | |
| flagged_phrases = self._get_flagged_phrases(classifier, sections) | |
| self.results["scores"].setdefault("evidence_based", {})[model_name] = { | |
| "score": avg_score, | |
| "flagged_phrases": flagged_phrases | |
| } | |
| self.assertIsNotNone(avg_score) | |
| def test_manipulative_language(self): | |
| """Detect manipulative language.""" | |
| content = self.results["content"] | |
| for model_name in self.models: | |
| if "sentiment" in model_name.lower() or "emotion" in model_name.lower() or "gpt" in model_name.lower(): | |
| with self.subTest(model=model_name): | |
| detector = pipeline("text-classification", model=self.models[model_name]["model"], device=self.device) | |
| sections = self._split_content(model_name, content) | |
| results = [max(detector(section), key=lambda x: x["score"]) for section in sections] | |
| avg_score = sum(r["score"] for r in results) / len(results) | |
| flagged_phrases = self._get_flagged_phrases(detector, sections) | |
| self.results["scores"].setdefault("manipulative_language", {})[model_name] = { | |
| "score": avg_score, | |
| "flagged_phrases": flagged_phrases | |
| } | |
| self.assertIsNotNone(avg_score) | |
| def test_bias_detection(self): | |
| """Detect bias.""" | |
| content = self.results["content"] | |
| for model_name in self.models: | |
| if "bias" in model_name.lower() or "toxic" in model_name.lower() or "roberta" in model_name.lower(): | |
| with self.subTest(model=model_name): | |
| detector = pipeline("text-classification", model=self.models[model_name]["model"], device=self.device) | |
| sections = self._split_content(model_name, content) | |
| results = [max(detector(section), key=lambda x: x["score"]) for section in sections] | |
| avg_score = sum(r["score"] for r in results) / len(results) | |
| flagged_phrases = self._get_flagged_phrases(detector, sections) | |
| self.results["scores"].setdefault("bias_detection", {})[model_name] = { | |
| "score": avg_score, | |
| "flagged_phrases": flagged_phrases | |
| } | |
| self.assertIsNotNone(avg_score) | |
| def tearDown(self): | |
| """Print top 2 models per test with clearer formatting.""" | |
| print("\n=== Top Model Recommendations ===") | |
| for test_type, model_results in self.results["scores"].items(): | |
| print(f"\nTop 2 Models for {test_type}:") | |
| sorted_results = sorted( | |
| model_results.items(), | |
| key=lambda x: x[1]["score"], | |
| reverse=(test_type != "headline_vs_content") | |
| ) | |
| top_2 = sorted_results[:2] | |
| table = [ | |
| [ | |
| model, | |
| f"{res['score']:.6f}", | |
| ", ".join(f"{phrase} ({label})" for phrase, label in res["flagged_phrases"]) | |
| ] | |
| for model, res in top_2 | |
| ] | |
| print(tabulate(table, headers=["Model", "Score", "Flagged Phrases"], tablefmt="grid")) | |
| criteria = "Lowest consistency score (better alignment)" if test_type == "headline_vs_content" else "Highest detection score" | |
| print(f"Criteria: {criteria}") | |
| if __name__ == "__main__": | |
| unittest.main() |