Spaces:
Build error
Build error
| import streamlit as st | |
| from docx import Document | |
| import os | |
| from langchain_core.prompts import PromptTemplate | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| import time | |
| from sentence_transformers import SentenceTransformer | |
| from langchain.vectorstores import Chroma | |
| from langchain.docstore.document import Document as Document2 | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| import cohere | |
| from langchain_core.prompts import PromptTemplate | |
| # Load token from environment variable | |
| token = os.getenv("HF_TOKEN") | |
| print("my token is ", token) | |
| # Save the token to Hugging Face's system directory | |
| docs_folder = "./converted_docs" | |
| # Function to load .docx files from Google Drive folder | |
| def load_docx_files_from_drive(drive_folder): | |
| docx_files = [f for f in os.listdir(drive_folder) if f.endswith(".docx")] | |
| documents = [] | |
| for file_name in docx_files: | |
| file_path = os.path.join(drive_folder, file_name) | |
| doc = Document(file_path) | |
| content = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) | |
| documents.append(content) | |
| return documents | |
| # Load .docx files from Google Drive folder | |
| documents = load_docx_files_from_drive(docs_folder) | |
| def split_extracted_text_into_chunks(documents): | |
| print("Splitting text into chunks") | |
| # List to hold all chunks | |
| chunks = [] | |
| for doc_text in documents: | |
| # Split the document text into lines | |
| lines = doc_text.splitlines() | |
| # Initialize variables for splitting | |
| current_chunk = [] | |
| for line in lines: | |
| # Check if the line starts with "File Name:" | |
| if line.startswith("File Name:"): | |
| # If there's a current chunk, save it before starting a new one | |
| if current_chunk: | |
| chunks.append("\n".join(current_chunk)) | |
| current_chunk = [] # Reset the current chunk | |
| # Add the line to the current chunk | |
| current_chunk.append(line) | |
| # Add the last chunk for the current document | |
| if current_chunk: | |
| chunks.append("\n".join(current_chunk)) | |
| return chunks | |
| # Split the extracted documents into chunks | |
| chunks = split_extracted_text_into_chunks(documents) | |
| def save_chunks_to_file(chunks, output_file_path): | |
| print("Saving chunks to file") | |
| # Open the file in write mode | |
| with open(output_file_path, "w", encoding="utf-8") as file: | |
| for i, chunk in enumerate(chunks, start=1): | |
| # Write each chunk with a header for easy identification | |
| file.write(f"Chunk {i}:\n") | |
| file.write(chunk) | |
| file.write("\n" + "=" * 50 + "\n") | |
| # Path to save the chunks file | |
| output_file_path = "./chunks_output.txt" | |
| # Split the extracted documents into chunks | |
| chunks = split_extracted_text_into_chunks(documents) | |
| # Save the chunks to the file | |
| save_chunks_to_file(chunks, output_file_path) | |
| # Step 1: Load the model through LangChain's wrapper | |
| embedding_model = HuggingFaceEmbeddings( | |
| model_name="Omartificial-Intelligence-Space/Arabic-Triplet-Matryoshka-V2" | |
| ) | |
| print("#0") | |
| # Step 2: Embed the chunks (now simplified) | |
| def embed_chunks(chunks): | |
| status_text = st.empty() | |
| progress_bar = st.progress(0) | |
| results = [] | |
| total_chunks = len(chunks) | |
| for i, chunk in enumerate(chunks): | |
| result = { | |
| "chunk": chunk, | |
| "embedding": embedding_model.embed_query(chunk) | |
| } | |
| results.append(result) | |
| progress = (i + 1) / total_chunks | |
| progress_bar.progress(progress) | |
| status_text.text(f"Processed {i+1}/{total_chunks} chunks ({progress:.0%})") | |
| progress_bar.progress(1.0) | |
| status_text.text("Embedding complete!") | |
| return results | |
| embeddings = embed_chunks(chunks) | |
| print("#1") | |
| # Step 3: Prepare documents (unchanged) | |
| def prepare_documents_for_chroma(embeddings): | |
| status_text = st.empty() | |
| progress_bar = st.progress(0) | |
| documents = [] | |
| total_entries = len(embeddings) | |
| for i, entry in enumerate(embeddings, start=1): | |
| doc = Document2( | |
| page_content=entry["chunk"], | |
| metadata={"chunk_index": i} | |
| ) | |
| documents.append(doc) | |
| progress = i / total_entries | |
| progress_bar.progress(progress) | |
| status_text.text(f"Processing document {i}/{total_entries} ({progress:.0%})") | |
| progress_bar.progress(1.0) | |
| status_text.text(f"✅ Successfully prepared {total_entries} documents") | |
| return documents | |
| print("#2") | |
| documents = prepare_documents_for_chroma(embeddings) | |
| print("Creating the vectore store") | |
| # Step 4: Create Chroma store (fixed) | |
| vectorstore = Chroma.from_documents( | |
| documents=documents, | |
| embedding=embedding_model, # Proper embedding object | |
| persist_directory="./chroma_db", # Optional persistence | |
| ) | |
| class RAGPipeline: | |
| def __init__(self, vectorstore, api_key, model_name="c4ai-aya-expanse-8b", k=3): | |
| print("Initializing RAG Pipeline") | |
| self.vectorstore = vectorstore | |
| self.model_name = model_name | |
| self.k = k | |
| self.api_key = api_key | |
| self.client = cohere.Client(api_key) # Initialize the Cohere client | |
| self.retriever = self.vectorstore.as_retriever( | |
| search_type="mmr", search_kwargs={"k": 3} | |
| ) | |
| self.prompt_template = PromptTemplate.from_template(self._get_template()) | |
| def _get_template(self): | |
| return """<s>[INST] <<SYS>> | |
| أنت مساعد مفيد يقدم إجابات باللغة العربية بناءً على السياق المقدم. | |
| - أجب فقط باللغة العربية | |
| - إذا لم تجد إجابة في السياق، قل أنك لا تعرف | |
| - كن دقيقاً وواضحاً في إجاباتك | |
| -جاوب من السياق حصريا | |
| <</SYS>> | |
| السياق: {context} | |
| السؤال: {question} | |
| الإجابة: [/INST]\ | |
| """ | |
| def generate_response(self, question): | |
| retrieved_docs = self._retrieve_documents(question) | |
| prompt = self._create_prompt(retrieved_docs, question) | |
| response = self._generate_response_cohere(prompt) | |
| return response | |
| def _retrieve_documents(self, question): | |
| retrieved_docs = self.retriever.invoke(question) | |
| # print("\n=== المستندات المسترجعة ===") | |
| # for i, doc in enumerate(retrieved_docs): | |
| # print(f"المستند {i+1}: {doc.page_content}") | |
| # print("==========================\n") | |
| # دمج النصوص المسترجعة في سياق واحد | |
| return " ".join([doc.page_content for doc in retrieved_docs]) | |
| def _create_prompt(self, docs, question): | |
| return self.prompt_template.format(context=docs, question=question) | |
| def _generate_response_cohere(self, prompt): | |
| # Call Cohere's generate API | |
| response = self.client.generate( | |
| model=self.model_name, | |
| prompt=prompt, | |
| max_tokens=2000, # Adjust token limit based on requirements | |
| temperature=0.3, # Control creativity | |
| stop_sequences=None, | |
| ) | |
| if response.generations: | |
| return response.generations[0].text.strip() | |
| else: | |
| raise Exception("No response generated by Cohere API.") | |
| st.title("Simple Text Generator") | |
| api_key = os.getenv("API_KEY") | |
| s = api_key[:5] | |
| print("KEY: ", s) | |
| rag_pipeline = RAGPipeline(vectorstore=vectorstore, api_key=api_key) | |
| print("Enter your question Here: ") | |
| question = st.text_input("أدخل سؤالك هنا") | |
| if st.button("Generate Answer"): | |
| response = rag_pipeline.generate_response(question) | |
| st.write(response) | |
| print("Question: ", question) | |
| print("Response: ", response) | |