Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import PyPDF2 | |
| import chromadb | |
| from openai import OpenAI | |
| import numpy as np | |
| from typing import List, Dict, Tuple | |
| import json | |
| import io | |
| import os | |
| from datetime import datetime | |
| import pandas as pd | |
| class RAGPipeline: | |
| def __init__(self): | |
| # Initialize local ChromaDB client using new configuration | |
| try: | |
| self.chroma_client = chromadb.PersistentClient(path="./chroma_db") | |
| except Exception as e: | |
| print(f"ChromaDB initialization error: {e}") | |
| self.chroma_client = None | |
| # OpenAI client (will be set through UI) | |
| self.openai_client = None | |
| self.openai_api_key = None | |
| # Collection for storing document chunks | |
| self.collection = None | |
| # Store document metadata and full text | |
| self.document_metadata = {} | |
| self.full_extracted_text = "" # Store full text here | |
| def set_openai_key(self, openai_key: str): | |
| """Set OpenAI API key and create client""" | |
| self.openai_api_key = openai_key | |
| if openai_key: | |
| self.openai_client = OpenAI(api_key=openai_key) | |
| def get_openai_embedding(self, text: str) -> List[float]: | |
| """Generate embeddings using OpenAI's text-embedding-ada-002 model""" | |
| if not self.openai_client: | |
| raise ValueError("OpenAI client not initialized") | |
| try: | |
| response = self.openai_client.embeddings.create( | |
| model="text-embedding-ada-002", | |
| input=text | |
| ) | |
| return response.data[0].embedding | |
| except Exception as e: | |
| raise Exception(f"OpenAI embedding generation failed: {str(e)}") | |
| def extract_text_from_pdf(self, pdf_file) -> Tuple[str, Dict]: | |
| """Extract text from uploaded PDF file""" | |
| try: | |
| # Handle different file types from Gradio | |
| if hasattr(pdf_file, 'name'): | |
| # If it's a file path, read the file | |
| with open(pdf_file.name, 'rb') as file: | |
| pdf_content = file.read() | |
| elif isinstance(pdf_file, bytes): | |
| # If it's already bytes | |
| pdf_content = pdf_file | |
| else: | |
| # If it's a file-like object, read it | |
| pdf_content = pdf_file.read() if hasattr(pdf_file, 'read') else pdf_file | |
| # Read PDF file | |
| pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content)) | |
| text = "" | |
| page_count = len(pdf_reader.pages) | |
| # Extract text from all pages | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| page_text = page.extract_text() | |
| if page_text.strip(): # Only add non-empty pages | |
| text += f"\n--- Page {page_num + 1} ---\n" | |
| text += page_text + "\n" | |
| # Clean up the text | |
| text = text.strip() | |
| # Store the full text in the pipeline object | |
| self.full_extracted_text = text | |
| print(f"DEBUG: Stored full text length: {len(self.full_extracted_text)}") | |
| # Create extraction metadata | |
| metadata = { | |
| "total_pages": page_count, | |
| "total_characters": len(text), | |
| "extraction_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "file_size_bytes": len(pdf_content), | |
| "pages_with_text": sum(1 for page in pdf_reader.pages if page.extract_text().strip()), | |
| "average_chars_per_page": len(text) // page_count if page_count > 0 else 0 | |
| } | |
| return text, metadata | |
| except Exception as e: | |
| return f"Error extracting PDF: {str(e)}", {} | |
| def chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> Tuple[List[str], Dict]: | |
| """Split text into overlapping chunks""" | |
| if not text or len(text.strip()) == 0: | |
| return [], {"error": "No text provided for chunking"} | |
| # Clean the text first | |
| text = text.strip() | |
| chunks = [] | |
| start = 0 | |
| print(f"DEBUG: Starting chunking with text length: {len(text)}") | |
| print(f"DEBUG: Chunk size: {chunk_size}, Overlap: {overlap}") | |
| while start < len(text): | |
| end = start + chunk_size | |
| # If we're not at the end, try to break at a sentence or word boundary | |
| if end < len(text): | |
| # Look for sentence boundary | |
| last_period = text.rfind('.', start, end) | |
| last_newline = text.rfind('\n', start, end) | |
| last_space = text.rfind(' ', start, end) | |
| # Choose the best breaking point | |
| break_point = max(last_period, last_newline, last_space) | |
| if break_point > start: | |
| end = break_point + 1 | |
| chunk = text[start:end].strip() | |
| if chunk and len(chunk) > 50: # Only add meaningful chunks | |
| chunks.append(chunk) | |
| print(f"DEBUG: Added chunk {len(chunks)}: length={len(chunk)}") | |
| # Move start position | |
| if end >= len(text): | |
| break | |
| start = end - overlap | |
| # Prevent infinite loop | |
| if start >= end: | |
| start = end | |
| print(f"DEBUG: Final chunks count: {len(chunks)}") | |
| # Create chunking metadata | |
| chunk_lengths = [len(chunk) for chunk in chunks] | |
| metadata = { | |
| "total_chunks": len(chunks), | |
| "chunk_size": chunk_size, | |
| "overlap": overlap, | |
| "avg_chunk_length": np.mean(chunk_lengths) if chunks else 0, | |
| "min_chunk_length": min(chunk_lengths) if chunks else 0, | |
| "max_chunk_length": max(chunk_lengths) if chunks else 0, | |
| "total_text_length": len(text), | |
| "chunking_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| return chunks, metadata | |
| def store_in_chromadb(self, chunks: List[str], document_name: str) -> Dict: | |
| """Store chunks in ChromaDB with OpenAI embeddings""" | |
| if not self.openai_client: | |
| return {"error": "OpenAI client not initialized for embedding generation"} | |
| try: | |
| # Create or get collection | |
| collection_name = f"financial_docs_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| try: | |
| self.chroma_client.delete_collection(collection_name) | |
| except: | |
| pass | |
| self.collection = self.chroma_client.create_collection( | |
| name=collection_name, | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| # Generate embeddings for chunks using OpenAI | |
| embeddings = [] | |
| embedding_metadata = { | |
| "model_used": "text-embedding-ada-002", | |
| "total_chunks_processed": len(chunks), | |
| "embedding_start_time": datetime.now().isoformat() | |
| } | |
| for i, chunk in enumerate(chunks): | |
| try: | |
| embedding = self.get_openai_embedding(chunk) | |
| embeddings.append(embedding) | |
| except Exception as e: | |
| return {"error": f"Failed to generate embedding for chunk {i}: {str(e)}"} | |
| embedding_metadata["embedding_end_time"] = datetime.now().isoformat() | |
| embedding_metadata["embedding_dimension"] = len(embeddings[0]) if embeddings else 0 | |
| # Create unique IDs for each chunk | |
| ids = [f"chunk_{i}" for i in range(len(chunks))] | |
| # Create metadata for each chunk | |
| metadatas = [ | |
| { | |
| "chunk_id": i, | |
| "document_name": document_name, | |
| "chunk_length": len(chunk), | |
| "created_at": datetime.now().isoformat(), | |
| "embedding_model": "text-embedding-ada-002" | |
| } | |
| for i, chunk in enumerate(chunks) | |
| ] | |
| # Store in ChromaDB | |
| self.collection.add( | |
| embeddings=embeddings, | |
| documents=chunks, | |
| metadatas=metadatas, | |
| ids=ids | |
| ) | |
| # Create storage metadata | |
| storage_metadata = { | |
| "collection_name": collection_name, | |
| "total_vectors_stored": len(chunks), | |
| "embedding_dimension": len(embeddings[0]) if embeddings else 0, | |
| "embedding_model": "text-embedding-ada-002", | |
| "storage_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "database_status": "Successfully stored", | |
| "database_type": "ChromaDB Local", | |
| "database_path": "./chroma_db", | |
| "embedding_metadata": embedding_metadata | |
| } | |
| return storage_metadata | |
| except Exception as e: | |
| return {"error": f"Storage failed: {str(e)}"} | |
| def semantic_search(self, query: str, top_k: int = 5) -> Tuple[List[Dict], Dict]: | |
| """Perform semantic search using OpenAI embeddings and return top-k results""" | |
| if not self.collection: | |
| return [], {"error": "No collection available. Please upload and process a document first."} | |
| if not self.openai_client: | |
| return [], {"error": "OpenAI client not initialized for query embedding generation"} | |
| try: | |
| # Generate query embedding using OpenAI | |
| query_embedding = self.get_openai_embedding(query) | |
| # Search in ChromaDB | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=top_k, | |
| include=['documents', 'metadatas', 'distances'] | |
| ) | |
| # Format results | |
| search_results = [] | |
| for i in range(len(results['documents'][0])): | |
| result = { | |
| "chunk_id": results['metadatas'][0][i]['chunk_id'], | |
| "similarity_score": 1 - results['distances'][0][i], # Convert distance to similarity | |
| "content": results['documents'][0][i][:500] + "..." if len(results['documents'][0][i]) > 500 else results['documents'][0][i], | |
| "full_content": results['documents'][0][i], | |
| "metadata": results['metadatas'][0][i] | |
| } | |
| search_results.append(result) | |
| # Create search metadata | |
| search_metadata = { | |
| "query": query, | |
| "results_found": len(search_results), | |
| "search_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "top_similarity_score": max([r["similarity_score"] for r in search_results]) if search_results else 0, | |
| "query_embedding_model": "text-embedding-ada-002", | |
| "vector_database": "ChromaDB Local" | |
| } | |
| return search_results, search_metadata | |
| except Exception as e: | |
| return [], {"error": f"Search failed: {str(e)}"} | |
| def generate_llm_response(self, query: str, search_results: List[Dict]) -> Tuple[str, Dict]: | |
| """Generate final response using OpenAI LLM""" | |
| if not self.openai_client: | |
| return "OpenAI client not initialized for LLM response generation.", {} | |
| try: | |
| # Prepare context from search results | |
| context = "\n\n".join([ | |
| f"Chunk {result['chunk_id']} (Similarity: {result['similarity_score']:.3f}):\n{result['full_content']}" | |
| for result in search_results | |
| ]) | |
| # Create prompt | |
| prompt = f"""Based on the following financial document excerpts, please provide a comprehensive and accurate answer to the user's question. | |
| Context from financial document: | |
| {context} | |
| User Question: {query} | |
| Instructions: | |
| 1. Provide a detailed, well-structured answer based solely on the provided context | |
| 2. If the context doesn't contain enough information to fully answer the question, clearly state this | |
| 3. Include specific numbers, dates, and financial figures when available | |
| 4. Structure your response clearly with proper formatting | |
| 5. Cite which chunk(s) your information comes from when possible | |
| Answer:""" | |
| # Generate response using OpenAI | |
| response = self.openai_client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a financial analyst AI assistant. Provide accurate, well-structured responses based on the given financial document context."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=1000, | |
| temperature=0.1 | |
| ) | |
| llm_response = response.choices[0].message.content | |
| # Create response metadata | |
| response_metadata = { | |
| "model_used": "gpt-3.5-turbo", | |
| "response_length": len(llm_response), | |
| "tokens_used": response.usage.total_tokens, | |
| "prompt_tokens": response.usage.prompt_tokens, | |
| "completion_tokens": response.usage.completion_tokens, | |
| "generation_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "context_chunks_used": len(search_results), | |
| "temperature": 0.1, | |
| "max_tokens": 1000 | |
| } | |
| return llm_response, response_metadata | |
| except Exception as e: | |
| return f"LLM Generation failed: {str(e)}", {"error": str(e)} | |
| # Initialize RAG pipeline | |
| rag_pipeline = RAGPipeline() | |
| def configure_openai_api(openai_key): | |
| """Configure OpenAI API key""" | |
| try: | |
| # Set OpenAI API key | |
| rag_pipeline.set_openai_key(openai_key) | |
| # Test OpenAI connection | |
| if openai_key: | |
| try: | |
| # Test with a simple API call | |
| test_response = rag_pipeline.openai_client.models.list() | |
| openai_status = "β OpenAI API key validated successfully" | |
| except Exception as e: | |
| openai_status = f"β OpenAI API key validation failed: {str(e)}" | |
| else: | |
| openai_status = "β OpenAI API key required" | |
| # ChromaDB status (local setup) | |
| if rag_pipeline.chroma_client: | |
| chroma_status = "β ChromaDB Local database ready (./chroma_db)" | |
| else: | |
| chroma_status = "β ChromaDB Local database initialization failed" | |
| return f"{openai_status}\n{chroma_status}" | |
| except Exception as e: | |
| return f"β Configuration failed: {str(e)}" | |
| # Remove the global variable since we're storing in the class | |
| # extracted_text_store = "" | |
| def process_pdf_upload(pdf_file): | |
| """Process uploaded PDF and extract text""" | |
| if pdf_file is None: | |
| return "No file uploaded", "{}" | |
| # Extract text using the updated method | |
| text, metadata = rag_pipeline.extract_text_from_pdf(pdf_file) | |
| if text.startswith("Error"): | |
| return text, json.dumps(metadata, indent=2) | |
| # Show more text in preview (first 3000 characters instead of 2000) | |
| preview_text = text[:3000] + f"...\n\n[SHOWING FIRST 3000 CHARACTERS OF {len(text)} TOTAL CHARACTERS]\n[FULL TEXT STORED FOR PROCESSING - Total Length: {len(rag_pipeline.full_extracted_text)} chars]" if len(text) > 3000 else text | |
| return preview_text, json.dumps(metadata, indent=2) | |
| def process_chunking(text, chunk_size, overlap): | |
| """Process text chunking""" | |
| # Always use the full text stored in the pipeline object | |
| if not rag_pipeline.full_extracted_text: | |
| return "No text available for chunking. Please upload a PDF first.", "{}" | |
| full_text = rag_pipeline.full_extracted_text | |
| print(f"DEBUG: Using full text for chunking, length: {len(full_text)}") | |
| if len(full_text.strip()) == 0: | |
| return "No valid text available for chunking.", "{}" | |
| chunks, metadata = rag_pipeline.chunk_text(full_text, int(chunk_size), int(overlap)) | |
| if not chunks: | |
| return "No chunks created. Please check your text and parameters.", json.dumps(metadata, indent=2) | |
| # Display first few chunks as preview | |
| preview = f"=== CHUNKING RESULTS ===\n" | |
| preview += f"Total chunks created: {len(chunks)}\n" | |
| preview += f"Full text length processed: {len(full_text)} characters\n\n" | |
| preview += "--- CHUNK PREVIEW ---\n\n" | |
| for i, chunk in enumerate(chunks[:3]): | |
| preview += f"Chunk {i+1} (Length: {len(chunk)} chars):\n" | |
| preview += f"{chunk[:200]}...\n\n" | |
| preview += "-" * 50 + "\n\n" | |
| if len(chunks) > 3: | |
| preview += f"... and {len(chunks)-3} more chunks\n" | |
| preview += f"Shortest chunk: {min(len(c) for c in chunks)} chars\n" | |
| preview += f"Longest chunk: {max(len(c) for c in chunks)} chars\n" | |
| return preview, json.dumps(metadata, indent=2) | |
| def process_vector_storage(text, chunk_size, overlap, doc_name): | |
| """Process vector storage in local ChromaDB""" | |
| if not rag_pipeline.openai_client: | |
| return "Please configure OpenAI API key first in the Configuration tab", "{}" | |
| if not rag_pipeline.chroma_client: | |
| return "ChromaDB local database not available. Please restart the application.", "{}" | |
| # Always use the stored full text | |
| if not rag_pipeline.full_extracted_text: | |
| return "No valid text to store. Please upload a PDF first.", "{}" | |
| full_text = rag_pipeline.full_extracted_text | |
| print(f"DEBUG: Using full text for storage, length: {len(full_text)}") | |
| # Re-chunk the text using full text | |
| chunks, _ = rag_pipeline.chunk_text(full_text, int(chunk_size), int(overlap)) | |
| if not chunks: | |
| return "No chunks to store", "{}" | |
| # Store in ChromaDB | |
| storage_metadata = rag_pipeline.store_in_chromadb(chunks, doc_name or "financial_document") | |
| if "error" in storage_metadata: | |
| return f"Storage failed: {storage_metadata['error']}", json.dumps(storage_metadata, indent=2) | |
| return f"Successfully stored {len(chunks)} chunks in ChromaDB Local using OpenAI embeddings\nFull text length: {len(full_text)} characters", json.dumps(storage_metadata, indent=2) | |
| def process_semantic_search(query, top_k): | |
| """Process semantic search""" | |
| if not query.strip(): | |
| return "Please enter a search query", "{}", "" | |
| search_results, search_metadata = rag_pipeline.semantic_search(query, int(top_k)) | |
| if not search_results: | |
| return "No results found", json.dumps(search_metadata, indent=2), "" | |
| # Format results for display | |
| results_display = "=== TOP MATCHING CHUNKS ===\n\n" | |
| for i, result in enumerate(search_results, 1): | |
| results_display += f"RESULT {i}:\n" | |
| results_display += f"Chunk ID: {result['chunk_id']}\n" | |
| results_display += f"Similarity Score: {result['similarity_score']:.4f}\n" | |
| results_display += f"Content Preview: {result['content']}\n" | |
| results_display += "-" * 50 + "\n\n" | |
| # Create DataFrame for structured display | |
| df_data = [] | |
| for result in search_results: | |
| df_data.append({ | |
| "Chunk ID": result['chunk_id'], | |
| "Similarity Score": f"{result['similarity_score']:.4f}", | |
| "Content Length": len(result['full_content']), | |
| "Preview": result['content'][:100] + "..." | |
| }) | |
| df = pd.DataFrame(df_data) | |
| return results_display, json.dumps(search_metadata, indent=2), df | |
| def generate_final_response(query, top_k): | |
| """Generate final LLM response""" | |
| if not rag_pipeline.openai_client: | |
| return "Please configure OpenAI API key first in the Configuration tab", "{}" | |
| if not query.strip(): | |
| return "Please enter a query first", "{}" | |
| # Get search results | |
| search_results, _ = rag_pipeline.semantic_search(query, int(top_k)) | |
| if not search_results: | |
| return "No search results available for LLM generation", "{}" | |
| # Generate LLM response | |
| response, metadata = rag_pipeline.generate_llm_response(query, search_results) | |
| return response, json.dumps(metadata, indent=2) | |
| def create_gradio_interface(): | |
| """Create the Gradio interface""" | |
| with gr.Blocks(title="RAG Pipeline Demo - Financial Document Analysis", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π¦ RAG Pipeline Demo - Financial Document Analysis | |
| This demo shows a complete Retrieval-Augmented Generation (RAG) pipeline with full transparency. | |
| Each step is clearly displayed so you can understand exactly what's happening in the backend. | |
| **π§ Start by configuring your API keys in the Configuration tab below.** | |
| """) | |
| # Configuration Tab - Simplified | |
| with gr.Tab("βοΈ Configuration"): | |
| gr.Markdown("### API Configuration") | |
| gr.Markdown("Configure your OpenAI API key. ChromaDB will run locally and store data in `./chroma_db` folder.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### OpenAI API Key") | |
| gr.Markdown("Required for both embeddings generation and LLM response generation") | |
| openai_key_input = gr.Textbox( | |
| label="OpenAI API Key", | |
| type="password", | |
| placeholder="sk-...", | |
| info="Get your API key from: https://platform.openai.com/api-keys" | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("#### ChromaDB Status") | |
| gr.Markdown("β **Local ChromaDB**: Data will be stored locally in `./chroma_db`") | |
| gr.Markdown("π **Storage Location**: Current directory/chroma_db") | |
| gr.Markdown("π **Persistence**: Data persists between sessions") | |
| config_btn = gr.Button("Save OpenAI Configuration", variant="primary", size="lg") | |
| config_status = gr.Textbox(label="Configuration Status", lines=3) | |
| # Step 1: Document Upload | |
| with gr.Tab("1οΈβ£ Document Upload"): | |
| gr.Markdown("### Step 1: Upload Your Financial PDF Document") | |
| with gr.Row(): | |
| with gr.Column(): | |
| pdf_input = gr.File(label="Upload PDF Document", file_types=[".pdf"]) | |
| upload_btn = gr.Button("Extract Text from PDF", variant="primary") | |
| with gr.Column(): | |
| extraction_output = gr.Textbox(label="Extracted Text Preview", lines=15, max_lines=20) | |
| extraction_metadata = gr.JSON(label="Extraction Metadata") | |
| # Step 2: Text Chunking | |
| with gr.Tab("2οΈβ£ Text Chunking"): | |
| gr.Markdown("### Step 2: Split Text into Manageable Chunks") | |
| with gr.Row(): | |
| with gr.Column(): | |
| chunk_size = gr.Slider(minimum=200, maximum=2000, value=1000, label="Chunk Size (characters)") | |
| overlap = gr.Slider(minimum=0, maximum=500, value=200, label="Overlap (characters)") | |
| chunk_btn = gr.Button("Create Chunks", variant="primary") | |
| with gr.Column(): | |
| chunks_output = gr.Textbox(label="Chunks Preview", lines=15, max_lines=20) | |
| chunking_metadata = gr.JSON(label="Chunking Metadata") | |
| # Step 3: Vector Storage | |
| with gr.Tab("3οΈβ£ Vector Storage"): | |
| gr.Markdown("### Step 3: Store Chunks in ChromaDB Vector Database") | |
| with gr.Row(): | |
| with gr.Column(): | |
| doc_name = gr.Textbox(label="Document Name", value="financial_report", placeholder="Enter document name") | |
| storage_btn = gr.Button("Store in ChromaDB", variant="primary") | |
| with gr.Column(): | |
| storage_output = gr.Textbox(label="Storage Status", lines=5) | |
| storage_metadata = gr.JSON(label="Storage Metadata") | |
| # Step 4: Semantic Search | |
| with gr.Tab("4οΈβ£ Semantic Search"): | |
| gr.Markdown("### Step 4: Search for Relevant Information") | |
| with gr.Row(): | |
| with gr.Column(): | |
| search_query = gr.Textbox(label="Enter your question", placeholder="e.g., What was the revenue growth in Q4?") | |
| top_k = gr.Slider(minimum=1, maximum=10, value=5, label="Number of results to retrieve") | |
| search_btn = gr.Button("Search Vector Database", variant="primary") | |
| with gr.Column(): | |
| search_results_text = gr.Textbox(label="Search Results", lines=15, max_lines=20) | |
| search_metadata = gr.JSON(label="Search Metadata") | |
| # Results table | |
| results_table = gr.DataFrame(label="Top Matching Chunks - Structured View") | |
| # Step 5: LLM Response Generation | |
| with gr.Tab("5οΈβ£ LLM Response"): | |
| gr.Markdown("### Step 5: Generate Final Answer using OpenAI") | |
| gr.Markdown("*Note: OpenAI API key must be configured in the Configuration tab*") | |
| with gr.Row(): | |
| with gr.Column(): | |
| generate_btn = gr.Button("Generate Final Response", variant="primary") | |
| gr.Markdown("**Current Query:** Will use the query from Step 4") | |
| with gr.Column(): | |
| final_response = gr.Textbox(label="AI Generated Response", lines=15, max_lines=20) | |
| response_metadata = gr.JSON(label="Response Metadata") | |
| # Complete Pipeline Tab | |
| with gr.Tab("π Complete Pipeline"): | |
| gr.Markdown("### Run the Complete RAG Pipeline") | |
| gr.Markdown("*Note: Make sure to configure API keys in the Configuration tab first*") | |
| with gr.Row(): | |
| with gr.Column(): | |
| complete_pdf = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
| complete_query = gr.Textbox(label="Your Question", placeholder="Ask about the financial document") | |
| with gr.Column(): | |
| complete_chunk_size = gr.Slider(minimum=200, maximum=2000, value=1000, label="Chunk Size") | |
| complete_overlap = gr.Slider(minimum=0, maximum=500, value=200, label="Overlap") | |
| complete_top_k = gr.Slider(minimum=1, maximum=10, value=5, label="Top K Results") | |
| complete_btn = gr.Button("Run Complete Pipeline", variant="primary", size="lg") | |
| with gr.Row(): | |
| pipeline_status = gr.Textbox(label="Pipeline Status", lines=10) | |
| pipeline_response = gr.Textbox(label="Final Answer", lines=10) | |
| # Event handlers | |
| config_btn.click( | |
| configure_openai_api, | |
| inputs=[openai_key_input], | |
| outputs=[config_status] | |
| ) | |
| upload_btn.click( | |
| process_pdf_upload, | |
| inputs=[pdf_input], | |
| outputs=[extraction_output, extraction_metadata] | |
| ) | |
| chunk_btn.click( | |
| process_chunking, | |
| inputs=[extraction_output, chunk_size, overlap], | |
| outputs=[chunks_output, chunking_metadata] | |
| ) | |
| storage_btn.click( | |
| process_vector_storage, | |
| inputs=[extraction_output, chunk_size, overlap, doc_name], | |
| outputs=[storage_output, storage_metadata] | |
| ) | |
| search_btn.click( | |
| process_semantic_search, | |
| inputs=[search_query, top_k], | |
| outputs=[search_results_text, search_metadata, results_table] | |
| ) | |
| generate_btn.click( | |
| generate_final_response, | |
| inputs=[search_query, top_k], | |
| outputs=[final_response, response_metadata] | |
| ) | |
| # Complete pipeline function | |
| def run_complete_pipeline(pdf_file, query, chunk_size, overlap, top_k): | |
| if not pdf_file or not query: | |
| return "Please provide PDF file and query", "" | |
| if not rag_pipeline.openai_client: | |
| return "Please configure OpenAI API key in the Configuration tab first", "" | |
| if not rag_pipeline.chroma_client: | |
| return "ChromaDB local database not available. Please restart the application.", "" | |
| status = "Starting RAG Pipeline...\n\n" | |
| status += "Using: ChromaDB Local + OpenAI API\n" | |
| status += "Storage: ./chroma_db directory\n\n" | |
| try: | |
| # Step 1: Extract text | |
| status += "Step 1: Extracting text from PDF...\n" | |
| text, _ = rag_pipeline.extract_text_from_pdf(pdf_file) | |
| if text.startswith("Error"): | |
| return status + f"Failed: {text}", "" | |
| status += "β Text extraction completed\n\n" | |
| # Step 2: Chunk text | |
| status += "Step 2: Chunking text...\n" | |
| chunks, _ = rag_pipeline.chunk_text(text, chunk_size, overlap) | |
| status += f"β Created {len(chunks)} chunks\n\n" | |
| # Step 3: Store in vector DB | |
| status += f"Step 3: Generating OpenAI embeddings and storing in ChromaDB Local...\n" | |
| storage_result = rag_pipeline.store_in_chromadb(chunks, "complete_pipeline_doc") | |
| if "error" in storage_result: | |
| return status + f"Failed: {storage_result['error']}", "" | |
| status += f"β Vectors stored in ChromaDB Local using OpenAI embeddings\n\n" | |
| # Step 4: Search | |
| status += "Step 4: Performing semantic search with OpenAI embeddings...\n" | |
| search_results, _ = rag_pipeline.semantic_search(query, top_k) | |
| if not search_results: | |
| return status + "β No search results found", "" | |
| status += f"β Found {len(search_results)} relevant chunks\n\n" | |
| # Step 5: Generate response | |
| status += "Step 5: Generating LLM response...\n" | |
| response, _ = rag_pipeline.generate_llm_response(query, search_results) | |
| if response.startswith("LLM Generation failed"): | |
| return status + f"Failed: {response}", "" | |
| status += "β Final response generated successfully!" | |
| return status, response | |
| except Exception as e: | |
| return status + f"β Pipeline failed: {str(e)}", "" | |
| complete_btn.click( | |
| run_complete_pipeline, | |
| inputs=[complete_pdf, complete_query, complete_chunk_size, complete_overlap, complete_top_k], | |
| outputs=[pipeline_status, pipeline_response] | |
| ) | |
| return demo | |
| # Launch the application | |
| if __name__ == "__main__": | |
| # Install required packages | |
| print("Starting RAG Pipeline Demo...") | |
| print("Make sure you have installed the required packages:") | |
| print("pip install gradio PyPDF2 chromadb openai pandas numpy") | |
| print("\nConfiguration:") | |
| print("β ChromaDB: Local storage (./chroma_db directory)") | |
| print("π OpenAI: API key required for embeddings + LLM") | |
| print("π Data persistence: Enabled across sessions") | |
| # Create and launch the Gradio interface | |
| demo = create_gradio_interface() | |
| demo.launch() |