Spaces:
Sleeping
Sleeping
File size: 7,794 Bytes
903b444 5181b3c 903b444 5181b3c 903b444 5181b3c 903b444 5181b3c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# Flask app for Subtitle KIS β main routes + search flow
import os
import re
import json as flask_json
from flask import Flask, render_template, request, jsonify
from markupsafe import escape, Markup
from nltk.corpus import wordnet
from nltk.stem import WordNetLemmatizer
# NOTE: heavy imports moved to lazy inside perform_search()
# from semantic_search import search_query
# from nlp_summary import summarize_text
from autocomplete import get_suggestions
from config import ABBREVIATION_MAP, VIDEO_METADATA, SEARCH_CONFIG
# App setup
template_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'templates')
app = Flask(__name__, template_folder=template_dir)
# Security headers:Content Security Policy
@app.after_request
def apply_csp(response):
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"img-src 'self' https://img.youtube.com data:; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline';"
)
return response
# Route: Home page
@app.route("/")
def index():
return render_template("index.html")
# Health check (fast) β for HF Spaces readiness
@app.get("/health")
def health():
return {"ok": True}, 200
# Template filter: convert HH:MM:SS to seconds
@app.template_filter("jump_time")
def jump_time(timestamp):
try:
h, m, s = timestamp.split(':')
total = int(h) * 3600 + int(m) * 60 + int(float(s))
return max(total - 2, 0)
except:
return 0
# NLP helpers: lemmatizer + synonym expansion
lemmatizer = WordNetLemmatizer()
def get_synonyms(word):
"""Return a set of synonyms for a single word."""
synonyms = set()
for syn in wordnet.synsets(word):
for lemma in syn.lemmas():
synonyms.add(lemma.name().replace("_", " "))
return synonyms
# Highlighting:
def highlight_keywords(text, keyword, semantic_mode=False):
"""
Highlight exact matches always.
In semantic mode, also highlight synonyms and lemmas.
"""
safe_text = escape(text)
if len(keyword) <= 3:
pattern = re.compile(rf"(?<!\w){re.escape(keyword)}(?!\w)", re.IGNORECASE)
else:
pattern = re.compile(re.escape(keyword), re.IGNORECASE)
if pattern.search(safe_text):
return pattern.sub(lambda m: f"<mark>{m.group(0)}</mark>", safe_text)
# Semantic mode
if semantic_mode:
words = keyword.split()
for w in words:
lemma = lemmatizer.lemmatize(w.lower())
candidates = {lemma} | get_synonyms(w)
for cand in candidates:
if len(cand) <= 3:
syn_pattern = re.compile(rf"(?<!\w){re.escape(cand)}(?!\w)", re.IGNORECASE)
else:
syn_pattern = re.compile(rf"\b{re.escape(cand)}\b", re.IGNORECASE)
if syn_pattern.search(safe_text):
safe_text = syn_pattern.sub(lambda m: f"<mark>{m.group(0)}</mark>", safe_text)
return safe_text
return safe_text
# Core search orchestration
def perform_search(query, start=0, shown=0, previous_results=None, semantic_mode=False):
"""Shared search logic for both HTML and JSON endpoints."""
if previous_results is None:
previous_results = []
# πΈ Lazy imports so heavy modules load on first search, not at boot
from semantic_search import search_query
from nlp_summary import summarize_text
raw_results, _ = search_query(query, offset=0, top_k=1000, semantic_mode=semantic_mode)
# Keyword mode
if not semantic_mode:
raw_results = [r for r in raw_results if re.search(re.escape(query), r["text"], re.IGNORECASE)]
page_size = SEARCH_CONFIG.get("results_per_page", 5)
paged_results = raw_results[start:start + page_size]
new_results = []
for idx, r in enumerate(paged_results):
vid_id = r.get("video_id")
friendly_key = next((k for k, v in VIDEO_METADATA.items() if v["id"] == vid_id), None)
r["video_title"] = VIDEO_METADATA.get(friendly_key, {}).get("title", "Unknown Title")
context_chunks = []
if idx > 0:
context_chunks.append(paged_results[idx - 1]["summary_input"])
context_chunks.append(r["summary_input"])
if idx + 1 < len(paged_results):
context_chunks.append(paged_results[idx + 1]["summary_input"])
summary = summarize_text(" ".join(context_chunks), query=query)
highlighted_before = highlight_keywords(r["context_before"], query, semantic_mode)
highlighted_match = highlight_keywords(r["text"], query, semantic_mode)
highlighted_after = highlight_keywords(r["context_after"], query, semantic_mode)
r["highlighted_block"] = Markup(f"{highlighted_before}\n{highlighted_match}\n{highlighted_after}")
r["summary"] = summary
new_results.append(r)
combined_results = previous_results + new_results
shown += len(new_results)
return combined_results, len(raw_results), shown, start + len(new_results)
# HTML endpoint
@app.route("/search", methods=["POST"])
def search():
query = request.form.get("query", "").strip()
if not query:
return render_template("index.html", error="Please enter a search query.")
semantic_mode = request.form.get("semantic") == "true"
start = int(request.form.get("start", 0))
try:
previous_results = flask_json.loads(request.form.get("previous_results", "[]"))
except:
previous_results = []
for r in previous_results:
if isinstance(r, dict) and "highlighted_block" in r:
r["highlighted_block"] = Markup(r["highlighted_block"])
shown = int(request.form.get("shown", len(previous_results)))
combined_results, total_matches, shown, next_start = perform_search(
query, start, shown, previous_results, semantic_mode
)
# Abbreviation
suggestion_term = ""
lower_query = query.lower()
if lower_query in ABBREVIATION_MAP:
suggestion_term = ABBREVIATION_MAP[lower_query]
elif lower_query in ABBREVIATION_MAP.values():
for abbr, full in ABBREVIATION_MAP.items():
if full == lower_query:
suggestion_term = abbr
break
return render_template(
"results.html",
query=query,
results=combined_results,
shown=shown,
start=next_start,
total_matches=total_matches,
previous_results=combined_results,
suggestion_term=suggestion_term,
semantic=semantic_mode
)
# JSON API endpoint
@app.route("/api/search", methods=["POST"])
def api_search():
data = request.get_json(force=True)
query = data.get("query", "").strip()
semantic_mode = bool(data.get("semantic", False))
start = int(data.get("start", 0))
shown = int(data.get("shown", 0))
previous_results = data.get("previous_results", [])
combined_results, total_matches, shown, next_start = perform_search(
query, start, shown, previous_results, semantic_mode
)
rendered_cards = [
render_template("_result_card.html", result=r, query=query, semantic=semantic_mode)
for r in combined_results[-SEARCH_CONFIG.get("results_per_page", 5):]
]
return jsonify({
"html": rendered_cards,
"shown": shown,
"total_matches": total_matches,
"next_start": next_start,
"has_more": next_start < total_matches
})
# Autocomplete endpoint
@app.route("/autocomplete", methods=["GET"])
def autocomplete():
term = request.args.get("term", "")
return flask_json.dumps(get_suggestions(term))
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860)) # HF Spaces default
app.run(host="0.0.0.0", port=port, debug=False)
|