|
|
|
|
|
""" |
|
|
main.py - Server for the Fake News Detection system |
|
|
|
|
|
This script creates a Flask server that exposes API endpoints to: |
|
|
1. Take user input (news query) from the UI |
|
|
2. Process the request through the fake news detection pipeline |
|
|
3. Return the results to the UI for display |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import time |
|
|
from dotenv import load_dotenv |
|
|
from flask import Flask, request, jsonify |
|
|
from flask_cors import CORS |
|
|
|
|
|
|
|
|
from gdelt_api import ( |
|
|
fetch_articles_from_gdelt, |
|
|
filter_by_whitelisted_domains, |
|
|
normalize_gdelt_articles |
|
|
) |
|
|
from ranker import ArticleRanker |
|
|
from gdelt_query_builder import generate_query, GEMINI_MODEL |
|
|
import bias_analyzer |
|
|
from google_search import google_search |
|
|
|
|
|
|
|
|
print("Preloading embedding model for faster request processing...") |
|
|
|
|
|
global_ranker = ArticleRanker() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_results(query, ranked_articles): |
|
|
""" |
|
|
Format the ranked results in a structured way for the UI. |
|
|
|
|
|
Args: |
|
|
query (str): The original query |
|
|
ranked_articles (list): List of ranked article dictionaries |
|
|
|
|
|
Returns: |
|
|
dict: Dictionary with formatted results |
|
|
""" |
|
|
result = {} |
|
|
|
|
|
if not ranked_articles: |
|
|
result = { |
|
|
"status": "no_results", |
|
|
"message": "⚠️ No news found. Possibly Fake.", |
|
|
"details": "No reliable sources could verify this information.", |
|
|
"articles": [] |
|
|
} |
|
|
else: |
|
|
|
|
|
show_scores = os.getenv('SHOW_SIMILARITY_SCORES', 'true').lower() == 'true' |
|
|
show_date = os.getenv('SHOW_PUBLISH_DATE', 'true').lower() == 'true' |
|
|
show_url = os.getenv('SHOW_URL', 'true').lower() == 'true' |
|
|
|
|
|
formatted_articles = [] |
|
|
for article in ranked_articles: |
|
|
formatted_article = { |
|
|
"rank": article['rank'], |
|
|
"title": article['title'], |
|
|
"source": article['source'] |
|
|
} |
|
|
|
|
|
if show_scores: |
|
|
formatted_article["similarity_score"] = round(article['similarity_score'], 4) |
|
|
|
|
|
if show_url: |
|
|
formatted_article["url"] = article['url'] |
|
|
|
|
|
if show_date: |
|
|
formatted_article["published_at"] = article['published_at'] |
|
|
|
|
|
formatted_articles.append(formatted_article) |
|
|
|
|
|
result = { |
|
|
"status": "success", |
|
|
"message": f"✅ Found {len(ranked_articles)} relevant articles for: '{query}'", |
|
|
"articles": formatted_articles, |
|
|
"footer": "If the news matches these reliable sources, it's likely true. If it contradicts them or no sources are found, it might be fake." |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def remove_duplicates(articles): |
|
|
""" |
|
|
Remove duplicate articles based on URL. |
|
|
|
|
|
Args: |
|
|
articles (list): List of article dictionaries |
|
|
|
|
|
Returns: |
|
|
list: List with duplicate articles removed |
|
|
""" |
|
|
unique_urls = set() |
|
|
unique_articles = [] |
|
|
|
|
|
for article in articles: |
|
|
if article['url'] not in unique_urls: |
|
|
unique_urls.add(article['url']) |
|
|
unique_articles.append(article) |
|
|
|
|
|
return unique_articles |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function to run the fake news detection pipeline as a server.""" |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
app = Flask(__name__, static_folder='static') |
|
|
CORS(app) |
|
|
|
|
|
@app.route('/static/') |
|
|
def index(): |
|
|
"""Serve the main page.""" |
|
|
return app.send_static_file('front.html') |
|
|
|
|
|
|
|
|
@app.route('/api/detect', methods=['POST']) |
|
|
def detect_fake_news(): |
|
|
"""API endpoint to check if news is potentially fake.""" |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
data = request.json |
|
|
query = data.get('query', '') |
|
|
|
|
|
if not query: |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": "Please provide a news statement to verify." |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
query_variations = generate_query(query) |
|
|
|
|
|
|
|
|
if query_variations == ["INAPPROPRIATE_QUERY"]: |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": "I cannot provide information on this topic as it appears to contain sensitive or inappropriate content." |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
all_articles = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for query_var in query_variations: |
|
|
articles = fetch_articles_from_gdelt(query_var) |
|
|
if articles: |
|
|
all_articles.extend(articles) |
|
|
|
|
|
|
|
|
if not all_articles: |
|
|
return jsonify({ |
|
|
"status": "no_results", |
|
|
"message": "No articles found on this topic.", |
|
|
"details": "No reliable sources could be found covering this information.", |
|
|
"articles": [] |
|
|
}) |
|
|
|
|
|
|
|
|
unique_articles = remove_duplicates(all_articles) |
|
|
|
|
|
|
|
|
use_whitelist_only = os.getenv('USE_WHITELIST_ONLY', 'false').lower() == 'true' |
|
|
if use_whitelist_only: |
|
|
print(f"Filtering articles to only include whitelisted domains...") |
|
|
unique_articles = filter_by_whitelisted_domains(unique_articles) |
|
|
print(f"After whitelist filtering: {len(unique_articles)} articles remain") |
|
|
|
|
|
|
|
|
normalized_articles = normalize_gdelt_articles(unique_articles) |
|
|
|
|
|
if not normalized_articles: |
|
|
return jsonify(format_results(query, [])) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model_name = os.getenv('SIMILARITY_MODEL', 'intfloat/multilingual-e5-base') |
|
|
|
|
|
|
|
|
if global_ranker.model_name == model_name: |
|
|
ranker = global_ranker |
|
|
else: |
|
|
ranker = ArticleRanker(model_name) |
|
|
|
|
|
|
|
|
TOP_K_ARTICLES = int(os.getenv('TOP_K_ARTICLES', 250)) |
|
|
min_threshold = float(os.getenv('MIN_SIMILARITY_THRESHOLD', 0.1)) |
|
|
|
|
|
|
|
|
article_texts = [f"{article['title']} {article['description'] or ''}" for article in normalized_articles] |
|
|
|
|
|
|
|
|
query_embedding, article_embeddings = ranker.create_embeddings(query, article_texts) |
|
|
similarities = ranker.calculate_similarities(query_embedding, article_embeddings) |
|
|
|
|
|
|
|
|
top_indices = ranker.get_top_articles(similarities, normalized_articles, TOP_K_ARTICLES, min_threshold) |
|
|
top_articles = ranker.format_results(top_indices, similarities, normalized_articles) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
outlet_names = [article['source'] for article in top_articles] |
|
|
unique_outlets = list(set(outlet_names)) |
|
|
print(f"Analyzing {len(unique_outlets)} unique news outlets for bias...") |
|
|
|
|
|
|
|
|
bias_analysis = bias_analyzer.analyze_bias(query, unique_outlets, GEMINI_MODEL) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("EMBEDDING VECTORS BY BIAS CATEGORY") |
|
|
print("=" * 80) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
category_rankings = bias_analyzer.categorize_and_rank_by_bias( |
|
|
query, normalized_articles, bias_analysis, ranker, min_threshold |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TOP_N_PER_CATEGORY = int(os.getenv('TOP_N_PER_CATEGORY', 5)) |
|
|
|
|
|
|
|
|
category_article_counts = { |
|
|
category: len(articles) |
|
|
for category, articles in category_rankings.items() |
|
|
if category not in ["descriptions", "reasoning"] |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
filtered_category_rankings = {} |
|
|
for category, articles in category_rankings.items(): |
|
|
|
|
|
if category in ["descriptions", "reasoning"]: |
|
|
continue |
|
|
|
|
|
filtered_category_rankings[category] = articles[:TOP_N_PER_CATEGORY] |
|
|
|
|
|
|
|
|
if len(filtered_category_rankings[category]) > 0: |
|
|
print(f"\n===== Top {len(filtered_category_rankings[category])} articles from {category} category =====") |
|
|
|
|
|
|
|
|
for i, article in enumerate(filtered_category_rankings[category], 1): |
|
|
print(f"Article #{i}:") |
|
|
print(f" Title: {article['title']}") |
|
|
print(f" Source: {article['source']}") |
|
|
print(f" Similarity Score: {article['similarity_score']:.4f}") |
|
|
print(f" Rank: {article['rank']}") |
|
|
print(f" URL: {article['url']}") |
|
|
print(f" Published: {article['published_at']}") |
|
|
print("-" * 50) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\nGenerating factual summary using top articles from all categories...") |
|
|
|
|
|
|
|
|
|
|
|
filtered_category_rankings["reasoning"] = bias_analysis.get("reasoning", "No reasoning provided") |
|
|
|
|
|
|
|
|
summary = bias_analyzer.generate_summary( |
|
|
query, |
|
|
normalized_articles, |
|
|
filtered_category_rankings, |
|
|
GEMINI_MODEL |
|
|
) |
|
|
|
|
|
|
|
|
print(summary) |
|
|
|
|
|
|
|
|
|
|
|
result = { |
|
|
"summary": summary |
|
|
} |
|
|
|
|
|
return jsonify(result) |
|
|
|
|
|
@app.route('/api/health', methods=['GET']) |
|
|
def health_check(): |
|
|
"""API endpoint to check if the server is running.""" |
|
|
return jsonify({ |
|
|
"status": "ok", |
|
|
"message": "Fake News Detection API is running" |
|
|
}) |
|
|
|
|
|
|
|
|
port = int(os.getenv('PORT', 5000)) |
|
|
debug = os.getenv('DEBUG', 'false').lower() == 'true' |
|
|
|
|
|
print(f"Starting Fake News Detection API server on port {port}...") |
|
|
|
|
|
app.run(host='0.0.0.0', port=port, debug=debug) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|