Spaces:
Sleeping
Sleeping
| import time | |
| import urllib.request | |
| from urllib.parse import quote | |
| from seleniumbase import SB | |
| import markdownify | |
| from bs4 import BeautifulSoup | |
| from requests_html import HTMLSession | |
| import html2text | |
| import re | |
| from openai import OpenAI | |
| import tiktoken | |
| from zenrows import ZenRowsClient | |
| import requests | |
| import os | |
| from dotenv import load_dotenv | |
| from threading import Thread | |
| load_dotenv() | |
| ZENROWS_KEY = os.getenv('ZENROWS_KEY') | |
| you_key = os.getenv("YOU_API_KEY") | |
| client = OpenAI() | |
| def get_fast_url_source(url): | |
| session = HTMLSession() | |
| r = session.get(url) | |
| return r.text | |
| def convert_html_to_text(html): | |
| h = html2text.HTML2Text() | |
| h.body_width = 0 # Disable line wrapping | |
| text = h.handle(html) | |
| text = re.sub(r'\n\s*', '', text) | |
| text = re.sub(r'\* \\', '', text) | |
| " ".join(text.split()) | |
| return text | |
| def get_google_search_url(query): | |
| url = 'https://www.google.com/search?q=' + quote(query) | |
| # Perform the request | |
| request = urllib.request.Request(url) | |
| # Set a normal User Agent header, otherwise Google will block the request. | |
| request.add_header('User-Agent', | |
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36') | |
| raw_response = urllib.request.urlopen(request).read() | |
| # Read the repsonse as a utf-8 string | |
| html = raw_response.decode("utf-8") | |
| # The code to get the html contents here. | |
| soup = BeautifulSoup(html, 'html.parser') | |
| # Find all the search result divs | |
| divs = soup.select("#search div.g") | |
| # print(divs) | |
| url = [] | |
| for div in divs: | |
| # Search for a h3 tag | |
| results = div.select("h3") | |
| urls = div.select('a') | |
| # Check if we have found a result | |
| # if (len(results) >= 1): | |
| # # Print the title | |
| # h3 = results[0] | |
| # print(h3.get_text()) | |
| url.append(urls[0]['href']) | |
| return url | |
| def format_text(text): | |
| soup = BeautifulSoup(text, 'html.parser') | |
| results = soup.find_all(['p', 'h1', 'h2', 'span']) | |
| text = '' | |
| for key, result in enumerate(results): | |
| if key % 2 == 0: | |
| text = text + str(result) + ' ' | |
| else: | |
| text = text + str(result) + ' ' | |
| return text | |
| def get_page_source_selenium_base(url): | |
| with SB(uc_cdp=True, guest_mode=True, headless=True) as sb: | |
| sb.open(url) | |
| sb.sleep(5) | |
| page_source = sb.driver.get_page_source() | |
| return page_source | |
| def num_tokens_from_string(string: str, encoding_name: str) -> int: | |
| encoding = tiktoken.get_encoding(encoding_name) | |
| # encoding = tiktoken.encoding_for_model(encoding_name) | |
| num_tokens = len(encoding.encode(string)) | |
| return num_tokens | |
| def encoding_getter(encoding_type: str): | |
| """ | |
| Returns the appropriate encoding based on the given encoding type (either an encoding string or a model name). | |
| """ | |
| if "k_base" in encoding_type: | |
| return tiktoken.get_encoding(encoding_type) | |
| else: | |
| return tiktoken.encoding_for_model(encoding_type) | |
| def tokenizer(string: str, encoding_type: str) -> list: | |
| """ | |
| Returns the tokens in a text string using the specified encoding. | |
| """ | |
| encoding = encoding_getter(encoding_type) | |
| tokens = encoding.encode(string) | |
| return tokens | |
| def token_counter(string: str, encoding_type: str) -> int: | |
| """ | |
| Returns the number of tokens in a text string using the specified encoding. | |
| """ | |
| num_tokens = len(tokenizer(string, encoding_type)) | |
| return num_tokens | |
| def format_output(text): | |
| page_source = format_text(text) | |
| page_source = markdownify.markdownify(page_source) | |
| # page_source = convert_html_to_text(page_source) | |
| page_source = " ".join(page_source.split()) | |
| return page_source | |
| def clean_text(text): | |
| # Remove URLs | |
| text = re.sub(r'http[s]?://\S+', '', text) | |
| # Remove special characters and punctuation (keep only letters, numbers, and basic punctuation) | |
| text = re.sub(r'[^a-zA-Z0-9\s,.!?-]', '', text) | |
| # Normalize whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def call_open_ai(system_prompt, max_tokens=800, stream=False): | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": system_prompt | |
| } | |
| ] | |
| stream = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=messages, | |
| temperature=0, | |
| max_tokens=max_tokens, | |
| top_p=0, | |
| frequency_penalty=0, | |
| presence_penalty=0, | |
| stream=stream | |
| ) | |
| return stream.choices[0].message.content | |
| def url_summary(text, question): | |
| system_prompt = """ | |
| Summarize the given text, please add all the important topics and numerical data. | |
| While summarizing please keep this question in mind. | |
| question:- {question} | |
| text: | |
| {text} | |
| """.format(question=question, text=text) | |
| return call_open_ai(system_prompt=system_prompt, max_tokens=800) | |
| def get_google_search_query(question): | |
| system_prompt = """ | |
| convert this question to the Google search query and return only query. | |
| question:- {question} | |
| """.format(question=question) | |
| return call_open_ai(system_prompt=system_prompt, max_tokens=50) | |
| def is_urlfile(url): | |
| # Check if online file exists | |
| try: | |
| r = urllib.request.urlopen(url) # response | |
| return r.getcode() == 200 | |
| except urllib.request.HTTPError: | |
| return False | |
| def check_url_pdf_file(url): | |
| r = requests.get(url) | |
| content_type = r.headers.get('content-type') | |
| if 'application/pdf' in content_type: | |
| return True | |
| else: | |
| return False | |
| def get_ai_snippets_for_query(query, num): | |
| headers = {"X-API-Key": you_key} | |
| params = {"query": query} | |
| return requests.get( | |
| f"https://api.ydc-index.io/search?query={query}&num_web_results={num}", | |
| params=params, | |
| headers=headers, | |
| ).json().get('hits') | |
| def get_web_search_you(query, num): | |
| docs = get_ai_snippets_for_query(query, num) | |
| markdown = "" | |
| for doc in docs: | |
| for key, value in doc.items(): | |
| if key == 'snippets': | |
| markdown += f"{key}:\n" | |
| for snippet in value: | |
| markdown += f"- {snippet}\n" | |
| else: | |
| markdown += f"{key}: {value}\n" | |
| markdown += "\n" | |
| return markdown | |
| def zenrows_scrapper(url): | |
| zen_client = ZenRowsClient(ZENROWS_KEY) | |
| params = {"js_render": "true"} | |
| response = zen_client.get(url, params=params) | |
| return response.text | |
| def get_new_question_from_history(pre_question, new_question, answer): | |
| system_prompt = """ | |
| Generate a new Google search query using the previous question and answer. And return only the query. | |
| previous question:- {pre_question} | |
| answer:- {answer} | |
| new question:- {new_question} | |
| """.format(pre_question=pre_question, answer=answer, new_question=new_question) | |
| return call_open_ai(system_prompt=system_prompt, max_tokens=50) | |
| def scraping_job(strategy, question, url, results, key): | |
| if strategy == 'Deep': | |
| # page_source = get_page_source_selenium_base(url) | |
| page_source = zenrows_scrapper(url) | |
| formatted_page_source = format_output(page_source) | |
| formatted_page_source = clean_text(formatted_page_source) | |
| else: | |
| page_source = get_fast_url_source(url) | |
| formatted_page_source = format_output(page_source) | |
| formatted_page_source = clean_text(formatted_page_source) | |
| tokens = token_counter(formatted_page_source, 'gpt-3.5-turbo') | |
| if tokens >= 15585: | |
| results[key] = '' | |
| else: | |
| summary = url_summary(formatted_page_source, question) | |
| results[key] = summary | |
| def get_docs_from_web(question, history, n_web_search, strategy): | |
| if history: | |
| question = get_new_question_from_history(history[0][0], question, history[0][1]) | |
| docs = '' | |
| if strategy == 'Normal Fast': | |
| docs = get_web_search_you(question, n_web_search) | |
| else: | |
| urls = get_google_search_url(get_google_search_query(question))[:n_web_search] | |
| urls = list(set(urls)) | |
| yield f"Scraping started for {len(urls)} urls:-\n\n" | |
| threads = [None] * len(urls) | |
| results = [None] * len(urls) | |
| for key, url in enumerate(urls): | |
| if '.pdf' in url or '.PDF' in url: | |
| yield f"Scraping skipped pdf detected. {key + 1}/{len(urls)} - {url} β\n" | |
| results[key] = '' | |
| continue | |
| threads[key] = Thread(target=scraping_job, args=(strategy, question, url, results, key)) | |
| threads[key].start() | |
| for i in range(len(threads)): | |
| if threads[i] is not None: | |
| threads[i].join() | |
| for key, result in enumerate(results): | |
| if result is not None and result != '': | |
| docs += result | |
| docs += '\n Source:-' + urls[key] + '\n\n' | |
| yield f"Scraping Done {key + 1}/{len(urls)} - {urls[key]} β \n" | |
| yield {"data": docs} | |