File size: 6,150 Bytes
700863c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
"""
gdelt_api.py - Module for interacting with the GDELT API
This module provides functions for fetching articles from GDELT
and filtering by trusted domains.
"""
import os
import requests
import datetime
from urllib.parse import urlparse
from dotenv import load_dotenv
# Import the whitelist from the separate module
from whitelisted_domains import WHITELISTED_DOMAINS
def fetch_articles_from_gdelt(query):
"""
Fetch news articles from GDELT API.
Args:
query (str): The news query to search for (can be structured GDELT query)
Returns:
list: List of article dictionaries, or empty list if no results or error
"""
try:
# Construct the API URL with the query
base_url = "https://api.gdeltproject.org/api/v2/doc/doc"
# Parse the query to separate main query from other parameters
params = {
'format': 'json',
'maxrecords': int(os.getenv('MAX_ARTICLES_PER_QUERY', 250))
}
# If the query has multiple parameters (separated by &)
if '&' in query:
parts = query.split('&')
for part in parts:
if '=' in part:
key, value = part.split('=', 1)
params[key] = value
elif part.startswith('query='):
params['query'] = part[6:] # Remove 'query='
else:
# If it's just a query without the query= prefix
if 'query' not in params:
params['query'] = part
else:
# It's just a simple query
if query.startswith('query='):
params['query'] = query[6:] # Remove 'query='
else:
params['query'] = query
# Convert params to query string
query_string = "&".join([f"{k}={requests.utils.quote(str(v))}" for k, v in params.items()])
url = f"{base_url}?{query_string}"
print(f"DEBUG - Requesting URL: {url}")
# Make the request
response = requests.get(url, timeout=30)
# Check if the request was successful
if response.status_code == 200:
data = response.json()
# GDELT API returns articles in the 'articles' field
if 'articles' in data:
return data['articles']
return []
except Exception as e:
print(f"⚠️ Error fetching articles from GDELT: {e}")
return []
def is_whitelisted_domain(url):
"""
Check if the URL belongs to a whitelisted domain.
Args:
url (str): The URL to check
Returns:
bool: True if the domain is whitelisted, False otherwise
"""
try:
domain = urlparse(url).netloc
# Handle subdomains by checking if any whitelisted domain is a suffix
return any(domain.endswith(trusted_domain) for trusted_domain in WHITELISTED_DOMAINS)
except:
return False
def format_timestamp(timestamp_str):
"""
Format a GDELT timestamp string to a more user-friendly format.
Args:
timestamp_str (str): Timestamp string in GDELT format (e.g., "20250829T173000Z")
Returns:
str: Formatted timestamp (e.g., "Aug 29, 2025 17:30")
"""
if not timestamp_str:
return ""
try:
# Handle common GDELT timestamp format
if "T" in timestamp_str and len(timestamp_str) >= 15:
# Parse YYYYMMDDTHHMMSSZ format
year = timestamp_str[0:4]
month = timestamp_str[4:6]
day = timestamp_str[6:8]
hour = timestamp_str[9:11]
minute = timestamp_str[11:13]
# Convert month number to month name
month_name = datetime.datetime.strptime(month, "%m").strftime("%b")
return f"{month_name} {int(day)}, {year} {hour}:{minute}"
else:
# Return original if not in expected format
return timestamp_str
except Exception as e:
print(f"Error formatting timestamp {timestamp_str}: {e}")
return timestamp_str
def filter_by_whitelisted_domains(articles):
"""
Filter articles to only include those from trusted domains.
Args:
articles (list): List of article dictionaries
Returns:
list: Filtered list of article dictionaries
"""
if not articles:
return []
trusted_articles = []
total_articles = len(articles)
for article in articles:
if 'url' in article and is_whitelisted_domain(article['url']):
trusted_articles.append(article)
filtered_count = total_articles - len(trusted_articles)
print(f"Domain filtering: {filtered_count} non-whitelisted articles removed, {len(trusted_articles)} whitelisted articles kept")
return trusted_articles
def normalize_gdelt_articles(articles):
"""
Normalize GDELT article format to match the expected format in the rest of the application.
Args:
articles (list): List of GDELT article dictionaries
Returns:
list: List of normalized article dictionaries
"""
normalized_articles = []
for article in articles:
# Extract domain from URL for source name if not available
source_name = article.get('sourcename', '')
if not source_name and 'url' in article:
domain = urlparse(article['url']).netloc
source_name = domain.replace('www.', '')
# Format the timestamp for better readability
raw_timestamp = article.get('seendate', '')
formatted_timestamp = format_timestamp(raw_timestamp)
normalized_articles.append({
'title': article.get('title', ''),
'description': article.get('seentext', ''),
'url': article.get('url', ''),
'publishedAt': formatted_timestamp,
'source': {'name': source_name}
})
return normalized_articles
|