Spaces:
Sleeping
Sleeping
| # Utilities to build a RAG system to query information from the | |
| # gwIAS search pipeline using Langchain | |
| # Thanks to Pablo Villanueva Domingo for sharing his CAMELS template | |
| # https://huggingface.co/spaces/PabloVD/CAMELSDocBot | |
| from langchain import hub | |
| from langchain_chroma import Chroma | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.runnables import RunnablePassthrough | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.document_loaders import WebBaseLoader, PyPDFLoader | |
| from langchain.schema import Document | |
| import requests | |
| import json | |
| import base64 | |
| from bs4 import BeautifulSoup | |
| import re | |
| def github_to_raw(url): | |
| """Convert GitHub URL to raw content URL""" | |
| return url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/") | |
| def load_github_notebook(url): | |
| """Load Jupyter notebook from GitHub URL using GitHub API""" | |
| try: | |
| # Convert GitHub blob URL to API URL | |
| if "github.com" in url and "/blob/" in url: | |
| # Extract owner, repo, branch and path from URL | |
| parts = url.replace("https://github.com/", "").split("/") | |
| owner = parts[0] | |
| repo = parts[1] | |
| branch = parts[3] # usually 'main' or 'master' | |
| path = "/".join(parts[4:]) | |
| api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}" | |
| else: | |
| raise ValueError("URL must be a GitHub blob URL") | |
| # Fetch notebook content | |
| response = requests.get(api_url) | |
| response.raise_for_status() | |
| content_data = response.json() | |
| if content_data.get('encoding') == 'base64': | |
| notebook_content = base64.b64decode(content_data['content']).decode('utf-8') | |
| else: | |
| notebook_content = content_data['content'] | |
| # Parse notebook JSON | |
| notebook = json.loads(notebook_content) | |
| docs = [] | |
| cell_count = 0 | |
| # Process each cell | |
| for cell in notebook.get('cells', []): | |
| cell_count += 1 | |
| cell_type = cell.get('cell_type', 'unknown') | |
| source = cell.get('source', []) | |
| # Join source lines | |
| if isinstance(source, list): | |
| content = ''.join(source) | |
| else: | |
| content = str(source) | |
| if content.strip(): # Only add non-empty cells | |
| metadata = { | |
| 'source': url, | |
| 'cell_type': cell_type, | |
| 'cell_number': cell_count, | |
| 'name': f"{url} - Cell {cell_count} ({cell_type})" | |
| } | |
| # Add cell type prefix for better context | |
| formatted_content = f"[{cell_type.upper()} CELL {cell_count}]\n{content}" | |
| docs.append(Document(page_content=formatted_content, metadata=metadata)) | |
| return docs | |
| except Exception as e: | |
| print(f"Error loading notebook from {url}: {str(e)}") | |
| return [] | |
| def clean_text(text): | |
| """Clean text content from a webpage""" | |
| # Remove excessive newlines | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| # Remove excessive whitespace | |
| text = re.sub(r'\s{2,}', ' ', text) | |
| return text.strip() | |
| def clean_github_content(html_content): | |
| """Extract meaningful content from GitHub pages""" | |
| # Ensure we're working with a BeautifulSoup object | |
| if isinstance(html_content, str): | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| else: | |
| soup = html_content | |
| # Remove navigation, footer, and other boilerplate | |
| for element in soup.find_all(['nav', 'footer', 'header']): | |
| element.decompose() | |
| # For README and code files | |
| readme_content = soup.find('article', class_='markdown-body') | |
| if readme_content: | |
| return clean_text(readme_content.get_text()) | |
| # For code files | |
| code_content = soup.find('table', class_='highlight') | |
| if code_content: | |
| return clean_text(code_content.get_text()) | |
| # For directory listings | |
| file_list = soup.find('div', role='grid') | |
| if file_list: | |
| return clean_text(file_list.get_text()) | |
| # Fallback to main content | |
| main_content = soup.find('main') | |
| if main_content: | |
| return clean_text(main_content.get_text()) | |
| # If no specific content found, get text from body | |
| body = soup.find('body') | |
| if body: | |
| return clean_text(body.get_text()) | |
| # Final fallback | |
| return clean_text(soup.get_text()) | |
| class GitHubLoader(WebBaseLoader): | |
| """Custom loader for GitHub pages with better content cleaning""" | |
| def clean_text(self, text): | |
| """Clean text content""" | |
| # Remove excessive newlines and spaces | |
| text = re.sub(r'\n{2,}', '\n', text) | |
| text = re.sub(r'\s{2,}', ' ', text) | |
| # Remove common GitHub boilerplate | |
| text = re.sub(r'Skip to content|Sign in|Search or jump to|Footer navigation|Terms|Privacy|Security|Status|Docs', '', text) | |
| return text.strip() | |
| def _scrape(self, url: str, *args, **kwargs) -> str: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| # For directory listings (tree URLs), use the API | |
| if '/tree/' in url: | |
| parts = url.replace("https://github.com/", "").split("/") | |
| owner = parts[0] | |
| repo = parts[1] | |
| branch = parts[3] # usually 'main' or 'master' | |
| path = "/".join(parts[4:]) if len(parts) > 4 else "" | |
| api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}" | |
| api_response = requests.get(api_url) | |
| api_response.raise_for_status() | |
| contents = api_response.json() | |
| if isinstance(contents, list): | |
| files = [f"{item['name']} ({item['type']})" for item in contents] | |
| return "Directory contents:\n" + "\n".join(files) | |
| else: | |
| return f"Error: Unexpected API response for {url}" | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # For README and markdown files | |
| readme_content = soup.find('article', class_='markdown-body') | |
| if readme_content and hasattr(readme_content, 'get_text'): | |
| return self.clean_text(readme_content.get_text()) | |
| # For code files | |
| code_content = soup.find('table', class_='highlight') | |
| if code_content and hasattr(code_content, 'get_text'): | |
| return self.clean_text(code_content.get_text()) | |
| # For other content, get main content | |
| main_content = soup.find('main') | |
| if main_content and hasattr(main_content, 'get_text'): | |
| return self.clean_text(main_content.get_text()) | |
| # Final fallback: get all text from soup | |
| if hasattr(soup, 'get_text'): | |
| return self.clean_text(soup.get_text()) | |
| else: | |
| return self.clean_text(str(soup)) | |
| def load(self): | |
| docs = [] | |
| for url in self.web_paths: | |
| text = self._scrape(url) | |
| docs.append(Document(page_content=text, metadata={"source": url})) | |
| return docs | |
| class RawContentLoader(WebBaseLoader): | |
| """Loader for raw content from GitHub (Python files, etc.)""" | |
| def _scrape(self, url: str, *args, **kwargs) -> str: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| return response.text | |
| def load(self): | |
| docs = [] | |
| for url in self.web_paths: | |
| text = self._scrape(url) | |
| docs.append(Document(page_content=text, metadata={"source": url})) | |
| return docs | |
| # Load documentation from urls | |
| def load_docs(): | |
| # Get urls | |
| urlsfile = open("urls.txt") | |
| urls = urlsfile.readlines() | |
| urls = [url.replace("\n","") for url in urls if not url.strip().startswith("#") and url.strip()] | |
| urlsfile.close() | |
| # Load documents from URLs | |
| docs = [] | |
| for url in urls: | |
| url = url.strip() | |
| if not url: | |
| continue | |
| # Handle PDF files | |
| if url.endswith('.pdf'): | |
| print(f"Loading PDF: {url}") | |
| try: | |
| loader = PyPDFLoader(url) | |
| pdf_docs = loader.load() | |
| for doc in pdf_docs: | |
| doc.metadata['source'] = url | |
| docs.extend(pdf_docs) | |
| except Exception as e: | |
| print(f"Error loading PDF {url}: {str(e)}") | |
| # Check if URL is a Jupyter notebook | |
| elif url.endswith('.ipynb') and 'github.com' in url and '/blob/' in url: | |
| print(f"Loading notebook: {url}") | |
| notebook_docs = load_github_notebook(url) | |
| docs.extend(notebook_docs) | |
| # Handle raw content URLs (already in raw.githubusercontent.com format) | |
| elif 'raw.githubusercontent.com' in url: | |
| print(f"Loading raw content: {url}") | |
| try: | |
| loader = RawContentLoader([url]) | |
| web_docs = loader.load() | |
| # Preserve original URL in metadata | |
| for doc in web_docs: | |
| doc.metadata['source'] = url | |
| docs.extend(web_docs) | |
| except Exception as e: | |
| print(f"Error loading {url}: {str(e)}") | |
| # Handle Python and Markdown files using raw content (convert from blob to raw) | |
| elif url.endswith(('.py', '.md')) and 'github.com' in url and '/blob/' in url: | |
| print(f"Loading raw content: {url}") | |
| try: | |
| raw_url = github_to_raw(url) | |
| loader = RawContentLoader([raw_url]) | |
| web_docs = loader.load() | |
| # Preserve original URL in metadata | |
| for doc in web_docs: | |
| doc.metadata['source'] = url | |
| docs.extend(web_docs) | |
| except Exception as e: | |
| print(f"Error loading {url}: {str(e)}") | |
| # Handle directory listings | |
| elif '/tree/' in url and 'github.com' in url: | |
| print(f"Loading directory: {url}") | |
| try: | |
| # Parse URL components | |
| parts = url.replace("https://github.com/", "").split("/") | |
| owner = parts[0] | |
| repo = parts[1] | |
| branch = parts[3] # usually 'main' or 'master' | |
| path = "/".join(parts[4:]) if len(parts) > 4 else "" | |
| # Construct API URL | |
| api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}" | |
| response = requests.get(api_url) | |
| response.raise_for_status() | |
| # Parse directory listing | |
| contents = response.json() | |
| if isinstance(contents, list): | |
| # Format directory contents | |
| content = "Directory contents:\n" + "\n".join([f"{item['name']} ({item['type']})" for item in contents]) | |
| docs.append(Document(page_content=content, metadata={'source': url})) | |
| else: | |
| print(f"Error: Unexpected API response for {url}") | |
| except Exception as e: | |
| print(f"Error loading directory {url}: {str(e)}") | |
| else: | |
| print(f"Loading web page: {url}") | |
| try: | |
| loader = GitHubLoader([url]) # Use custom loader | |
| web_docs = loader.load() | |
| docs.extend(web_docs) | |
| except Exception as e: | |
| print(f"Error loading {url}: {str(e)}") | |
| # Add source URLs as document names for reference | |
| for i, doc in enumerate(docs): | |
| if 'source' in doc.metadata: | |
| doc.metadata['name'] = doc.metadata['source'] | |
| else: | |
| doc.metadata['name'] = f"Document {i+1}" | |
| print(f"Loaded {len(docs)} documents:") | |
| for doc in docs: | |
| print(f" - {doc.metadata.get('name')}") | |
| return docs | |
| def extract_reference(url): | |
| """Extract a reference keyword from the URL for display in citations.""" | |
| # Handle GitHub blob URLs | |
| if "blob/main" in url: | |
| return url.split("blob/main/")[-1] | |
| # Handle GitHub tree URLs | |
| elif "tree/main" in url: | |
| return url.split("tree/main/")[-1] or "root" | |
| # Handle raw.githubusercontent.com URLs | |
| elif "raw.githubusercontent.com" in url: | |
| # Example: https://raw.githubusercontent.com/user/repo/branch/path/to/file.py | |
| parts = url.split("raw.githubusercontent.com/")[-1].split("/") | |
| if len(parts) > 3: | |
| # Remove user, repo, branch | |
| return "/".join(parts[3:]) | |
| else: | |
| return url | |
| # For arXiv PDFs and other URLs, just use the filename | |
| elif url.endswith('.pdf') or url.endswith('.ipynb') or url.endswith('.py') or url.endswith('.md'): | |
| return url.split("/")[-1] | |
| return url | |
| # Join content pages for processing | |
| def format_docs(docs): | |
| formatted_docs = [] | |
| for doc in docs: | |
| source = doc.metadata.get('source', 'Unknown source') | |
| reference = f"[{extract_reference(source)}]" | |
| content = doc.page_content | |
| formatted_docs.append(f"{content}\n\nReference: {reference}") | |
| return "\n\n---\n\n".join(formatted_docs) | |
| # Create a RAG chain | |
| def RAG(llm, docs, embeddings): | |
| # Split text | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| splits = text_splitter.split_documents(docs) | |
| # Create vector store | |
| vectorstore = Chroma.from_documents(documents=splits, embedding=embeddings) | |
| # Retrieve and generate using the relevant snippets of the documents | |
| retriever = vectorstore.as_retriever() | |
| # Prompt basis example for RAG systems | |
| prompt = hub.pull("rlm/rag-prompt") | |
| # Adding custom instructions to the prompt | |
| template = prompt.messages[0].prompt.template | |
| template_parts = template.split("\nQuestion: {question}") | |
| combined_template = "You are an assistant for question-answering tasks. "\ | |
| + "Use the following pieces of retrieved context to answer the question. "\ | |
| + "If you don't know the answer, just say that you don't know. "\ | |
| + "Try to keep the answer concise if possible. "\ | |
| + "Write the names of the relevant functions from the retrived code and include code snippets to aid the user's understanding. "\ | |
| + "Include the references used in square brackets at the end of your answer."\ | |
| + template_parts[1] | |
| prompt.messages[0].prompt.template = combined_template | |
| # Create the chain | |
| rag_chain = ( | |
| {"context": retriever | format_docs, "question": RunnablePassthrough()} | |
| | prompt | |
| | llm | |
| | StrOutputParser() | |
| ) | |
| return rag_chain |