import os import asyncio import uuid from dotenv import load_dotenv from datasets import Dataset import pandas as pd from typing import Sequence, Any, List # Ragas and LangChain components from ragas import evaluate from ragas.metrics import ( faithfulness, answer_relevancy, context_recall, context_precision, ) from ragas.testset import TestsetGenerator # NOTE: The 'evolutions' import has been completely removed. # Your specific RAG components from app.py from langchain_groq import ChatGroq from langchain_community.document_loaders import PyMuPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS from langchain.storage import InMemoryStore from langchain_community.retrievers import BM25Retriever from langchain.retrievers import EnsembleRetriever, ContextualCompressionRetriever from langchain.retrievers.document_compressors.base import BaseDocumentCompressor from langchain_core.documents import Document from sentence_transformers.cross_encoder import CrossEncoder from rag_processor import create_rag_chain from langchain_community.chat_message_histories import ChatMessageHistory import fitz # Load environment variables load_dotenv() # --- Re-implementing LocalReranker from app.py --- class LocalReranker(BaseDocumentCompressor): model: Any top_n: int = 3 class Config: arbitrary_types_allowed = True def compress_documents(self, documents: Sequence[Document], query: str, callbacks=None) -> Sequence[Document]: if not documents: return [] pairs = [[query, doc.page_content] for doc in documents] scores = self.model.predict(pairs, show_progress_bar=False) doc_scores = list(zip(documents, scores)) sorted_doc_scores = sorted(doc_scores, key=lambda x: x[1], reverse=True) top_docs = [] for doc, score in sorted_doc_scores[:self.top_n]: doc.metadata['rerank_score'] = float(score) top_docs.append(doc) return top_docs # --- Helper Functions --- def load_pdf_with_fallback(filepath): """Load PDF using PyMuPDF""" try: docs = [] with fitz.open(filepath) as pdf_doc: for page_num, page in enumerate(pdf_doc): text = page.get_text() if text.strip(): docs.append(Document( page_content=text, metadata={"source": os.path.basename(filepath), "page": page_num + 1} )) if docs: print(f"✓ Successfully loaded PDF: {filepath}") return docs else: raise ValueError("No text content found in PDF.") except Exception as e: print(f"✗ PyMuPDF failed for {filepath}: {e}") raise async def main(): """Main execution function""" print("\n" + "="*60 + "\nSTARTING RAGAS EVALUATION\n" + "="*60) pdf_path = "uploads/Unit_-_1_Introduction.pdf" if not os.path.exists(pdf_path): print(f"✗ Error: PDF not found at {pdf_path}") return try: # --- 1. Setup Models --- print("\n--- 1. Initializing Models ---") groq_api_key = os.getenv("GROQ_API_KEY") if not groq_api_key or groq_api_key == "your_groq_api_key_here": raise ValueError("GROQ_API_KEY not found or is a placeholder.") generator_llm = ChatGroq(model="llama-3.1-8b-instant", api_key=groq_api_key) critic_llm = ChatGroq(model="llama-3.1-70b-versatile", api_key=groq_api_key) embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") reranker_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device='cpu') print("✓ Models initialized.") # --- 2. Setup RAG Pipeline --- print("\n--- 2. Setting up RAG Pipeline ---") documents = load_pdf_with_fallback(pdf_path) # Split documents parent_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=400) child_splitter = RecursiveCharacterTextSplitter(chunk_size=250, chunk_overlap=50) parent_docs = parent_splitter.split_documents(documents) doc_ids = [str(uuid.uuid4()) for _ in parent_docs] child_docs = [] for i, doc in enumerate(parent_docs): _id = doc_ids[i] sub_docs = child_splitter.split_documents([doc]) for child in sub_docs: child.metadata["doc_id"] = _id child_docs.extend(sub_docs) store = InMemoryStore() store.mset(list(zip(doc_ids, parent_docs))) vectorstore = FAISS.from_documents(child_docs, embedding_model) bm25_retriever = BM25Retriever.from_documents(child_docs, k=10) faiss_retriever = vectorstore.as_retriever(search_kwargs={"k": 10}) ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, faiss_retriever], weights=[0.4, 0.6]) reranker = LocalReranker(model=reranker_model, top_n=5) compression_retriever = ContextualCompressionRetriever(base_compressor=reranker, base_retriever=ensemble_retriever) def get_parents(docs: List[Document]) -> List[Document]: parent_ids = {d.metadata["doc_id"] for d in docs} return store.mget(list(parent_ids)) final_retriever = compression_retriever | get_parents message_histories = {} def get_session_history(session_id: str): if session_id not in message_histories: message_histories[session_id] = ChatMessageHistory() return message_histories[session_id] rag_chain = create_rag_chain(final_retriever, get_session_history) print("✓ RAG chain created successfully.") # --- 3. Generate Testset --- print("\n--- 3. Generating Test Questions ---") generator = TestsetGenerator.from_langchain(generator_llm, critic_llm, embedding_model) # Generate a simple test set without complex distributions testset = generator.generate_with_langchain_docs(documents, testset_size=5) print("✓ Testset generated.") # --- 4. Run RAG Chain on Testset --- print("\n--- 4. Running RAG Chain to Generate Answers ---") test_questions = [item['question'] for item in testset.to_pandas().to_dict('records')] ground_truths = [item['ground_truth'] for item in testset.to_pandas().to_dict('records')] answers = [] contexts = [] for i, question in enumerate(test_questions): print(f" Processing question {i+1}/{len(test_questions)}...") # Retrieve contexts retrieved_docs = final_retriever.invoke(question) contexts.append([doc.page_content for doc in retrieved_docs]) # Get answer from chain config = {"configurable": {"session_id": str(uuid.uuid4())}} answer = await rag_chain.ainvoke({"question": question}, config=config) answers.append(answer) # --- 5. Evaluate with Ragas --- print("\n--- 5. Evaluating Results with Ragas ---") eval_data = { 'question': test_questions, 'answer': answers, 'contexts': contexts, 'ground_truth': ground_truths } eval_dataset = Dataset.from_dict(eval_data) result = evaluate( eval_dataset, metrics=[faithfulness, answer_relevancy, context_precision, context_recall], llm=critic_llm, embeddings=embedding_model ) print("\n" + "="*60 + "\nEVALUATION RESULTS\n" + "="*60) print(result) # --- 6. Save Results --- print("\n--- 6. Saving Results ---") results_df = result.to_pandas() results_df.to_csv("evaluation_results.csv", index=False) print("✓ Evaluation results saved to evaluation_results.csv") print("\n" + "="*60 + "\nEVALUATION COMPLETE!\n" + "="*60) except Exception as e: print(f"\n✗ An error occurred during the process: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())