|
|
""" |
|
|
gdelt_api.py - Module for interacting with the GDELT API |
|
|
|
|
|
This module provides functions for fetching articles from GDELT |
|
|
and filtering by trusted domains. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import requests |
|
|
import datetime |
|
|
from urllib.parse import urlparse |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
from whitelisted_domains import WHITELISTED_DOMAINS |
|
|
|
|
|
|
|
|
def fetch_articles_from_gdelt(query): |
|
|
""" |
|
|
Fetch news articles from GDELT API. |
|
|
|
|
|
Args: |
|
|
query (str): The news query to search for (can be structured GDELT query) |
|
|
|
|
|
Returns: |
|
|
list: List of article dictionaries, or empty list if no results or error |
|
|
""" |
|
|
try: |
|
|
|
|
|
base_url = "https://api.gdeltproject.org/api/v2/doc/doc" |
|
|
|
|
|
|
|
|
params = { |
|
|
'format': 'json', |
|
|
'maxrecords': int(os.getenv('MAX_ARTICLES_PER_QUERY', 250)) |
|
|
} |
|
|
|
|
|
|
|
|
if '&' in query: |
|
|
parts = query.split('&') |
|
|
for part in parts: |
|
|
if '=' in part: |
|
|
key, value = part.split('=', 1) |
|
|
params[key] = value |
|
|
elif part.startswith('query='): |
|
|
params['query'] = part[6:] |
|
|
else: |
|
|
|
|
|
if 'query' not in params: |
|
|
params['query'] = part |
|
|
else: |
|
|
|
|
|
if query.startswith('query='): |
|
|
params['query'] = query[6:] |
|
|
else: |
|
|
params['query'] = query |
|
|
|
|
|
|
|
|
query_string = "&".join([f"{k}={requests.utils.quote(str(v))}" for k, v in params.items()]) |
|
|
url = f"{base_url}?{query_string}" |
|
|
|
|
|
print(f"DEBUG - Requesting URL: {url}") |
|
|
|
|
|
|
|
|
response = requests.get(url, timeout=30) |
|
|
|
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
|
|
|
|
|
|
if 'articles' in data: |
|
|
return data['articles'] |
|
|
|
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Error fetching articles from GDELT: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def is_whitelisted_domain(url): |
|
|
""" |
|
|
Check if the URL belongs to a whitelisted domain. |
|
|
|
|
|
Args: |
|
|
url (str): The URL to check |
|
|
|
|
|
Returns: |
|
|
bool: True if the domain is whitelisted, False otherwise |
|
|
""" |
|
|
try: |
|
|
domain = urlparse(url).netloc |
|
|
|
|
|
|
|
|
return any(domain.endswith(trusted_domain) for trusted_domain in WHITELISTED_DOMAINS) |
|
|
except: |
|
|
return False |
|
|
|
|
|
|
|
|
def format_timestamp(timestamp_str): |
|
|
""" |
|
|
Format a GDELT timestamp string to a more user-friendly format. |
|
|
|
|
|
Args: |
|
|
timestamp_str (str): Timestamp string in GDELT format (e.g., "20250829T173000Z") |
|
|
|
|
|
Returns: |
|
|
str: Formatted timestamp (e.g., "Aug 29, 2025 17:30") |
|
|
""" |
|
|
if not timestamp_str: |
|
|
return "" |
|
|
|
|
|
try: |
|
|
|
|
|
if "T" in timestamp_str and len(timestamp_str) >= 15: |
|
|
|
|
|
year = timestamp_str[0:4] |
|
|
month = timestamp_str[4:6] |
|
|
day = timestamp_str[6:8] |
|
|
hour = timestamp_str[9:11] |
|
|
minute = timestamp_str[11:13] |
|
|
|
|
|
|
|
|
month_name = datetime.datetime.strptime(month, "%m").strftime("%b") |
|
|
|
|
|
return f"{month_name} {int(day)}, {year} {hour}:{minute}" |
|
|
else: |
|
|
|
|
|
return timestamp_str |
|
|
except Exception as e: |
|
|
print(f"Error formatting timestamp {timestamp_str}: {e}") |
|
|
return timestamp_str |
|
|
|
|
|
|
|
|
def filter_by_whitelisted_domains(articles): |
|
|
""" |
|
|
Filter articles to only include those from trusted domains. |
|
|
|
|
|
Args: |
|
|
articles (list): List of article dictionaries |
|
|
|
|
|
Returns: |
|
|
list: Filtered list of article dictionaries |
|
|
""" |
|
|
if not articles: |
|
|
return [] |
|
|
|
|
|
trusted_articles = [] |
|
|
total_articles = len(articles) |
|
|
|
|
|
for article in articles: |
|
|
if 'url' in article and is_whitelisted_domain(article['url']): |
|
|
trusted_articles.append(article) |
|
|
|
|
|
filtered_count = total_articles - len(trusted_articles) |
|
|
print(f"Domain filtering: {filtered_count} non-whitelisted articles removed, {len(trusted_articles)} whitelisted articles kept") |
|
|
|
|
|
return trusted_articles |
|
|
|
|
|
|
|
|
def normalize_gdelt_articles(articles): |
|
|
""" |
|
|
Normalize GDELT article format to match the expected format in the rest of the application. |
|
|
|
|
|
Args: |
|
|
articles (list): List of GDELT article dictionaries |
|
|
|
|
|
Returns: |
|
|
list: List of normalized article dictionaries |
|
|
""" |
|
|
normalized_articles = [] |
|
|
|
|
|
for article in articles: |
|
|
|
|
|
source_name = article.get('sourcename', '') |
|
|
if not source_name and 'url' in article: |
|
|
domain = urlparse(article['url']).netloc |
|
|
source_name = domain.replace('www.', '') |
|
|
|
|
|
|
|
|
raw_timestamp = article.get('seendate', '') |
|
|
formatted_timestamp = format_timestamp(raw_timestamp) |
|
|
|
|
|
normalized_articles.append({ |
|
|
'title': article.get('title', ''), |
|
|
'description': article.get('seentext', ''), |
|
|
'url': article.get('url', ''), |
|
|
'publishedAt': formatted_timestamp, |
|
|
'source': {'name': source_name} |
|
|
}) |
|
|
|
|
|
return normalized_articles |
|
|
|