Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Query, HTTPException, Body | |
| from typing import Optional, List, Dict, Any, Tuple, Set | |
| import os | |
| import time | |
| import socket | |
| import logging | |
| import hashlib | |
| from functools import lru_cache | |
| from collections import Counter | |
| import requests | |
| import tldextract | |
| import math | |
| import nltk | |
| from nltk.sentiment import SentimentIntensityAnalyzer | |
| from geopy.geocoders import Nominatim | |
| from geopy.exc import GeocoderUnavailable, GeocoderTimedOut | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from countryinfo import CountryInfo | |
| from sentence_transformers import SentenceTransformer, util | |
| from domain_country_map import domain_country_map | |
| from time import monotonic | |
| from langdetect import detect, DetectorFactory | |
| import re | |
| from urllib.parse import urlparse, urlunparse, parse_qsl | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from html import unescape | |
| import threading | |
| import difflib | |
| from starlette.middleware.gzip import GZipMiddleware | |
| from transformers import pipeline as hf_pipeline | |
| import os | |
| os.environ.setdefault("OMP_NUM_THREADS", "1") | |
| from fastapi import Path | |
| import torch | |
| torch.set_num_threads(2) | |
| # Optional runtime check for local OPUS tokenizers | |
| try: | |
| import sentencepiece as _spm # noqa: F401 | |
| _HAS_SENTENCEPIECE = True | |
| except Exception: | |
| _HAS_SENTENCEPIECE = False | |
| from enum import Enum | |
| class Speed(str, Enum): | |
| fast = "fast" | |
| balanced = "balanced" | |
| max = "max" | |
| _local_pipes = {} | |
| _news_clf = None | |
| _sbert = None | |
| # --- Translation runtime flags / caches --- | |
| ALLOW_HF_REMOTE = os.getenv("ALLOW_HF_REMOTE", "0") == "1" # default OFF | |
| _hf_bad_models: Set[str] = set() | |
| def _translate_local(text: str, src: str, tgt: str) -> Optional[str]: | |
| if not _HAS_SENTENCEPIECE: | |
| # Avoid attempting to download/instantiate Marian tokenizers without sentencepiece | |
| return None | |
| model_id = opus_model_for(src, tgt) | |
| if not model_id: | |
| return None | |
| key = model_id | |
| try: | |
| if key not in _local_pipes: | |
| _local_pipes[key] = hf_pipeline("translation", model=model_id) | |
| out = _local_pipes[key](text, max_length=512) | |
| return out[0]["translation_text"] | |
| except Exception as e: | |
| log.warning("Local translate failed for %s: %s", model_id, e) | |
| return None | |
| def fetch_gdelt_multi(limit=120, query=None, language=None, timespan="48h", category=None, speed: Speed = Speed.balanced): | |
| # If user forced a language, honor it (but add a small English boost for coverage) | |
| if language: | |
| primary = fetch_gdelt_articles(limit=limit, query=query, language=language, timespan=timespan, category=category) | |
| # tiny English booster to catch global wires | |
| booster = fetch_gdelt_articles(limit=max(10, limit // 6), query=query, language="en", timespan=timespan, category=category) | |
| return primary + booster | |
| # Otherwise rotate across multiple languages | |
| if speed == Speed.fast: | |
| langs = LANG_ROTATION[:3] # quicker | |
| timespan = "24h" | |
| elif speed == Speed.balanced: | |
| langs = LANG_ROTATION[:8] # good mix | |
| timespan = "48h" | |
| else: | |
| langs = LANG_ROTATION # max coverage | |
| timespan = "3d" | |
| per_lang = max(8, math.ceil(limit / len(langs))) | |
| out = [] | |
| for lg in langs: | |
| out.extend(fetch_gdelt_articles(limit=per_lang, query=query, language=lg, timespan=timespan, category=category)) | |
| # Optional: add a few English pulls biased to different source countries (broadens outlets) | |
| if speed != Speed.fast: | |
| per_cc = max(4, limit // 30) if speed == Speed.max else max(2, limit // 40) | |
| for cc in COUNTRY_SEEDS[: (8 if speed == Speed.balanced else 16)]: | |
| out.extend( | |
| fetch_gdelt_articles( | |
| limit=per_cc, | |
| query=query, | |
| language="en", | |
| timespan=timespan, | |
| category=category, | |
| extra_tokens=[f"sourcecountry:{cc}"] | |
| ) | |
| ) | |
| return out | |
| def get_news_clf(): | |
| global _news_clf | |
| if _news_clf is None: | |
| _news_clf = hf_pipeline( | |
| "text-classification", | |
| model="cardiffnlp/tweet-topic-21-multi", | |
| top_k=1, | |
| ) | |
| return _news_clf | |
| def get_sbert(): | |
| global _sbert | |
| if _sbert is None: | |
| _sbert = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| return _sbert | |
| # globals | |
| SESSION = requests.Session() | |
| ADAPTER = requests.adapters.HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=2) | |
| SESSION.mount("http://", ADAPTER) | |
| SESSION.mount("https://", ADAPTER) | |
| def _session_get(url, **kwargs): | |
| headers = kwargs.pop("headers", {}) | |
| headers.setdefault("User-Agent", "Mozilla/5.0 (compatible; NewsGlobe/1.0)") | |
| return SESSION.get(url, headers=headers, timeout=kwargs.pop("timeout", 12), **kwargs) | |
| def _try_jina_reader(url: str, timeout: int) -> Optional[str]: | |
| try: | |
| u = url.strip() | |
| if not u.startswith(("http://", "https://")): | |
| u = "https://" + u | |
| r = _session_get(f"https://r.jina.ai/{u}", timeout=timeout) | |
| if r.status_code == 200: | |
| txt = _clean_text(r.text) | |
| sents = _split_sentences(txt) | |
| if sents: | |
| best = " ".join(sents[:2]) | |
| return best if len(best) >= 80 else (sents[0] if sents else None) | |
| except Exception: | |
| pass | |
| return None | |
| # --- description cleanup helpers --- | |
| BOILER_DESC = re.compile( | |
| r"(subscribe|sign in|sign up|enable javascript|cookies? (policy|settings)|" | |
| r"privacy (policy|notice)|continue reading|read more|click here|" | |
| r"accept (cookies|the terms)|by continuing|newsletter|advertisement|adblock)", | |
| re.I | |
| ) | |
| def _split_sentences(text: str) -> List[str]: | |
| # light-weight splitter good enough for news blurbs | |
| parts = re.split(r"(?<=[\.\?\!])\s+(?=[A-Z0-9])", (text or "").strip()) | |
| # also break on " β’ " and long dashes if present | |
| out = [] | |
| for p in parts: | |
| out.extend(re.split(r"\s+[β’ββ]\s+", p)) | |
| return [p.strip() for p in out if p and len(p.strip()) >= 2] | |
| def _tidy_description(title: str, desc: str, source_name: str, max_chars: int = 240) -> str: | |
| if not desc: | |
| return "" | |
| # remove repeated title | |
| desc = _dedupe_title_from_desc(title, desc) | |
| # strip obvious boilerplate | |
| desc = BOILER_DESC.sub("", desc) | |
| desc = re.sub(r"\s+", " ", desc).strip(" -β:β’|") | |
| # choose first 1β2 sentences that look like summary | |
| sents = _split_sentences(desc) | |
| if not sents: | |
| sents = [desc] | |
| best = " ".join(sents[:2]).strip() | |
| # soft truncate at sentence boundary | |
| if len(best) > max_chars: | |
| # try only first sentence | |
| if len(sents[0]) <= max_chars * 0.9: | |
| best = sents[0] | |
| else: | |
| best = best[:max_chars].rsplit(" ", 1)[0].rstrip(",;:-ββ") | |
| # avoid parroting the headline | |
| if _too_similar(title, best): | |
| # try next sentence if we have it | |
| for alt in sents[1:3]: | |
| if not _too_similar(title, alt): | |
| best = alt | |
| break | |
| # ensure it ends neatly | |
| if best and best[-1] not in ".!?": | |
| best += "." | |
| return best | |
| def _too_similar(a: str, b: str, thresh: float = 0.92) -> bool: | |
| """Return True if strings are near-duplicates (or one contains the other).""" | |
| a = (a or "").strip() | |
| b = (b or "").strip() | |
| if not a or not b: | |
| return False | |
| if a.lower() == b.lower(): | |
| return True | |
| if a.lower() in b.lower() or b.lower() in a.lower(): | |
| return True | |
| ratio = difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio() | |
| return ratio >= thresh | |
| def _dedupe_title_from_desc(title: str, desc: str) -> str: | |
| """If the description contains the title, strip it and tidy up.""" | |
| t = (title or "").strip() | |
| d = (desc or "").strip() | |
| if not t or not d: | |
| return d | |
| # Remove exact leading title | |
| if d.lower().startswith(t.lower()): | |
| d = d[len(t):].lstrip(" -β:β’|") | |
| # Remove inner repeats | |
| d = d.replace(t, "").strip(" -β:β’|") | |
| d = _clean_text(d) | |
| return d | |
| # Prevent duplicate upstream fetches when identical requests arrive together | |
| _inflight_locks: Dict[Tuple, threading.Lock] = {} | |
| _inflight_global_lock = threading.Lock() | |
| def _get_inflight_lock(key: Tuple) -> threading.Lock: | |
| with _inflight_global_lock: | |
| lk = _inflight_locks.get(key) | |
| if lk is None: | |
| lk = threading.Lock() | |
| _inflight_locks[key] = lk | |
| return lk | |
| DESC_CACHE_LOCK = threading.Lock() | |
| try: | |
| from bs4 import BeautifulSoup # optional but nice to have | |
| except Exception: | |
| BeautifulSoup = None | |
| # -------- Description fetching config -------- | |
| DESC_FETCH_TIMEOUT = 3 # seconds per URL | |
| DESC_MIN_LEN = 100 # consider shorter text as "weak" and try to upgrade | |
| DESC_CACHE_TTL = 24 * 3600 # 24h | |
| MAX_DESC_FETCHES = 24 # cap number of fetches per request | |
| DESC_WORKERS = 12 # parallel workers | |
| # url -> {"text": str, "t": monotonic()} | |
| DESC_CACHE: Dict[str, Dict[str, Any]] = {} | |
| def _now_mono(): | |
| try: | |
| return monotonic() | |
| except Exception: | |
| return time.time() | |
| def _clean_text(s: str) -> str: | |
| s = unescape(s or "") | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def _extract_desc_from_ld_json(html: str) -> Optional[str]: | |
| if not html or not BeautifulSoup: | |
| return None | |
| try: | |
| soup = BeautifulSoup(html, "html.parser") | |
| for tag in soup.find_all("script", {"type": "application/ld+json"}): | |
| try: | |
| import json | |
| data = json.loads(tag.string or "") | |
| except Exception: | |
| continue | |
| def find_desc(obj): | |
| if not isinstance(obj, (dict, list)): | |
| return None | |
| if isinstance(obj, list): | |
| for it in obj: | |
| v = find_desc(it) | |
| if v: | |
| return v | |
| return None | |
| # dict | |
| for key in ("description", "abstract", "articleBody"): | |
| val = obj.get(key) | |
| if isinstance(val, str): | |
| txt = _clean_text(val) | |
| if len(txt) >= 40: | |
| return txt | |
| # nested | |
| for k, v in obj.items(): | |
| if isinstance(v, (dict, list)): | |
| got = find_desc(v) | |
| if got: | |
| return got | |
| return None | |
| d = find_desc(data) | |
| if d and len(d) >= 40: | |
| return d | |
| except Exception: | |
| pass | |
| return None | |
| CONSENT_HINTS = re.compile(r"(consent|gdpr|privacy choices|before you continue|we value your privacy)", re.I) | |
| def _looks_like_consent_wall(html: str) -> bool: | |
| if not html: | |
| return False | |
| if "consent.yahoo.com" in html.lower(): # common interstitial | |
| return True | |
| # generic phrasing | |
| return bool(CONSENT_HINTS.search(html)) | |
| def _extract_desc_from_html(html: str) -> Optional[str]: | |
| html = html or "" | |
| if BeautifulSoup: | |
| soup = BeautifulSoup(html, "html.parser") | |
| # β JSON-LD early | |
| ld = _extract_desc_from_ld_json(html) | |
| if ld: | |
| txt = _clean_text(ld) | |
| if 40 <= len(txt) <= 480: | |
| return txt | |
| for sel, attr in [ | |
| ('meta[property="og:description"]', "content"), | |
| ('meta[name="twitter:description"]', "content"), | |
| ('meta[name="description"]', "content"), | |
| ]: | |
| tag = soup.select_one(sel) | |
| if tag: | |
| txt = _clean_text(tag.get(attr, "")) | |
| if len(txt) >= 40: | |
| return txt | |
| # Fallback: first meaningful <p> | |
| for p in soup.find_all("p"): | |
| txt = _clean_text(p.get_text(" ")) | |
| if len(txt) >= 80: | |
| return txt | |
| else: | |
| # regex fallbacks (as you had) | |
| for pat in [ | |
| r'<meta[^>]+property=["\']og:description["\'][^>]+content=["\']([^"\']+)["\']', | |
| r'<meta[^>]+name=["\']twitter:description["\'][^>]+content=["\']([^"\']+)["\']', | |
| r'<meta[^>]+name=["\']description["\'][^>]+content=["\']([^"\']+)["\']', | |
| ]: | |
| m = re.search(pat, html, flags=re.I | re.S) | |
| if m: | |
| txt = _clean_text(m.group(1)) | |
| if len(txt) >= 40: | |
| return txt | |
| m = re.search(r"<p[^>]*>(.*?)</p>", html, flags=re.I | re.S) | |
| if m: | |
| txt = _clean_text(re.sub("<[^>]+>", " ", m.group(1))) | |
| if len(txt) >= 80: | |
| return txt | |
| # JSON-LD as last regex-free fallback not available w/o bs4 | |
| return None | |
| def _desc_cache_get(url: str) -> Optional[str]: | |
| if not url: | |
| return None | |
| entry = DESC_CACHE.get(url) | |
| if not entry: | |
| return None | |
| if _now_mono() - entry["t"] > DESC_CACHE_TTL: | |
| DESC_CACHE.pop(url, None) | |
| return None | |
| return entry["text"] | |
| def _desc_cache_put(url: str, text: str): | |
| if url and text: | |
| with DESC_CACHE_LOCK: | |
| DESC_CACHE[url] = {"text": text, "t": _now_mono()} | |
| def _attempt_fetch(url: str, timeout: int) -> Optional[str]: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| } | |
| try: | |
| r = _session_get(url, headers=headers, timeout=timeout, allow_redirects=True) | |
| if r.status_code != 200: | |
| return None | |
| ct = (r.headers.get("Content-Type") or "").lower() | |
| txt = r.text or "" | |
| if "html" not in ct and "<html" not in txt.lower(): | |
| return None | |
| if _looks_like_consent_wall(txt): | |
| # jump straight to Jina if a consent wall is detected | |
| jd = _try_jina_reader(url, timeout) | |
| if jd: | |
| return jd | |
| return None | |
| desc = _extract_desc_from_html(txt) | |
| if desc and 40 <= len(desc) <= 480: | |
| return desc | |
| except Exception: | |
| # fall through to Jina | |
| pass | |
| # Last-resort: Jina reader | |
| jd = _try_jina_reader(url, timeout) | |
| if jd and 40 <= len(jd) <= 480: | |
| return jd | |
| return None | |
| def fetch_page_description(url: str) -> Optional[str]: | |
| """Fetch and cache a best-effort article description from the page (incl. AMP retries).""" | |
| if not url: | |
| return None | |
| cached = _desc_cache_get(url) | |
| if cached: | |
| return cached | |
| # Try the original URL first | |
| desc = _attempt_fetch(url, DESC_FETCH_TIMEOUT) | |
| if not desc: | |
| # Try common AMP variants | |
| amp_candidates = [] | |
| try: | |
| p = urlparse(url) | |
| # /amp path | |
| if not p.path.endswith("/amp"): | |
| amp_candidates.append(urlunparse(p._replace(path=(p.path.rstrip("/") + "/amp")))) | |
| # ?amp | |
| q = p.query | |
| amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "amp=1")))) | |
| # ?outputType=amp (CNN, some US sites) | |
| amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "outputType=amp")))) | |
| except Exception: | |
| pass | |
| for amp_url in amp_candidates: | |
| desc = _attempt_fetch(amp_url, DESC_FETCH_TIMEOUT) | |
| if desc: | |
| break | |
| if desc: | |
| _desc_cache_put(url, desc) | |
| return desc | |
| return None | |
| def _needs_desc_upgrade(a: Dict[str, Any]) -> bool: | |
| url = a.get("url") or "" | |
| if not url: | |
| return False | |
| title = (a.get("title") or "").strip() | |
| desc = (a.get("description") or "").strip() | |
| if not desc or desc.lower().startswith("no description"): | |
| return True | |
| if len(desc) < DESC_MIN_LEN: | |
| return True | |
| # NEW: if desc β title, trigger upgrade | |
| if _too_similar(title, desc): | |
| return True | |
| return False | |
| def prefetch_descriptions(raw_articles: List[Dict[str, Any]], speed: Speed = Speed.balanced): | |
| candidates, seen = [], set() | |
| max_fetches = 6 if speed == Speed.fast else 8 if speed == Speed.balanced else 16 | |
| timeout = 1 if speed == Speed.fast else 2 | |
| workers = 3 if speed == Speed.fast else 4 if speed == Speed.balanced else 8 | |
| for a in raw_articles: | |
| url = a.get("url"); | |
| if not url or url in seen: continue | |
| seen.add(url) | |
| if _needs_desc_upgrade(a) and not _desc_cache_get(url): | |
| candidates.append(url) | |
| if len(candidates) >= max_fetches: break | |
| if not candidates: return | |
| with ThreadPoolExecutor(max_workers=workers) as ex: | |
| futs = [ex.submit(fetch_page_description, u) for u in candidates] | |
| for _ in as_completed(futs): pass | |
| def prefetch_descriptions_async(raw_articles, speed: Speed = Speed.balanced): | |
| threading.Thread(target=prefetch_descriptions, args=(raw_articles, speed), daemon=True).start() | |
| # news_clf = pipeline("text-classification", model="cardiffnlp/tweet-topic-21-multi", top_k=1) | |
| DetectorFactory.seed = 0 # deterministic | |
| SECTION_HINTS = { | |
| "sports": "sports", | |
| "sport": "sports", | |
| "business": "business", | |
| "money": "business", | |
| "market": "business", | |
| "tech": "technology", | |
| "technology": "technology", | |
| "sci": "science", | |
| "science": "science", | |
| "health": "health", | |
| "wellness": "health", | |
| "entertainment": "entertainment", | |
| "culture": "entertainment", | |
| "showbiz": "entertainment", | |
| "crime": "crime", | |
| "world": "general", | |
| "weather": "weather", | |
| "environment": "environment", | |
| "climate": "environment", | |
| "travel": "travel", | |
| "politics": "politics", | |
| "election": "politics", | |
| } | |
| KEYWORDS = { | |
| "sports": r"\b(NBA|NFL|MLB|NHL|Olympic|goal|match|tournament|coach|transfer)\b", | |
| "business": r"\b(stocks?|earnings|IPO|merger|acquisition|revenue|inflation|market)\b", | |
| "technology": r"\b(AI|software|chip|semiconductor|app|startup|cyber|hack|quantum|robot)\b", | |
| "science": r"\b(researchers?|study|physics|astronomy|genome|spacecraft|telescope)\b", | |
| "health": r"\b(virus|vaccine|disease|hospital|doctor|public health|covid)\b", | |
| "entertainment": r"\b(movie|film|box office|celebrity|series|show|album|music)\b", | |
| "crime": r"\b(arrested|charged|police|homicide|fraud|theft|court|lawsuit)\b", | |
| "weather": r"\b(hurricane|storm|flood|heatwave|blizzard|tornado|forecast)\b", | |
| "environment": r"\b(climate|emissions|wildfire|deforestation|biodiversity)\b", | |
| "travel": r"\b(flight|airline|airport|tourism|visa|cruise|hotel)\b", | |
| "politics": r"\b(president|parliament|congress|minister|policy|campaign|election)\b", | |
| } | |
| def _infer_category_from_url_path(url_path: str) -> Optional[str]: | |
| parts = [p for p in url_path.lower().split("/") if p] | |
| for p in parts: | |
| if p in SECTION_HINTS: | |
| return SECTION_HINTS[p] | |
| # also try hyphenated tokens like 'us-business' or 'tech-news' | |
| for p in parts: | |
| for tok in re.split(r"[-_]", p): | |
| if tok in SECTION_HINTS: | |
| return SECTION_HINTS[tok] | |
| return None | |
| def _infer_category_from_text(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| for cat, pat in KEYWORDS.items(): | |
| if re.search(pat, text, flags=re.I): | |
| return cat | |
| return None | |
| def infer_category(article_url, title, description, provided): | |
| if provided: | |
| p = provided.strip().lower() | |
| if p: | |
| return p | |
| # url rules | |
| try: | |
| p = urlparse(article_url).path or "" | |
| cat = _infer_category_from_url_path(p) | |
| if cat: | |
| return cat | |
| except Exception: | |
| pass | |
| # keyword rules | |
| text = f"{title or ''} {description or ''}".strip() | |
| cat = _infer_category_from_text(text) | |
| if cat: | |
| return cat | |
| try: | |
| preds = get_news_clf()(text[:512]) # lazy-loaded | |
| if isinstance(preds[0], list): | |
| label = preds[0][0]["label"] | |
| else: | |
| label = preds[0]["label"] | |
| return label.lower() | |
| except Exception as e: | |
| log.warning(f"ML category failed: {e}") | |
| return "general" | |
| BOILER = re.compile(r"\b(live updates|breaking|what we know|in pictures|opinion)\b", re.I) | |
| def _norm_text(s: str) -> str: | |
| s = (s or "").strip() | |
| s = re.sub(r"\s+", " ", s) | |
| return s | |
| def _cluster_text(a): | |
| base = f"{a.get('orig_title') or a.get('title') or ''} {a.get('orig_description') or a.get('description') or ''}" | |
| base = BOILER.sub("", base) | |
| base = re.sub(r"\b(\d{1,2}:\d{2}\s?(AM|PM))|\b(\d{1,2}\s\w+\s\d{4})", "", base, flags=re.I) | |
| return _norm_text(base) | |
| def _canonical_url(u: str) -> str: | |
| if not u: | |
| return u | |
| p = urlparse(u) | |
| # drop tracking params | |
| qs = [(k, v) for (k, v) in parse_qsl(p.query, keep_blank_values=False) if not k.lower().startswith(("utm_", "fbclid", "gclid"))] | |
| clean = p._replace(query="&".join([f"{k}={v}" for k, v in qs]), fragment="") | |
| # some sites add trailing slashes inconsistently | |
| path = clean.path.rstrip("/") or "/" | |
| clean = clean._replace(path=path) | |
| return urlunparse(clean) | |
| def detect_lang(text: str) -> Optional[str]: | |
| try: | |
| return detect(text) # returns 'en','fr','de',... | |
| except Exception: | |
| return None | |
| def _embed_texts(texts: List[str]): | |
| embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False) | |
| return embs | |
| # ---- cache helpers ---- | |
| CACHE_TTL_SECS = 900 | |
| SIM_THRESHOLD = 0.6 | |
| _events_cache: Dict[Tuple, Dict[str, Any]] = {} | |
| def cache_key_for(q, category, language, limit_each, translate=False, target_lang=None, speed=Speed.balanced): | |
| return (q or "", category or "", language or "", int(limit_each or 50), | |
| bool(translate), (target_lang or "").lower(), speed.value) | |
| _first_real_build = True # module-global | |
| def get_or_build_events_cache(q, category, language, translate, target_lang, limit_each, speed=Speed.balanced): | |
| global _first_real_build | |
| key = cache_key_for(q, category, language, limit_each, translate, target_lang, speed) | |
| now = monotonic() | |
| if speed == Speed.fast: | |
| use_timespan, use_limit = "24h", min(limit_each, 20) | |
| elif speed == Speed.balanced: | |
| use_timespan, use_limit = "48h", min(limit_each, 150) | |
| else: # max | |
| use_timespan, use_limit = "3d", limit_each | |
| entry = _events_cache.get(key) | |
| if entry and now - entry["t"] < CACHE_TTL_SECS: | |
| log.info(f"CACHE HIT for {key}") | |
| return key, entry["enriched"], entry["clusters"] | |
| lock = _get_inflight_lock(key) | |
| with lock: | |
| entry = _events_cache.get(key) | |
| if entry and now - entry["t"] < CACHE_TTL_SECS: | |
| log.info(f"CACHE HIT (post-lock) for {key}") | |
| return key, entry["enriched"], entry["clusters"] | |
| if _first_real_build: | |
| use_timespan = "24h" if use_timespan != "24h" else use_timespan | |
| use_limit = min(use_limit, 100) | |
| log.info(f"CACHE MISS for {key} β fetching (timespan={use_timespan}, limit_each={use_limit})") | |
| raw = combine_raw_articles( | |
| category=category, # providers may use it; inference ignores it | |
| query=q, | |
| language=language, | |
| limit_each=use_limit, | |
| timespan=use_timespan, | |
| speed=speed, | |
| ) | |
| prefetch_descriptions_async(raw, speed) | |
| enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] | |
| if category: | |
| cat_norm = (category or "").strip().lower() | |
| enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm] | |
| else: | |
| enriched = enriched_all | |
| clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD, speed=speed) | |
| _events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters} | |
| _first_real_build = False | |
| return key, enriched, clusters | |
| # Which languages to rotate when user didn't restrict language | |
| LANG_ROTATION = ["en", "es", "fr", "de", "ar", "ru", "pt", "zh", "hi", "ja", "ko"] | |
| # A few sourcecountry seeds for English to diversify outlets (optional) | |
| COUNTRY_SEEDS = ["US", "GB", "IN", "CA", "AU", "ZA", "SG", "NG", "DE", "FR", "BR", "MX", "ES", "RU", "JP", "KR", "CN"] | |
| # ----------------- Config / Keys ----------------- | |
| USE_GNEWS_API = False | |
| USE_NEWSDATA_API = False | |
| USE_GDELT_API = True | |
| USE_NEWSAPI = False | |
| NEWSAPI_KEY = os.getenv("NEWSAPI_KEY", "ea734c66dc4044fa8e4501ad7b90e753") | |
| GNEWS_API_KEY = os.getenv("GNEWS_API_KEY", "5419897c95e8a4b21074e0d3fe95a3dd") | |
| NEWSDATA_API_KEY = os.getenv("NEWSDATA_API_KEY", "pub_1feb49a71a844719af68d0844fb43a61") | |
| HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") | |
| logging.basicConfig( | |
| level=logging.WARNING, | |
| format="%(levelname)s:%(name)s:%(message)s", | |
| ) | |
| log = logging.getLogger("newsglobe") | |
| log.setLevel(logging.WARNING) | |
| fetch_log = logging.getLogger("newsglobe.fetch_summary") | |
| fetch_log.setLevel(logging.INFO) | |
| _fetch_handler = logging.StreamHandler() | |
| _fetch_handler.setLevel(logging.INFO) | |
| _fetch_handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s")) | |
| fetch_log.addHandler(_fetch_handler) | |
| fetch_log.propagate = False # don't pass to root (which is WARNING) | |
| logging.getLogger("urllib3").setLevel(logging.WARNING) | |
| logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) | |
| logging.getLogger("requests.packages.urllib3").setLevel(logging.WARNING) | |
| logging.getLogger("sentence_transformers").setLevel(logging.WARNING) | |
| logging.getLogger("transformers").setLevel(logging.WARNING) | |
| for name in ("urllib3", "urllib3.connectionpool", "requests.packages.urllib3"): | |
| lg = logging.getLogger(name) | |
| lg.setLevel(logging.ERROR) | |
| lg.propagate = False | |
| def _newsapi_enabled() -> bool: | |
| if not NEWSAPI_KEY: | |
| log.warning("NewsAPI disabled: missing NEWSAPI_KEY env var") | |
| return False | |
| return True | |
| def cluster_id(cluster, enriched_articles): | |
| urls = sorted([(enriched_articles[i].get("url") or "") for i in cluster["indices"] if enriched_articles[i].get("url")]) | |
| base = "|".join(urls) if urls else "empty" | |
| return hashlib.md5(base.encode("utf-8")).hexdigest()[:10] | |
| # ----------------- NLTK / VADER ----------------- | |
| try: | |
| nltk.data.find("sentiment/vader_lexicon") | |
| except LookupError: | |
| nltk.download("vader_lexicon") # one-time fetch in a fresh container | |
| try: | |
| _vader = SentimentIntensityAnalyzer() | |
| except Exception: | |
| _vader = None | |
| def classify_sentiment(text: str) -> str: | |
| if not text: | |
| return "neutral" | |
| if _vader is None: | |
| return "neutral" | |
| scores = _vader.polarity_scores(text) | |
| c = scores["compound"] | |
| return "positive" if c >= 0.2 else "negative" if c <= -0.2 else "neutral" | |
| # ----------------- Geocode helpers ----------------- | |
| def get_country_centroid(country_name): | |
| if not country_name or country_name == "Unknown": | |
| return {"lat": 0, "lon": 0, "country": "Unknown"} | |
| try: | |
| country = CountryInfo(country_name) | |
| latlng = country.capital_latlng() | |
| return {"lat": latlng[0], "lon": latlng[1], "country": country_name} | |
| except Exception as e: | |
| log.info(f"Could not get centroid for {country_name}: {e}") | |
| return {"lat": 0, "lon": 0, "country": country_name or "Unknown"} | |
| def resolve_domain_to_ip(domain): | |
| if not domain: | |
| return None | |
| try: | |
| return socket.gethostbyname(domain) | |
| except socket.gaierror: | |
| return None | |
| def geolocate_ip(ip): | |
| try: | |
| r = _session_get(f"https://ipwho.is/{ip}?fields=success,country,latitude,longitude", timeout=8) | |
| j = r.json() | |
| if j.get("success"): | |
| return {"lat": j["latitude"], "lon": j["longitude"], "country": j["country"]} | |
| except Exception: | |
| pass | |
| return None | |
| geolocator = Nominatim(user_agent="newsglobe-app (contact: you@example.com)") | |
| domain_geo_cache: Dict[str, Dict[str, Any]] = {} | |
| MAJOR_OUTLETS = { | |
| "bbc.co.uk": "United Kingdom", | |
| "theguardian.com": "United Kingdom", | |
| "reuters.com": "United States", | |
| "aljazeera.com": "Qatar", | |
| "lemonde.fr": "France", | |
| "dw.com": "Germany", | |
| "abc.net.au": "Australia", | |
| "ndtv.com": "India", | |
| "globo.com": "Brazil", | |
| "elpais.com": "Spain", | |
| "lefigaro.fr": "France", | |
| "kyodonews.net": "Japan", | |
| "straitstimes.com": "Singapore", | |
| "thesun.my": "Malaysia", # <-- add this | |
| } | |
| def geocode_source(source_text: str, domain: str = "", do_network: bool = False): | |
| cache_key = f"{source_text}|{domain}" | |
| if cache_key in domain_geo_cache: | |
| return domain_geo_cache[cache_key] | |
| ext = tldextract.extract(domain or "") | |
| fqdn = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else "" | |
| # 0) Major outlets / domain map | |
| if fqdn in MAJOR_OUTLETS: | |
| coords = get_country_centroid(MAJOR_OUTLETS[fqdn]); domain_geo_cache[cache_key] = coords; return coords | |
| if ext.domain in domain_country_map: | |
| coords = get_country_centroid(domain_country_map[ext.domain]); domain_geo_cache[cache_key] = coords; return coords | |
| # 1) Suffix fallback (instant) | |
| coords = get_country_centroid(_suffix_country(ext.suffix)) | |
| domain_geo_cache[cache_key] = coords | |
| # 2) Optional async refinement (never block hot path) | |
| if do_network: | |
| threading.Thread(target=_refine_geo_async, args=(cache_key, source_text, fqdn), daemon=True).start() | |
| return coords | |
| def _suffix_country(suffix: Optional[str]) -> str: | |
| s = (suffix or "").split(".")[-1] | |
| m = { | |
| "au":"Australia","uk":"United Kingdom","gb":"United Kingdom","ca":"Canada","in":"India","us":"United States", | |
| "ng":"Nigeria","de":"Germany","fr":"France","jp":"Japan","sg":"Singapore","za":"South Africa","nz":"New Zealand", | |
| "ie":"Ireland","it":"Italy","es":"Spain","se":"Sweden","ch":"Switzerland","nl":"Netherlands","br":"Brazil", | |
| "my":"Malaysia","id":"Indonesia","ph":"Philippines","th":"Thailand","vn":"Vietnam","sa":"Saudi Arabia", | |
| "ae":"United Arab Emirates","tr":"Turkey","mx":"Mexico","ar":"Argentina","cl":"Chile","co":"Colombia", | |
| "il":"Israel","kr":"South Korea","cn":"China","tw":"Taiwan","hk":"Hong Kong" | |
| } | |
| return m.get(s, "United States" if s in ("com","org","net") else "Unknown") | |
| def _refine_geo_async(cache_key, source_text, fqdn): | |
| try: | |
| # Try IP geo (cheap) | |
| ip = resolve_domain_to_ip(fqdn) if fqdn else None | |
| if ip: | |
| coords = geolocate_ip(ip) | |
| if coords: | |
| domain_geo_cache[cache_key] = coords | |
| return | |
| # Try Nominatim FAST (lower timeout) | |
| location = geolocator.geocode(f"{source_text} News Headquarters", timeout=2) | |
| if location and hasattr(location, "raw"): | |
| coords = { | |
| "lat": location.latitude, | |
| "lon": location.longitude, | |
| "country": location.raw.get("address", {}).get("country", "Unknown"), | |
| } | |
| domain_geo_cache[cache_key] = coords | |
| except Exception: | |
| pass | |
| # ----------------- HuggingFace translate (optional) ----------------- | |
| HF_MODEL_PRIMARY = None # disable NLLB remote (avoids 404 spam); use OPUS + pivot/LibreTranslate | |
| # 2-letter ISO -> NLLB codes | |
| NLLB_CODES = { | |
| "en": "eng_Latn", | |
| "es": "spa_Latn", | |
| "fr": "fra_Latn", | |
| "de": "deu_Latn", | |
| "it": "ita_Latn", | |
| "pt": "por_Latn", | |
| "zh": "zho_Hans", | |
| "ru": "rus_Cyrl", | |
| "ar": "arb_Arab", | |
| "hi": "hin_Deva", | |
| "ja": "jpn_Jpan", | |
| "ko": "kor_Hang", | |
| } | |
| # OPUS-MT model map for common pairs (expand as needed) | |
| def opus_model_for(src2: str, tgt2: str) -> Optional[str]: | |
| pairs = { | |
| ("es", "en"): "Helsinki-NLP/opus-mt-es-en", | |
| ("en", "es"): "Helsinki-NLP/opus-mt-en-es", | |
| ("fr", "en"): "Helsinki-NLP/opus-mt-fr-en", | |
| ("en", "fr"): "Helsinki-NLP/opus-mt-en-fr", | |
| ("de", "en"): "Helsinki-NLP/opus-mt-de-en", | |
| ("en", "de"): "Helsinki-NLP/opus-mt-en-de", | |
| ("pt", "en"): "Helsinki-NLP/opus-mt-pt-en", | |
| ("en", "pt"): "Helsinki-NLP/opus-mt-en-pt", | |
| ("it", "en"): "Helsinki-NLP/opus-mt-it-en", | |
| ("en", "it"): "Helsinki-NLP/opus-mt-en-it", | |
| ("ru", "en"): "Helsinki-NLP/opus-mt-ru-en", | |
| ("en", "ru"): "Helsinki-NLP/opus-mt-en-ru", | |
| ("zh", "en"): "Helsinki-NLP/opus-mt-zh-en", | |
| ("en", "zh"): "Helsinki-NLP/opus-mt-en-zh", | |
| ("ja", "en"): "Helsinki-NLP/opus-mt-ja-en", | |
| ("en", "ja"): "Helsinki-NLP/opus-mt-en-ja", | |
| ("ko", "en"): "Helsinki-NLP/opus-mt-ko-en", | |
| ("en", "ko"): "Helsinki-NLP/opus-mt-en-ko", | |
| ("hi", "en"): "Helsinki-NLP/opus-mt-hi-en", | |
| ("en", "hi"): "Helsinki-NLP/opus-mt-en-hi", | |
| ("ar", "en"): "Helsinki-NLP/opus-mt-ar-en", | |
| ("en", "ar"): "Helsinki-NLP/opus-mt-en-ar", | |
| } | |
| return pairs.get((src2, tgt2)) | |
| SUPPORTED = {"en", "fr", "de", "es", "it", "hi", "ar", "ru", "ja", "ko", "pt", "zh"} | |
| LIBRETRANSLATE_URL = os.getenv("LIBRETRANSLATE_URL") # e.g., http://127.0.0.1:5000 | |
| def _translate_via_libre(text: str, src: str, tgt: str) -> Optional[str]: | |
| url = LIBRETRANSLATE_URL | |
| if not url or not text or src == tgt: | |
| return None | |
| try: | |
| r = SESSION.post( | |
| f"{url.rstrip('/')}/translate", | |
| json={"q": text, "source": src, "target": tgt, "format": "text"}, | |
| timeout=6 | |
| ) | |
| if r.status_code == 200: | |
| j = r.json() | |
| out = j.get("translatedText") | |
| return out if isinstance(out, str) and out else None | |
| else: | |
| log.warning("LibreTranslate HTTP %s: %s", r.status_code, r.text[:200]) | |
| except Exception as e: | |
| log.warning("LibreTranslate failed: %s", e) | |
| return None | |
| def _hf_call(model_id: str, payload: dict) -> Optional[str]: | |
| # require both a token and explicit opt-in | |
| if not (HUGGINGFACE_API_TOKEN and ALLOW_HF_REMOTE): | |
| return None | |
| if model_id in _hf_bad_models: | |
| return None | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = { | |
| "Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}", | |
| "HF-API-KEY": HUGGINGFACE_API_TOKEN, | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| } | |
| try: | |
| r = requests.post(url, headers=headers, json=payload, timeout=25) | |
| if r.status_code != 200: | |
| if r.status_code == 404: | |
| _hf_bad_models.add(model_id) | |
| log.warning("HF %s -> 404: Not Found (disabled for this process)", model_id) | |
| else: | |
| log.warning("HF %s -> %s: %s", model_id, r.status_code, r.text[:300]) | |
| return None | |
| j = r.json() | |
| except Exception as e: | |
| log.warning("HF request failed: %s", e) | |
| return None | |
| if isinstance(j, list) and j and isinstance(j[0], dict): | |
| if "generated_text" in j[0]: | |
| return j[0]["generated_text"] | |
| if "translation_text" in j[0]: | |
| return j[0]["translation_text"] | |
| if isinstance(j, dict) and "generated_text" in j: | |
| return j["generated_text"] | |
| if isinstance(j, str): | |
| return j | |
| return None | |
| def _translate_cached(text: str, src: str, tgt: str) -> str: | |
| if not text or src == tgt: | |
| return text | |
| # 0) Local LibreTranslate (fast & free, if running) | |
| out = _translate_via_libre(text, src, tgt) | |
| if out: | |
| return out | |
| # 1) OPUS serverless (direct pair) β try this first | |
| opus_model = opus_model_for(src, tgt) | |
| if opus_model: | |
| out = _hf_call(opus_model, {"inputs": text}) | |
| if out: | |
| return out | |
| # 2) NLLB serverless (optional; disabled if HF_MODEL_PRIMARY is None) | |
| try: | |
| if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES): | |
| out = _hf_call( | |
| HF_MODEL_PRIMARY, | |
| { | |
| "inputs": text, | |
| "parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]}, | |
| "options": {"wait_for_model": True}, | |
| }, | |
| ) | |
| if out: | |
| return out | |
| except Exception: | |
| pass | |
| # 3) Two-hop pivot via English for non-Englishβnon-English | |
| if src != "en" and tgt != "en": | |
| step_en = _translate_cached(text, src, "en") | |
| if step_en and step_en != text: | |
| out = _translate_cached(step_en, "en", tgt) | |
| if out: | |
| return out | |
| # 4) Local OPUS fallback (direct pair with local pipeline) | |
| out = _translate_local(text, src, tgt) | |
| if out: | |
| return out | |
| log.warning("All translate paths failed (%s->%s); returning original.", src, tgt) | |
| return text | |
| def translate_text(text: str, target_lang: Optional[str], fallback_src: Optional[str] = None) -> str: | |
| if not text or not target_lang: | |
| return text | |
| tgt = target_lang.lower() | |
| if tgt not in SUPPORTED: | |
| return text | |
| src = (fallback_src or detect_lang(text) or "en").lower() | |
| if src == tgt: | |
| return text | |
| if src not in SUPPORTED: | |
| if src.startswith("zh"): | |
| src = "zh" | |
| elif src.startswith("pt"): | |
| src = "pt" | |
| elif src[:2] in SUPPORTED: | |
| src = src[:2] | |
| else: | |
| src = "en" | |
| return _translate_cached(text, src, tgt) | |
| # ----------------- FastAPI ----------------- | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.add_middleware(GZipMiddleware, minimum_size=500) | |
| # === Warm config === | |
| WARM_LIMIT_EACH = 20 # smaller bite to prime caches | |
| WARM_TIMESPAN = "24h" # narrower GDELT window for faster first fetch | |
| WARM_PREFETCH_DESCRIPTIONS = False | |
| def _fmt_mmss(ms: float) -> str: | |
| total_sec = int(round(ms / 1000.0)) | |
| m, s = divmod(total_sec, 60) | |
| return f"{m}:{s:02d}" | |
| def _warm_once(): | |
| try: | |
| log.info("WARM: starting background warm-up (limit_each=%d, timespan=%s)", WARM_LIMIT_EACH, WARM_TIMESPAN) | |
| t0 = time.perf_counter() | |
| # models (you already call these in startup, but keep them here too) | |
| get_sbert() | |
| get_news_clf() | |
| # fetch a small set with shorter timespan | |
| t1 = time.perf_counter() | |
| raw = combine_raw_articles( | |
| category=None, query=None, language="en", | |
| limit_each=WARM_LIMIT_EACH, timespan=WARM_TIMESPAN, | |
| log_summary=False # β silence warm-up summary | |
| ) | |
| t_fetch = (time.perf_counter() - t1) * 1000 | |
| # optional: skip description prefetch during warm to save time | |
| if WARM_PREFETCH_DESCRIPTIONS: | |
| prefetch_descriptions_async(raw) | |
| # enrich + cluster once (no translation on warm) | |
| t2 = time.perf_counter() | |
| enriched = [enrich_article(a, language="en", translate=False, target_lang=None) for a in raw] | |
| t_enrich = (time.perf_counter() - t2) * 1000 | |
| t3 = time.perf_counter() | |
| clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD) | |
| t_cluster = (time.perf_counter() - t3) * 1000 | |
| # stash in cache under the common default key so /news and /events hit warm data | |
| key = cache_key_for(q=None, category=None, language="en", | |
| limit_each=WARM_LIMIT_EACH, translate=False, target_lang=None, | |
| speed=Speed.balanced) # π add speed | |
| _events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters} | |
| t_total = (time.perf_counter() - t0) * 1000 | |
| log.info( | |
| "WARM: fetch=%s, enrich=%s, cluster=%s, total=%s (raw=%d, enriched=%d, clusters=%d)", | |
| _fmt_mmss(t_fetch), _fmt_mmss(t_enrich), _fmt_mmss(t_cluster), _fmt_mmss(t_total), | |
| len(raw), len(enriched), len(clusters), | |
| ) | |
| except Exception as e: | |
| log.warning(f"WARM: failed: {e}") | |
| def warm(): | |
| # keep your existing model warms | |
| get_sbert() | |
| get_news_clf() | |
| # fire-and-forget warm in a background thread so startup stays snappy | |
| threading.Thread(target=_warm_once, daemon=True).start() | |
| # ----------------- Providers ----------------- | |
| # ISO -> GDELT 'sourcelang:' names (keep yours) | |
| _GDELT_LANG = { | |
| "en": "english", | |
| "es": "spanish", | |
| "fr": "french", | |
| "de": "german", | |
| "it": "italian", | |
| "pt": "portuguese", | |
| "ru": "russian", | |
| "ar": "arabic", | |
| "hi": "hindi", | |
| "ja": "japanese", | |
| "ko": "korean", | |
| "zh": "chinese", | |
| } | |
| def _gdelt_safe_query(user_q, language): | |
| parts = [] | |
| if user_q: | |
| q = user_q.strip() | |
| if len(q) < 3: | |
| q = f'"{q}" news' | |
| parts.append(q) | |
| if language and (lg := _GDELT_LANG.get(language.lower())): | |
| parts.append(f"sourcelang:{lg}") | |
| if not parts: | |
| # rotate or randomly choose one to diversify | |
| parts.append("sourcelang:english") | |
| return " ".join(parts) | |
| def fetch_gdelt_articles( | |
| limit=50, | |
| query=None, | |
| language=None, | |
| timespan="3d", | |
| category=None, | |
| extra_tokens: Optional[List[str]] = None | |
| ): | |
| q = _gdelt_safe_query(query, language) | |
| if extra_tokens: | |
| q = f"{q} " + " ".join(extra_tokens) | |
| url = "https://api.gdeltproject.org/api/v2/doc/doc" | |
| params = { | |
| "query": q, | |
| "mode": "ArtList", | |
| "format": "json", | |
| "sort": "DateDesc", | |
| "maxrecords": int(min(250, max(1, limit))), | |
| "timespan": timespan, | |
| } | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)", | |
| "Accept": "application/json", | |
| } | |
| def _do_request(p): | |
| r = _session_get(url, params=p, timeout=10) | |
| log.info(f"GDELT URL: {r.url} (status={r.status_code})") | |
| if r.status_code != 200: | |
| log.warning(f"GDELT HTTP {r.status_code}: {r.text[:400]}") | |
| return None | |
| try: | |
| return r.json() | |
| except Exception: | |
| ct = r.headers.get("Content-Type", "") | |
| log.warning(f"GDELT non-JSON response. CT={ct}. Body[:400]: {r.text[:400]}") | |
| return None | |
| data = _do_request(params) | |
| if data is None: | |
| # Retry narrower and smaller if needed | |
| p2 = {**params, "timespan": "24h", "maxrecords": min(100, params["maxrecords"])} | |
| data = _do_request(p2) | |
| if not data: | |
| return [] | |
| arts = data.get("articles") or [] | |
| results = [] | |
| for a in arts: | |
| desc = a.get("description") or a.get("content") or "" | |
| title = a.get("title") or "" | |
| if desc and ( | |
| desc.strip().lower() == title.strip().lower() or | |
| (len(desc) <= 60 and _too_similar(title, desc)) | |
| ): | |
| desc = "" | |
| desc = desc or "No description available" | |
| results.append( | |
| { | |
| "title": title, | |
| "url": a.get("url"), | |
| "source": {"name": a.get("domain") or "GDELT"}, | |
| "description": desc, | |
| "publishedAt": a.get("seendate"), | |
| "api_source": "gdelt", | |
| "gdelt_sourcecountry": a.get("sourcecountry"), | |
| # Keep the user's chosen category only for debugging/reference; do NOT use for inference. | |
| "requested_category": category, | |
| } | |
| ) | |
| log.info(f"GDELT returned {len(results)}") | |
| return results | |
| def fetch_newsdata_articles(category=None, limit=20, query=None, language=None): | |
| base_url = "https://newsdata.io/api/1/news" | |
| allowed = [ | |
| "business", | |
| "entertainment", | |
| "environment", | |
| "food", | |
| "health", | |
| "politics", | |
| "science", | |
| "sports", | |
| "technology", | |
| "top", | |
| "world", | |
| ] | |
| params = {"apikey": NEWSDATA_API_KEY, "language": (language or "en")} | |
| if category and category in allowed: | |
| params["category"] = category | |
| if query: | |
| params["q"] = query | |
| all_articles, next_page = [], None | |
| while len(all_articles) < limit: | |
| if next_page: | |
| params["page"] = next_page | |
| resp = _session_get(base_url, params=params, timeout=12) | |
| if resp.status_code != 200: | |
| break | |
| data = resp.json() | |
| articles = data.get("results", []) | |
| for a in articles: | |
| a["api_source"] = "newsdata" | |
| all_articles.extend(articles) | |
| next_page = data.get("nextPage") | |
| if not next_page: | |
| break | |
| # normalize timestamps if available | |
| for a in all_articles: | |
| a["publishedAt"] = a.get("pubDate") | |
| return all_articles[:limit] | |
| def fetch_gnews_articles(limit=20, query=None, language=None): | |
| url = f"https://gnews.io/api/v4/top-headlines?lang={(language or 'en')}&max={limit}&token={GNEWS_API_KEY}" | |
| if query: | |
| url += f"&q={requests.utils.quote(query)}" | |
| try: | |
| r = _session_get(url, timeout=12) | |
| if r.status_code != 200: | |
| return [] | |
| arts = r.json().get("articles", []) | |
| for a in arts: | |
| a["api_source"] = "gnews" | |
| return arts | |
| except Exception: | |
| return [] | |
| NEWSAPI_COUNTRIES = ["us", "gb", "ca", "au", "in", "za", "sg", "ie", "nz"] | |
| def fetch_newsapi_headlines_multi(limit=50, language=None): | |
| if not _newsapi_enabled(): | |
| return [] | |
| all_ = [] | |
| per = max(1, math.ceil(limit / max(1, len(NEWSAPI_COUNTRIES)))) | |
| per = min(per, 100) # NewsAPI pageSize cap | |
| for c in NEWSAPI_COUNTRIES: | |
| url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per}&apiKey={NEWSAPI_KEY}" | |
| r = _session_get(url, timeout=12) | |
| if r.status_code != 200: | |
| log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}") | |
| continue | |
| arts = r.json().get("articles", []) | |
| for a in arts: | |
| a["api_source"] = "newsapi" | |
| all_.extend(arts) | |
| time.sleep(0.2) | |
| return all_[:limit] # β enforce exact limit | |
| def fetch_newsapi_articles(category=None, limit=20, query=None, language=None): | |
| if not _newsapi_enabled(): | |
| return [] | |
| # If a query is provided, use /everything (language allowed here) | |
| if query: | |
| url = f"https://newsapi.org/v2/everything?pageSize={limit}&apiKey={NEWSAPI_KEY}&q={requests.utils.quote(query)}" | |
| if language: | |
| url += f"&language={language}" | |
| try: | |
| r = _session_get(url, timeout=12) | |
| if r.status_code != 200: | |
| log.warning(f"NewsAPI /everything HTTP {r.status_code}: {r.text[:200]}") | |
| return [] | |
| arts = r.json().get("articles", []) | |
| for a in arts: | |
| a["api_source"] = "newsapi" | |
| # DO NOT stamp category here; we infer later | |
| return arts[:limit] | |
| except Exception as e: | |
| log.warning(f"NewsAPI /everything request failed: {e}") | |
| return [] | |
| # Otherwise, rotate /top-headlines by country (no language param) | |
| results = [] | |
| per_country = max(5, limit // len(NEWSAPI_COUNTRIES)) | |
| for c in NEWSAPI_COUNTRIES: | |
| url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per_country}&apiKey={NEWSAPI_KEY}" | |
| if category: | |
| url += f"&category={category}" | |
| try: | |
| r = _session_get(url, timeout=12) | |
| if r.status_code != 200: | |
| log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}") | |
| continue | |
| arts = r.json().get("articles", []) | |
| for a in arts: | |
| a["api_source"] = "newsapi" | |
| # DO NOT stamp category here; we infer later | |
| results.extend(arts) | |
| except Exception as e: | |
| log.warning(f"NewsAPI top-headlines {c} failed: {e}") | |
| time.sleep(0.2) | |
| return results[:limit] | |
| def normalize_newsdata_article(article): | |
| return { | |
| "title": article.get("title"), | |
| "url": article.get("link"), | |
| "source": {"name": article.get("source_id", "NewsData.io")}, | |
| "description": article.get("description") or "No description available", | |
| "publishedAt": article.get("publishedAt"), | |
| "api_source": article.get("api_source", "newsdata"), | |
| "category": ((article.get("category") or [None])[0] if isinstance(article.get("category"), list) else article.get("category")), | |
| } | |
| # ----------------- Enrichment ----------------- | |
| def enrich_article(a, language=None, translate=False, target_lang=None): | |
| # Normalize source name | |
| source_name = (a.get("source", {}) or {}).get("name", "").strip() or "Unknown" | |
| s_lower = source_name.lower() | |
| if "newsapi" in s_lower: | |
| source_name = "NewsAPI" | |
| elif "gnews" in s_lower: | |
| source_name = "GNews" | |
| elif "newsdata" in s_lower: | |
| source_name = "NewsData.io" | |
| # Canonicalize URL & derive domain | |
| article_url = _canonical_url(a.get("url") or "") | |
| try: | |
| ext = tldextract.extract(article_url) | |
| domain = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else "" | |
| except Exception: | |
| domain = "" | |
| # Country guess (GDELT provides ISO2) | |
| country_guess = None | |
| if a.get("api_source") == "gdelt": | |
| sc = a.get("gdelt_sourcecountry") | |
| if sc: | |
| iso2map = { | |
| "US": "United States", "GB": "United Kingdom", "AU": "Australia", "CA": "Canada", "IN": "India", | |
| "DE": "Germany", "FR": "France", "IT": "Italy", "ES": "Spain", "BR": "Brazil", "JP": "Japan", | |
| "CN": "China", "RU": "Russia", "KR": "South Korea", "ZA": "South Africa", "NG": "Nigeria", | |
| "MX": "Mexico", "AR": "Argentina", "CL": "Chile", "CO": "Colombia", "NL": "Netherlands", | |
| "SE": "Sweden", "NO": "Norway", "DK": "Denmark", "FI": "Finland", "IE": "Ireland", "PL": "Poland", | |
| "PT": "Portugal", "GR": "Greece", "TR": "Turkey", "IL": "Israel", "SA": "Saudi Arabia", | |
| "AE": "United Arab Emirates", "SG": "Singapore", "MY": "Malaysia", "TH": "Thailand", | |
| "PH": "Philippines", "ID": "Indonesia", "NZ": "New Zealand", | |
| } | |
| country_guess = iso2map.get(str(sc).upper(), sc if len(str(sc)) > 2 else None) | |
| coords = get_country_centroid(country_guess) if country_guess else geocode_source(source_name, domain, do_network=False) | |
| # Title / description (raw) | |
| title = (a.get("title") or "").strip() or "(untitled)" | |
| description = (a.get("description") or "").strip() | |
| if description.lower().startswith("no description"): | |
| description = "" | |
| # Prefer cached page summary when weak/title-like | |
| cached_desc = _desc_cache_get(article_url) | |
| need_upgrade = ( | |
| (not description) | |
| or description.lower().startswith("no description") | |
| or len(description) < DESC_MIN_LEN | |
| or _too_similar(title, description) | |
| ) | |
| if need_upgrade and cached_desc: | |
| description = cached_desc | |
| if description: | |
| description = _tidy_description(title, description, source_name) | |
| if (not description) or _too_similar(title, description): | |
| description = f"Quick take: {title.rstrip('.')}." | |
| # Save originals for categorization and debug | |
| orig_title = title | |
| orig_description = description | |
| # Language detection / sentiment | |
| detected_lang = (detect_lang(f"{title} {description}") or "").lower() | |
| ml_text = f"{orig_title}. {orig_description}".strip() | |
| sentiment = classify_sentiment(f"{orig_title} {orig_description}") | |
| # Stable id & category (ALWAYS infer; ignore provider/requested categories) | |
| seed = f"{source_name}|{article_url}|{title}" | |
| uid = hashlib.md5(seed.encode("utf-8")).hexdigest()[:12] | |
| cat = infer_category(article_url, orig_title, orig_description, None) | |
| return { | |
| "id": uid, | |
| "title": title, | |
| "url": article_url, | |
| "source": source_name, | |
| "lat": coords["lat"], | |
| "lon": coords["lon"], | |
| "country": coords.get("country", "Unknown"), | |
| "description": description, | |
| "sentiment": sentiment, | |
| "api_source": a.get("api_source", "unknown"), | |
| "publishedAt": a.get("publishedAt"), | |
| "_ml_text": ml_text, | |
| "orig_title": orig_title, | |
| "orig_description": orig_description, | |
| "detected_lang": detected_lang, | |
| "translated": False, | |
| "category": cat, | |
| } | |
| # ----------------- Clustering into Events ----------------- | |
| # sbert_model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") | |
| # cluster_articles() | |
| def cluster_articles(articles: List[Dict[str, Any]], sim_threshold=SIM_THRESHOLD, speed=Speed.balanced): | |
| if speed == Speed.fast: | |
| articles = articles[:150] # early cap | |
| sim_threshold = max(sim_threshold, 0.64) | |
| elif speed == Speed.balanced: | |
| articles = articles[:] | |
| sim_threshold = max(sim_threshold, 0.62) | |
| texts = [_cluster_text(a) for a in articles] | |
| embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False) | |
| clusters = [] # [{indices:[...], centroid:tensor}] | |
| centroids = [] | |
| for i, emb in enumerate(embs): | |
| best_idx, best_sim = -1, -1.0 | |
| for ci, c_emb in enumerate(centroids): | |
| sim = util.cos_sim(emb, c_emb).item() | |
| if sim > sim_threshold and sim > best_sim: | |
| best_sim, best_idx = sim, ci | |
| if best_idx >= 0: | |
| clusters[best_idx]["indices"].append(i) | |
| idxs = clusters[best_idx]["indices"] | |
| new_c = embs[idxs].mean(dim=0) | |
| new_c = new_c / new_c.norm() | |
| centroids[best_idx] = new_c | |
| clusters[best_idx]["centroid"] = new_c | |
| else: | |
| # use texts[i] now (titles[] no longer exists) | |
| event_id = hashlib.md5(texts[i].encode("utf-8")).hexdigest()[:10] | |
| clusters.append({"id": event_id, "indices": [i], "centroid": emb}) | |
| centroids.append(emb) | |
| # second-pass merge to reduce fragmenting | |
| merged = _merge_close_clusters(clusters, embs, threshold=0.70) | |
| # keep ids stable: recompute with URLs of member articles | |
| for c in merged: | |
| c["id"] = cluster_id(c, articles) | |
| return merged | |
| def event_payload_from_cluster(cluster, enriched_articles): | |
| idxs = cluster["indices"] | |
| arts = [enriched_articles[i] for i in idxs] | |
| title_counts = Counter([a["title"] for a in arts]) | |
| canonical_title = title_counts.most_common(1)[0][0] | |
| keywords = list({w.lower() for t in title_counts for w in t.split() if len(w) > 3})[:8] | |
| sources = {a["source"] for a in arts} | |
| countries = {a["country"] for a in arts if a["country"] and a["country"] != "Unknown"} | |
| ts = [a.get("publishedAt") for a in arts if a.get("publishedAt")] | |
| return { | |
| "event_id": cluster_id(cluster, enriched_articles), # <-- stable id | |
| "title": canonical_title, | |
| "keywords": keywords, | |
| "article_count": len(arts), | |
| "source_count": len(sources), | |
| "country_count": len(countries), | |
| "time_range": {"min": min(ts) if ts else None, "max": max(ts) if ts else None}, | |
| "sample_urls": [a["url"] for a in arts[:3] if a.get("url")], | |
| } | |
| def aggregate_event_by_country(cluster, enriched_articles): | |
| idxs = cluster["indices"] | |
| arts = [enriched_articles[i] for i in idxs] | |
| by_country: Dict[str, Dict[str, Any]] = {} | |
| for a in arts: | |
| c = a.get("country") or "Unknown" | |
| if c not in by_country: | |
| coords = get_country_centroid(c) | |
| by_country[c] = {"country": c, "lat": coords["lat"], "lon": coords["lon"], "articles": []} | |
| by_country[c]["articles"].append(a) | |
| # summarize per country | |
| results = [] | |
| for c, block in by_country.items(): | |
| arr = block["articles"] | |
| # avg sentiment mapped to -1/0/+1 | |
| to_num = {"negative": -1, "neutral": 0, "positive": 1} | |
| vals = [to_num.get(a["sentiment"], 0) for a in arr] | |
| avg = sum(vals) / max(len(vals), 1) | |
| avg_sent = "positive" if avg > 0.15 else "negative" if avg < -0.15 else "neutral" | |
| top_sources = [s for s, _ in Counter([a["source"] for a in arr]).most_common(3)] | |
| # tiny extractive summary: top 2 headlines | |
| summary = " β’ ".join([a["title"] for a in arr[:2]]) | |
| results.append( | |
| { | |
| "country": c, | |
| "lat": block["lat"], | |
| "lon": block["lon"], | |
| "count": len(arr), | |
| "avg_sentiment": avg_sent, | |
| "top_sources": top_sources, | |
| "summary": summary, | |
| "samples": [ | |
| { | |
| "title": a["title"], | |
| "orig_title": a.get("orig_title"), | |
| "orig_description": a.get("orig_description"), # π add this | |
| "url": a["url"], | |
| "source": a["source"], | |
| "sentiment": a["sentiment"], | |
| "detected_lang": a.get("detected_lang"), | |
| } | |
| for a in arr[:5] | |
| ], | |
| } | |
| ) | |
| return results | |
| def _merge_close_clusters(clusters, embs, threshold=0.68): | |
| # clusters: [{"indices":[...], "centroid":tensor}, ...] β add centroid in your first pass | |
| merged = [] | |
| used = set() | |
| for i in range(len(clusters)): | |
| if i in used: | |
| continue | |
| base = clusters[i] | |
| group = [i] | |
| for j in range(i + 1, len(clusters)): | |
| if j in used: | |
| continue | |
| sim = util.cos_sim(base["centroid"], clusters[j]["centroid"]).item() | |
| if sim >= threshold: | |
| group.append(j) | |
| # merge those groups | |
| all_idx = [] | |
| cents = [] | |
| for g in group: | |
| used.add(g) | |
| all_idx.extend(clusters[g]["indices"]) | |
| cents.append(clusters[g]["centroid"]) | |
| # new centroid | |
| newc = torch.stack(cents, dim=0).mean(dim=0) | |
| newc = newc / newc.norm() | |
| merged.append({"indices": sorted(set(all_idx)), "centroid": newc}) | |
| return merged | |
| # ----------------- Endpoints ----------------- | |
| prefetch = False | |
| def get_events( | |
| q: Optional[str] = Query(None), | |
| category: Optional[str] = Query(None), | |
| language: Optional[str] = Query(None), | |
| translate: Optional[bool] = Query(False), | |
| target_lang: Optional[str] = Query(None), | |
| limit_each: int = Query(150, ge=5, le=250), | |
| max_events: int = Query(15, ge=5, le=50), | |
| min_countries: int = Query(2, ge=1, le=50), | |
| min_articles: int = Query(2, ge=1, le=200), | |
| speed: Speed = Query(Speed.balanced), | |
| ): | |
| # always build cache on untranslated data | |
| cache_key, enriched, clusters = get_or_build_events_cache( | |
| q, category, language, False, None, limit_each, speed=speed | |
| ) | |
| # optional post-translate view (does not mutate cache) | |
| view = enriched | |
| if translate and target_lang: | |
| view = [dict(i) for i in enriched] | |
| for i in view: | |
| src_hint = i.get("detected_lang") | |
| i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) | |
| i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) | |
| i["translated"] = True | |
| events = [event_payload_from_cluster(c, view) for c in clusters] | |
| events = [e for e in events if (e["country_count"] >= min_countries and e["article_count"] >= min_articles)] | |
| events.sort(key=lambda e: e["article_count"], reverse=True) | |
| return {"events": events[:max_events], "cache_key": "|".join(map(str, cache_key))} | |
| def get_event_details( | |
| event_id: str, | |
| cache_key: Optional[str] = Query(None), | |
| q: Optional[str] = Query(None), | |
| category: Optional[str] = Query(None), | |
| language: Optional[str] = Query(None), | |
| translate: Optional[bool] = Query(False), | |
| target_lang: Optional[str] = Query(None), | |
| limit_each: int = Query(150, ge=5, le=250), | |
| ): | |
| # /event/{event_id} | |
| if cache_key: | |
| parts = cache_key.split("|") | |
| if len(parts) != 7: | |
| raise HTTPException(status_code=400, detail="Bad cache_key") | |
| speed_str = parts[6] | |
| try: | |
| speed_obj = Speed(speed_str) # "fast" | "balanced" | "max" | |
| except ValueError: | |
| speed_obj = Speed.balanced | |
| key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), | |
| parts[4] == "True", parts[5].lower(), speed_str) | |
| else: | |
| speed_obj = Speed.balanced | |
| key_tuple = cache_key_for(q, category, language, limit_each, translate, target_lang, speed=speed_obj) | |
| entry = _events_cache.get(key_tuple) | |
| if not entry: | |
| # always build untranslated | |
| _, enriched, clusters = get_or_build_events_cache( | |
| q, category, language, False, None, limit_each, speed=speed_obj | |
| ) | |
| else: | |
| enriched, clusters = entry["enriched"], entry["clusters"] | |
| # optional post-translate view (do not mutate cache) | |
| eview = enriched | |
| if translate and target_lang: | |
| eview = [dict(i) for i in enriched] | |
| for i in eview: | |
| src_hint = i.get("detected_lang") | |
| i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) | |
| i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) | |
| i["translated"] = True | |
| cluster = next((c for c in clusters if cluster_id(c, enriched) == event_id), None) | |
| if not cluster: | |
| raise HTTPException(status_code=404, detail="Event not found with current filters") | |
| payload = event_payload_from_cluster(cluster, eview) | |
| countries = aggregate_event_by_country(cluster, eview) | |
| payload["articles_in_event"] = sum(c["count"] for c in countries) | |
| return {"event": payload, "countries": countries} | |
| def get_news( | |
| cache_key: Optional[str] = Query(None), | |
| category: Optional[str] = Query(None), | |
| sentiment: Optional[str] = Query(None), | |
| q: Optional[str] = Query(None), | |
| language: Optional[str] = Query(None), | |
| translate: Optional[bool] = Query(False), | |
| target_lang: Optional[str] = Query(None), | |
| limit_each: int = Query(100, ge=5, le=100), | |
| lite: bool = Query(True), | |
| speed: Speed = Query(Speed.balanced), | |
| page: int = Query(1, ge=1), | |
| page_size: int = Query(120, ge=5, le=300), | |
| ): | |
| enriched: List[Dict[str, Any]] = [] | |
| # Pull from cache if provided | |
| if cache_key: | |
| parts = cache_key.split("|") | |
| if len(parts) == 7: | |
| key_tuple = ( | |
| parts[0], # q | |
| parts[1], # category | |
| parts[2], # language | |
| int(parts[3]), # limit_each | |
| parts[4] == "True", # translate | |
| parts[5].lower(), # target_lang | |
| parts[6], # speed | |
| ) | |
| entry = _events_cache.get(key_tuple) | |
| if entry: | |
| enriched = entry["enriched"] | |
| # Fetch fresh if no cached items | |
| if not enriched: | |
| raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each, speed=speed) | |
| prefetch_descriptions_async(raw, speed) | |
| enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] | |
| if category: | |
| cat_norm = (category or "").strip().lower() | |
| enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm] | |
| else: | |
| enriched = enriched_all | |
| else: | |
| # If we got cached items but want to ensure the selected category is enforced: | |
| if category: | |
| cat_norm = (category or "").strip().lower() | |
| enriched = [e for e in enriched if (e.get("category") or "").lower() == cat_norm] | |
| # Translation (optional) | |
| if translate and target_lang: | |
| enriched = [dict(i) for i in enriched] | |
| for i in enriched: | |
| i["original_title"] = i.get("orig_title") or i.get("title") | |
| i["original_description"] = i.get("orig_description") or i.get("description") | |
| src_hint = i.get("detected_lang") | |
| i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) | |
| i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) | |
| i["translated"] = True | |
| i["translated_from"] = (src_hint or "").lower() | |
| i["translated_to"] = target_lang.lower() | |
| # Optional sentiment filter | |
| if sentiment: | |
| s = sentiment.strip().lower() | |
| enriched = [i for i in enriched if i.get("sentiment", "").lower() == s] | |
| # Pagination | |
| total = len(enriched) | |
| start = (page - 1) * page_size | |
| end = start + page_size | |
| items = [dict(i) for i in enriched[start:end]] | |
| # Trim debug fields | |
| if lite: | |
| drop = {"_ml_text"} | |
| for i in items: | |
| for k in drop: | |
| i.pop(k, None) | |
| return { | |
| "items": items, | |
| "total": total, | |
| "page": page, | |
| "page_size": page_size | |
| } | |
| def combine_raw_articles(category=None, query=None, language=None, limit_each=30, | |
| timespan="3d", speed=Speed.balanced, log_summary: bool = True): | |
| if speed == Speed.fast: | |
| timespan = "24h" | |
| limit_each = min(limit_each, 20) | |
| elif speed == Speed.balanced: | |
| timespan = "48h" | |
| limit_each = min(limit_each, 150) | |
| a1 = [] | |
| if USE_NEWSAPI: | |
| if not query: | |
| a1 = fetch_newsapi_headlines_multi(limit=limit_each, language=language) | |
| else: | |
| a1 = fetch_newsapi_articles(category=category, limit=limit_each, query=query, language=language) | |
| a2 = [] | |
| if USE_NEWSDATA_API: | |
| a2 = [ | |
| normalize_newsdata_article(a) | |
| for a in fetch_newsdata_articles(category=category, limit=limit_each, query=query, language=language) | |
| if a.get("link") | |
| ] | |
| a3 = fetch_gnews_articles(limit=limit_each, query=query, language=language) if USE_GNEWS_API else [] | |
| # a4 = fetch_gdelt_articles( | |
| # limit=min(100, limit_each * 2), | |
| # query=query, | |
| # language=language, | |
| # timespan=timespan, | |
| # category=category | |
| # ) | |
| gdelt_limit = limit_each | |
| a4 = fetch_gdelt_multi( | |
| limit=gdelt_limit, | |
| query=query, | |
| language=language, # if provided, we honor it (with small EN boost) | |
| timespan=timespan, | |
| category=category, | |
| speed=speed, | |
| ) | |
| # Dedup by canonical URL (maintain source precedence) | |
| seen, merged = set(), [] | |
| for a in a1 + a3 + a2 + a4: | |
| if a.get("url"): | |
| a["url"] = _canonical_url(a["url"]) | |
| url = a["url"] | |
| if url not in seen: | |
| seen.add(url) | |
| merged.append(a) | |
| if log_summary: | |
| fetch_log.info("----- Article Fetch Summary -----") | |
| fetch_log.info(f"π NewsAPI returned: {len(a1)} articles") | |
| fetch_log.info(f"π NewsData.io returned: {len(a2)} articles") | |
| fetch_log.info(f"π GNews returned: {len(a3)} articles") | |
| fetch_log.info(f"π GDELT returned: {len(a4)} articles") | |
| fetch_log.info(f"β Total merged articles after deduplication: {len(merged)}") | |
| fetch_log.info("---------------------------------") | |
| return merged | |
| def related_articles( | |
| id: Optional[str] = Query(None, description="article id from /news"), | |
| title: Optional[str] = Query(None), | |
| description: Optional[str] = Query(None), | |
| q: Optional[str] = Query(None), | |
| category: Optional[str] = Query(None), | |
| language: Optional[str] = Query(None), | |
| limit_each: int = Query(50, ge=5, le=100), | |
| k: int = Query(10, ge=1, le=50), | |
| ): | |
| # ensure we have a working article list (enriched) to search over | |
| raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each) | |
| enriched = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] | |
| if not enriched: | |
| return {"items": []} | |
| # pick the query vector | |
| if id: | |
| base = next((a for a in enriched if a.get("id") == id), None) | |
| if not base: | |
| raise HTTPException(404, "article id not found in current fetch") | |
| query_text = base["_ml_text"] | |
| else: | |
| text = f"{title or ''} {description or ''}".strip() | |
| if not text: | |
| raise HTTPException(400, "provide either id or title/description") | |
| query_text = text | |
| corpus_texts = [a["_ml_text"] for a in enriched] | |
| corpus_embs = _embed_texts(corpus_texts) | |
| query_emb = _embed_texts([query_text])[0] | |
| # cosine similarities | |
| sims = util.cos_sim(query_emb, corpus_embs).cpu().numpy().flatten() | |
| # take top-k excluding the query itself (if id provided) | |
| idxs = sims.argsort()[::-1] | |
| items = [] | |
| for idx in idxs: | |
| a = enriched[idx] | |
| if id and a["id"] == id: | |
| continue | |
| items.append({**a, "similarity": float(sims[idx])}) | |
| if len(items) >= k: | |
| break | |
| return {"items": items} | |
| async def timing_middleware(request, call_next): | |
| start = time.perf_counter() | |
| response = None | |
| try: | |
| response = await call_next(request) | |
| return response | |
| finally: | |
| dur_ms = (time.perf_counter() - start) * 1000 | |
| # log.info(f"{request.method} {request.url.path} -> {dur_ms:.1f} ms ({_fmt_mmss(dur_ms)})") | |
| if response is not None: | |
| try: | |
| response.headers["X-Process-Time-ms"] = f"{dur_ms:.1f}" | |
| except Exception: | |
| pass | |
| def client_metric(payload: Dict[str, Any] = Body(...)): | |
| name = (payload.get("name") or "").strip() | |
| # Drop redraw spam if it ever slips through again | |
| if name in {"Load all article markers on globe", "Load event country markers on globe"}: | |
| return {"ok": True} | |
| return {"ok": True} | |
| def diag_translate(): | |
| remote = _hf_call("Helsinki-NLP/opus-mt-es-en", {"inputs":"Hola mundo"}) | |
| local = _translate_local("Hola mundo", "es", "en") | |
| libre = _translate_via_libre("Hola mundo", "es", "en") | |
| return { | |
| "token": bool(HUGGINGFACE_API_TOKEN), | |
| "remote_ok": bool(remote), | |
| "local_ok": bool(local), | |
| "libre_ok": bool(libre), | |
| "sample": libre or remote or local | |
| } | |