Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import time | |
| import chromadb | |
| from langchain_chroma import Chroma | |
| import transformers | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_ollama import ChatOllama | |
| from langchain_core.documents import Document | |
| from langchain_community.llms import HuggingFacePipeline | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from huggingface_hub import login | |
| # Initialize in-memory ChromaDB client | |
| client = chromadb.Client() | |
| # Load your embeddings model | |
| model_kwargs = {"device": "cpu"} | |
| encode_kwargs = {"normalize_embeddings": True} | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/paraphrase-distilroberta-base-v1", | |
| model_kwargs=model_kwargs, | |
| encode_kwargs=encode_kwargs, | |
| ) | |
| # Initialize the vector DB using the in-memory client | |
| vectorDB = Chroma( | |
| client=client, | |
| collection_name="embeddings", | |
| embedding_function=embeddings, | |
| ) | |
| # Function to process and ingest a PDF file | |
| def process_pdf(file_path): | |
| # Use PyPDFLoader to load the PDF | |
| loader = PyPDFLoader(file_path) | |
| documents = loader.load() | |
| # Split the documents for better retrieval | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=30) | |
| texts = text_splitter.split_documents(documents) | |
| # Ingest into the vector store | |
| # Note: A docId is added to group these documents | |
| metadata_chunks = [] | |
| # Concatenate all chunks into a single string | |
| for i, chunk in enumerate(texts): | |
| # Add metadata to each chunk | |
| metadata = {"source": f"example_source_{i}", "docId":str("42")} | |
| id = str(i) | |
| doc_with_metadata = Document( | |
| page_content=chunk.page_content, metadata=metadata, id=id,docId="42" | |
| ) | |
| metadata_chunks.append(doc_with_metadata) | |
| print("Done") | |
| # Add the documents to the vector database | |
| try: | |
| vectorDB.add_documents(metadata_chunks) | |
| except: | |
| raise Exception() | |
| gr.Info("PDF processed and ready for questions!") | |
| # Your existing functions | |
| def using_ollama_model(retriever, query, results, conversation_history, token): | |
| print("toekn----------->", token) | |
| try: | |
| if token: | |
| gr.Info("Attempting to log in to Hugging Face...") | |
| login(token=token) | |
| gr.Info("Login successful!") | |
| else: | |
| gr.Warning("No Hugging Face token provided. Gated models may not be accessible.") | |
| except Exception as e: | |
| gr.Error(f"Hugging Face login failed: {e}") | |
| return "An error occurred during authentication. Please check your token and try again." | |
| history_text = "" | |
| for item in conversation_history: | |
| if "question" in item and item["question"]: | |
| history_text += f"User: {item['question']}\n" | |
| if "answer" in item and item["answer"]: | |
| history_text += f"Assistant: {item['answer']}\n" | |
| prompt_template = """ | |
| You are a helpful assistant. Answer the following question using the provided context and previous conversation history. | |
| If the context does not contain the answer, only then reply with: "Sorry, I don't have enough information." | |
| Conversation History :{history} | |
| Context:{results} | |
| Question:{query} | |
| """ | |
| template = PromptTemplate( | |
| input_variables=["history", "results", "query"], template=prompt_template, | |
| ) | |
| doc_texts = "\\n".join([doc.page_content for doc in results]) | |
| model_id = "meta-llama/Llama-3.2-3B-Instruct" | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelForCausalLM.from_pretrained(model_id) | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=256, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.95, | |
| repetition_penalty=1.2 | |
| ) | |
| llm = HuggingFacePipeline(pipeline=pipe) | |
| rag_chain = template | llm | StrOutputParser() | |
| answer = rag_chain.invoke({"history": history_text, "results": doc_texts, "query": query}) | |
| return answer | |
| def retrievingReponse(docId, query, conversation_history, token): | |
| retriever = vectorDB.as_retriever( | |
| search_type="similarity", | |
| search_kwargs={ | |
| "k": 4, | |
| "filter": {"docId": docId} | |
| } | |
| ) | |
| results = retriever.invoke(query) | |
| unique_results = [] | |
| seen_texts = set() | |
| for result in results: | |
| if result.page_content not in seen_texts: | |
| ans = result.page_content.replace("\n", "") | |
| unique_results.append(ans) | |
| seen_texts.add(result.page_content) | |
| llm_result = using_ollama_model(retriever, query, results, conversation_history, token) | |
| return llm_result | |
| # The revised Gradio wrapper function | |
| def gradio_rag_wrapper(message, history, token): | |
| print(history) | |
| # Check if a file has been uploaded | |
| # 'message' is a dictionary due to `multimodal=True` | |
| uploaded_files = message.get("files", []) | |
| # Process the PDF if it exists | |
| if uploaded_files: | |
| for file_path in uploaded_files: | |
| process_pdf(file_path) | |
| # Return a message to confirm the upload | |
| return "PDF uploaded and processed. You can now ask questions about the content." | |
| # Process the text query | |
| text_query = message.get("text", "") | |
| if not text_query.strip(): | |
| # Handle cases where only a file was uploaded | |
| return "Please upload a document or enter a text query." | |
| rag_history = [] | |
| for user_msg, bot_msg in history: | |
| # Note: You need to extract the text from user messages which may contain files | |
| user_text = user_msg.get("text", "") if isinstance(user_msg, dict) else user_msg | |
| rag_history.append({"question": user_text, "answer": bot_msg}) | |
| docId = "42" # Use the docId from the uploaded file | |
| response = retrievingReponse(docId, text_query, rag_history, token) | |
| return response | |
| # Create the Gradio interface with multimodal input | |
| with gr.Blocks(title="Contextual RAG Chatbot on Hugging Face Spaces") as demo: | |
| gr.Markdown("## Contextual RAG Chatbot") | |
| gr.Markdown("Please enter your Hugging Face Access Token to access gated models like Llama 3.2. You can generate a token from your [Hugging Face settings](https://huggingface.co/settings/tokens).") | |
| hf_token_textbox = gr.Textbox( | |
| label="Hugging Face Access Token", | |
| type="password", | |
| interactive=True | |
| ) | |
| # Use gr.Chatbot and gr.MultimodalTextbox for more control | |
| chatbot = gr.Chatbot(label="Chatbot") | |
| msg = gr.MultimodalTextbox( | |
| placeholder="Upload a PDF file or enter your query...", | |
| file_types=[".pdf"], | |
| interactive=True | |
| ) | |
| # Submit handler to process user input and update the chatbot | |
| def respond(message, chat_history, hf_token_from_textbox): | |
| # The wrapper function now correctly receives the token from the text box | |
| user_message_text = message.get("text") or "File uploaded." | |
| response = gradio_rag_wrapper(message, chat_history, hf_token_from_textbox) | |
| chat_history.append((user_message_text, response)) | |
| return "", chat_history | |
| # Define the submit event to call the respond function | |
| msg.submit( | |
| respond, | |
| inputs=[msg, chatbot, hf_token_textbox], | |
| outputs=[msg, chatbot], | |
| ) | |
| if __name__ == "__main__": | |
| # Create a dummy doc for initial testing if no PDF is uploaded | |
| demo.launch() | |