|
|
import os |
|
|
import asyncio |
|
|
import uuid |
|
|
from dotenv import load_dotenv |
|
|
from datasets import Dataset |
|
|
import pandas as pd |
|
|
from typing import Sequence, Any, List |
|
|
|
|
|
|
|
|
from ragas import evaluate |
|
|
from ragas.metrics import ( |
|
|
faithfulness, |
|
|
answer_relevancy, |
|
|
context_recall, |
|
|
context_precision, |
|
|
) |
|
|
from ragas.testset import TestsetGenerator |
|
|
|
|
|
|
|
|
|
|
|
from langchain_groq import ChatGroq |
|
|
from langchain_community.document_loaders import PyMuPDFLoader |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
from langchain_community.vectorstores import FAISS |
|
|
from langchain.storage import InMemoryStore |
|
|
from langchain_community.retrievers import BM25Retriever |
|
|
from langchain.retrievers import EnsembleRetriever, ContextualCompressionRetriever |
|
|
from langchain.retrievers.document_compressors.base import BaseDocumentCompressor |
|
|
from langchain_core.documents import Document |
|
|
from sentence_transformers.cross_encoder import CrossEncoder |
|
|
from rag_processor import create_rag_chain |
|
|
from langchain_community.chat_message_histories import ChatMessageHistory |
|
|
import fitz |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
class LocalReranker(BaseDocumentCompressor): |
|
|
model: Any |
|
|
top_n: int = 3 |
|
|
class Config: |
|
|
arbitrary_types_allowed = True |
|
|
def compress_documents(self, documents: Sequence[Document], query: str, callbacks=None) -> Sequence[Document]: |
|
|
if not documents: return [] |
|
|
pairs = [[query, doc.page_content] for doc in documents] |
|
|
scores = self.model.predict(pairs, show_progress_bar=False) |
|
|
doc_scores = list(zip(documents, scores)) |
|
|
sorted_doc_scores = sorted(doc_scores, key=lambda x: x[1], reverse=True) |
|
|
top_docs = [] |
|
|
for doc, score in sorted_doc_scores[:self.top_n]: |
|
|
doc.metadata['rerank_score'] = float(score) |
|
|
top_docs.append(doc) |
|
|
return top_docs |
|
|
|
|
|
|
|
|
def load_pdf_with_fallback(filepath): |
|
|
"""Load PDF using PyMuPDF""" |
|
|
try: |
|
|
docs = [] |
|
|
with fitz.open(filepath) as pdf_doc: |
|
|
for page_num, page in enumerate(pdf_doc): |
|
|
text = page.get_text() |
|
|
if text.strip(): |
|
|
docs.append(Document( |
|
|
page_content=text, |
|
|
metadata={"source": os.path.basename(filepath), "page": page_num + 1} |
|
|
)) |
|
|
if docs: |
|
|
print(f"β Successfully loaded PDF: {filepath}") |
|
|
return docs |
|
|
else: |
|
|
raise ValueError("No text content found in PDF.") |
|
|
except Exception as e: |
|
|
print(f"β PyMuPDF failed for {filepath}: {e}") |
|
|
raise |
|
|
|
|
|
async def main(): |
|
|
"""Main execution function""" |
|
|
print("\n" + "="*60 + "\nSTARTING RAGAS EVALUATION\n" + "="*60) |
|
|
|
|
|
pdf_path = "uploads/Unit_-_1_Introduction.pdf" |
|
|
if not os.path.exists(pdf_path): |
|
|
print(f"β Error: PDF not found at {pdf_path}") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
print("\n--- 1. Initializing Models ---") |
|
|
groq_api_key = os.getenv("GROQ_API_KEY") |
|
|
if not groq_api_key or groq_api_key == "your_groq_api_key_here": |
|
|
raise ValueError("GROQ_API_KEY not found or is a placeholder.") |
|
|
|
|
|
generator_llm = ChatGroq(model="llama-3.1-8b-instant", api_key=groq_api_key) |
|
|
critic_llm = ChatGroq(model="llama-3.1-70b-versatile", api_key=groq_api_key) |
|
|
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
|
|
reranker_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device='cpu') |
|
|
print("β Models initialized.") |
|
|
|
|
|
|
|
|
print("\n--- 2. Setting up RAG Pipeline ---") |
|
|
documents = load_pdf_with_fallback(pdf_path) |
|
|
|
|
|
|
|
|
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=400) |
|
|
child_splitter = RecursiveCharacterTextSplitter(chunk_size=250, chunk_overlap=50) |
|
|
parent_docs = parent_splitter.split_documents(documents) |
|
|
doc_ids = [str(uuid.uuid4()) for _ in parent_docs] |
|
|
|
|
|
child_docs = [] |
|
|
for i, doc in enumerate(parent_docs): |
|
|
_id = doc_ids[i] |
|
|
sub_docs = child_splitter.split_documents([doc]) |
|
|
for child in sub_docs: |
|
|
child.metadata["doc_id"] = _id |
|
|
child_docs.extend(sub_docs) |
|
|
|
|
|
store = InMemoryStore() |
|
|
store.mset(list(zip(doc_ids, parent_docs))) |
|
|
vectorstore = FAISS.from_documents(child_docs, embedding_model) |
|
|
|
|
|
bm25_retriever = BM25Retriever.from_documents(child_docs, k=10) |
|
|
faiss_retriever = vectorstore.as_retriever(search_kwargs={"k": 10}) |
|
|
ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, faiss_retriever], weights=[0.4, 0.6]) |
|
|
|
|
|
reranker = LocalReranker(model=reranker_model, top_n=5) |
|
|
compression_retriever = ContextualCompressionRetriever(base_compressor=reranker, base_retriever=ensemble_retriever) |
|
|
|
|
|
def get_parents(docs: List[Document]) -> List[Document]: |
|
|
parent_ids = {d.metadata["doc_id"] for d in docs} |
|
|
return store.mget(list(parent_ids)) |
|
|
|
|
|
final_retriever = compression_retriever | get_parents |
|
|
|
|
|
message_histories = {} |
|
|
def get_session_history(session_id: str): |
|
|
if session_id not in message_histories: |
|
|
message_histories[session_id] = ChatMessageHistory() |
|
|
return message_histories[session_id] |
|
|
|
|
|
rag_chain = create_rag_chain(final_retriever, get_session_history) |
|
|
print("β RAG chain created successfully.") |
|
|
|
|
|
|
|
|
print("\n--- 3. Generating Test Questions ---") |
|
|
generator = TestsetGenerator.from_langchain(generator_llm, critic_llm, embedding_model) |
|
|
|
|
|
|
|
|
testset = generator.generate_with_langchain_docs(documents, testset_size=5) |
|
|
print("β Testset generated.") |
|
|
|
|
|
|
|
|
print("\n--- 4. Running RAG Chain to Generate Answers ---") |
|
|
test_questions = [item['question'] for item in testset.to_pandas().to_dict('records')] |
|
|
ground_truths = [item['ground_truth'] for item in testset.to_pandas().to_dict('records')] |
|
|
|
|
|
answers = [] |
|
|
contexts = [] |
|
|
|
|
|
for i, question in enumerate(test_questions): |
|
|
print(f" Processing question {i+1}/{len(test_questions)}...") |
|
|
|
|
|
retrieved_docs = final_retriever.invoke(question) |
|
|
contexts.append([doc.page_content for doc in retrieved_docs]) |
|
|
|
|
|
config = {"configurable": {"session_id": str(uuid.uuid4())}} |
|
|
answer = await rag_chain.ainvoke({"question": question}, config=config) |
|
|
answers.append(answer) |
|
|
|
|
|
|
|
|
print("\n--- 5. Evaluating Results with Ragas ---") |
|
|
eval_data = { |
|
|
'question': test_questions, |
|
|
'answer': answers, |
|
|
'contexts': contexts, |
|
|
'ground_truth': ground_truths |
|
|
} |
|
|
eval_dataset = Dataset.from_dict(eval_data) |
|
|
|
|
|
result = evaluate( |
|
|
eval_dataset, |
|
|
metrics=[faithfulness, answer_relevancy, context_precision, context_recall], |
|
|
llm=critic_llm, |
|
|
embeddings=embedding_model |
|
|
) |
|
|
|
|
|
print("\n" + "="*60 + "\nEVALUATION RESULTS\n" + "="*60) |
|
|
print(result) |
|
|
|
|
|
|
|
|
print("\n--- 6. Saving Results ---") |
|
|
results_df = result.to_pandas() |
|
|
results_df.to_csv("evaluation_results.csv", index=False) |
|
|
print("β Evaluation results saved to evaluation_results.csv") |
|
|
|
|
|
print("\n" + "="*60 + "\nEVALUATION COMPLETE!\n" + "="*60) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\nβ An error occurred during the process: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |