Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| # ----------------- Imports (Stdlib + Typing) ----------------- | |
| from fastapi import FastAPI, Query, HTTPException, Body | |
| from typing import Optional, List, Dict, Any, Tuple, Set | |
| import os | |
| import time | |
| import socket | |
| import logging | |
| import hashlib | |
| from functools import lru_cache | |
| from collections import Counter | |
| import requests | |
| import tldextract | |
| import math | |
| import nltk | |
| from nltk.sentiment import SentimentIntensityAnalyzer | |
| from geopy.geocoders import Nominatim | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from countryinfo import CountryInfo | |
| from sentence_transformers import SentenceTransformer, util | |
| from domain_country_map import domain_country_map | |
| from time import monotonic | |
| from langdetect import detect, DetectorFactory | |
| import re | |
| from urllib.parse import urlparse, urlunparse, parse_qsl | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from html import unescape | |
| import threading | |
| import difflib | |
| from starlette.middleware.gzip import GZipMiddleware | |
| from transformers import pipeline as hf_pipeline | |
| os.environ.setdefault("OMP_NUM_THREADS", "1") | |
| from fastapi.responses import PlainTextResponse, JSONResponse | |
| from datetime import datetime, timezone | |
| # ----------------- Torch Runtime Settings ----------------- | |
| import torch | |
| torch.set_num_threads(2) | |
| # ----------------- Optional Local Tokenizers ----------------- | |
| try: | |
| import sentencepiece as _spm | |
| _HAS_SENTENCEPIECE = True | |
| except Exception: | |
| _HAS_SENTENCEPIECE = False | |
| # ----------------- Runtime Modes / Speed Enum ----------------- | |
| from enum import Enum | |
| class Speed(str, Enum): | |
| fast = "fast" | |
| balanced = "balanced" | |
| max = "max" | |
| # ----------------- Global Model Handles / Pipelines ----------------- | |
| _local_pipes = {} | |
| _news_clf = None | |
| _sbert = None | |
| # ----------------- tldextract (PSL-cached) ----------------- | |
| _TLD_CACHE = os.getenv("TLDEXTRACT_CACHE", "/data/tld_cache") | |
| try: | |
| _tld = tldextract.TLDExtract(cache_dir=_TLD_CACHE, suffix_list_urls=None) | |
| except Exception: | |
| _tld = tldextract.extract | |
| # ----------------- Translation Runtime Flags ----------------- | |
| ALLOW_HF_REMOTE = os.getenv("ALLOW_HF_REMOTE", "0") == "1" | |
| _hf_bad_models: Set[str] = set() | |
| # ----------------- FastAPI App + Middleware ----------------- | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.add_middleware(GZipMiddleware, minimum_size=500) | |
| def root(): | |
| return JSONResponse({"ok": True, "service": "newsglobe-backend"}) | |
| def healthz(): | |
| return PlainTextResponse("OK", status_code=200) | |
| def favicon(): | |
| return PlainTextResponse("", status_code=204) | |
| # ----------------- HTTP Session (connection pooling) ----------------- | |
| SESSION = requests.Session() | |
| ADAPTER = requests.adapters.HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=2) | |
| SESSION.mount("http://", ADAPTER) | |
| SESSION.mount("https://", ADAPTER) | |
| def _session_get(url, **kwargs): | |
| headers = kwargs.pop("headers", {}) | |
| headers.setdefault("User-Agent", "Mozilla/5.0 (compatible; NewsGlobe/1.0)") | |
| return SESSION.get(url, headers=headers, timeout=kwargs.pop("timeout", 12), **kwargs) | |
| # ----------------- Lightweight Reader Fallback (Jina) ----------------- | |
| def _try_jina_reader(url: str, timeout: int) -> Optional[str]: | |
| try: | |
| u = url.strip() | |
| if not u.startswith(("http://", "https://")): | |
| u = "https://" + u | |
| r = _session_get(f"https://r.jina.ai/{u}", timeout=timeout) | |
| if r.status_code == 200: | |
| txt = _clean_text(r.text) | |
| sents = _split_sentences(txt) | |
| if sents: | |
| best = " ".join(sents[:2]) | |
| return best if len(best) >= 80 else (sents[0] if sents else None) | |
| except Exception: | |
| pass | |
| return None | |
| # ----------------- Description Cleanup Helpers ----------------- | |
| BOILER_DESC = re.compile( | |
| r"(subscribe|sign in|sign up|enable javascript|cookies? (policy|settings)|" | |
| r"privacy (policy|notice)|continue reading|read more|click here|" | |
| r"accept (cookies|the terms)|by continuing|newsletter|advertisement|adblock)", | |
| re.I | |
| ) | |
| def _split_sentences(text: str) -> List[str]: | |
| parts = re.split(r"(?<=[\.\?\!])\s+(?=[A-Z0-9])", (text or "").strip()) | |
| out = [] | |
| for p in parts: | |
| out.extend(re.split(r"\s+[•–—]\s+", p)) | |
| return [p.strip() for p in out if p and len(p.strip()) >= 2] | |
| def _too_similar(a: str, b: str, thresh: float = 0.92) -> bool: | |
| a = (a or "").strip() | |
| b = (b or "").strip() | |
| if not a or not b: | |
| return False | |
| if a.lower() == b.lower(): | |
| return True | |
| if a.lower() in b.lower() or b.lower() in a.lower(): | |
| return True | |
| ratio = difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio() | |
| return ratio >= thresh | |
| def _dedupe_title_from_desc(title: str, desc: str) -> str: | |
| t = (title or "").strip() | |
| d = (desc or "").strip() | |
| if not t or not d: | |
| return d | |
| if d.lower().startswith(t.lower()): | |
| d = d[len(t):].lstrip(" -–:•|") | |
| d = d.replace(t, "").strip(" -–:•|") | |
| d = _clean_text(d) | |
| return d | |
| def _clean_text(s: str) -> str: | |
| s = unescape(s or "") | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def _tidy_description(title: str, desc: str, source_name: str, max_chars: int = 240) -> str: | |
| if not desc: | |
| return "" | |
| desc = _dedupe_title_from_desc(title, desc) | |
| desc = BOILER_DESC.sub("", desc) | |
| desc = re.sub(r"\s+", " ", desc).strip(" -–:•|") | |
| sents = _split_sentences(desc) | |
| if not sents: | |
| sents = [desc] | |
| best = " ".join(sents[:2]).strip() | |
| if len(best) > max_chars: | |
| if len(sents[0]) <= max_chars * 0.9: | |
| best = sents[0] | |
| else: | |
| best = best[:max_chars].rsplit(" ", 1)[0].rstrip(",;:-–—") | |
| if _too_similar(title, best): | |
| for alt in sents[1:3]: | |
| if not _too_similar(title, alt): | |
| best = alt | |
| break | |
| if best and best[-1] not in ".!?": | |
| best += "." | |
| return best | |
| # ----------------- Inflight Request Coalescing ----------------- | |
| _inflight_locks: Dict[Tuple, threading.Lock] = {} | |
| _inflight_global_lock = threading.Lock() | |
| def _get_inflight_lock(key: Tuple) -> threading.Lock: | |
| with _inflight_global_lock: | |
| lk = _inflight_locks.get(key) | |
| if lk is None: | |
| lk = threading.Lock() | |
| _inflight_locks[key] = lk | |
| return lk | |
| # ----------------- Description Fetching (Cache + Extract) ----------------- | |
| DESC_CACHE_LOCK = threading.Lock() | |
| try: | |
| from bs4 import BeautifulSoup | |
| except Exception: | |
| BeautifulSoup = None | |
| DESC_FETCH_TIMEOUT = 3 | |
| DESC_MIN_LEN = 100 | |
| DESC_CACHE_TTL = 24 * 3600 | |
| MAX_DESC_FETCHES = 24 | |
| DESC_WORKERS = 12 | |
| DESC_CACHE: Dict[str, Dict[str, Any]] = {} | |
| def _now_mono(): | |
| # Monotonic for TTL calculations | |
| try: | |
| return monotonic() | |
| except Exception: | |
| return time.time() | |
| def _extract_desc_from_ld_json(html: str) -> Optional[str]: | |
| # Prefer LD-JSON when present (often cleaner summaries) | |
| if not html or not BeautifulSoup: | |
| return None | |
| try: | |
| soup = BeautifulSoup(html, "html.parser") | |
| for tag in soup.find_all("script", {"type": "application/ld+json"}): | |
| try: | |
| import json | |
| data = json.loads(tag.string or "") | |
| except Exception: | |
| continue | |
| def find_desc(obj): | |
| if not isinstance(obj, (dict, list)): | |
| return None | |
| if isinstance(obj, list): | |
| for it in obj: | |
| v = find_desc(it) | |
| if v: | |
| return v | |
| return None | |
| for key in ("description", "abstract", "articleBody"): | |
| val = obj.get(key) | |
| if isinstance(val, str): | |
| txt = _clean_text(val) | |
| if len(txt) >= 40: | |
| return txt | |
| for k, v in obj.items(): | |
| if isinstance(v, (dict, list)): | |
| got = find_desc(v) | |
| if got: | |
| return got | |
| return None | |
| d = find_desc(data) | |
| if d and len(d) >= 40: | |
| return d | |
| except Exception: | |
| pass | |
| return None | |
| # Heuristic to detect consent walls and jump to reader fallback | |
| CONSENT_HINTS = re.compile(r"(consent|gdpr|privacy choices|before you continue|we value your privacy)", re.I) | |
| def _looks_like_consent_wall(html: str) -> bool: | |
| if not html: | |
| return False | |
| if "consent.yahoo.com" in html.lower(): | |
| return True | |
| return bool(CONSENT_HINTS.search(html)) | |
| def _extract_desc_from_html(html: str) -> Optional[str]: | |
| html = html or "" | |
| if BeautifulSoup: | |
| soup = BeautifulSoup(html, "html.parser") | |
| ld = _extract_desc_from_ld_json(html) | |
| if ld: | |
| txt = _clean_text(ld) | |
| if 40 <= len(txt) <= 480: | |
| return txt | |
| for sel, attr in [ | |
| ('meta[property="og:description"]', "content"), | |
| ('meta[name="twitter:description"]', "content"), | |
| ('meta[name="description"]', "content"), | |
| ]: | |
| tag = soup.select_one(sel) | |
| if tag: | |
| txt = _clean_text(tag.get(attr, "")) | |
| if len(txt) >= 40: | |
| return txt | |
| for p in soup.find_all("p"): | |
| txt = _clean_text(p.get_text(" ")) | |
| if len(txt) >= 80: | |
| return txt | |
| else: | |
| for pat in [ | |
| r'<meta[^>]+property=["\']og:description["\'][^>]+content=["\']([^"\']+)["\']', | |
| r'<meta[^>]+name=["\']twitter:description["\'][^>]+content=["\']([^"\']+)["\']', | |
| r'<meta[^>]+name=["\']description["\'][^>]+content=["\']([^"\']+)["\']', | |
| ]: | |
| m = re.search(pat, html, flags=re.I | re.S) | |
| if m: | |
| txt = _clean_text(m.group(1)) | |
| if len(txt) >= 40: | |
| return txt | |
| m = re.search(r"<p[^>]*>(.*?)</p>", html, flags=re.I | re.S) | |
| if m: | |
| txt = _clean_text(re.sub("<[^>]+>", " ", m.group(1))) | |
| if len(txt) >= 80: | |
| return txt | |
| return None | |
| def _desc_cache_get(url: str) -> Optional[str]: | |
| if not url: | |
| return None | |
| with DESC_CACHE_LOCK: | |
| entry = DESC_CACHE.get(url) | |
| if not entry: | |
| return None | |
| if _now_mono() - entry["t"] > DESC_CACHE_TTL: | |
| DESC_CACHE.pop(url, None) | |
| return None | |
| return entry["text"] | |
| def _desc_cache_put(url: str, text: str): | |
| if url and text: | |
| with DESC_CACHE_LOCK: | |
| DESC_CACHE[url] = {"text": text, "t": _now_mono()} | |
| def _attempt_fetch(url: str, timeout: int) -> Optional[str]: | |
| # Fetch page and extract description; fallback to reader if needed | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| } | |
| try: | |
| r = _session_get(url, headers=headers, timeout=timeout, allow_redirects=True) | |
| if r.status_code != 200: | |
| return None | |
| ct = (r.headers.get("Content-Type") or "").lower() | |
| txt = r.text or "" | |
| if "html" not in ct and "<html" not in txt.lower(): | |
| return None | |
| if _looks_like_consent_wall(txt): | |
| jd = _try_jina_reader(url, timeout) | |
| if jd: | |
| return jd | |
| return None | |
| desc = _extract_desc_from_html(txt) | |
| if desc and 40 <= len(desc) <= 480: | |
| return desc | |
| except Exception: | |
| pass | |
| jd = _try_jina_reader(url, timeout) | |
| if jd and 40 <= len(jd) <= 480: | |
| return jd | |
| return None | |
| def fetch_page_description(url: str) -> Optional[str]: | |
| # Public entry: consult cache -> fetch -> AMP variants -> cache | |
| if not url: | |
| return None | |
| cached = _desc_cache_get(url) | |
| if cached: | |
| return cached | |
| desc = _attempt_fetch(url, DESC_FETCH_TIMEOUT) | |
| if not desc: | |
| amp_candidates = [] | |
| try: | |
| p = urlparse(url) | |
| if not p.path.endswith("/amp"): | |
| amp_candidates.append(urlunparse(p._replace(path=(p.path.rstrip("/") + "/amp")))) | |
| q = p.query | |
| amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "amp=1")))) | |
| amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "outputType=amp")))) | |
| except Exception: | |
| pass | |
| for amp_url in amp_candidates: | |
| desc = _attempt_fetch(amp_url, DESC_FETCH_TIMEOUT) | |
| if desc: | |
| break | |
| if desc: | |
| _desc_cache_put(url, desc) | |
| return desc | |
| return None | |
| def _needs_desc_upgrade(a: Dict[str, Any]) -> bool: | |
| # Decide if we should try to refetch a better description | |
| url = a.get("url") or "" | |
| if not url: | |
| return False | |
| title = (a.get("title") or "").strip() | |
| desc = (a.get("description") or "").strip() | |
| if not desc or desc.lower().startswith("no description"): | |
| return True | |
| if len(desc) < DESC_MIN_LEN: | |
| return True | |
| if _too_similar(title, desc): | |
| return True | |
| return False | |
| def prefetch_descriptions(raw_articles: List[Dict[str, Any]], speed: Speed = Speed.balanced): | |
| # Parallel prefetch for weak descriptions (bounded to avoid stampedes) | |
| candidates, seen = [], set() | |
| max_fetches = 6 if speed == Speed.fast else 8 if speed == Speed.balanced else 16 | |
| timeout = 1 if speed == Speed.fast else 2 | |
| workers = 3 if speed == Speed.fast else 4 if speed == Speed.balanced else 8 | |
| for a in raw_articles: | |
| url = a.get("url") | |
| if not url or url in seen: | |
| continue | |
| seen.add(url) | |
| if _needs_desc_upgrade(a) and not _desc_cache_get(url): | |
| candidates.append(url) | |
| if len(candidates) >= max_fetches: | |
| break | |
| if not candidates: | |
| return | |
| with ThreadPoolExecutor(max_workers=workers) as ex: | |
| futs = [ex.submit(fetch_page_description, u) for u in candidates] | |
| for _ in as_completed(futs): | |
| pass | |
| def prefetch_descriptions_async(raw_articles, speed: Speed = Speed.balanced): | |
| threading.Thread(target=prefetch_descriptions, args=(raw_articles, speed), daemon=True).start() | |
| # ----------------- Category / Keyword Heuristics ----------------- | |
| DetectorFactory.seed = 0 | |
| SECTION_HINTS = { | |
| "sports": "sports", | |
| "sport": "sports", | |
| "business": "business", | |
| "money": "business", | |
| "market": "business", | |
| "tech": "technology", | |
| "technology": "technology", | |
| "sci": "science", | |
| "science": "science", | |
| "health": "health", | |
| "wellness": "health", | |
| "entertainment": "entertainment", | |
| "culture": "entertainment", | |
| "showbiz": "entertainment", | |
| "crime": "crime", | |
| "world": "general", | |
| "weather": "weather", | |
| "environment": "environment", | |
| "climate": "environment", | |
| "travel": "travel", | |
| "politics": "politics", | |
| "election": "politics", | |
| } | |
| KEYWORDS = { | |
| "sports": r"\b(NBA|NFL|MLB|NHL|Olympic|goal|match|tournament|coach|transfer)\b", | |
| "business": r"\b(stocks?|earnings|IPO|merger|acquisition|revenue|inflation|market|tax|budget|inflation|revenue|deficit)\b", | |
| "technology": r"\b(AI|software|chip|semiconductor|app|startup|cyber|hack|quantum|robot)\b", | |
| "science": r"\b(researchers?|study|physics|astronomy|genome|spacecraft|telescope)\b", | |
| "health": r"\b(virus|vaccine|disease|hospital|doctor|public health|covid|recall|FDA|contamination|disease outbreak)\b", | |
| "entertainment": r"\b(movie|film|box office|celebrity|series|show|album|music|)\b", | |
| "crime": r"\b(arrested|charged|police|homicide|fraud|theft|court|lawsuit)\b", | |
| "weather": r"\b(hurricane|storm|flood|heatwave|blizzard|tornado|forecast)\b", | |
| "environment": r"\b(climate|emissions|wildfire|deforestation|biodiversity)\b", | |
| "travel": r"\b(flight|airline|airport|tourism|visa|cruise|hotel)\b", | |
| "politics": r"\b(president|parliament|congress|minister|policy|campaign|election|rally|protest|demonstration)\b", | |
| } | |
| # ----------------- Category normalization to frontend set ----------------- | |
| FRONTEND_CATS = { | |
| "politics","technology","sports","business","entertainment", | |
| "science","health","crime","weather","environment","travel", | |
| "viral","general" | |
| } | |
| ML_TO_FRONTEND = { | |
| "arts_&_culture": "entertainment", | |
| "business": "business", | |
| "business_&_entrepreneurs": "business", | |
| "celebrity_&_pop_culture": "entertainment", | |
| "crime": "crime", | |
| "diaries_&_daily_life": "viral", | |
| "entertainment": "entertainment", | |
| "environment": "environment", | |
| "fashion_&_style": "entertainment", | |
| "film_tv_&_video": "entertainment", | |
| "fitness_&_health": "health", | |
| "food_&_dining": "entertainment", | |
| "general": "general", | |
| "learning_&_educational": "science", | |
| "news_&_social_concern": "politics", | |
| "politics": "politics", | |
| "science_&_technology": "science", | |
| "sports": "sports", | |
| "technology": "technology", | |
| "travel_&_adventure": "travel", | |
| "other_hobbies": "viral" | |
| } | |
| def normalize_category(c: Optional[str]) -> str: | |
| s = (c or "").strip().lower() | |
| if not s: | |
| return "general" | |
| if s in FRONTEND_CATS: | |
| return s | |
| return ML_TO_FRONTEND.get(s, "general") | |
| def get_news_clf(): | |
| # Lazy-init topic classifier | |
| global _news_clf | |
| if _news_clf is None: | |
| _news_clf = hf_pipeline( | |
| "text-classification", | |
| model="cardiffnlp/tweet-topic-21-multi", | |
| top_k=1, | |
| ) | |
| return _news_clf | |
| def _infer_category_from_url_path(url_path: str) -> Optional[str]: | |
| # Order: provided -> URL path -> keyword -> ML fallback | |
| parts = [p for p in url_path.lower().split("/") if p] | |
| for p in parts: | |
| if p in SECTION_HINTS: | |
| return SECTION_HINTS[p] | |
| for p in parts: | |
| for tok in re.split(r"[-_]", p): | |
| if tok in SECTION_HINTS: | |
| return SECTION_HINTS[tok] | |
| return None | |
| def _infer_category_from_text(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| for cat, pat in KEYWORDS.items(): | |
| if re.search(pat, text, flags=re.I): | |
| return cat | |
| return None | |
| def infer_category(article_url, title, description, provided): | |
| if provided: | |
| got = normalize_category(provided) | |
| if got: | |
| return got | |
| try: | |
| p = urlparse(article_url).path or "" | |
| cat = _infer_category_from_url_path(p) | |
| if cat: | |
| return normalize_category(cat) | |
| except Exception: | |
| pass | |
| text = f"{title or ''} {description or ''}".strip() | |
| cat = _infer_category_from_text(text) | |
| if cat: | |
| return normalize_category(cat) | |
| try: | |
| preds = get_news_clf()(text[:512]) | |
| label = preds[0][0]["label"] if isinstance(preds[0], list) else preds[0]["label"] | |
| return normalize_category(label) | |
| except Exception: | |
| return "general" | |
| # ----------------- Language Detection / Embeddings ----------------- | |
| def detect_lang(text: str) -> Optional[str]: | |
| try: | |
| return detect(text) | |
| except Exception: | |
| return None | |
| def get_sbert(): | |
| global _sbert | |
| if _sbert is None: | |
| _sbert = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| return _sbert | |
| def _embed_texts(texts: List[str]): | |
| embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False) | |
| return embs | |
| # ----------------- NLTK / VADER Sentiment ----------------- | |
| NLTK_DATA_DIR = os.environ.get("NLTK_DATA", "/app/nltk_data") | |
| if NLTK_DATA_DIR not in nltk.data.path: | |
| nltk.data.path.insert(0, NLTK_DATA_DIR) | |
| try: | |
| nltk.data.find("sentiment/vader_lexicon") | |
| except LookupError: | |
| try: | |
| os.makedirs(NLTK_DATA_DIR, exist_ok=True) | |
| nltk.download("vader_lexicon", download_dir=NLTK_DATA_DIR, quiet=True) | |
| except Exception: | |
| pass | |
| try: | |
| _vader = SentimentIntensityAnalyzer() | |
| except Exception: | |
| _vader = None | |
| def classify_sentiment(text: str) -> str: | |
| if not text: | |
| return "neutral" | |
| if _vader is None: | |
| return "neutral" | |
| scores = _vader.polarity_scores(text) | |
| c = scores["compound"] | |
| return "positive" if c >= 0.2 else "negative" if c <= -0.2 else "neutral" | |
| # ----------------- Geocoding / Domain → Country ----------------- | |
| def get_country_centroid(country_name): | |
| if not country_name or country_name == "Unknown": | |
| return {"lat": 0, "lon": 0, "country": "Unknown"} | |
| try: | |
| country = CountryInfo(country_name) | |
| latlng = country.capital_latlng() | |
| return {"lat": latlng[0], "lon": latlng[1], "country": country_name} | |
| except Exception as e: | |
| log.info(f"Could not get centroid for {country_name}: {e}") | |
| return {"lat": 0, "lon": 0, "country": country_name or "Unknown"} | |
| def resolve_domain_to_ip(domain): | |
| if not domain: | |
| return None | |
| try: | |
| return socket.gethostbyname(domain) | |
| except socket.gaierror: | |
| return None | |
| def geolocate_ip(ip): | |
| try: | |
| r = _session_get(f"https://ipwho.is/{ip}?fields=success,country,latitude,longitude", timeout=8) | |
| j = r.json() | |
| if j.get("success"): | |
| return {"lat": j["latitude"], "lon": j["longitude"], "country": j["country"]} | |
| except Exception: | |
| pass | |
| return None | |
| # Nominatim for a light refinement pass (async) | |
| geolocator = Nominatim(user_agent="newsglobe-app (contact: you@example.com)") | |
| domain_geo_cache: Dict[str, Dict[str, Any]] = {} | |
| MAJOR_OUTLETS = { | |
| "bbc.co.uk": "United Kingdom", | |
| "theguardian.com": "United Kingdom", | |
| "reuters.com": "United States", | |
| "aljazeera.com": "Qatar", | |
| "lemonde.fr": "France", | |
| "dw.com": "Germany", | |
| "abc.net.au": "Australia", | |
| "ndtv.com": "India", | |
| "globo.com": "Brazil", | |
| "elpais.com": "Spain", | |
| "lefigaro.fr": "France", | |
| "kyodonews.net": "Japan", | |
| "straitstimes.com": "Singapore", | |
| "thesun.my": "Malaysia", | |
| } | |
| def geocode_source(source_text: str, domain: str = "", do_network: bool = False): | |
| cache_key = f"{source_text}|{domain}" | |
| if cache_key in domain_geo_cache: | |
| return domain_geo_cache[cache_key] | |
| ext = _tld(domain or "") | |
| fqdn = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else "" | |
| if fqdn in MAJOR_OUTLETS: | |
| coords = get_country_centroid(MAJOR_OUTLETS[fqdn]); domain_geo_cache[cache_key] = coords; return coords | |
| if ext.domain in domain_country_map: | |
| coords = get_country_centroid(domain_country_map[ext.domain]); domain_geo_cache[cache_key] = coords; return coords | |
| coords = get_country_centroid(_suffix_country(ext.suffix)) | |
| domain_geo_cache[cache_key] = coords | |
| if do_network: | |
| threading.Thread(target=_refine_geo_async, args=(cache_key, source_text, fqdn), daemon=True).start() | |
| return coords | |
| def _suffix_country(suffix: Optional[str]) -> str: | |
| s = (suffix or "").split(".")[-1] | |
| m = { | |
| "au":"Australia","uk":"United Kingdom","gb":"United Kingdom","ca":"Canada","in":"India","us":"United States", | |
| "ng":"Nigeria","de":"Germany","fr":"France","jp":"Japan","sg":"Singapore","za":"South Africa","nz":"New Zealand", | |
| "ie":"Ireland","it":"Italy","es":"Spain","se":"Sweden","ch":"Switzerland","nl":"Netherlands","br":"Brazil", | |
| "my":"Malaysia","id":"Indonesia","ph":"Philippines","th":"Thailand","vn":"Vietnam","sa":"Saudi Arabia", | |
| "ae":"United Arab Emirates","tr":"Turkey","mx":"Mexico","ar":"Argentina","cl":"Chile","co":"Colombia", | |
| "il":"Israel","kr":"South Korea","cn":"China","tw":"Taiwan","hk":"Hong Kong" | |
| } | |
| return m.get(s, "United States" if s in ("com","org","net") else "Unknown") | |
| def _refine_geo_async(cache_key, source_text, fqdn): | |
| try: | |
| ip = resolve_domain_to_ip(fqdn) if fqdn else None | |
| if ip: | |
| coords = geolocate_ip(ip) | |
| if coords: | |
| domain_geo_cache[cache_key] = coords | |
| return | |
| location = geolocator.geocode(f"{source_text} News Headquarters", timeout=2) | |
| if location and hasattr(location, "raw"): | |
| coords = { | |
| "lat": location.latitude, | |
| "lon": location.longitude, | |
| "country": location.raw.get("address", {}).get("country", "Unknown"), | |
| } | |
| domain_geo_cache[cache_key] = coords | |
| except Exception: | |
| pass | |
| # ----------------- Translation (HF / Libre / Local) ----------------- | |
| HF_MODEL_PRIMARY = None | |
| NLLB_CODES = { | |
| "en": "eng_Latn", | |
| "es": "spa_Latn", | |
| "fr": "fra_Latn", | |
| "de": "deu_Latn", | |
| "it": "ita_Latn", | |
| "pt": "por_Latn", | |
| "zh": "zho_Hans", | |
| "ru": "rus_Cyrl", | |
| "ar": "arb_Arab", | |
| "hi": "hin_Deva", | |
| "ja": "jpn_Jpan", | |
| "ko": "kor_Hang", | |
| } | |
| def opus_model_for(src2: str, tgt2: str) -> Optional[str]: | |
| pairs = { | |
| ("es", "en"): "Helsinki-NLP/opus-mt-es-en", | |
| ("en", "es"): "Helsinki-NLP/opus-mt-en-es", | |
| ("fr", "en"): "Helsinki-NLP/opus-mt-fr-en", | |
| ("en", "fr"): "Helsinki-NLP/opus-mt-en-fr", | |
| ("de", "en"): "Helsinki-NLP/opus-mt-de-en", | |
| ("en", "de"): "Helsinki-NLP/opus-mt-en-de", | |
| ("pt", "en"): "Helsinki-NLP/opus-mt-pt-en", | |
| ("en", "pt"): "Helsinki-NLP/opus-mt-en-pt", | |
| ("it", "en"): "Helsinki-NLP/opus-mt-it-en", | |
| ("en", "it"): "Helsinki-NLP/opus-mt-en-it", | |
| ("ru", "en"): "Helsinki-NLP/opus-mt-ru-en", | |
| ("en", "ru"): "Helsinki-NLP/opus-mt-en-ru", | |
| ("zh", "en"): "Helsinki-NLP/opus-mt-zh-en", | |
| ("en", "zh"): "Helsinki-NLP/opus-mt-en-zh", | |
| ("ja", "en"): "Helsinki-NLP/opus-mt-ja-en", | |
| ("en", "ja"): "Helsinki-NLP/opus-mt-en-ja", | |
| ("ko", "en"): "Helsinki-NLP/opus-mt-ko-en", | |
| ("en", "ko"): "Helsinki-NLP/opus-mt-en-ko", | |
| ("hi", "en"): "Helsinki-NLP/opus-mt-hi-en", | |
| ("en", "hi"): "Helsinki-NLP/opus-mt-en-hi", | |
| ("ar", "en"): "Helsinki-NLP/opus-mt-ar-en", | |
| ("en", "ar"): "Helsinki-NLP/opus-mt-en-ar", | |
| } | |
| return pairs.get((src2, tgt2)) | |
| SUPPORTED = {"en", "fr", "de", "es", "it", "hi", "ar", "ru", "ja", "ko", "pt", "zh"} | |
| LIBRETRANSLATE_URL = os.getenv("LIBRETRANSLATE_URL") | |
| def _lt_lang(code: str) -> str: | |
| if not code: | |
| return code | |
| c = code.lower() | |
| # LibreTranslate uses zh-Hans; normalize zh* to zh-Hans | |
| if c.startswith("zh"): | |
| return "zh-Hans" | |
| return c | |
| def _translate_via_libre(text: str, src: str, tgt: str) -> Optional[str]: | |
| url = LIBRETRANSLATE_URL | |
| if not url or not text or src == tgt: | |
| return None | |
| payload = { | |
| "q": text, | |
| "source": _lt_lang(src), | |
| "target": _lt_lang(tgt), | |
| "format": "text", | |
| } | |
| # First call can be slow while LT warms models; retry once. | |
| for attempt in (1, 2): | |
| try: | |
| r = SESSION.post( | |
| f"{url.rstrip('/')}/translate", | |
| json=payload, | |
| timeout=15 # was 6 | |
| ) | |
| if r.status_code == 200: | |
| j = r.json() | |
| out = j.get("translatedText") | |
| return out if isinstance(out, str) and out else None | |
| else: | |
| log.warning("LibreTranslate HTTP %s: %s", r.status_code, r.text[:200]) | |
| return None | |
| except Exception as e: | |
| if attempt == 2: | |
| log.warning("LibreTranslate failed: %s", e) | |
| return None | |
| time.sleep(0.5) | |
| def _hf_call(model_id: str, payload: dict) -> Optional[str]: | |
| if not (HUGGINGFACE_API_TOKEN and ALLOW_HF_REMOTE): | |
| return None | |
| if model_id in _hf_bad_models: | |
| return None | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = { | |
| "Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}", | |
| "HF-API-KEY": HUGGINGFACE_API_TOKEN, | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| } | |
| try: | |
| r = requests.post(url, headers=headers, json=payload, timeout=25) | |
| if r.status_code != 200: | |
| if r.status_code == 404: | |
| _hf_bad_models.add(model_id) | |
| log.warning("HF %s -> 404: Not Found (disabled for this process)", model_id) | |
| else: | |
| log.warning("HF %s -> %s: %s", model_id, r.status_code, r.text[:300]) | |
| return None | |
| j = r.json() | |
| except Exception as e: | |
| log.warning("HF request failed: %s", e) | |
| return None | |
| if isinstance(j, list) and j and isinstance(j[0], dict): | |
| if "generated_text" in j[0]: | |
| return j[0]["generated_text"] | |
| if "translation_text" in j[0]: | |
| return j[0]["translation_text"] | |
| if isinstance(j, dict) and "generated_text" in j: | |
| return j["generated_text"] | |
| if isinstance(j, str): | |
| return j | |
| return None | |
| def _translate_cached(text: str, src: str, tgt: str) -> str: | |
| if not text or src == tgt: | |
| return text | |
| out = _translate_via_libre(text, src, tgt) | |
| if out: | |
| return out | |
| opus_model = opus_model_for(src, tgt) | |
| if opus_model: | |
| out = _hf_call(opus_model, {"inputs": text}) | |
| if out: | |
| return out | |
| try: | |
| if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES): | |
| out = _hf_call( | |
| HF_MODEL_PRIMARY, | |
| { | |
| "inputs": text, | |
| "parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]}, | |
| "options": {"wait_for_model": True}, | |
| }, | |
| ) | |
| if out: | |
| return out | |
| except Exception: | |
| pass | |
| if src != "en" and tgt != "en": | |
| step_en = _translate_cached(text, src, "en") | |
| if step_en and step_en != text: | |
| out = _translate_cached(step_en, "en", tgt) | |
| if out: | |
| return out | |
| out = _translate_local(text, src, tgt) | |
| if out: | |
| return out | |
| log.warning("All translate paths failed (%s->%s); returning original.", src, tgt) | |
| return text | |
| def translate_text(text: str, target_lang: Optional[str], fallback_src: Optional[str] = None) -> str: | |
| if not text or not target_lang: | |
| return text | |
| tgt = target_lang.lower() | |
| if tgt not in SUPPORTED: | |
| return text | |
| src = (fallback_src or detect_lang(text) or "en").lower() | |
| if src == tgt: | |
| return text | |
| if src not in SUPPORTED: | |
| if src.startswith("zh"): | |
| src = "zh" | |
| elif src.startswith("pt"): | |
| src = "pt" | |
| elif src[:2] in SUPPORTED: | |
| src = src[:2] | |
| else: | |
| src = "en" | |
| return _translate_cached(text, src, tgt) | |
| def _translate_local(text: str, src: str, tgt: str) -> Optional[str]: | |
| if not _HAS_SENTENCEPIECE: | |
| return None | |
| model_id = opus_model_for(src, tgt) | |
| if not model_id: | |
| return None | |
| key = model_id | |
| try: | |
| if key not in _local_pipes: | |
| _local_pipes[key] = hf_pipeline("translation", model=model_id) | |
| out = _local_pipes[key](text, max_length=512) | |
| return out[0]["translation_text"] | |
| except Exception as e: | |
| log.warning("Local translate failed for %s: %s", model_id, e) | |
| return None | |
| # ----------------- Warmup Settings & Routine ----------------- | |
| WARM_LIMIT_EACH = 20 | |
| WARM_TIMESPAN = "24h" | |
| WARM_PREFETCH_DESCRIPTIONS = False | |
| def _fmt_mmss(ms: float) -> str: | |
| total_sec = int(round(ms / 1000.0)) | |
| m, s = divmod(total_sec, 60) | |
| return f"{m}:{s:02d}" | |
| def _warm_once(): | |
| try: | |
| log.info("WARM: starting background warm-up (limit_each=%d, timespan=%s)", WARM_LIMIT_EACH, WARM_TIMESPAN) | |
| t0 = time.perf_counter() | |
| get_sbert() | |
| get_news_clf() | |
| t1 = time.perf_counter() | |
| raw = combine_raw_articles( | |
| category=None, query=None, language="en", | |
| limit_each=WARM_LIMIT_EACH, timespan=WARM_TIMESPAN, | |
| log_summary=False | |
| ) | |
| t_fetch = (time.perf_counter() - t1) * 1000 | |
| if WARM_PREFETCH_DESCRIPTIONS: | |
| prefetch_descriptions_async(raw) | |
| t2 = time.perf_counter() | |
| enriched = [enrich_article(a, language="en", translate=False, target_lang=None) for a in raw] | |
| t_enrich = (time.perf_counter() - t2) * 1000 | |
| t3 = time.perf_counter() | |
| clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD) | |
| t_cluster = (time.perf_counter() - t3) * 1000 | |
| key = cache_key_for(q=None, category=None, language="en", | |
| limit_each=WARM_LIMIT_EACH, translate=False, target_lang=None, | |
| speed=Speed.balanced) | |
| _events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters} | |
| t_total = (time.perf_counter() - t0) * 1000 | |
| log.info( | |
| "WARM: fetch=%s, enrich=%s, cluster=%s, total=%s (raw=%d, enriched=%d, clusters=%d)", | |
| _fmt_mmss(t_fetch), _fmt_mmss(t_enrich), _fmt_mmss(t_cluster), _fmt_mmss(t_total), | |
| len(raw), len(enriched), len(clusters), | |
| ) | |
| except Exception as e: | |
| log.warning(f"WARM: failed: {e}") | |
| def warm(): | |
| try: | |
| _translate_cached.cache_clear() | |
| except Exception: | |
| pass | |
| get_sbert() | |
| get_news_clf() | |
| threading.Thread(target=_warm_once, daemon=True).start() | |
| # ----------------- GDELT Query Helpers ----------------- | |
| _GDELT_LANG = { | |
| "en": "english", | |
| "es": "spanish", | |
| "fr": "french", | |
| "de": "german", | |
| "it": "italian", | |
| "pt": "portuguese", | |
| "ru": "russian", | |
| "ar": "arabic", | |
| "hi": "hindi", | |
| "ja": "japanese", | |
| "ko": "korean", | |
| "zh": "chinese", | |
| } | |
| def _gdelt_safe_query(user_q, language): | |
| parts = [] | |
| if user_q: | |
| q = user_q.strip() | |
| if len(q) < 3: | |
| q = f'"{q}" news' | |
| parts.append(q) | |
| if language and (lg := _GDELT_LANG.get(language.lower())): | |
| parts.append(f"sourcelang:{lg}") | |
| if not parts: | |
| parts.append("sourcelang:english") | |
| return " ".join(parts) | |
| # ----------------- GDELT Fetchers ----------------- | |
| def fetch_gdelt_articles( | |
| limit=50, | |
| query=None, | |
| language=None, | |
| timespan="3d", | |
| category=None, | |
| extra_tokens: Optional[List[str]] = None, | |
| start_utc: Optional[datetime] = None, | |
| end_utc: Optional[datetime] = None, | |
| ): | |
| q = _gdelt_safe_query(query, language) | |
| if extra_tokens: | |
| q = f"{q} " + " ".join(extra_tokens) | |
| url = "https://api.gdeltproject.org/api/v2/doc/doc" | |
| params = { | |
| "query": q, | |
| "mode": "ArtList", | |
| "format": "json", | |
| "sort": "DateDesc", | |
| "maxrecords": int(min(250, max(1, limit))), | |
| } | |
| if start_utc and end_utc: | |
| params["startdatetime"] = _gdelt_fmt(start_utc) | |
| params["enddatetime"] = _gdelt_fmt(end_utc) | |
| else: | |
| params["timespan"] = timespan | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)", | |
| "Accept": "application/json", | |
| } | |
| def _do_request(p): | |
| r = _session_get(url, params=p, timeout=10) | |
| log.info(f"GDELT URL: {r.url} (status={r.status_code})") | |
| if r.status_code != 200: | |
| log.warning(f"GDELT HTTP {r.status_code}: {r.text[:400]}") | |
| return None | |
| try: | |
| return r.json() | |
| except Exception: | |
| ct = r.headers.get("Content-Type", "") | |
| log.warning(f"GDELT non-JSON response. CT={ct}. Body[:400]: {r.text[:400]}") | |
| return None | |
| data = _do_request(params) | |
| if data is None: | |
| p2 = {**params, "timespan": "24h", "maxrecords": min(100, params["maxrecords"])} | |
| data = _do_request(p2) | |
| if not data: | |
| return [] | |
| arts = data.get("articles") or [] | |
| results = [] | |
| for a in arts: | |
| desc = a.get("description") or a.get("content") or "" | |
| title = a.get("title") or "" | |
| if desc and ( | |
| desc.strip().lower() == title.strip().lower() or | |
| (len(desc) <= 60 and _too_similar(title, desc)) | |
| ): | |
| desc = "" | |
| desc = desc or "No description available" | |
| results.append( | |
| { | |
| "title": title, | |
| "url": a.get("url"), | |
| "source": {"name": a.get("domain") or "GDELT"}, | |
| "description": desc, | |
| "publishedAt": a.get("seendate"), | |
| "api_source": "gdelt", | |
| "gdelt_sourcecountry": a.get("sourcecountry"), | |
| "requested_category": category, | |
| } | |
| ) | |
| log.info(f"GDELT returned {len(results)}") | |
| return results | |
| def fetch_gdelt_multi( | |
| limit=120, query=None, language=None, timespan="48h", | |
| category=None, speed: Speed = Speed.balanced, | |
| start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None | |
| ): | |
| if language: | |
| primary = fetch_gdelt_articles(limit=limit, query=query, language=language, | |
| timespan=timespan, category=category, | |
| start_utc=start_utc, end_utc=end_utc) | |
| booster = fetch_gdelt_articles(limit=max(10, limit // 6), query=query, language="en", | |
| timespan=timespan, category=category, | |
| start_utc=start_utc, end_utc=end_utc) | |
| return primary + booster | |
| if speed == Speed.fast: | |
| langs = LANG_ROTATION[:3] | |
| timespan = "24h" | |
| elif speed == Speed.balanced: | |
| langs = LANG_ROTATION[:8] | |
| timespan = "48h" | |
| else: | |
| langs = LANG_ROTATION | |
| timespan = "3d" | |
| per_lang = max(8, math.ceil(limit / len(langs))) | |
| out = [] | |
| for lg in langs: | |
| out.extend(fetch_gdelt_articles(limit=per_lang, query=query, language=lg, | |
| timespan=timespan, category=category, | |
| start_utc=start_utc, end_utc=end_utc)) | |
| if speed != Speed.fast: | |
| per_cc = max(4, limit // 30) if speed == Speed.max else max(2, limit // 40) | |
| for cc in COUNTRY_SEEDS[: (8 if speed == Speed.balanced else 16)]: | |
| out.extend(fetch_gdelt_articles( | |
| limit=per_cc, query=query, language="en", | |
| timespan=timespan, category=category, | |
| extra_tokens=[f"sourcecountry:{cc}"], | |
| start_utc=start_utc, end_utc=end_utc | |
| )) | |
| return out | |
| # ----------------- Provider Flags / Keys / Logging ----------------- | |
| USE_GNEWS_API = True | |
| USE_NEWSDATA_API = True | |
| USE_GDELT_API = True | |
| USE_NEWSAPI = True | |
| NEWSAPI_KEY = os.getenv("NEWSAPI_KEY", "3b2d3fde45b84cdbb3f706dfb0110df4") | |
| GNEWS_API_KEY = os.getenv("GNEWS_API_KEY", "5419897c95e8a4b21074e0d3fe95a3dd") | |
| NEWSDATA_API_KEY = os.getenv("NEWSDATA_API_KEY", "pub_1feb49a71a844719af68d0844fb43a61") | |
| HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") | |
| logging.basicConfig( | |
| level=logging.WARNING, | |
| format="%(levelname)s:%(name)s:%(message)s", | |
| ) | |
| log = logging.getLogger("newsglobe") | |
| log.setLevel(logging.WARNING) | |
| fetch_log = logging.getLogger("newsglobe.fetch_summary") | |
| fetch_log.setLevel(logging.INFO) | |
| _fetch_handler = logging.StreamHandler() | |
| _fetch_handler.setLevel(logging.INFO) | |
| _fetch_handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s")) | |
| fetch_log.addHandler(_fetch_handler) | |
| fetch_log.propagate = False | |
| for name in ("urllib3", "urllib3.connectionpool", "requests.packages.urllib3"): | |
| lg = logging.getLogger(name) | |
| lg.setLevel(logging.ERROR) | |
| lg.propagate = False | |
| def _newsapi_enabled() -> bool: | |
| if not NEWSAPI_KEY: | |
| log.warning("NewsAPI disabled: missing NEWSAPI_KEY env var") | |
| return False | |
| return True | |
| # ----------------- Clustering Helpers ----------------- | |
| def cluster_id(cluster, enriched_articles): | |
| urls = sorted([(enriched_articles[i].get("url") or "") for i in cluster["indices"] if enriched_articles[i].get("url")]) | |
| base = "|".join(urls) if urls else "empty" | |
| return hashlib.md5(base.encode("utf-8")).hexdigest()[:10] | |
| BOILER = re.compile(r"\b(live updates|breaking|what we know|in pictures|opinion)\b", re.I) | |
| def _norm_text(s: str) -> str: | |
| s = (s or "").strip() | |
| s = re.sub(r"\s+", " ", s) | |
| return s | |
| def _cluster_text(a): | |
| base = f"{a.get('orig_title') or a.get('title') or ''} {a.get('orig_description') or a.get('description') or ''}" | |
| base = BOILER.sub("", base) | |
| base = re.sub(r"\b(\d{1,2}:\d{2}\s?(AM|PM))|\b(\d{1,2}\s\w+\s\d{4})", "", base, flags=re.I) | |
| return _norm_text(base) | |
| def _canonical_url(u: str) -> str: | |
| if not u: | |
| return u | |
| p = urlparse(u) | |
| qs = [(k, v) for (k, v) in parse_qsl(p.query, keep_blank_values=False) if not k.lower().startswith(("utm_", "fbclid", "gclid"))] | |
| clean = p._replace(query="&".join([f"{k}={v}" for k, v in qs]), fragment="") | |
| path = clean.path.rstrip("/") or "/" | |
| clean = clean._replace(path=path) | |
| return urlunparse(clean) | |
| # ----------------- Normalizers / Enrichment ----------------- | |
| def normalize_newsdata_article(article): | |
| return { | |
| "title": article.get("title"), | |
| "url": article.get("link"), | |
| "source": {"name": article.get("source_id", "NewsData.io")}, | |
| "description": article.get("description") or "No description available", | |
| "publishedAt": article.get("publishedAt"), | |
| "api_source": article.get("api_source", "newsdata"), | |
| "category": ((article.get("category") or [None])[0] if isinstance(article.get("category"), list) else article.get("category")), | |
| } | |
| def enrich_article(a, language=None, translate=False, target_lang=None): | |
| source_name = (a.get("source", {}) or {}).get("name", "").strip() or "Unknown" | |
| s_lower = source_name.lower() | |
| if "newsapi" in s_lower: | |
| source_name = "NewsAPI" | |
| elif "gnews" in s_lower: | |
| source_name = "GNews" | |
| elif "newsdata" in s_lower: | |
| source_name = "NewsData.io" | |
| article_url = _canonical_url(a.get("url") or "") | |
| try: | |
| ext = _tld(article_url) | |
| domain = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else "" | |
| except Exception: | |
| domain = "" | |
| country_guess = None | |
| if a.get("api_source") == "gdelt": | |
| sc = a.get("gdelt_sourcecountry") | |
| if sc: | |
| iso2map = { | |
| "US": "United States", "GB": "United Kingdom", "AU": "Australia", "CA": "Canada", "IN": "India", | |
| "DE": "Germany", "FR": "France", "IT": "Italy", "ES": "Spain", "BR": "Brazil", "JP": "Japan", | |
| "CN": "China", "RU": "Russia", "KR": "South Korea", "ZA": "South Africa", "NG": "Nigeria", | |
| "MX": "Mexico", "AR": "Argentina", "CL": "Chile", "CO": "Colombia", "NL": "Netherlands", | |
| "SE": "Sweden", "NO": "Norway", "DK": "Denmark", "FI": "Finland", "IE": "Ireland", "PL": "Poland", | |
| "PT": "Portugal", "GR": "Greece", "TR": "Turkey", "IL": "Israel", "SA": "Saudi Arabia", | |
| "AE": "United Arab Emirates", "SG": "Singapore", "MY": "Malaysia", "TH": "Thailand", | |
| "PH": "Philippines", "ID": "Indonesia", "NZ": "New Zealand", | |
| } | |
| country_guess = iso2map.get(str(sc).upper(), sc if len(str(sc)) > 2 else None) | |
| coords = get_country_centroid(country_guess) if country_guess else geocode_source(source_name, domain, do_network=False) | |
| title = (a.get("title") or "").strip() or "(untitled)" | |
| description = (a.get("description") or "").strip() | |
| if description.lower().startswith("no description"): | |
| description = "" | |
| cached_desc = _desc_cache_get(article_url) | |
| need_upgrade = ( | |
| (not description) | |
| or description.lower().startswith("no description") | |
| or len(description) < DESC_MIN_LEN | |
| or _too_similar(title, description) | |
| ) | |
| if need_upgrade and cached_desc: | |
| description = cached_desc | |
| if description: | |
| description = _tidy_description(title, description, source_name) | |
| if (not description) or _too_similar(title, description): | |
| description = f"Quick take: {title.rstrip('.')}." | |
| orig_title = title | |
| orig_description = description | |
| detected_lang = (detect_lang(f"{title} {description}") or "").lower() | |
| ml_text = f"{orig_title}. {orig_description}".strip() | |
| sentiment = classify_sentiment(f"{orig_title} {orig_description}") | |
| seed = f"{source_name}|{article_url}|{title}" | |
| uid = hashlib.md5(seed.encode("utf-8")).hexdigest()[:12] | |
| provided = a.get("category") | |
| if provided and normalize_category(provided) != "general": | |
| cat = normalize_category(provided) | |
| else: | |
| cat = infer_category(article_url, orig_title, orig_description, provided) | |
| return { | |
| "id": uid, | |
| "title": title, | |
| "url": article_url, | |
| "source": source_name, | |
| "lat": coords["lat"], | |
| "lon": coords["lon"], | |
| "country": coords.get("country", "Unknown"), | |
| "description": description, | |
| "sentiment": sentiment, | |
| "api_source": a.get("api_source", "unknown"), | |
| "publishedAt": a.get("publishedAt"), | |
| "_ml_text": ml_text, | |
| "orig_title": orig_title, | |
| "orig_description": orig_description, | |
| "detected_lang": detected_lang, | |
| "translated": False, | |
| "category": cat, | |
| } | |
| # ----------------- Clustering (Semantic, single-pass + merge) ----------------- | |
| def cluster_articles(articles: List[Dict[str, Any]], sim_threshold=0.6, speed: Speed = Speed.balanced): | |
| if speed == Speed.fast: | |
| articles = articles[:150] | |
| sim_threshold = max(sim_threshold, 0.64) | |
| elif speed == Speed.balanced: | |
| articles = articles[:] | |
| sim_threshold = max(sim_threshold, 0.62) | |
| texts = [_cluster_text(a) for a in articles] | |
| embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False) | |
| clusters = [] | |
| centroids = [] | |
| for i, emb in enumerate(embs): | |
| best_idx, best_sim = -1, -1.0 | |
| for ci, c_emb in enumerate(centroids): | |
| sim = util.cos_sim(emb, c_emb).item() | |
| if sim > sim_threshold and sim > best_sim: | |
| best_sim, best_idx = sim, ci | |
| if best_idx >= 0: | |
| clusters[best_idx]["indices"].append(i) | |
| idxs = clusters[best_idx]["indices"] | |
| new_c = embs[idxs].mean(dim=0) | |
| new_c = new_c / new_c.norm() | |
| centroids[best_idx] = new_c | |
| clusters[best_idx]["centroid"] = new_c | |
| else: | |
| event_id = hashlib.md5(texts[i].encode("utf-8")).hexdigest()[:10] | |
| clusters.append({"id": event_id, "indices": [i], "centroid": emb}) | |
| centroids.append(emb) | |
| merged = _merge_close_clusters(clusters, embs, threshold=0.70) | |
| for c in merged: | |
| c["id"] = cluster_id(c, articles) | |
| return merged | |
| def event_payload_from_cluster(cluster, enriched_articles): | |
| idxs = cluster["indices"] | |
| arts = [enriched_articles[i] for i in idxs] | |
| title_counts = Counter([a["title"] for a in arts]) | |
| canonical_title = title_counts.most_common(1)[0][0] | |
| keywords = list({w.lower() for t in title_counts for w in t.split() if len(w) > 3})[:8] | |
| sources = {a["source"] for a in arts} | |
| countries = {a["country"] for a in arts if a["country"] and a["country"] != "Unknown"} | |
| ts = [a.get("publishedAt") for a in arts if a.get("publishedAt")] | |
| return { | |
| "event_id": cluster_id(cluster, enriched_articles), | |
| "title": canonical_title, | |
| "keywords": keywords, | |
| "article_count": len(arts), | |
| "source_count": len(sources), | |
| "country_count": len(countries), | |
| "time_range": {"min": min(ts) if ts else None, "max": max(ts) if ts else None}, | |
| "sample_urls": [a["url"] for a in arts[:3] if a.get("url")], | |
| } | |
| def aggregate_event_by_country(cluster, enriched_articles, max_samples: int | None = 5): | |
| idxs = cluster["indices"] | |
| arts = [enriched_articles[i] for i in idxs] | |
| by_country: Dict[str, Dict[str, Any]] = {} | |
| for a in arts: | |
| c = a.get("country") or "Unknown" | |
| if c not in by_country: | |
| coords = get_country_centroid(c) | |
| by_country[c] = {"country": c, "lat": coords["lat"], "lon": coords["lon"], "articles": []} | |
| by_country[c]["articles"].append(a) | |
| results = [] | |
| for c, block in by_country.items(): | |
| arr = block["articles"] | |
| to_num = {"negative": -1, "neutral": 0, "positive": 1} | |
| vals = [to_num.get(a["sentiment"], 0) for a in arr] | |
| avg = sum(vals) / max(len(vals), 1) | |
| avg_sent = "positive" if avg > 0.15 else "negative" if avg < -0.15 else "neutral" | |
| top_sources = [s for s, _ in Counter([a["source"] for a in arr]).most_common(3)] | |
| summary = " • ".join([a["title"] for a in arr[:2]]) | |
| use = arr if (max_samples in (None, 0) or max_samples < 0) else arr[:max_samples] | |
| results.append( | |
| { | |
| "country": c, | |
| "lat": block["lat"], | |
| "lon": block["lon"], | |
| "count": len(arr), | |
| "avg_sentiment": avg_sent, | |
| "top_sources": top_sources, | |
| "summary": summary, | |
| "samples": [ | |
| { | |
| "title": a["title"], | |
| "orig_title": a.get("orig_title"), | |
| "orig_description": a.get("orig_description"), | |
| "url": a["url"], | |
| "source": a["source"], | |
| "sentiment": a["sentiment"], | |
| "detected_lang": a.get("detected_lang"), | |
| } | |
| # for a in arr[:5] | |
| for a in use | |
| ], | |
| } | |
| ) | |
| return results | |
| def _merge_close_clusters(clusters, embs, threshold=0.68): | |
| merged = [] | |
| used = set() | |
| for i in range(len(clusters)): | |
| if i in used: | |
| continue | |
| base = clusters[i] | |
| group = [i] | |
| for j in range(i + 1, len(clusters)): | |
| if j in used: | |
| continue | |
| sim = util.cos_sim(base["centroid"], clusters[j]["centroid"]).item() | |
| if sim >= threshold: | |
| group.append(j) | |
| all_idx = [] | |
| cents = [] | |
| for g in group: | |
| used.add(g) | |
| all_idx.extend(clusters[g]["indices"]) | |
| cents.append(clusters[g]["centroid"]) | |
| newc = torch.stack(cents, dim=0).mean(dim=0) | |
| newc = newc / newc.norm() | |
| merged.append({"indices": sorted(set(all_idx)), "centroid": newc}) | |
| return merged | |
| # ----------------- Event Cache / Keys ----------------- | |
| CACHE_TTL_SECS = 900 | |
| SIM_THRESHOLD = 0.6 | |
| _events_cache: Dict[Tuple, Dict[str, Any]] = {} | |
| # -------- Date parsing helpers (Option B) -------- | |
| ISO_BASIC_RE = re.compile(r'^(\d{4})(\d{2})(\d{2})(?:[T ]?(\d{2})(\d{2})(\d{2}))?(Z|[+-]\d{2}:?\d{2})?$') | |
| def _parse_user_dt(s: Optional[str], which: str) -> Optional[datetime]: | |
| """Parse query 'start'/'end' into UTC-aware datetimes.""" | |
| if not s: | |
| return None | |
| s = s.strip() | |
| try: | |
| # Normalize Z | |
| if s.endswith("Z"): | |
| s = s[:-1] + "+00:00" | |
| # Date-only | |
| if re.match(r'^\d{4}-\d{2}-\d{2}$', s): | |
| s = s + ("T00:00:00+00:00" if which == "start" else "T23:59:59+00:00") | |
| dt = datetime.fromisoformat(s) | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=timezone.utc) | |
| return dt.astimezone(timezone.utc) | |
| except Exception: | |
| m = ISO_BASIC_RE.match(s) | |
| if m: | |
| yyyy, MM, dd, hh, mm, ss, tz = m.groups() | |
| hh = hh or ("00" if which == "start" else "23") | |
| mm = mm or ("00" if which == "start" else "59") | |
| ss = ss or ("00" if which == "start" else "59") | |
| return datetime(int(yyyy), int(MM), int(dd), int(hh), int(mm), int(ss), tzinfo=timezone.utc) | |
| return None | |
| def _gdelt_fmt(dt: datetime) -> str: | |
| return dt.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") | |
| def _parse_any_pubdate(s: Optional[str]) -> Optional[datetime]: | |
| """Best-effort parse of provider publishedAt strings to UTC.""" | |
| if not s: | |
| return None | |
| try: | |
| t = s.strip() | |
| if t.endswith("Z"): | |
| t = t[:-1] + "+00:00" | |
| return datetime.fromisoformat(t).astimezone(timezone.utc) | |
| except Exception: | |
| m = ISO_BASIC_RE.match(s) | |
| if m: | |
| yyyy, MM, dd, hh, mm, ss, tz = m.groups() | |
| hh = hh or "00"; mm = mm or "00"; ss = ss or "00" | |
| return datetime(int(yyyy), int(MM), int(dd), int(hh), int(mm), int(ss), tzinfo=timezone.utc) | |
| return None | |
| def cache_key_for( | |
| q, category, language, limit_each, | |
| translate=False, target_lang=None, | |
| start_utc: Optional[datetime] = None, | |
| end_utc: Optional[datetime] = None, | |
| speed: Speed = Speed.balanced | |
| ): | |
| return ( | |
| q or "", category or "", language or "", int(limit_each or 50), | |
| bool(translate), (target_lang or "").lower(), | |
| (start_utc and _gdelt_fmt(start_utc)) or "", | |
| (end_utc and _gdelt_fmt(end_utc)) or "", | |
| speed.value, | |
| ) | |
| _first_real_build = True | |
| def get_or_build_events_cache( | |
| q, category, language, translate, target_lang, limit_each, | |
| start_utc: Optional[datetime] = None, | |
| end_utc: Optional[datetime] = None, | |
| speed: Speed = Speed.balanced | |
| ): | |
| global _first_real_build | |
| key = cache_key_for(q, category, language, limit_each, translate, target_lang, start_utc, end_utc, speed) | |
| now = monotonic() | |
| if speed == Speed.fast: | |
| use_timespan, use_limit = "24h", min(limit_each, 20) | |
| elif speed == Speed.balanced: | |
| use_timespan, use_limit = "48h", min(limit_each, 100) | |
| else: | |
| use_timespan, use_limit = "3d", limit_each | |
| entry = _events_cache.get(key) | |
| if entry and now - entry["t"] < CACHE_TTL_SECS: | |
| log.info(f"CACHE HIT for {key}") | |
| return key, entry["enriched"], entry["clusters"] | |
| lock = _get_inflight_lock(key) | |
| with lock: | |
| entry = _events_cache.get(key) | |
| if entry and now - entry["t"] < CACHE_TTL_SECS: | |
| log.info(f"CACHE HIT (post-lock) for {key}") | |
| return key, entry["enriched"], entry["clusters"] | |
| if _first_real_build and not (start_utc and end_utc): | |
| use_timespan = "24h" if use_timespan != "24h" else use_timespan | |
| use_limit = min(use_limit, 100) | |
| log.info(f"CACHE MISS for {key} — fetching (timespan={use_timespan}, limit_each={use_limit}, start={start_utc}, end={end_utc})") | |
| raw = combine_raw_articles( | |
| category=category, | |
| query=q, | |
| language=language, | |
| limit_each=use_limit, | |
| timespan=use_timespan, | |
| speed=speed, | |
| start_utc=start_utc, | |
| end_utc=end_utc, | |
| ) | |
| prefetch_descriptions_async(raw, speed) | |
| enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] | |
| if category: | |
| cat_norm = (category or "").strip().lower() | |
| enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm] | |
| else: | |
| enriched = enriched_all | |
| clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD, speed=speed) | |
| _events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters} | |
| _first_real_build = False | |
| return key, enriched, clusters | |
| # ----------------- Language Rotation / Seeds ----------------- | |
| LANG_ROTATION = ["en", "es", "fr", "de", "ar", "ru", "pt", "zh", "hi", "ja", "ko"] | |
| COUNTRY_SEEDS = ["US", "GB", "IN", "CA", "AU", "ZA", "SG", "NG", "DE", "FR", "BR", "MX", "ES", "RU", "JP", "KR", "CN"] | |
| # ----------------- Other Providers (NewsData/GNews/NewsAPI) ----------------- | |
| def fetch_newsdata_articles(category=None, limit=20, query=None, language=None): | |
| base_url = "https://newsdata.io/api/1/news" | |
| allowed = [ | |
| "business", | |
| "entertainment", | |
| "environment", | |
| "food", | |
| "health", | |
| "politics", | |
| "science", | |
| "sports", | |
| "technology", | |
| "top", | |
| "world", | |
| ] | |
| params = {"apikey": NEWSDATA_API_KEY, "language": (language or "en")} | |
| if category and category in allowed: | |
| params["category"] = category | |
| if query: | |
| params["q"] = query | |
| all_articles, next_page = [], None | |
| while len(all_articles) < limit: | |
| if next_page: | |
| params["page"] = next_page | |
| resp = _session_get(base_url, params=params, timeout=12) | |
| if resp.status_code != 200: | |
| break | |
| data = resp.json() | |
| articles = data.get("results", []) | |
| for a in articles: | |
| a["api_source"] = "newsdata" | |
| all_articles.extend(articles) | |
| next_page = data.get("nextPage") | |
| if not next_page: | |
| break | |
| for a in all_articles: | |
| a["publishedAt"] = a.get("pubDate") | |
| return all_articles[:limit] | |
| def fetch_gnews_articles(limit=20, query=None, language=None): | |
| url = f"https://gnews.io/api/v4/top-headlines?lang={(language or 'en')}&max={limit}&token={GNEWS_API_KEY}" | |
| if query: | |
| url += f"&q={requests.utils.quote(query)}" | |
| try: | |
| r = _session_get(url, timeout=12) | |
| if r.status_code != 200: | |
| return [] | |
| arts = r.json().get("articles", []) | |
| for a in arts: | |
| a["api_source"] = "gnews" | |
| return arts | |
| except Exception: | |
| return [] | |
| NEWSAPI_COUNTRIES = ["us", "gb", "ca", "au", "in", "za", "sg", "ie", "nz"] | |
| def fetch_newsapi_headlines_multi(limit=50, language=None): | |
| if not _newsapi_enabled(): | |
| return [] | |
| all_ = [] | |
| per = max(1, math.ceil(limit / max(1, len(NEWSAPI_COUNTRIES)))) | |
| per = min(per, 100) | |
| for c in NEWSAPI_COUNTRIES: | |
| url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per}&apiKey={NEWSAPI_KEY}" | |
| r = _session_get(url, timeout=12) | |
| if r.status_code != 200: | |
| log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}") | |
| continue | |
| arts = r.json().get("articles", []) | |
| for a in arts: | |
| a["api_source"] = "newsapi" | |
| all_.extend(arts) | |
| time.sleep(0.2) | |
| return all_[:limit] | |
| def fetch_newsapi_articles( | |
| category=None, | |
| limit=20, | |
| query=None, | |
| language=None, | |
| start_utc: Optional[datetime] = None, | |
| end_utc: Optional[datetime] = None, | |
| ): | |
| if not _newsapi_enabled(): | |
| return [] | |
| if query: | |
| url = f"https://newsapi.org/v2/everything?pageSize={limit}&apiKey={NEWSAPI_KEY}&q={requests.utils.quote(query)}" | |
| if language: | |
| url += f"&language={language}" | |
| # NEW: date range for /everything | |
| if start_utc: | |
| url += f"&from={start_utc.date().isoformat()}" | |
| if end_utc: | |
| url += f"&to={end_utc.date().isoformat()}" | |
| try: | |
| r = _session_get(url, timeout=12) | |
| if r.status_code != 200: | |
| log.warning(f"NewsAPI /everything HTTP {r.status_code}: {r.text[:200]}") | |
| return [] | |
| arts = r.json().get("articles", []) | |
| for a in arts: | |
| a["api_source"] = "newsapi" | |
| return arts[:limit] | |
| except Exception as e: | |
| log.warning(f"NewsAPI /everything request failed: {e}") | |
| return [] | |
| results = [] | |
| per_country = max(5, limit // len(NEWSAPI_COUNTRIES)) | |
| for c in NEWSAPI_COUNTRIES: | |
| url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per_country}&apiKey={NEWSAPI_KEY}" | |
| if category: | |
| url += f"&category={category}" | |
| try: | |
| r = _session_get(url, timeout=12) | |
| if r.status_code != 200: | |
| log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}") | |
| continue | |
| arts = r.json().get("articles", []) | |
| for a in arts: | |
| a["api_source"] = "newsapi" | |
| results.extend(arts) | |
| except Exception as e: | |
| log.warning(f"NewsAPI top-headlines {c} failed: {e}") | |
| time.sleep(0.2) | |
| return results[:limit] | |
| # ----------------- Provider Combiner / Dedup ----------------- | |
| def combine_raw_articles(category=None, query=None, language=None, limit_each=30, | |
| timespan="3d", speed=Speed.balanced, log_summary: bool = True, | |
| start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None): | |
| if speed == Speed.fast: | |
| timespan = "24h" | |
| limit_each = min(limit_each, 20) | |
| elif speed == Speed.balanced: | |
| timespan = "48h" | |
| limit_each = min(limit_each, 100) | |
| a1 = [] | |
| if USE_NEWSAPI: | |
| if not query: | |
| a1 = fetch_newsapi_headlines_multi(limit=limit_each, language=language) | |
| else: | |
| a1 = fetch_newsapi_articles(category=category, limit=limit_each, query=query, | |
| language=language, start_utc=start_utc, end_utc=end_utc) | |
| a2 = [] | |
| if USE_NEWSDATA_API: | |
| a2 = [ | |
| normalize_newsdata_article(a) | |
| for a in fetch_newsdata_articles(category=category, limit=limit_each, query=query, language=language) | |
| if a.get("link") | |
| ] | |
| a3 = fetch_gnews_articles(limit=limit_each, query=query, language=language) if USE_GNEWS_API else [] | |
| a4 = fetch_gdelt_multi( | |
| limit=limit_each, query=query, language=language, | |
| timespan=timespan, category=category, speed=speed, | |
| start_utc=start_utc, end_utc=end_utc | |
| ) if USE_GDELT_API else [] | |
| seen, merged = set(), [] | |
| for a in a1 + a3 + a2 + a4: | |
| if a.get("url"): | |
| a["url"] = _canonical_url(a["url"]) | |
| url = a["url"] | |
| if url not in seen: | |
| seen.add(url) | |
| merged.append(a) | |
| #Apply date filter locally (for providers that can’t filter server-side) | |
| if start_utc or end_utc: | |
| s_ts = start_utc.timestamp() if start_utc else None | |
| e_ts = end_utc.timestamp() if end_utc else None | |
| def _in_range(row): | |
| dt = _parse_any_pubdate(row.get("publishedAt") or "") | |
| if not dt: | |
| return False | |
| t = dt.timestamp() | |
| if s_ts and t < s_ts: return False | |
| if e_ts and t > e_ts: return False | |
| return True | |
| merged = [a for a in merged if _in_range(a)] | |
| if log_summary: | |
| fetch_log.info("----- Article Fetch Summary -----") | |
| fetch_log.info(f"📊 NewsAPI returned: {len(a1)} articles") | |
| fetch_log.info(f"📊 NewsData.io returned: {len(a2)} articles") | |
| fetch_log.info(f"📊 GNews returned: {len(a3)} articles") | |
| fetch_log.info(f"📊 GDELT returned: {len(a4)} articles") | |
| fetch_log.info(f"✅ Total merged articles after deduplication: {len(merged)}") | |
| fetch_log.info("---------------------------------") | |
| return merged | |
| # ----------------- API: /events ----------------- | |
| def get_events( | |
| q: Optional[str] = Query(None), | |
| category: Optional[str] = Query(None), | |
| language: Optional[str] = Query(None), | |
| translate: Optional[bool] = Query(False), | |
| target_lang: Optional[str] = Query(None), | |
| limit_each: int = Query(150, ge=5, le=250), | |
| max_events: int = Query(15, ge=5, le=50), | |
| min_countries: int = Query(2, ge=1, le=50), | |
| min_articles: int = Query(2, ge=1, le=200), | |
| speed: Speed = Query(Speed.balanced), | |
| start: Optional[str] = Query(None), | |
| end: Optional[str] = Query(None), | |
| ): | |
| start_dt = _parse_user_dt(start, "start") | |
| end_dt = _parse_user_dt(end, "end") | |
| if start_dt and end_dt and start_dt > end_dt: | |
| start_dt, end_dt = end_dt, start_dt # swap | |
| cache_key, enriched, clusters = get_or_build_events_cache( | |
| q, category, language, False, None, limit_each, | |
| start_utc=start_dt, end_utc=end_dt, speed=speed | |
| ) | |
| view = enriched | |
| if translate and target_lang: | |
| view = [dict(i) for i in enriched] | |
| for i in view: | |
| src_hint = i.get("detected_lang") | |
| i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) | |
| i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) | |
| i["translated"] = True | |
| events = [event_payload_from_cluster(c, view) for c in clusters] | |
| events = [e for e in events if (e["country_count"] >= min_countries and e["article_count"] >= min_articles)] | |
| events.sort(key=lambda e: e["article_count"], reverse=True) | |
| return {"events": events[:max_events], "cache_key": "|".join(map(str, cache_key))} | |
| # ----------------- API: /event/{event_id} ----------------- | |
| def get_event_details( | |
| event_id: str, | |
| cache_key: Optional[str] = Query(None), | |
| q: Optional[str] = Query(None), | |
| category: Optional[str] = Query(None), | |
| language: Optional[str] = Query(None), | |
| translate: Optional[bool] = Query(False), | |
| target_lang: Optional[str] = Query(None), | |
| limit_each: int = Query(150, ge=5, le=250), | |
| max_samples: int = Query(5, ge=0, le=1000), | |
| start: Optional[str] = Query(None), | |
| end: Optional[str] = Query(None), | |
| ): | |
| start_dt = _parse_user_dt(start, "start") | |
| end_dt = _parse_user_dt(end, "end") | |
| if cache_key: | |
| parts = cache_key.split("|") | |
| if len(parts) == 9: | |
| speed_str = parts[8] | |
| try: | |
| speed_obj = Speed(speed_str) | |
| except ValueError: | |
| speed_obj = Speed.balanced | |
| key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), | |
| parts[4] == "True", parts[5].lower(), | |
| parts[6], parts[7], speed_str) | |
| elif len(parts) == 7: # backwards compat | |
| speed_str = parts[6] | |
| try: | |
| speed_obj = Speed(speed_str) | |
| except ValueError: | |
| speed_obj = Speed.balanced | |
| key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), | |
| parts[4] == "True", parts[5].lower(), "", "", speed_str) | |
| else: | |
| raise HTTPException(status_code=400, detail="Bad cache_key") | |
| else: | |
| speed_obj = Speed.balanced | |
| key_tuple = cache_key_for(q, category, language, limit_each, translate, target_lang, | |
| start_utc=start_dt, end_utc=end_dt, speed=speed_obj) | |
| entry = _events_cache.get(key_tuple) | |
| if not entry: | |
| _, enriched, clusters = get_or_build_events_cache( | |
| q, category, language, False, None, limit_each, | |
| start_utc=start_dt, end_utc=end_dt, speed=speed_obj | |
| ) | |
| else: | |
| enriched, clusters = entry["enriched"], entry["clusters"] | |
| eview = enriched | |
| if translate and target_lang: | |
| eview = [dict(i) for i in enriched] | |
| for i in eview: | |
| src_hint = i.get("detected_lang") | |
| i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) | |
| i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) | |
| i["translated"] = True | |
| cluster = next((c for c in clusters if cluster_id(c, enriched) == event_id), None) | |
| if not cluster: | |
| raise HTTPException(status_code=404, detail="Event not found with current filters") | |
| payload = event_payload_from_cluster(cluster, eview) | |
| countries = aggregate_event_by_country(cluster, eview, max_samples=max_samples) | |
| payload["articles_in_event"] = sum(c["count"] for c in countries) | |
| return {"event": payload, "countries": countries} | |
| # ----------------- API: /news ----------------- | |
| def get_news( | |
| cache_key: Optional[str] = Query(None), | |
| category: Optional[str] = Query(None), | |
| sentiment: Optional[str] = Query(None), | |
| q: Optional[str] = Query(None), | |
| language: Optional[str] = Query(None), | |
| translate: Optional[bool] = Query(False), | |
| target_lang: Optional[str] = Query(None), | |
| limit_each: int = Query(100, ge=5, le=100), | |
| lite: bool = Query(True), | |
| speed: Speed = Query(Speed.balanced), | |
| page: int = Query(1, ge=1), | |
| page_size: int = Query(120, ge=5, le=300), | |
| start: Optional[str] = Query(None), | |
| end: Optional[str] = Query(None), | |
| ): | |
| start_dt = _parse_user_dt(start, "start") | |
| end_dt = _parse_user_dt(end, "end") | |
| enriched: List[Dict[str, Any]] = [] | |
| if cache_key: | |
| parts = cache_key.split("|") | |
| if len(parts) == 9: | |
| key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), | |
| parts[4] == "True", parts[5].lower(), parts[6], parts[7], parts[8]) | |
| entry = _events_cache.get(key_tuple) | |
| if entry: | |
| enriched = entry["enriched"] | |
| elif len(parts) == 7: # backwards compat | |
| key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), | |
| parts[4] == "True", parts[5].lower(), "", "", parts[6]) | |
| entry = _events_cache.get(key_tuple) | |
| if entry: | |
| enriched = entry["enriched"] | |
| if not enriched: | |
| raw = combine_raw_articles(category=category, query=q, language=language, | |
| limit_each=limit_each, speed=speed, | |
| start_utc=start_dt, end_utc=end_dt) | |
| prefetch_descriptions_async(raw, speed) | |
| enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] | |
| if category: | |
| cat_norm = (category or "").strip().lower() | |
| enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm] | |
| else: | |
| enriched = enriched_all | |
| else: | |
| if category: | |
| cat_norm = (category or "").strip().lower() | |
| enriched = [e for e in enriched if (e.get("category") or "").lower() == cat_norm] | |
| if translate and target_lang: | |
| enriched = [dict(i) for i in enriched] | |
| for i in enriched: | |
| i["original_title"] = i.get("orig_title") or i.get("title") | |
| i["original_description"] = i.get("orig_description") or i.get("description") | |
| src_hint = i.get("detected_lang") | |
| i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) | |
| i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) | |
| i["translated"] = True | |
| i["translated_from"] = (src_hint or "").lower() | |
| i["translated_to"] = target_lang.lower() | |
| if sentiment: | |
| s = sentiment.strip().lower() | |
| enriched = [i for i in enriched if i.get("sentiment", "").lower() == s] | |
| total = len(enriched) | |
| offset = (page - 1) * page_size | |
| end_idx = offset + page_size | |
| items = [dict(i) for i in enriched[offset:end_idx]] | |
| if lite: | |
| drop = {"_ml_text"} | |
| for i in items: | |
| for k in drop: | |
| i.pop(k, None) | |
| return { | |
| "items": items, | |
| "total": total, | |
| "page": page, | |
| "page_size": page_size | |
| } | |
| # ----------------- API: /related ----------------- | |
| def related_articles( | |
| id: Optional[str] = Query(None, description="article id from /news"), | |
| title: Optional[str] = Query(None), | |
| description: Optional[str] = Query(None), | |
| q: Optional[str] = Query(None), | |
| category: Optional[str] = Query(None), | |
| language: Optional[str] = Query(None), | |
| limit_each: int = Query(50, ge=5, le=100), | |
| k: int = Query(10, ge=1, le=50), | |
| ): | |
| raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each) | |
| enriched = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] | |
| if not enriched: | |
| return {"items": []} | |
| if id: | |
| base = next((a for a in enriched if a.get("id") == id), None) | |
| if not base: | |
| raise HTTPException(404, "article id not found in current fetch") | |
| query_text = base["_ml_text"] | |
| else: | |
| text = f"{title or ''} {description or ''}".strip() | |
| if not text: | |
| raise HTTPException(400, "provide either id or title/description") | |
| query_text = text | |
| corpus_texts = [a["_ml_text"] for a in enriched] | |
| corpus_embs = _embed_texts(corpus_texts) | |
| query_emb = _embed_texts([query_text])[0] | |
| sims = util.cos_sim(query_emb, corpus_embs).cpu().numpy().flatten() | |
| idxs = sims.argsort()[::-1] | |
| items = [] | |
| for idx in idxs: | |
| a = enriched[idx] | |
| if id and a["id"] == id: | |
| continue | |
| items.append({**a, "similarity": float(sims[idx])}) | |
| if len(items) >= k: | |
| break | |
| return {"items": items} | |
| # ----------------- Middleware: Request Timing ----------------- | |
| async def timing_middleware(request, call_next): | |
| start = time.perf_counter() | |
| response = None | |
| try: | |
| response = await call_next(request) | |
| return response | |
| finally: | |
| dur_ms = (time.perf_counter() - start) * 1000 | |
| if response is not None: | |
| try: | |
| response.headers["X-Process-Time-ms"] = f"{dur_ms:.1f}" | |
| except Exception: | |
| pass | |
| # ----------------- Misc: Client Metrics ----------------- | |
| def client_metric(payload: Dict[str, Any] = Body(...)): | |
| name = (payload.get("name") or "").strip() | |
| if name in {"Load all article markers on globe", "Load event country markers on globe"}: | |
| return {"ok": True} | |
| return {"ok": True} | |
| # ----------------- Diagnostics: Translation Health ----------------- | |
| def diag_translate( | |
| src: str = Query("pt"), | |
| tgt: str = Query("en"), | |
| text: str = Query("Olá mundo") | |
| ): | |
| # Try each path explicitly (same order your runtime uses) | |
| libre = _translate_via_libre(text, src, tgt) | |
| remote = None | |
| local = None | |
| opus_id = opus_model_for(src, tgt) | |
| if opus_id: | |
| remote = _hf_call(opus_id, {"inputs": text}) | |
| local = _translate_local(text, src, tgt) | |
| # Optional: try primary NLLB if configured | |
| nllb = None | |
| if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES): | |
| nllb = _hf_call( | |
| HF_MODEL_PRIMARY, | |
| { | |
| "inputs": text, | |
| "parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]}, | |
| "options": {"wait_for_model": True}, | |
| }, | |
| ) | |
| sample_out = libre or remote or local or nllb | |
| out_lang = detect_lang(sample_out or "") or None | |
| return { | |
| "src": src, "tgt": tgt, "text": text, | |
| "libre_url": LIBRETRANSLATE_URL, | |
| "token_present": bool(HUGGINGFACE_API_TOKEN), | |
| "libre_ok": bool(libre), | |
| "remote_ok": bool(remote), | |
| "local_ok": bool(local), | |
| "nllb_ok": bool(nllb), | |
| "sample_out": sample_out, | |
| "sample_out_lang_detected": out_lang, | |
| "lang_match": (out_lang == tgt) | |
| } | |
