Spaces:
Runtime error
Runtime error
| from flask import Flask, request, render_template, send_file, redirect, url_for | |
| import os | |
| import re | |
| import uuid | |
| import numpy as np | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import pipeline | |
| from PyPDF2 import PdfReader | |
| print("✅ App starting...") | |
| print("⏳ Loading SentenceTransformer model...") | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| print("✅ Model loaded.") | |
| print("⏳ Loading NLI pipeline...") | |
| nli = pipeline("text-classification", model="microsoft/deberta-large-mnli") | |
| print("✅ NLI pipeline loaded.") | |
| app = Flask(__name__) | |
| # ── base folders ─────────────────────────────────────────────────────────────── | |
| BASE_UPLOADS = os.path.join(os.path.dirname(__file__), "uploads") | |
| BASE_RESULTS = os.path.join(os.path.dirname(__file__), "results") | |
| os.makedirs(BASE_UPLOADS, exist_ok=True) | |
| os.makedirs(BASE_RESULTS, exist_ok=True) | |
| # ── clear uploads at launch ──────────────────────────────────────────────────── | |
| def clear_uploads_folder(): | |
| """Remove all files and subfolders inside the uploads folder on app launch.""" | |
| for entry in os.listdir(BASE_UPLOADS): | |
| path = os.path.join(BASE_UPLOADS, entry) | |
| if os.path.isdir(path): | |
| for root, dirs, files in os.walk(path, topdown=False): | |
| for fname in files: | |
| os.remove(os.path.join(root, fname)) | |
| for dname in dirs: | |
| os.rmdir(os.path.join(root, dname)) | |
| os.rmdir(path) | |
| else: | |
| os.remove(path) | |
| clear_uploads_folder() | |
| print("✅ Uploads folder cleared.") | |
| # runtime cache keyed by search‑id → (paragraphs, embeddings, faiss‑index) | |
| index_data = {} | |
| # ── helpers ──────────────────────────────────────────────────────────────────── | |
| def get_paths(sid: str): | |
| """Return per‑search folders & files, creating them if needed.""" | |
| up_folder = os.path.join(BASE_UPLOADS, sid) | |
| res_folder = os.path.join(BASE_RESULTS, sid) | |
| os.makedirs(up_folder, exist_ok=True) | |
| os.makedirs(res_folder, exist_ok=True) | |
| merged_file = os.path.join(res_folder, "merged.txt") | |
| result_file = os.path.join(res_folder, "results.txt") | |
| return up_folder, res_folder, merged_file, result_file | |
| def extract_text(file_path): | |
| if file_path.endswith('.txt'): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| elif file_path.endswith('.pdf'): | |
| reader = PdfReader(file_path) | |
| full_text = " ".join(page.extract_text() for page in reader.pages if page.extract_text()) | |
| full_text = re.sub(r'(?<=[.!?])\s{2,}', '\n\n', full_text) | |
| full_text = re.sub(r'(?<=[a-z])\.\s+(?=[A-Z])', '.\n\n', full_text) | |
| full_text = re.sub(r'(\n\s*){2,}', '\n\n', full_text) | |
| return full_text | |
| return "" | |
| def rebuild_merged_and_index(sid: str): | |
| """Re‑embed everything for *this* search id.""" | |
| up_folder, _, merged_file, _ = get_paths(sid) | |
| merged_text = "" | |
| for filename in os.listdir(up_folder): | |
| if filename.lower().endswith((".pdf", ".txt")): | |
| merged_text += extract_text(os.path.join(up_folder, filename)) + "\n\n" | |
| with open(merged_file, "w", encoding='utf-8') as f: | |
| f.write(merged_text) | |
| paras = re.split(r'\n\s*\n+', merged_text) | |
| paras = [p.strip().replace('\n', ' ') for p in paras if len(p.strip().split()) > 4] | |
| if not paras: | |
| index_data[sid] = ([], None, None) | |
| return | |
| embed = model.encode(paras, batch_size=32, show_progress_bar=False) | |
| embed = np.asarray(embed) | |
| if embed.ndim == 1: | |
| embed = embed[np.newaxis, :] | |
| faiss.normalize_L2(embed) | |
| idx = faiss.IndexFlatIP(embed.shape[1]) | |
| idx.add(embed) | |
| index_data[sid] = (paras, embed, idx) | |
| # ── routes ───────────────────────────────────────────────────────────────────── | |
| def index(): | |
| # Each *page load* gets its own UUID, preserved via hidden form fields/URLs | |
| sid = request.args.get("sid") or request.form.get("sid") | |
| if not sid: | |
| sid = str(uuid.uuid4()) | |
| up_folder, _, _, _ = get_paths(sid) # ensure dirs exist | |
| paragraphs, embeddings, index_faiss = index_data.get(sid, ([], None, None)) | |
| results = [] | |
| query = "" | |
| k = 5 | |
| if request.method == "POST": | |
| query = request.form.get("query", "").strip() | |
| try: | |
| k = int(request.form.get("topk", 5)) | |
| except ValueError: | |
| k = 5 | |
| if paragraphs and query: | |
| q_embed = model.encode([query]) | |
| q_embed = np.asarray(q_embed) | |
| if q_embed.ndim == 1: | |
| q_embed = q_embed[np.newaxis, :] | |
| faiss.normalize_L2(q_embed) | |
| D, I = index_faiss.search(q_embed, k=min(k, len(paragraphs))) | |
| results = [paragraphs[i] for i in I[0]] | |
| _, res_folder, _, result_file = get_paths(sid) | |
| with open(result_file, "w", encoding='utf-8') as f: | |
| for para in results: | |
| f.write(para + "\n\n") | |
| return render_template("index.html", results=results, query=query, topk=k, sid=sid) | |
| def upload_file(): | |
| sid = request.args.get("sid") | |
| if not sid: | |
| return ("Missing sid", 400) | |
| up_folder, _, _, _ = get_paths(sid) | |
| uploaded_files = request.files.getlist("file") | |
| for file in uploaded_files: | |
| if file and file.filename.lower().endswith((".pdf", ".txt")): | |
| file.save(os.path.join(up_folder, file.filename)) | |
| rebuild_merged_and_index(sid) | |
| return ("", 204) | |
| def download(): | |
| sid = request.args.get("sid") | |
| if not sid: | |
| return ("Missing sid", 400) | |
| _, _, _, result_file = get_paths(sid) | |
| if not os.path.exists(result_file): | |
| return ("Nothing to download", 404) | |
| return send_file(result_file, as_attachment=True) | |
| def download_merged(): | |
| sid = request.args.get("sid") | |
| if not sid: | |
| return ("Missing sid", 400) | |
| _, _, merged_file, _ = get_paths(sid) | |
| if not os.path.exists(merged_file): | |
| return ("Nothing to download", 404) | |
| return send_file(merged_file, as_attachment=True) | |
| def reset(): | |
| sid = request.args.get("sid") | |
| if not sid: | |
| return redirect(url_for('index')) | |
| up_folder, res_folder, _, _ = get_paths(sid) | |
| for folder in [up_folder, res_folder]: | |
| if os.path.exists(folder): | |
| for f in os.listdir(folder): | |
| os.remove(os.path.join(folder, f)) | |
| index_data.pop(sid, None) # drop cached embeddings | |
| return redirect(url_for('index')) | |
| #if __name__ == "__main__": | |
| # from waitress import serve | |
| # # Use threads to approximate “workers” on Windows (Waitress is single‑process). | |
| # serve(app, host="0.0.0.0", port=9001, threads=4) | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |