MANOJSEQ's picture
Upload main.py
d9314fb verified
raw
history blame
71 kB
from fastapi import FastAPI, Query, HTTPException, Body
from typing import Optional, List, Dict, Any, Tuple, Set
import os
import time
import socket
import logging
import hashlib
from functools import lru_cache
from collections import Counter
import requests
import tldextract
import math
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderUnavailable, GeocoderTimedOut
from fastapi.middleware.cors import CORSMiddleware
from countryinfo import CountryInfo
from sentence_transformers import SentenceTransformer, util
from domain_country_map import domain_country_map
from time import monotonic
from langdetect import detect, DetectorFactory
import re
from urllib.parse import urlparse, urlunparse, parse_qsl
from concurrent.futures import ThreadPoolExecutor, as_completed
from html import unescape
import threading
import difflib
from starlette.middleware.gzip import GZipMiddleware
from transformers import pipeline as hf_pipeline
import os
os.environ.setdefault("OMP_NUM_THREADS", "1")
from fastapi import Path
import torch
torch.set_num_threads(2)
# Optional runtime check for local OPUS tokenizers
try:
import sentencepiece as _spm # noqa: F401
_HAS_SENTENCEPIECE = True
except Exception:
_HAS_SENTENCEPIECE = False
from enum import Enum
class Speed(str, Enum):
fast = "fast"
balanced = "balanced"
max = "max"
_local_pipes = {}
_news_clf = None
_sbert = None
# --- Translation runtime flags / caches ---
ALLOW_HF_REMOTE = os.getenv("ALLOW_HF_REMOTE", "0") == "1" # default OFF
_hf_bad_models: Set[str] = set()
def _translate_local(text: str, src: str, tgt: str) -> Optional[str]:
if not _HAS_SENTENCEPIECE:
# Avoid attempting to download/instantiate Marian tokenizers without sentencepiece
return None
model_id = opus_model_for(src, tgt)
if not model_id:
return None
key = model_id
try:
if key not in _local_pipes:
_local_pipes[key] = hf_pipeline("translation", model=model_id)
out = _local_pipes[key](text, max_length=512)
return out[0]["translation_text"]
except Exception as e:
log.warning("Local translate failed for %s: %s", model_id, e)
return None
def fetch_gdelt_multi(limit=120, query=None, language=None, timespan="48h", category=None, speed: Speed = Speed.balanced):
# If user forced a language, honor it (but add a small English boost for coverage)
if language:
primary = fetch_gdelt_articles(limit=limit, query=query, language=language, timespan=timespan, category=category)
# tiny English booster to catch global wires
booster = fetch_gdelt_articles(limit=max(10, limit // 6), query=query, language="en", timespan=timespan, category=category)
return primary + booster
# Otherwise rotate across multiple languages
if speed == Speed.fast:
langs = LANG_ROTATION[:3] # quicker
timespan = "24h"
elif speed == Speed.balanced:
langs = LANG_ROTATION[:8] # good mix
timespan = "48h"
else:
langs = LANG_ROTATION # max coverage
timespan = "3d"
per_lang = max(8, math.ceil(limit / len(langs)))
out = []
for lg in langs:
out.extend(fetch_gdelt_articles(limit=per_lang, query=query, language=lg, timespan=timespan, category=category))
# Optional: add a few English pulls biased to different source countries (broadens outlets)
if speed != Speed.fast:
per_cc = max(4, limit // 30) if speed == Speed.max else max(2, limit // 40)
for cc in COUNTRY_SEEDS[: (8 if speed == Speed.balanced else 16)]:
out.extend(
fetch_gdelt_articles(
limit=per_cc,
query=query,
language="en",
timespan=timespan,
category=category,
extra_tokens=[f"sourcecountry:{cc}"]
)
)
return out
def get_news_clf():
global _news_clf
if _news_clf is None:
_news_clf = hf_pipeline(
"text-classification",
model="cardiffnlp/tweet-topic-21-multi",
top_k=1,
)
return _news_clf
def get_sbert():
global _sbert
if _sbert is None:
_sbert = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
return _sbert
# globals
SESSION = requests.Session()
ADAPTER = requests.adapters.HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=2)
SESSION.mount("http://", ADAPTER)
SESSION.mount("https://", ADAPTER)
def _session_get(url, **kwargs):
headers = kwargs.pop("headers", {})
headers.setdefault("User-Agent", "Mozilla/5.0 (compatible; NewsGlobe/1.0)")
return SESSION.get(url, headers=headers, timeout=kwargs.pop("timeout", 12), **kwargs)
def _try_jina_reader(url: str, timeout: int) -> Optional[str]:
try:
u = url.strip()
if not u.startswith(("http://", "https://")):
u = "https://" + u
r = _session_get(f"https://r.jina.ai/{u}", timeout=timeout)
if r.status_code == 200:
txt = _clean_text(r.text)
sents = _split_sentences(txt)
if sents:
best = " ".join(sents[:2])
return best if len(best) >= 80 else (sents[0] if sents else None)
except Exception:
pass
return None
# --- description cleanup helpers ---
BOILER_DESC = re.compile(
r"(subscribe|sign in|sign up|enable javascript|cookies? (policy|settings)|"
r"privacy (policy|notice)|continue reading|read more|click here|"
r"accept (cookies|the terms)|by continuing|newsletter|advertisement|adblock)",
re.I
)
def _split_sentences(text: str) -> List[str]:
# light-weight splitter good enough for news blurbs
parts = re.split(r"(?<=[\.\?\!])\s+(?=[A-Z0-9])", (text or "").strip())
# also break on " β€’ " and long dashes if present
out = []
for p in parts:
out.extend(re.split(r"\s+[‒–—]\s+", p))
return [p.strip() for p in out if p and len(p.strip()) >= 2]
def _tidy_description(title: str, desc: str, source_name: str, max_chars: int = 240) -> str:
if not desc:
return ""
# remove repeated title
desc = _dedupe_title_from_desc(title, desc)
# strip obvious boilerplate
desc = BOILER_DESC.sub("", desc)
desc = re.sub(r"\s+", " ", desc).strip(" -–:β€’|")
# choose first 1–2 sentences that look like summary
sents = _split_sentences(desc)
if not sents:
sents = [desc]
best = " ".join(sents[:2]).strip()
# soft truncate at sentence boundary
if len(best) > max_chars:
# try only first sentence
if len(sents[0]) <= max_chars * 0.9:
best = sents[0]
else:
best = best[:max_chars].rsplit(" ", 1)[0].rstrip(",;:-–—")
# avoid parroting the headline
if _too_similar(title, best):
# try next sentence if we have it
for alt in sents[1:3]:
if not _too_similar(title, alt):
best = alt
break
# ensure it ends neatly
if best and best[-1] not in ".!?":
best += "."
return best
def _too_similar(a: str, b: str, thresh: float = 0.92) -> bool:
"""Return True if strings are near-duplicates (or one contains the other)."""
a = (a or "").strip()
b = (b or "").strip()
if not a or not b:
return False
if a.lower() == b.lower():
return True
if a.lower() in b.lower() or b.lower() in a.lower():
return True
ratio = difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio()
return ratio >= thresh
def _dedupe_title_from_desc(title: str, desc: str) -> str:
"""If the description contains the title, strip it and tidy up."""
t = (title or "").strip()
d = (desc or "").strip()
if not t or not d:
return d
# Remove exact leading title
if d.lower().startswith(t.lower()):
d = d[len(t):].lstrip(" -–:β€’|")
# Remove inner repeats
d = d.replace(t, "").strip(" -–:β€’|")
d = _clean_text(d)
return d
# Prevent duplicate upstream fetches when identical requests arrive together
_inflight_locks: Dict[Tuple, threading.Lock] = {}
_inflight_global_lock = threading.Lock()
def _get_inflight_lock(key: Tuple) -> threading.Lock:
with _inflight_global_lock:
lk = _inflight_locks.get(key)
if lk is None:
lk = threading.Lock()
_inflight_locks[key] = lk
return lk
DESC_CACHE_LOCK = threading.Lock()
try:
from bs4 import BeautifulSoup # optional but nice to have
except Exception:
BeautifulSoup = None
# -------- Description fetching config --------
DESC_FETCH_TIMEOUT = 3 # seconds per URL
DESC_MIN_LEN = 100 # consider shorter text as "weak" and try to upgrade
DESC_CACHE_TTL = 24 * 3600 # 24h
MAX_DESC_FETCHES = 24 # cap number of fetches per request
DESC_WORKERS = 12 # parallel workers
# url -> {"text": str, "t": monotonic()}
DESC_CACHE: Dict[str, Dict[str, Any]] = {}
def _now_mono():
try:
return monotonic()
except Exception:
return time.time()
def _clean_text(s: str) -> str:
s = unescape(s or "")
s = re.sub(r"\s+", " ", s).strip()
return s
def _extract_desc_from_ld_json(html: str) -> Optional[str]:
if not html or not BeautifulSoup:
return None
try:
soup = BeautifulSoup(html, "html.parser")
for tag in soup.find_all("script", {"type": "application/ld+json"}):
try:
import json
data = json.loads(tag.string or "")
except Exception:
continue
def find_desc(obj):
if not isinstance(obj, (dict, list)):
return None
if isinstance(obj, list):
for it in obj:
v = find_desc(it)
if v:
return v
return None
# dict
for key in ("description", "abstract", "articleBody"):
val = obj.get(key)
if isinstance(val, str):
txt = _clean_text(val)
if len(txt) >= 40:
return txt
# nested
for k, v in obj.items():
if isinstance(v, (dict, list)):
got = find_desc(v)
if got:
return got
return None
d = find_desc(data)
if d and len(d) >= 40:
return d
except Exception:
pass
return None
CONSENT_HINTS = re.compile(r"(consent|gdpr|privacy choices|before you continue|we value your privacy)", re.I)
def _looks_like_consent_wall(html: str) -> bool:
if not html:
return False
if "consent.yahoo.com" in html.lower(): # common interstitial
return True
# generic phrasing
return bool(CONSENT_HINTS.search(html))
def _extract_desc_from_html(html: str) -> Optional[str]:
html = html or ""
if BeautifulSoup:
soup = BeautifulSoup(html, "html.parser")
# βœ… JSON-LD early
ld = _extract_desc_from_ld_json(html)
if ld:
txt = _clean_text(ld)
if 40 <= len(txt) <= 480:
return txt
for sel, attr in [
('meta[property="og:description"]', "content"),
('meta[name="twitter:description"]', "content"),
('meta[name="description"]', "content"),
]:
tag = soup.select_one(sel)
if tag:
txt = _clean_text(tag.get(attr, ""))
if len(txt) >= 40:
return txt
# Fallback: first meaningful <p>
for p in soup.find_all("p"):
txt = _clean_text(p.get_text(" "))
if len(txt) >= 80:
return txt
else:
# regex fallbacks (as you had)
for pat in [
r'<meta[^>]+property=["\']og:description["\'][^>]+content=["\']([^"\']+)["\']',
r'<meta[^>]+name=["\']twitter:description["\'][^>]+content=["\']([^"\']+)["\']',
r'<meta[^>]+name=["\']description["\'][^>]+content=["\']([^"\']+)["\']',
]:
m = re.search(pat, html, flags=re.I | re.S)
if m:
txt = _clean_text(m.group(1))
if len(txt) >= 40:
return txt
m = re.search(r"<p[^>]*>(.*?)</p>", html, flags=re.I | re.S)
if m:
txt = _clean_text(re.sub("<[^>]+>", " ", m.group(1)))
if len(txt) >= 80:
return txt
# JSON-LD as last regex-free fallback not available w/o bs4
return None
def _desc_cache_get(url: str) -> Optional[str]:
if not url:
return None
entry = DESC_CACHE.get(url)
if not entry:
return None
if _now_mono() - entry["t"] > DESC_CACHE_TTL:
DESC_CACHE.pop(url, None)
return None
return entry["text"]
def _desc_cache_put(url: str, text: str):
if url and text:
with DESC_CACHE_LOCK:
DESC_CACHE[url] = {"text": text, "t": _now_mono()}
def _attempt_fetch(url: str, timeout: int) -> Optional[str]:
headers = {
"User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
try:
r = _session_get(url, headers=headers, timeout=timeout, allow_redirects=True)
if r.status_code != 200:
return None
ct = (r.headers.get("Content-Type") or "").lower()
txt = r.text or ""
if "html" not in ct and "<html" not in txt.lower():
return None
if _looks_like_consent_wall(txt):
# jump straight to Jina if a consent wall is detected
jd = _try_jina_reader(url, timeout)
if jd:
return jd
return None
desc = _extract_desc_from_html(txt)
if desc and 40 <= len(desc) <= 480:
return desc
except Exception:
# fall through to Jina
pass
# Last-resort: Jina reader
jd = _try_jina_reader(url, timeout)
if jd and 40 <= len(jd) <= 480:
return jd
return None
def fetch_page_description(url: str) -> Optional[str]:
"""Fetch and cache a best-effort article description from the page (incl. AMP retries)."""
if not url:
return None
cached = _desc_cache_get(url)
if cached:
return cached
# Try the original URL first
desc = _attempt_fetch(url, DESC_FETCH_TIMEOUT)
if not desc:
# Try common AMP variants
amp_candidates = []
try:
p = urlparse(url)
# /amp path
if not p.path.endswith("/amp"):
amp_candidates.append(urlunparse(p._replace(path=(p.path.rstrip("/") + "/amp"))))
# ?amp
q = p.query
amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "amp=1"))))
# ?outputType=amp (CNN, some US sites)
amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "outputType=amp"))))
except Exception:
pass
for amp_url in amp_candidates:
desc = _attempt_fetch(amp_url, DESC_FETCH_TIMEOUT)
if desc:
break
if desc:
_desc_cache_put(url, desc)
return desc
return None
def _needs_desc_upgrade(a: Dict[str, Any]) -> bool:
url = a.get("url") or ""
if not url:
return False
title = (a.get("title") or "").strip()
desc = (a.get("description") or "").strip()
if not desc or desc.lower().startswith("no description"):
return True
if len(desc) < DESC_MIN_LEN:
return True
# NEW: if desc β‰ˆ title, trigger upgrade
if _too_similar(title, desc):
return True
return False
def prefetch_descriptions(raw_articles: List[Dict[str, Any]], speed: Speed = Speed.balanced):
candidates, seen = [], set()
max_fetches = 6 if speed == Speed.fast else 8 if speed == Speed.balanced else 16
timeout = 1 if speed == Speed.fast else 2
workers = 3 if speed == Speed.fast else 4 if speed == Speed.balanced else 8
for a in raw_articles:
url = a.get("url");
if not url or url in seen: continue
seen.add(url)
if _needs_desc_upgrade(a) and not _desc_cache_get(url):
candidates.append(url)
if len(candidates) >= max_fetches: break
if not candidates: return
with ThreadPoolExecutor(max_workers=workers) as ex:
futs = [ex.submit(fetch_page_description, u) for u in candidates]
for _ in as_completed(futs): pass
def prefetch_descriptions_async(raw_articles, speed: Speed = Speed.balanced):
threading.Thread(target=prefetch_descriptions, args=(raw_articles, speed), daemon=True).start()
# news_clf = pipeline("text-classification", model="cardiffnlp/tweet-topic-21-multi", top_k=1)
DetectorFactory.seed = 0 # deterministic
SECTION_HINTS = {
"sports": "sports",
"sport": "sports",
"business": "business",
"money": "business",
"market": "business",
"tech": "technology",
"technology": "technology",
"sci": "science",
"science": "science",
"health": "health",
"wellness": "health",
"entertainment": "entertainment",
"culture": "entertainment",
"showbiz": "entertainment",
"crime": "crime",
"world": "general",
"weather": "weather",
"environment": "environment",
"climate": "environment",
"travel": "travel",
"politics": "politics",
"election": "politics",
}
KEYWORDS = {
"sports": r"\b(NBA|NFL|MLB|NHL|Olympic|goal|match|tournament|coach|transfer)\b",
"business": r"\b(stocks?|earnings|IPO|merger|acquisition|revenue|inflation|market)\b",
"technology": r"\b(AI|software|chip|semiconductor|app|startup|cyber|hack|quantum|robot)\b",
"science": r"\b(researchers?|study|physics|astronomy|genome|spacecraft|telescope)\b",
"health": r"\b(virus|vaccine|disease|hospital|doctor|public health|covid)\b",
"entertainment": r"\b(movie|film|box office|celebrity|series|show|album|music)\b",
"crime": r"\b(arrested|charged|police|homicide|fraud|theft|court|lawsuit)\b",
"weather": r"\b(hurricane|storm|flood|heatwave|blizzard|tornado|forecast)\b",
"environment": r"\b(climate|emissions|wildfire|deforestation|biodiversity)\b",
"travel": r"\b(flight|airline|airport|tourism|visa|cruise|hotel)\b",
"politics": r"\b(president|parliament|congress|minister|policy|campaign|election)\b",
}
def _infer_category_from_url_path(url_path: str) -> Optional[str]:
parts = [p for p in url_path.lower().split("/") if p]
for p in parts:
if p in SECTION_HINTS:
return SECTION_HINTS[p]
# also try hyphenated tokens like 'us-business' or 'tech-news'
for p in parts:
for tok in re.split(r"[-_]", p):
if tok in SECTION_HINTS:
return SECTION_HINTS[tok]
return None
def _infer_category_from_text(text: str) -> Optional[str]:
if not text:
return None
for cat, pat in KEYWORDS.items():
if re.search(pat, text, flags=re.I):
return cat
return None
def infer_category(article_url, title, description, provided):
if provided:
p = provided.strip().lower()
if p:
return p
# url rules
try:
p = urlparse(article_url).path or ""
cat = _infer_category_from_url_path(p)
if cat:
return cat
except Exception:
pass
# keyword rules
text = f"{title or ''} {description or ''}".strip()
cat = _infer_category_from_text(text)
if cat:
return cat
try:
preds = get_news_clf()(text[:512]) # lazy-loaded
if isinstance(preds[0], list):
label = preds[0][0]["label"]
else:
label = preds[0]["label"]
return label.lower()
except Exception as e:
log.warning(f"ML category failed: {e}")
return "general"
BOILER = re.compile(r"\b(live updates|breaking|what we know|in pictures|opinion)\b", re.I)
def _norm_text(s: str) -> str:
s = (s or "").strip()
s = re.sub(r"\s+", " ", s)
return s
def _cluster_text(a):
base = f"{a.get('orig_title') or a.get('title') or ''} {a.get('orig_description') or a.get('description') or ''}"
base = BOILER.sub("", base)
base = re.sub(r"\b(\d{1,2}:\d{2}\s?(AM|PM))|\b(\d{1,2}\s\w+\s\d{4})", "", base, flags=re.I)
return _norm_text(base)
def _canonical_url(u: str) -> str:
if not u:
return u
p = urlparse(u)
# drop tracking params
qs = [(k, v) for (k, v) in parse_qsl(p.query, keep_blank_values=False) if not k.lower().startswith(("utm_", "fbclid", "gclid"))]
clean = p._replace(query="&".join([f"{k}={v}" for k, v in qs]), fragment="")
# some sites add trailing slashes inconsistently
path = clean.path.rstrip("/") or "/"
clean = clean._replace(path=path)
return urlunparse(clean)
def detect_lang(text: str) -> Optional[str]:
try:
return detect(text) # returns 'en','fr','de',...
except Exception:
return None
def _embed_texts(texts: List[str]):
embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False)
return embs
# ---- cache helpers ----
CACHE_TTL_SECS = 900
SIM_THRESHOLD = 0.6
_events_cache: Dict[Tuple, Dict[str, Any]] = {}
def cache_key_for(q, category, language, limit_each, translate=False, target_lang=None, speed=Speed.balanced):
return (q or "", category or "", language or "", int(limit_each or 50),
bool(translate), (target_lang or "").lower(), speed.value)
_first_real_build = True # module-global
def get_or_build_events_cache(q, category, language, translate, target_lang, limit_each, speed=Speed.balanced):
global _first_real_build
key = cache_key_for(q, category, language, limit_each, translate, target_lang, speed)
now = monotonic()
if speed == Speed.fast:
use_timespan, use_limit = "24h", min(limit_each, 20)
elif speed == Speed.balanced:
use_timespan, use_limit = "48h", min(limit_each, 150)
else: # max
use_timespan, use_limit = "3d", limit_each
entry = _events_cache.get(key)
if entry and now - entry["t"] < CACHE_TTL_SECS:
log.info(f"CACHE HIT for {key}")
return key, entry["enriched"], entry["clusters"]
lock = _get_inflight_lock(key)
with lock:
entry = _events_cache.get(key)
if entry and now - entry["t"] < CACHE_TTL_SECS:
log.info(f"CACHE HIT (post-lock) for {key}")
return key, entry["enriched"], entry["clusters"]
if _first_real_build:
use_timespan = "24h" if use_timespan != "24h" else use_timespan
use_limit = min(use_limit, 100)
log.info(f"CACHE MISS for {key} β€” fetching (timespan={use_timespan}, limit_each={use_limit})")
raw = combine_raw_articles(
category=category, # providers may use it; inference ignores it
query=q,
language=language,
limit_each=use_limit,
timespan=use_timespan,
speed=speed,
)
prefetch_descriptions_async(raw, speed)
enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw]
if category:
cat_norm = (category or "").strip().lower()
enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm]
else:
enriched = enriched_all
clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD, speed=speed)
_events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters}
_first_real_build = False
return key, enriched, clusters
# Which languages to rotate when user didn't restrict language
LANG_ROTATION = ["en", "es", "fr", "de", "ar", "ru", "pt", "zh", "hi", "ja", "ko"]
# A few sourcecountry seeds for English to diversify outlets (optional)
COUNTRY_SEEDS = ["US", "GB", "IN", "CA", "AU", "ZA", "SG", "NG", "DE", "FR", "BR", "MX", "ES", "RU", "JP", "KR", "CN"]
# ----------------- Config / Keys -----------------
USE_GNEWS_API = False
USE_NEWSDATA_API = False
USE_GDELT_API = True
USE_NEWSAPI = False
NEWSAPI_KEY = os.getenv("NEWSAPI_KEY", "ea734c66dc4044fa8e4501ad7b90e753")
GNEWS_API_KEY = os.getenv("GNEWS_API_KEY", "5419897c95e8a4b21074e0d3fe95a3dd")
NEWSDATA_API_KEY = os.getenv("NEWSDATA_API_KEY", "pub_1feb49a71a844719af68d0844fb43a61")
HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN")
logging.basicConfig(
level=logging.WARNING,
format="%(levelname)s:%(name)s:%(message)s",
)
log = logging.getLogger("newsglobe")
log.setLevel(logging.WARNING)
fetch_log = logging.getLogger("newsglobe.fetch_summary")
fetch_log.setLevel(logging.INFO)
_fetch_handler = logging.StreamHandler()
_fetch_handler.setLevel(logging.INFO)
_fetch_handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
fetch_log.addHandler(_fetch_handler)
fetch_log.propagate = False # don't pass to root (which is WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARNING)
logging.getLogger("sentence_transformers").setLevel(logging.WARNING)
logging.getLogger("transformers").setLevel(logging.WARNING)
for name in ("urllib3", "urllib3.connectionpool", "requests.packages.urllib3"):
lg = logging.getLogger(name)
lg.setLevel(logging.ERROR)
lg.propagate = False
def _newsapi_enabled() -> bool:
if not NEWSAPI_KEY:
log.warning("NewsAPI disabled: missing NEWSAPI_KEY env var")
return False
return True
def cluster_id(cluster, enriched_articles):
urls = sorted([(enriched_articles[i].get("url") or "") for i in cluster["indices"] if enriched_articles[i].get("url")])
base = "|".join(urls) if urls else "empty"
return hashlib.md5(base.encode("utf-8")).hexdigest()[:10]
# ----------------- NLTK / VADER -----------------
try:
nltk.data.find("sentiment/vader_lexicon")
except LookupError:
nltk.download("vader_lexicon") # one-time fetch in a fresh container
try:
_vader = SentimentIntensityAnalyzer()
except Exception:
_vader = None
def classify_sentiment(text: str) -> str:
if not text:
return "neutral"
if _vader is None:
return "neutral"
scores = _vader.polarity_scores(text)
c = scores["compound"]
return "positive" if c >= 0.2 else "negative" if c <= -0.2 else "neutral"
# ----------------- Geocode helpers -----------------
def get_country_centroid(country_name):
if not country_name or country_name == "Unknown":
return {"lat": 0, "lon": 0, "country": "Unknown"}
try:
country = CountryInfo(country_name)
latlng = country.capital_latlng()
return {"lat": latlng[0], "lon": latlng[1], "country": country_name}
except Exception as e:
log.info(f"Could not get centroid for {country_name}: {e}")
return {"lat": 0, "lon": 0, "country": country_name or "Unknown"}
def resolve_domain_to_ip(domain):
if not domain:
return None
try:
return socket.gethostbyname(domain)
except socket.gaierror:
return None
def geolocate_ip(ip):
try:
r = _session_get(f"https://ipwho.is/{ip}?fields=success,country,latitude,longitude", timeout=8)
j = r.json()
if j.get("success"):
return {"lat": j["latitude"], "lon": j["longitude"], "country": j["country"]}
except Exception:
pass
return None
geolocator = Nominatim(user_agent="newsglobe-app (contact: you@example.com)")
domain_geo_cache: Dict[str, Dict[str, Any]] = {}
MAJOR_OUTLETS = {
"bbc.co.uk": "United Kingdom",
"theguardian.com": "United Kingdom",
"reuters.com": "United States",
"aljazeera.com": "Qatar",
"lemonde.fr": "France",
"dw.com": "Germany",
"abc.net.au": "Australia",
"ndtv.com": "India",
"globo.com": "Brazil",
"elpais.com": "Spain",
"lefigaro.fr": "France",
"kyodonews.net": "Japan",
"straitstimes.com": "Singapore",
"thesun.my": "Malaysia", # <-- add this
}
def geocode_source(source_text: str, domain: str = "", do_network: bool = False):
cache_key = f"{source_text}|{domain}"
if cache_key in domain_geo_cache:
return domain_geo_cache[cache_key]
ext = tldextract.extract(domain or "")
fqdn = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else ""
# 0) Major outlets / domain map
if fqdn in MAJOR_OUTLETS:
coords = get_country_centroid(MAJOR_OUTLETS[fqdn]); domain_geo_cache[cache_key] = coords; return coords
if ext.domain in domain_country_map:
coords = get_country_centroid(domain_country_map[ext.domain]); domain_geo_cache[cache_key] = coords; return coords
# 1) Suffix fallback (instant)
coords = get_country_centroid(_suffix_country(ext.suffix))
domain_geo_cache[cache_key] = coords
# 2) Optional async refinement (never block hot path)
if do_network:
threading.Thread(target=_refine_geo_async, args=(cache_key, source_text, fqdn), daemon=True).start()
return coords
def _suffix_country(suffix: Optional[str]) -> str:
s = (suffix or "").split(".")[-1]
m = {
"au":"Australia","uk":"United Kingdom","gb":"United Kingdom","ca":"Canada","in":"India","us":"United States",
"ng":"Nigeria","de":"Germany","fr":"France","jp":"Japan","sg":"Singapore","za":"South Africa","nz":"New Zealand",
"ie":"Ireland","it":"Italy","es":"Spain","se":"Sweden","ch":"Switzerland","nl":"Netherlands","br":"Brazil",
"my":"Malaysia","id":"Indonesia","ph":"Philippines","th":"Thailand","vn":"Vietnam","sa":"Saudi Arabia",
"ae":"United Arab Emirates","tr":"Turkey","mx":"Mexico","ar":"Argentina","cl":"Chile","co":"Colombia",
"il":"Israel","kr":"South Korea","cn":"China","tw":"Taiwan","hk":"Hong Kong"
}
return m.get(s, "United States" if s in ("com","org","net") else "Unknown")
def _refine_geo_async(cache_key, source_text, fqdn):
try:
# Try IP geo (cheap)
ip = resolve_domain_to_ip(fqdn) if fqdn else None
if ip:
coords = geolocate_ip(ip)
if coords:
domain_geo_cache[cache_key] = coords
return
# Try Nominatim FAST (lower timeout)
location = geolocator.geocode(f"{source_text} News Headquarters", timeout=2)
if location and hasattr(location, "raw"):
coords = {
"lat": location.latitude,
"lon": location.longitude,
"country": location.raw.get("address", {}).get("country", "Unknown"),
}
domain_geo_cache[cache_key] = coords
except Exception:
pass
# ----------------- HuggingFace translate (optional) -----------------
HF_MODEL_PRIMARY = None # disable NLLB remote (avoids 404 spam); use OPUS + pivot/LibreTranslate
# 2-letter ISO -> NLLB codes
NLLB_CODES = {
"en": "eng_Latn",
"es": "spa_Latn",
"fr": "fra_Latn",
"de": "deu_Latn",
"it": "ita_Latn",
"pt": "por_Latn",
"zh": "zho_Hans",
"ru": "rus_Cyrl",
"ar": "arb_Arab",
"hi": "hin_Deva",
"ja": "jpn_Jpan",
"ko": "kor_Hang",
}
# OPUS-MT model map for common pairs (expand as needed)
def opus_model_for(src2: str, tgt2: str) -> Optional[str]:
pairs = {
("es", "en"): "Helsinki-NLP/opus-mt-es-en",
("en", "es"): "Helsinki-NLP/opus-mt-en-es",
("fr", "en"): "Helsinki-NLP/opus-mt-fr-en",
("en", "fr"): "Helsinki-NLP/opus-mt-en-fr",
("de", "en"): "Helsinki-NLP/opus-mt-de-en",
("en", "de"): "Helsinki-NLP/opus-mt-en-de",
("pt", "en"): "Helsinki-NLP/opus-mt-pt-en",
("en", "pt"): "Helsinki-NLP/opus-mt-en-pt",
("it", "en"): "Helsinki-NLP/opus-mt-it-en",
("en", "it"): "Helsinki-NLP/opus-mt-en-it",
("ru", "en"): "Helsinki-NLP/opus-mt-ru-en",
("en", "ru"): "Helsinki-NLP/opus-mt-en-ru",
("zh", "en"): "Helsinki-NLP/opus-mt-zh-en",
("en", "zh"): "Helsinki-NLP/opus-mt-en-zh",
("ja", "en"): "Helsinki-NLP/opus-mt-ja-en",
("en", "ja"): "Helsinki-NLP/opus-mt-en-ja",
("ko", "en"): "Helsinki-NLP/opus-mt-ko-en",
("en", "ko"): "Helsinki-NLP/opus-mt-en-ko",
("hi", "en"): "Helsinki-NLP/opus-mt-hi-en",
("en", "hi"): "Helsinki-NLP/opus-mt-en-hi",
("ar", "en"): "Helsinki-NLP/opus-mt-ar-en",
("en", "ar"): "Helsinki-NLP/opus-mt-en-ar",
}
return pairs.get((src2, tgt2))
SUPPORTED = {"en", "fr", "de", "es", "it", "hi", "ar", "ru", "ja", "ko", "pt", "zh"}
LIBRETRANSLATE_URL = os.getenv("LIBRETRANSLATE_URL") # e.g., http://127.0.0.1:5000
def _translate_via_libre(text: str, src: str, tgt: str) -> Optional[str]:
url = LIBRETRANSLATE_URL
if not url or not text or src == tgt:
return None
try:
r = SESSION.post(
f"{url.rstrip('/')}/translate",
json={"q": text, "source": src, "target": tgt, "format": "text"},
timeout=6
)
if r.status_code == 200:
j = r.json()
out = j.get("translatedText")
return out if isinstance(out, str) and out else None
else:
log.warning("LibreTranslate HTTP %s: %s", r.status_code, r.text[:200])
except Exception as e:
log.warning("LibreTranslate failed: %s", e)
return None
def _hf_call(model_id: str, payload: dict) -> Optional[str]:
# require both a token and explicit opt-in
if not (HUGGINGFACE_API_TOKEN and ALLOW_HF_REMOTE):
return None
if model_id in _hf_bad_models:
return None
url = f"https://api-inference.huggingface.co/models/{model_id}"
headers = {
"Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}",
"HF-API-KEY": HUGGINGFACE_API_TOKEN,
"Accept": "application/json",
"Content-Type": "application/json",
}
try:
r = requests.post(url, headers=headers, json=payload, timeout=25)
if r.status_code != 200:
if r.status_code == 404:
_hf_bad_models.add(model_id)
log.warning("HF %s -> 404: Not Found (disabled for this process)", model_id)
else:
log.warning("HF %s -> %s: %s", model_id, r.status_code, r.text[:300])
return None
j = r.json()
except Exception as e:
log.warning("HF request failed: %s", e)
return None
if isinstance(j, list) and j and isinstance(j[0], dict):
if "generated_text" in j[0]:
return j[0]["generated_text"]
if "translation_text" in j[0]:
return j[0]["translation_text"]
if isinstance(j, dict) and "generated_text" in j:
return j["generated_text"]
if isinstance(j, str):
return j
return None
@lru_cache(maxsize=4096)
def _translate_cached(text: str, src: str, tgt: str) -> str:
if not text or src == tgt:
return text
# 0) Local LibreTranslate (fast & free, if running)
out = _translate_via_libre(text, src, tgt)
if out:
return out
# 1) OPUS serverless (direct pair) – try this first
opus_model = opus_model_for(src, tgt)
if opus_model:
out = _hf_call(opus_model, {"inputs": text})
if out:
return out
# 2) NLLB serverless (optional; disabled if HF_MODEL_PRIMARY is None)
try:
if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES):
out = _hf_call(
HF_MODEL_PRIMARY,
{
"inputs": text,
"parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]},
"options": {"wait_for_model": True},
},
)
if out:
return out
except Exception:
pass
# 3) Two-hop pivot via English for non-English↔non-English
if src != "en" and tgt != "en":
step_en = _translate_cached(text, src, "en")
if step_en and step_en != text:
out = _translate_cached(step_en, "en", tgt)
if out:
return out
# 4) Local OPUS fallback (direct pair with local pipeline)
out = _translate_local(text, src, tgt)
if out:
return out
log.warning("All translate paths failed (%s->%s); returning original.", src, tgt)
return text
def translate_text(text: str, target_lang: Optional[str], fallback_src: Optional[str] = None) -> str:
if not text or not target_lang:
return text
tgt = target_lang.lower()
if tgt not in SUPPORTED:
return text
src = (fallback_src or detect_lang(text) or "en").lower()
if src == tgt:
return text
if src not in SUPPORTED:
if src.startswith("zh"):
src = "zh"
elif src.startswith("pt"):
src = "pt"
elif src[:2] in SUPPORTED:
src = src[:2]
else:
src = "en"
return _translate_cached(text, src, tgt)
# ----------------- FastAPI -----------------
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=500)
# === Warm config ===
WARM_LIMIT_EACH = 20 # smaller bite to prime caches
WARM_TIMESPAN = "24h" # narrower GDELT window for faster first fetch
WARM_PREFETCH_DESCRIPTIONS = False
def _fmt_mmss(ms: float) -> str:
total_sec = int(round(ms / 1000.0))
m, s = divmod(total_sec, 60)
return f"{m}:{s:02d}"
def _warm_once():
try:
log.info("WARM: starting background warm-up (limit_each=%d, timespan=%s)", WARM_LIMIT_EACH, WARM_TIMESPAN)
t0 = time.perf_counter()
# models (you already call these in startup, but keep them here too)
get_sbert()
get_news_clf()
# fetch a small set with shorter timespan
t1 = time.perf_counter()
raw = combine_raw_articles(
category=None, query=None, language="en",
limit_each=WARM_LIMIT_EACH, timespan=WARM_TIMESPAN,
log_summary=False # ← silence warm-up summary
)
t_fetch = (time.perf_counter() - t1) * 1000
# optional: skip description prefetch during warm to save time
if WARM_PREFETCH_DESCRIPTIONS:
prefetch_descriptions_async(raw)
# enrich + cluster once (no translation on warm)
t2 = time.perf_counter()
enriched = [enrich_article(a, language="en", translate=False, target_lang=None) for a in raw]
t_enrich = (time.perf_counter() - t2) * 1000
t3 = time.perf_counter()
clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD)
t_cluster = (time.perf_counter() - t3) * 1000
# stash in cache under the common default key so /news and /events hit warm data
key = cache_key_for(q=None, category=None, language="en",
limit_each=WARM_LIMIT_EACH, translate=False, target_lang=None,
speed=Speed.balanced) # πŸ‘ˆ add speed
_events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters}
t_total = (time.perf_counter() - t0) * 1000
log.info(
"WARM: fetch=%s, enrich=%s, cluster=%s, total=%s (raw=%d, enriched=%d, clusters=%d)",
_fmt_mmss(t_fetch), _fmt_mmss(t_enrich), _fmt_mmss(t_cluster), _fmt_mmss(t_total),
len(raw), len(enriched), len(clusters),
)
except Exception as e:
log.warning(f"WARM: failed: {e}")
@app.on_event("startup")
def warm():
# keep your existing model warms
get_sbert()
get_news_clf()
# fire-and-forget warm in a background thread so startup stays snappy
threading.Thread(target=_warm_once, daemon=True).start()
# ----------------- Providers -----------------
# ISO -> GDELT 'sourcelang:' names (keep yours)
_GDELT_LANG = {
"en": "english",
"es": "spanish",
"fr": "french",
"de": "german",
"it": "italian",
"pt": "portuguese",
"ru": "russian",
"ar": "arabic",
"hi": "hindi",
"ja": "japanese",
"ko": "korean",
"zh": "chinese",
}
def _gdelt_safe_query(user_q, language):
parts = []
if user_q:
q = user_q.strip()
if len(q) < 3:
q = f'"{q}" news'
parts.append(q)
if language and (lg := _GDELT_LANG.get(language.lower())):
parts.append(f"sourcelang:{lg}")
if not parts:
# rotate or randomly choose one to diversify
parts.append("sourcelang:english")
return " ".join(parts)
def fetch_gdelt_articles(
limit=50,
query=None,
language=None,
timespan="3d",
category=None,
extra_tokens: Optional[List[str]] = None
):
q = _gdelt_safe_query(query, language)
if extra_tokens:
q = f"{q} " + " ".join(extra_tokens)
url = "https://api.gdeltproject.org/api/v2/doc/doc"
params = {
"query": q,
"mode": "ArtList",
"format": "json",
"sort": "DateDesc",
"maxrecords": int(min(250, max(1, limit))),
"timespan": timespan,
}
headers = {
"User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)",
"Accept": "application/json",
}
def _do_request(p):
r = _session_get(url, params=p, timeout=10)
log.info(f"GDELT URL: {r.url} (status={r.status_code})")
if r.status_code != 200:
log.warning(f"GDELT HTTP {r.status_code}: {r.text[:400]}")
return None
try:
return r.json()
except Exception:
ct = r.headers.get("Content-Type", "")
log.warning(f"GDELT non-JSON response. CT={ct}. Body[:400]: {r.text[:400]}")
return None
data = _do_request(params)
if data is None:
# Retry narrower and smaller if needed
p2 = {**params, "timespan": "24h", "maxrecords": min(100, params["maxrecords"])}
data = _do_request(p2)
if not data:
return []
arts = data.get("articles") or []
results = []
for a in arts:
desc = a.get("description") or a.get("content") or ""
title = a.get("title") or ""
if desc and (
desc.strip().lower() == title.strip().lower() or
(len(desc) <= 60 and _too_similar(title, desc))
):
desc = ""
desc = desc or "No description available"
results.append(
{
"title": title,
"url": a.get("url"),
"source": {"name": a.get("domain") or "GDELT"},
"description": desc,
"publishedAt": a.get("seendate"),
"api_source": "gdelt",
"gdelt_sourcecountry": a.get("sourcecountry"),
# Keep the user's chosen category only for debugging/reference; do NOT use for inference.
"requested_category": category,
}
)
log.info(f"GDELT returned {len(results)}")
return results
def fetch_newsdata_articles(category=None, limit=20, query=None, language=None):
base_url = "https://newsdata.io/api/1/news"
allowed = [
"business",
"entertainment",
"environment",
"food",
"health",
"politics",
"science",
"sports",
"technology",
"top",
"world",
]
params = {"apikey": NEWSDATA_API_KEY, "language": (language or "en")}
if category and category in allowed:
params["category"] = category
if query:
params["q"] = query
all_articles, next_page = [], None
while len(all_articles) < limit:
if next_page:
params["page"] = next_page
resp = _session_get(base_url, params=params, timeout=12)
if resp.status_code != 200:
break
data = resp.json()
articles = data.get("results", [])
for a in articles:
a["api_source"] = "newsdata"
all_articles.extend(articles)
next_page = data.get("nextPage")
if not next_page:
break
# normalize timestamps if available
for a in all_articles:
a["publishedAt"] = a.get("pubDate")
return all_articles[:limit]
def fetch_gnews_articles(limit=20, query=None, language=None):
url = f"https://gnews.io/api/v4/top-headlines?lang={(language or 'en')}&max={limit}&token={GNEWS_API_KEY}"
if query:
url += f"&q={requests.utils.quote(query)}"
try:
r = _session_get(url, timeout=12)
if r.status_code != 200:
return []
arts = r.json().get("articles", [])
for a in arts:
a["api_source"] = "gnews"
return arts
except Exception:
return []
NEWSAPI_COUNTRIES = ["us", "gb", "ca", "au", "in", "za", "sg", "ie", "nz"]
def fetch_newsapi_headlines_multi(limit=50, language=None):
if not _newsapi_enabled():
return []
all_ = []
per = max(1, math.ceil(limit / max(1, len(NEWSAPI_COUNTRIES))))
per = min(per, 100) # NewsAPI pageSize cap
for c in NEWSAPI_COUNTRIES:
url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per}&apiKey={NEWSAPI_KEY}"
r = _session_get(url, timeout=12)
if r.status_code != 200:
log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}")
continue
arts = r.json().get("articles", [])
for a in arts:
a["api_source"] = "newsapi"
all_.extend(arts)
time.sleep(0.2)
return all_[:limit] # βœ… enforce exact limit
def fetch_newsapi_articles(category=None, limit=20, query=None, language=None):
if not _newsapi_enabled():
return []
# If a query is provided, use /everything (language allowed here)
if query:
url = f"https://newsapi.org/v2/everything?pageSize={limit}&apiKey={NEWSAPI_KEY}&q={requests.utils.quote(query)}"
if language:
url += f"&language={language}"
try:
r = _session_get(url, timeout=12)
if r.status_code != 200:
log.warning(f"NewsAPI /everything HTTP {r.status_code}: {r.text[:200]}")
return []
arts = r.json().get("articles", [])
for a in arts:
a["api_source"] = "newsapi"
# DO NOT stamp category here; we infer later
return arts[:limit]
except Exception as e:
log.warning(f"NewsAPI /everything request failed: {e}")
return []
# Otherwise, rotate /top-headlines by country (no language param)
results = []
per_country = max(5, limit // len(NEWSAPI_COUNTRIES))
for c in NEWSAPI_COUNTRIES:
url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per_country}&apiKey={NEWSAPI_KEY}"
if category:
url += f"&category={category}"
try:
r = _session_get(url, timeout=12)
if r.status_code != 200:
log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}")
continue
arts = r.json().get("articles", [])
for a in arts:
a["api_source"] = "newsapi"
# DO NOT stamp category here; we infer later
results.extend(arts)
except Exception as e:
log.warning(f"NewsAPI top-headlines {c} failed: {e}")
time.sleep(0.2)
return results[:limit]
def normalize_newsdata_article(article):
return {
"title": article.get("title"),
"url": article.get("link"),
"source": {"name": article.get("source_id", "NewsData.io")},
"description": article.get("description") or "No description available",
"publishedAt": article.get("publishedAt"),
"api_source": article.get("api_source", "newsdata"),
"category": ((article.get("category") or [None])[0] if isinstance(article.get("category"), list) else article.get("category")),
}
# ----------------- Enrichment -----------------
def enrich_article(a, language=None, translate=False, target_lang=None):
# Normalize source name
source_name = (a.get("source", {}) or {}).get("name", "").strip() or "Unknown"
s_lower = source_name.lower()
if "newsapi" in s_lower:
source_name = "NewsAPI"
elif "gnews" in s_lower:
source_name = "GNews"
elif "newsdata" in s_lower:
source_name = "NewsData.io"
# Canonicalize URL & derive domain
article_url = _canonical_url(a.get("url") or "")
try:
ext = tldextract.extract(article_url)
domain = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else ""
except Exception:
domain = ""
# Country guess (GDELT provides ISO2)
country_guess = None
if a.get("api_source") == "gdelt":
sc = a.get("gdelt_sourcecountry")
if sc:
iso2map = {
"US": "United States", "GB": "United Kingdom", "AU": "Australia", "CA": "Canada", "IN": "India",
"DE": "Germany", "FR": "France", "IT": "Italy", "ES": "Spain", "BR": "Brazil", "JP": "Japan",
"CN": "China", "RU": "Russia", "KR": "South Korea", "ZA": "South Africa", "NG": "Nigeria",
"MX": "Mexico", "AR": "Argentina", "CL": "Chile", "CO": "Colombia", "NL": "Netherlands",
"SE": "Sweden", "NO": "Norway", "DK": "Denmark", "FI": "Finland", "IE": "Ireland", "PL": "Poland",
"PT": "Portugal", "GR": "Greece", "TR": "Turkey", "IL": "Israel", "SA": "Saudi Arabia",
"AE": "United Arab Emirates", "SG": "Singapore", "MY": "Malaysia", "TH": "Thailand",
"PH": "Philippines", "ID": "Indonesia", "NZ": "New Zealand",
}
country_guess = iso2map.get(str(sc).upper(), sc if len(str(sc)) > 2 else None)
coords = get_country_centroid(country_guess) if country_guess else geocode_source(source_name, domain, do_network=False)
# Title / description (raw)
title = (a.get("title") or "").strip() or "(untitled)"
description = (a.get("description") or "").strip()
if description.lower().startswith("no description"):
description = ""
# Prefer cached page summary when weak/title-like
cached_desc = _desc_cache_get(article_url)
need_upgrade = (
(not description)
or description.lower().startswith("no description")
or len(description) < DESC_MIN_LEN
or _too_similar(title, description)
)
if need_upgrade and cached_desc:
description = cached_desc
if description:
description = _tidy_description(title, description, source_name)
if (not description) or _too_similar(title, description):
description = f"Quick take: {title.rstrip('.')}."
# Save originals for categorization and debug
orig_title = title
orig_description = description
# Language detection / sentiment
detected_lang = (detect_lang(f"{title} {description}") or "").lower()
ml_text = f"{orig_title}. {orig_description}".strip()
sentiment = classify_sentiment(f"{orig_title} {orig_description}")
# Stable id & category (ALWAYS infer; ignore provider/requested categories)
seed = f"{source_name}|{article_url}|{title}"
uid = hashlib.md5(seed.encode("utf-8")).hexdigest()[:12]
cat = infer_category(article_url, orig_title, orig_description, None)
return {
"id": uid,
"title": title,
"url": article_url,
"source": source_name,
"lat": coords["lat"],
"lon": coords["lon"],
"country": coords.get("country", "Unknown"),
"description": description,
"sentiment": sentiment,
"api_source": a.get("api_source", "unknown"),
"publishedAt": a.get("publishedAt"),
"_ml_text": ml_text,
"orig_title": orig_title,
"orig_description": orig_description,
"detected_lang": detected_lang,
"translated": False,
"category": cat,
}
# ----------------- Clustering into Events -----------------
# sbert_model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
# cluster_articles()
def cluster_articles(articles: List[Dict[str, Any]], sim_threshold=SIM_THRESHOLD, speed=Speed.balanced):
if speed == Speed.fast:
articles = articles[:150] # early cap
sim_threshold = max(sim_threshold, 0.64)
elif speed == Speed.balanced:
articles = articles[:]
sim_threshold = max(sim_threshold, 0.62)
texts = [_cluster_text(a) for a in articles]
embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False)
clusters = [] # [{indices:[...], centroid:tensor}]
centroids = []
for i, emb in enumerate(embs):
best_idx, best_sim = -1, -1.0
for ci, c_emb in enumerate(centroids):
sim = util.cos_sim(emb, c_emb).item()
if sim > sim_threshold and sim > best_sim:
best_sim, best_idx = sim, ci
if best_idx >= 0:
clusters[best_idx]["indices"].append(i)
idxs = clusters[best_idx]["indices"]
new_c = embs[idxs].mean(dim=0)
new_c = new_c / new_c.norm()
centroids[best_idx] = new_c
clusters[best_idx]["centroid"] = new_c
else:
# use texts[i] now (titles[] no longer exists)
event_id = hashlib.md5(texts[i].encode("utf-8")).hexdigest()[:10]
clusters.append({"id": event_id, "indices": [i], "centroid": emb})
centroids.append(emb)
# second-pass merge to reduce fragmenting
merged = _merge_close_clusters(clusters, embs, threshold=0.70)
# keep ids stable: recompute with URLs of member articles
for c in merged:
c["id"] = cluster_id(c, articles)
return merged
def event_payload_from_cluster(cluster, enriched_articles):
idxs = cluster["indices"]
arts = [enriched_articles[i] for i in idxs]
title_counts = Counter([a["title"] for a in arts])
canonical_title = title_counts.most_common(1)[0][0]
keywords = list({w.lower() for t in title_counts for w in t.split() if len(w) > 3})[:8]
sources = {a["source"] for a in arts}
countries = {a["country"] for a in arts if a["country"] and a["country"] != "Unknown"}
ts = [a.get("publishedAt") for a in arts if a.get("publishedAt")]
return {
"event_id": cluster_id(cluster, enriched_articles), # <-- stable id
"title": canonical_title,
"keywords": keywords,
"article_count": len(arts),
"source_count": len(sources),
"country_count": len(countries),
"time_range": {"min": min(ts) if ts else None, "max": max(ts) if ts else None},
"sample_urls": [a["url"] for a in arts[:3] if a.get("url")],
}
def aggregate_event_by_country(cluster, enriched_articles):
idxs = cluster["indices"]
arts = [enriched_articles[i] for i in idxs]
by_country: Dict[str, Dict[str, Any]] = {}
for a in arts:
c = a.get("country") or "Unknown"
if c not in by_country:
coords = get_country_centroid(c)
by_country[c] = {"country": c, "lat": coords["lat"], "lon": coords["lon"], "articles": []}
by_country[c]["articles"].append(a)
# summarize per country
results = []
for c, block in by_country.items():
arr = block["articles"]
# avg sentiment mapped to -1/0/+1
to_num = {"negative": -1, "neutral": 0, "positive": 1}
vals = [to_num.get(a["sentiment"], 0) for a in arr]
avg = sum(vals) / max(len(vals), 1)
avg_sent = "positive" if avg > 0.15 else "negative" if avg < -0.15 else "neutral"
top_sources = [s for s, _ in Counter([a["source"] for a in arr]).most_common(3)]
# tiny extractive summary: top 2 headlines
summary = " β€’ ".join([a["title"] for a in arr[:2]])
results.append(
{
"country": c,
"lat": block["lat"],
"lon": block["lon"],
"count": len(arr),
"avg_sentiment": avg_sent,
"top_sources": top_sources,
"summary": summary,
"samples": [
{
"title": a["title"],
"orig_title": a.get("orig_title"),
"orig_description": a.get("orig_description"), # πŸ‘ˆ add this
"url": a["url"],
"source": a["source"],
"sentiment": a["sentiment"],
"detected_lang": a.get("detected_lang"),
}
for a in arr[:5]
],
}
)
return results
def _merge_close_clusters(clusters, embs, threshold=0.68):
# clusters: [{"indices":[...], "centroid":tensor}, ...] – add centroid in your first pass
merged = []
used = set()
for i in range(len(clusters)):
if i in used:
continue
base = clusters[i]
group = [i]
for j in range(i + 1, len(clusters)):
if j in used:
continue
sim = util.cos_sim(base["centroid"], clusters[j]["centroid"]).item()
if sim >= threshold:
group.append(j)
# merge those groups
all_idx = []
cents = []
for g in group:
used.add(g)
all_idx.extend(clusters[g]["indices"])
cents.append(clusters[g]["centroid"])
# new centroid
newc = torch.stack(cents, dim=0).mean(dim=0)
newc = newc / newc.norm()
merged.append({"indices": sorted(set(all_idx)), "centroid": newc})
return merged
# ----------------- Endpoints -----------------
prefetch = False
@app.get("/events")
def get_events(
q: Optional[str] = Query(None),
category: Optional[str] = Query(None),
language: Optional[str] = Query(None),
translate: Optional[bool] = Query(False),
target_lang: Optional[str] = Query(None),
limit_each: int = Query(150, ge=5, le=250),
max_events: int = Query(15, ge=5, le=50),
min_countries: int = Query(2, ge=1, le=50),
min_articles: int = Query(2, ge=1, le=200),
speed: Speed = Query(Speed.balanced),
):
# always build cache on untranslated data
cache_key, enriched, clusters = get_or_build_events_cache(
q, category, language, False, None, limit_each, speed=speed
)
# optional post-translate view (does not mutate cache)
view = enriched
if translate and target_lang:
view = [dict(i) for i in enriched]
for i in view:
src_hint = i.get("detected_lang")
i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint)
i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint)
i["translated"] = True
events = [event_payload_from_cluster(c, view) for c in clusters]
events = [e for e in events if (e["country_count"] >= min_countries and e["article_count"] >= min_articles)]
events.sort(key=lambda e: e["article_count"], reverse=True)
return {"events": events[:max_events], "cache_key": "|".join(map(str, cache_key))}
@app.get("/event/{event_id}")
def get_event_details(
event_id: str,
cache_key: Optional[str] = Query(None),
q: Optional[str] = Query(None),
category: Optional[str] = Query(None),
language: Optional[str] = Query(None),
translate: Optional[bool] = Query(False),
target_lang: Optional[str] = Query(None),
limit_each: int = Query(150, ge=5, le=250),
):
# /event/{event_id}
if cache_key:
parts = cache_key.split("|")
if len(parts) != 7:
raise HTTPException(status_code=400, detail="Bad cache_key")
speed_str = parts[6]
try:
speed_obj = Speed(speed_str) # "fast" | "balanced" | "max"
except ValueError:
speed_obj = Speed.balanced
key_tuple = (parts[0], parts[1], parts[2], int(parts[3]),
parts[4] == "True", parts[5].lower(), speed_str)
else:
speed_obj = Speed.balanced
key_tuple = cache_key_for(q, category, language, limit_each, translate, target_lang, speed=speed_obj)
entry = _events_cache.get(key_tuple)
if not entry:
# always build untranslated
_, enriched, clusters = get_or_build_events_cache(
q, category, language, False, None, limit_each, speed=speed_obj
)
else:
enriched, clusters = entry["enriched"], entry["clusters"]
# optional post-translate view (do not mutate cache)
eview = enriched
if translate and target_lang:
eview = [dict(i) for i in enriched]
for i in eview:
src_hint = i.get("detected_lang")
i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint)
i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint)
i["translated"] = True
cluster = next((c for c in clusters if cluster_id(c, enriched) == event_id), None)
if not cluster:
raise HTTPException(status_code=404, detail="Event not found with current filters")
payload = event_payload_from_cluster(cluster, eview)
countries = aggregate_event_by_country(cluster, eview)
payload["articles_in_event"] = sum(c["count"] for c in countries)
return {"event": payload, "countries": countries}
@app.get("/news")
def get_news(
cache_key: Optional[str] = Query(None),
category: Optional[str] = Query(None),
sentiment: Optional[str] = Query(None),
q: Optional[str] = Query(None),
language: Optional[str] = Query(None),
translate: Optional[bool] = Query(False),
target_lang: Optional[str] = Query(None),
limit_each: int = Query(100, ge=5, le=100),
lite: bool = Query(True),
speed: Speed = Query(Speed.balanced),
page: int = Query(1, ge=1),
page_size: int = Query(120, ge=5, le=300),
):
enriched: List[Dict[str, Any]] = []
# Pull from cache if provided
if cache_key:
parts = cache_key.split("|")
if len(parts) == 7:
key_tuple = (
parts[0], # q
parts[1], # category
parts[2], # language
int(parts[3]), # limit_each
parts[4] == "True", # translate
parts[5].lower(), # target_lang
parts[6], # speed
)
entry = _events_cache.get(key_tuple)
if entry:
enriched = entry["enriched"]
# Fetch fresh if no cached items
if not enriched:
raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each, speed=speed)
prefetch_descriptions_async(raw, speed)
enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw]
if category:
cat_norm = (category or "").strip().lower()
enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm]
else:
enriched = enriched_all
else:
# If we got cached items but want to ensure the selected category is enforced:
if category:
cat_norm = (category or "").strip().lower()
enriched = [e for e in enriched if (e.get("category") or "").lower() == cat_norm]
# Translation (optional)
if translate and target_lang:
enriched = [dict(i) for i in enriched]
for i in enriched:
i["original_title"] = i.get("orig_title") or i.get("title")
i["original_description"] = i.get("orig_description") or i.get("description")
src_hint = i.get("detected_lang")
i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint)
i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint)
i["translated"] = True
i["translated_from"] = (src_hint or "").lower()
i["translated_to"] = target_lang.lower()
# Optional sentiment filter
if sentiment:
s = sentiment.strip().lower()
enriched = [i for i in enriched if i.get("sentiment", "").lower() == s]
# Pagination
total = len(enriched)
start = (page - 1) * page_size
end = start + page_size
items = [dict(i) for i in enriched[start:end]]
# Trim debug fields
if lite:
drop = {"_ml_text"}
for i in items:
for k in drop:
i.pop(k, None)
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size
}
def combine_raw_articles(category=None, query=None, language=None, limit_each=30,
timespan="3d", speed=Speed.balanced, log_summary: bool = True):
if speed == Speed.fast:
timespan = "24h"
limit_each = min(limit_each, 20)
elif speed == Speed.balanced:
timespan = "48h"
limit_each = min(limit_each, 150)
a1 = []
if USE_NEWSAPI:
if not query:
a1 = fetch_newsapi_headlines_multi(limit=limit_each, language=language)
else:
a1 = fetch_newsapi_articles(category=category, limit=limit_each, query=query, language=language)
a2 = []
if USE_NEWSDATA_API:
a2 = [
normalize_newsdata_article(a)
for a in fetch_newsdata_articles(category=category, limit=limit_each, query=query, language=language)
if a.get("link")
]
a3 = fetch_gnews_articles(limit=limit_each, query=query, language=language) if USE_GNEWS_API else []
# a4 = fetch_gdelt_articles(
# limit=min(100, limit_each * 2),
# query=query,
# language=language,
# timespan=timespan,
# category=category
# )
gdelt_limit = limit_each
a4 = fetch_gdelt_multi(
limit=gdelt_limit,
query=query,
language=language, # if provided, we honor it (with small EN boost)
timespan=timespan,
category=category,
speed=speed,
)
# Dedup by canonical URL (maintain source precedence)
seen, merged = set(), []
for a in a1 + a3 + a2 + a4:
if a.get("url"):
a["url"] = _canonical_url(a["url"])
url = a["url"]
if url not in seen:
seen.add(url)
merged.append(a)
if log_summary:
fetch_log.info("----- Article Fetch Summary -----")
fetch_log.info(f"πŸ“Š NewsAPI returned: {len(a1)} articles")
fetch_log.info(f"πŸ“Š NewsData.io returned: {len(a2)} articles")
fetch_log.info(f"πŸ“Š GNews returned: {len(a3)} articles")
fetch_log.info(f"πŸ“Š GDELT returned: {len(a4)} articles")
fetch_log.info(f"βœ… Total merged articles after deduplication: {len(merged)}")
fetch_log.info("---------------------------------")
return merged
@app.get("/related")
def related_articles(
id: Optional[str] = Query(None, description="article id from /news"),
title: Optional[str] = Query(None),
description: Optional[str] = Query(None),
q: Optional[str] = Query(None),
category: Optional[str] = Query(None),
language: Optional[str] = Query(None),
limit_each: int = Query(50, ge=5, le=100),
k: int = Query(10, ge=1, le=50),
):
# ensure we have a working article list (enriched) to search over
raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each)
enriched = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw]
if not enriched:
return {"items": []}
# pick the query vector
if id:
base = next((a for a in enriched if a.get("id") == id), None)
if not base:
raise HTTPException(404, "article id not found in current fetch")
query_text = base["_ml_text"]
else:
text = f"{title or ''} {description or ''}".strip()
if not text:
raise HTTPException(400, "provide either id or title/description")
query_text = text
corpus_texts = [a["_ml_text"] for a in enriched]
corpus_embs = _embed_texts(corpus_texts)
query_emb = _embed_texts([query_text])[0]
# cosine similarities
sims = util.cos_sim(query_emb, corpus_embs).cpu().numpy().flatten()
# take top-k excluding the query itself (if id provided)
idxs = sims.argsort()[::-1]
items = []
for idx in idxs:
a = enriched[idx]
if id and a["id"] == id:
continue
items.append({**a, "similarity": float(sims[idx])})
if len(items) >= k:
break
return {"items": items}
@app.middleware("http")
async def timing_middleware(request, call_next):
start = time.perf_counter()
response = None
try:
response = await call_next(request)
return response
finally:
dur_ms = (time.perf_counter() - start) * 1000
# log.info(f"{request.method} {request.url.path} -> {dur_ms:.1f} ms ({_fmt_mmss(dur_ms)})")
if response is not None:
try:
response.headers["X-Process-Time-ms"] = f"{dur_ms:.1f}"
except Exception:
pass
@app.post("/client-metric")
def client_metric(payload: Dict[str, Any] = Body(...)):
name = (payload.get("name") or "").strip()
# Drop redraw spam if it ever slips through again
if name in {"Load all article markers on globe", "Load event country markers on globe"}:
return {"ok": True}
return {"ok": True}
@app.get("/diag/translate")
def diag_translate():
remote = _hf_call("Helsinki-NLP/opus-mt-es-en", {"inputs":"Hola mundo"})
local = _translate_local("Hola mundo", "es", "en")
libre = _translate_via_libre("Hola mundo", "es", "en")
return {
"token": bool(HUGGINGFACE_API_TOKEN),
"remote_ok": bool(remote),
"local_ok": bool(local),
"libre_ok": bool(libre),
"sample": libre or remote or local
}