|
|
import os |
|
|
os.environ['HF_HUB_ENABLE_HF_TRANSFER'] = '0' |
|
|
|
|
|
import gradio as gr |
|
|
from sentence_transformers import SentenceTransformer |
|
|
import numpy as np |
|
|
from pypdf import PdfReader |
|
|
import torch |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
import re |
|
|
|
|
|
|
|
|
chunks = [] |
|
|
embeddings = [] |
|
|
model = None |
|
|
tokenizer = None |
|
|
embed_model = None |
|
|
text_cache = "" |
|
|
|
|
|
def initialize_models(): |
|
|
"""Initialize models on startup with optimizations""" |
|
|
global model, tokenizer, embed_model |
|
|
|
|
|
print("Loading models...") |
|
|
|
|
|
|
|
|
embed_model = SentenceTransformer( |
|
|
'sentence-transformers/paraphrase-MiniLM-L3-v2', |
|
|
device='cpu' |
|
|
) |
|
|
|
|
|
|
|
|
model_name = "microsoft/phi-1_5" |
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
|
model_name, |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
model_name, |
|
|
torch_dtype=torch.float32, |
|
|
low_cpu_mem_usage=True, |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
|
|
|
if tokenizer.pad_token is None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
print("Models loaded successfully!") |
|
|
|
|
|
def smart_chunk_text(text, chunk_size=500, overlap=100): |
|
|
"""Smarter chunking that respects sentence boundaries""" |
|
|
|
|
|
sentences = re.split(r'[.!?]+', text) |
|
|
chunks = [] |
|
|
current_chunk = "" |
|
|
|
|
|
for sentence in sentences: |
|
|
sentence = sentence.strip() |
|
|
if not sentence: |
|
|
continue |
|
|
|
|
|
|
|
|
if len(current_chunk) + len(sentence) > chunk_size and current_chunk: |
|
|
chunks.append(current_chunk) |
|
|
|
|
|
words = current_chunk.split() |
|
|
current_chunk = " ".join(words[-20:]) + " " + sentence |
|
|
else: |
|
|
current_chunk += " " + sentence |
|
|
|
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(current_chunk.strip()) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def process_pdf(pdf_file): |
|
|
"""Process PDF and create embeddings - OPTIMIZED""" |
|
|
global chunks, embeddings, embed_model, text_cache |
|
|
|
|
|
if pdf_file is None: |
|
|
return "β Please upload a PDF file!", None |
|
|
|
|
|
try: |
|
|
|
|
|
pdf_reader = PdfReader(pdf_file.name) |
|
|
text = "" |
|
|
for page in pdf_reader.pages: |
|
|
text += page.extract_text() + "\n" |
|
|
|
|
|
if not text.strip(): |
|
|
return "β Could not extract text from PDF!", None |
|
|
|
|
|
text_cache = text |
|
|
|
|
|
|
|
|
chunks = smart_chunk_text(text, chunk_size=500, overlap=100) |
|
|
|
|
|
|
|
|
print(f"Creating embeddings for {len(chunks)} chunks...") |
|
|
embeddings = embed_model.encode( |
|
|
chunks, |
|
|
batch_size=32, |
|
|
show_progress_bar=False, |
|
|
convert_to_numpy=True |
|
|
) |
|
|
|
|
|
return f"β
PDF processed! Created {len(chunks)} chunks. You can now ask questions!", None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing PDF: {str(e)}") |
|
|
return f"β Error: {str(e)}", None |
|
|
|
|
|
def find_relevant_chunks(query, top_k=2): |
|
|
"""Find most relevant chunks - OPTIMIZED""" |
|
|
global chunks, embeddings, embed_model |
|
|
|
|
|
if not chunks or len(embeddings) == 0: |
|
|
return [] |
|
|
|
|
|
|
|
|
query_embedding = embed_model.encode( |
|
|
[query], |
|
|
convert_to_numpy=True, |
|
|
show_progress_bar=False |
|
|
)[0] |
|
|
|
|
|
|
|
|
embeddings_norm = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) |
|
|
query_norm = query_embedding / np.linalg.norm(query_embedding) |
|
|
similarities = np.dot(embeddings_norm, query_norm) |
|
|
|
|
|
|
|
|
top_indices = np.argsort(similarities)[-top_k:][::-1] |
|
|
|
|
|
return [chunks[i] for i in top_indices] |
|
|
|
|
|
def generate_response(question, context): |
|
|
"""Generate response - OPTIMIZED""" |
|
|
global model, tokenizer |
|
|
|
|
|
|
|
|
prompt = f"""Context: {context[:800]} |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Answer:""" |
|
|
|
|
|
inputs = tokenizer( |
|
|
prompt, |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
max_length=1024 |
|
|
) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=150, |
|
|
temperature=0.7, |
|
|
top_p=0.9, |
|
|
do_sample=True, |
|
|
pad_token_id=tokenizer.eos_token_id, |
|
|
num_beams=1, |
|
|
early_stopping=True |
|
|
) |
|
|
|
|
|
response = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
|
|
|
|
|
if "Answer:" in response: |
|
|
response = response.split("Answer:")[-1].strip() |
|
|
|
|
|
|
|
|
response = response.split("\n")[0].strip() |
|
|
|
|
|
return response |
|
|
|
|
|
def chat(message, history): |
|
|
"""Handle chat - OPTIMIZED""" |
|
|
global chunks |
|
|
|
|
|
if not chunks: |
|
|
return history + [[message, "β οΈ Please upload and process a PDF first!"]] |
|
|
|
|
|
if not message.strip(): |
|
|
return history |
|
|
|
|
|
try: |
|
|
|
|
|
relevant_chunks = find_relevant_chunks(message, top_k=2) |
|
|
context = " ".join(relevant_chunks) |
|
|
|
|
|
|
|
|
response = generate_response(message, context) |
|
|
|
|
|
|
|
|
if not response or len(response) < 10: |
|
|
response = "I found relevant information but couldn't generate a clear answer. Please try rephrasing your question." |
|
|
|
|
|
return history + [[message, response]] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in chat: {str(e)}") |
|
|
return history + [[message, f"β Error: {str(e)}"]] |
|
|
|
|
|
def clear_all(): |
|
|
"""Clear everything""" |
|
|
global chunks, embeddings, text_cache |
|
|
chunks = [] |
|
|
embeddings = [] |
|
|
text_cache = "" |
|
|
return None, "Ready to process a new PDF" |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Chat with PDF - Fast", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown("# β‘ Chat with PDF - Optimized Fast Version") |
|
|
gr.Markdown("*Using lightweight models for faster responses*") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
pdf_input = gr.File( |
|
|
label="π Upload PDF", |
|
|
file_types=[".pdf"] |
|
|
) |
|
|
process_btn = gr.Button( |
|
|
"π Process PDF", |
|
|
variant="primary", |
|
|
size="lg" |
|
|
) |
|
|
status = gr.Textbox( |
|
|
label="Status", |
|
|
lines=2, |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
gr.Markdown("### Tips:") |
|
|
gr.Markdown(""" |
|
|
- Processing is much faster now! |
|
|
- Ask specific questions |
|
|
- Keep questions concise |
|
|
""") |
|
|
|
|
|
clear_all_btn = gr.Button("ποΈ Clear All", variant="stop") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
chatbot = gr.Chatbot( |
|
|
label="π¬ Chat", |
|
|
height=450, |
|
|
bubble_full_width=False |
|
|
) |
|
|
msg = gr.Textbox( |
|
|
label="Question", |
|
|
placeholder="Ask a question about the PDF...", |
|
|
lines=2 |
|
|
) |
|
|
with gr.Row(): |
|
|
send_btn = gr.Button("π€ Send", variant="primary") |
|
|
clear_btn = gr.Button("Clear Chat") |
|
|
|
|
|
|
|
|
process_btn.click( |
|
|
process_pdf, |
|
|
inputs=[pdf_input], |
|
|
outputs=[status, chatbot] |
|
|
) |
|
|
|
|
|
msg.submit( |
|
|
chat, |
|
|
inputs=[msg, chatbot], |
|
|
outputs=[chatbot] |
|
|
).then( |
|
|
lambda: "", |
|
|
None, |
|
|
[msg] |
|
|
) |
|
|
|
|
|
send_btn.click( |
|
|
chat, |
|
|
inputs=[msg, chatbot], |
|
|
outputs=[chatbot] |
|
|
).then( |
|
|
lambda: "", |
|
|
None, |
|
|
[msg] |
|
|
) |
|
|
|
|
|
clear_btn.click(lambda: None, None, [chatbot]) |
|
|
clear_all_btn.click(clear_all, None, [chatbot, status]) |
|
|
|
|
|
|
|
|
initialize_models() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue() |
|
|
demo.launch(share=False) |