riteshraut
fix
f7d42c1
raw
history blame
8.4 kB
import os
import asyncio
import uuid
from dotenv import load_dotenv
from datasets import Dataset
import pandas as pd
from typing import Sequence, Any, List
# Ragas and LangChain components
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_recall,
context_precision,
)
from ragas.testset import TestsetGenerator
# NOTE: The 'evolutions' import has been completely removed.
# Your specific RAG components from app.py
from langchain_groq import ChatGroq
from langchain_community.document_loaders import PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.storage import InMemoryStore
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever, ContextualCompressionRetriever
from langchain.retrievers.document_compressors.base import BaseDocumentCompressor
from langchain_core.documents import Document
from sentence_transformers.cross_encoder import CrossEncoder
from rag_processor import create_rag_chain
from langchain_community.chat_message_histories import ChatMessageHistory
import fitz
# Load environment variables
load_dotenv()
# --- Re-implementing LocalReranker from app.py ---
class LocalReranker(BaseDocumentCompressor):
model: Any
top_n: int = 3
class Config:
arbitrary_types_allowed = True
def compress_documents(self, documents: Sequence[Document], query: str, callbacks=None) -> Sequence[Document]:
if not documents: return []
pairs = [[query, doc.page_content] for doc in documents]
scores = self.model.predict(pairs, show_progress_bar=False)
doc_scores = list(zip(documents, scores))
sorted_doc_scores = sorted(doc_scores, key=lambda x: x[1], reverse=True)
top_docs = []
for doc, score in sorted_doc_scores[:self.top_n]:
doc.metadata['rerank_score'] = float(score)
top_docs.append(doc)
return top_docs
# --- Helper Functions ---
def load_pdf_with_fallback(filepath):
"""Load PDF using PyMuPDF"""
try:
docs = []
with fitz.open(filepath) as pdf_doc:
for page_num, page in enumerate(pdf_doc):
text = page.get_text()
if text.strip():
docs.append(Document(
page_content=text,
metadata={"source": os.path.basename(filepath), "page": page_num + 1}
))
if docs:
print(f"βœ“ Successfully loaded PDF: {filepath}")
return docs
else:
raise ValueError("No text content found in PDF.")
except Exception as e:
print(f"βœ— PyMuPDF failed for {filepath}: {e}")
raise
async def main():
"""Main execution function"""
print("\n" + "="*60 + "\nSTARTING RAGAS EVALUATION\n" + "="*60)
pdf_path = "uploads/Unit_-_1_Introduction.pdf"
if not os.path.exists(pdf_path):
print(f"βœ— Error: PDF not found at {pdf_path}")
return
try:
# --- 1. Setup Models ---
print("\n--- 1. Initializing Models ---")
groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key or groq_api_key == "your_groq_api_key_here":
raise ValueError("GROQ_API_KEY not found or is a placeholder.")
generator_llm = ChatGroq(model="llama-3.1-8b-instant", api_key=groq_api_key)
critic_llm = ChatGroq(model="llama-3.1-70b-versatile", api_key=groq_api_key)
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
reranker_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device='cpu')
print("βœ“ Models initialized.")
# --- 2. Setup RAG Pipeline ---
print("\n--- 2. Setting up RAG Pipeline ---")
documents = load_pdf_with_fallback(pdf_path)
# Split documents
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=400)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=250, chunk_overlap=50)
parent_docs = parent_splitter.split_documents(documents)
doc_ids = [str(uuid.uuid4()) for _ in parent_docs]
child_docs = []
for i, doc in enumerate(parent_docs):
_id = doc_ids[i]
sub_docs = child_splitter.split_documents([doc])
for child in sub_docs:
child.metadata["doc_id"] = _id
child_docs.extend(sub_docs)
store = InMemoryStore()
store.mset(list(zip(doc_ids, parent_docs)))
vectorstore = FAISS.from_documents(child_docs, embedding_model)
bm25_retriever = BM25Retriever.from_documents(child_docs, k=10)
faiss_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, faiss_retriever], weights=[0.4, 0.6])
reranker = LocalReranker(model=reranker_model, top_n=5)
compression_retriever = ContextualCompressionRetriever(base_compressor=reranker, base_retriever=ensemble_retriever)
def get_parents(docs: List[Document]) -> List[Document]:
parent_ids = {d.metadata["doc_id"] for d in docs}
return store.mget(list(parent_ids))
final_retriever = compression_retriever | get_parents
message_histories = {}
def get_session_history(session_id: str):
if session_id not in message_histories:
message_histories[session_id] = ChatMessageHistory()
return message_histories[session_id]
rag_chain = create_rag_chain(final_retriever, get_session_history)
print("βœ“ RAG chain created successfully.")
# --- 3. Generate Testset ---
print("\n--- 3. Generating Test Questions ---")
generator = TestsetGenerator.from_langchain(generator_llm, critic_llm, embedding_model)
# Generate a simple test set without complex distributions
testset = generator.generate_with_langchain_docs(documents, testset_size=5)
print("βœ“ Testset generated.")
# --- 4. Run RAG Chain on Testset ---
print("\n--- 4. Running RAG Chain to Generate Answers ---")
test_questions = [item['question'] for item in testset.to_pandas().to_dict('records')]
ground_truths = [item['ground_truth'] for item in testset.to_pandas().to_dict('records')]
answers = []
contexts = []
for i, question in enumerate(test_questions):
print(f" Processing question {i+1}/{len(test_questions)}...")
# Retrieve contexts
retrieved_docs = final_retriever.invoke(question)
contexts.append([doc.page_content for doc in retrieved_docs])
# Get answer from chain
config = {"configurable": {"session_id": str(uuid.uuid4())}}
answer = await rag_chain.ainvoke({"question": question}, config=config)
answers.append(answer)
# --- 5. Evaluate with Ragas ---
print("\n--- 5. Evaluating Results with Ragas ---")
eval_data = {
'question': test_questions,
'answer': answers,
'contexts': contexts,
'ground_truth': ground_truths
}
eval_dataset = Dataset.from_dict(eval_data)
result = evaluate(
eval_dataset,
metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
llm=critic_llm,
embeddings=embedding_model
)
print("\n" + "="*60 + "\nEVALUATION RESULTS\n" + "="*60)
print(result)
# --- 6. Save Results ---
print("\n--- 6. Saving Results ---")
results_df = result.to_pandas()
results_df.to_csv("evaluation_results.csv", index=False)
print("βœ“ Evaluation results saved to evaluation_results.csv")
print("\n" + "="*60 + "\nEVALUATION COMPLETE!\n" + "="*60)
except Exception as e:
print(f"\nβœ— An error occurred during the process: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())