Spaces:
Runtime error
Runtime error
| import requests | |
| from bs4 import BeautifulSoup | |
| import re | |
| import time | |
| from .utils import timer_func | |
| def remove_emoji(string): | |
| emoji_pattern = re.compile("[" | |
| u"\U0001F300-\U0001FAD6" # emoticons | |
| u"\U0001F300-\U0001F5FF" # symbols & pictographs | |
| u"\U0001F680-\U0001F6FF" # transport & map symbols | |
| u"\U0001F1E0-\U0001F1FF" # flags (iOS) | |
| u"\U00002702-\U000027B0" | |
| u"\U000024C2-\U0001F251" | |
| "]+", flags=re.UNICODE) | |
| return emoji_pattern.sub(r'', string) | |
| def preprocess(texts): | |
| texts = [text.replace("_", " ") for text in texts] | |
| texts = [i.lower() for i in texts] | |
| texts = [remove_emoji(i) for i in texts] | |
| texts = [re.sub('[^\w\d\s]', '', i) for i in texts] | |
| texts = [re.sub('\s+|\n', ' ', i) for i in texts] | |
| texts = [re.sub('^\s|\s$', '', i) for i in texts] | |
| # texts = [ViTokenizer.tokenize(i) for i in texts] | |
| return texts | |
| class MyCrawler: | |
| headers = { | |
| "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.67 Safari/537.36", | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'DNT': '1', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1' | |
| } | |
| # headers = { | |
| # 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:49.0) Gecko/20100101 Firefox/49.0', | |
| # # 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| # # 'Accept-Language': 'en-US,en;q=0.5', | |
| # # 'Accept-Encoding': 'gzip, deflate', | |
| # # 'DNT': '1', | |
| # # 'Connection': 'keep-alive', | |
| # # 'Upgrade-Insecure-Requests': '1' | |
| # } | |
| def getSoup(self, url: str): | |
| req = requests.get(url,headers=self.headers) | |
| return BeautifulSoup(req.text, 'html.parser') | |
| def crawl_byContainer(self, url: str, article_container: str, body_class: str): | |
| soup = self.getSoup(url) | |
| paragraphs = soup.find(article_container,{"class": body_class}) | |
| if paragraphs: | |
| #Crawl all paragraphs | |
| contents = [] | |
| numOfParagraphs = 0 | |
| for p in paragraphs.find_all("p"): | |
| contents.append(p.get_text()) | |
| numOfParagraphs += 1 | |
| # if numOfParagraphs > 10: | |
| # break | |
| if contents: | |
| result = "\n".join(contents) | |
| if (url.split("/")[2] == "vnexpress.net"): | |
| result = self.crawl_byElement(soup, "p", "description") + "\n" + result | |
| return result | |
| return "" | |
| def crawl_byElement(self, soup, element: str, ele_class: str): | |
| print("by Elements...") | |
| paragraph = soup.find(element,{"class": ele_class}) | |
| if paragraph: | |
| print(paragraph.get_text()) | |
| return paragraph.get_text() | |
| return "" | |
| def crawl_webcontent(self, url: str): | |
| provider = url.split("/")[2] | |
| content = "" | |
| if provider == "thanhnien.vn" or provider == "tuoitre.vn": | |
| content = self.crawl_byContainer(url, "div", "afcbc-body") | |
| elif provider == "vietnamnet.vn": | |
| content = self.crawl_byContainer(url, "div", "maincontent") | |
| elif provider == "vnexpress.net": | |
| content = self.crawl_byContainer(url, "article", "fck_detail") | |
| elif provider == "www.24h.com.vn": | |
| content = self.crawl_byContainer(url, "article", "cate-24h-foot-arti-deta-info") | |
| elif provider == "vov.vn": | |
| content = self.crawl_byContainer(url, "div", "article-content") | |
| elif provider == "vtv.vn": | |
| content = self.crawl_byContainer(url, "div", "ta-justify") | |
| elif provider == "vi.wikipedia.org": | |
| content = self.crawl_byContainer(url, "div", "mw-content-ltr") | |
| elif provider == "www.vinmec.com": | |
| content = self.crawl_byContainer(url, "div", "block-content") | |
| elif provider == "vietstock.vn": | |
| content = self.crawl_byContainer(url, "div", "single_post_heading") | |
| elif provider == "vneconomy.vn": | |
| content = self.crawl_byContainer(url, "article", "detail-wrap") | |
| elif provider == "dantri.com.vn": | |
| content = self.crawl_byContainer(url, "article", "singular-container") | |
| # elif provider == "plo.vn": | |
| # content = self.crawl_byContainer(url, "div", "article__body") | |
| return provider, url, content | |
| #def crawl_redir(url): | |
| def search(self, claim: str, count: int = 1): | |
| processed_claim = preprocess([claim])[0] | |
| num_words = 100 | |
| ls_word = processed_claim.split(" ") | |
| claim_short = " ".join(ls_word[:num_words]) | |
| print(claim_short) | |
| query = claim_short | |
| # query = '+'.join(claim_short.split(" ")) | |
| try: | |
| # print(soup.prettify()) | |
| #get all URLs | |
| attemp_time = 0 | |
| urls = [] | |
| while len(urls) == 0 and attemp_time < 3: | |
| req=requests.get("https://www.bing.com/search?", headers=self.headers, params={ | |
| "q": query, | |
| "responseFilter":"-images", | |
| "responseFilter":"-videos" | |
| }) | |
| print("Query URL: " + req.url) | |
| print("Crawling Attempt " + str(attemp_time)) | |
| soup = BeautifulSoup(req.text, 'html.parser') | |
| completeData = soup.find_all("li",{"class":"b_algo"}) | |
| for data in completeData: | |
| urls.append(data.find("a", href=True)["href"]) | |
| attemp_time += 1 | |
| time.sleep(1) | |
| print("Got " + str(len(urls)) + " urls") | |
| result = [] | |
| for url in urls: | |
| print("Crawling... " + url) | |
| provider, url, content = self.crawl_webcontent(url) | |
| if content: | |
| result.append({ | |
| "provider": provider, | |
| "url": url, | |
| "content": content | |
| }) | |
| count -= 1 | |
| if count == 0: | |
| break | |
| return result | |
| except Exception as e: | |
| print(e) | |
| return [] | |
| def searchGoogle(self, claim: str, count: int = 1): | |
| processed_claim = preprocess([claim])[0] | |
| num_words = 100 | |
| ls_word = processed_claim.split(" ") | |
| claim_short = " ".join(ls_word[:num_words]) | |
| print(claim_short) | |
| query = claim_short | |
| # query = '+'.join(claim_short.split(" ")) | |
| try: | |
| # print(soup.prettify()) | |
| #get all URLs | |
| attemp_time = 0 | |
| urls = [] | |
| while len(urls) == 0 and attemp_time < 3: | |
| req=requests.get("https://www.google.com/search?", headers=self.headers, params={ | |
| "q": query | |
| }) | |
| print("Query URL: " + req.url) | |
| print("Crawling Attempt " + str(attemp_time)) | |
| soup = BeautifulSoup(req.text, 'html.parser') | |
| completeData = soup.find_all("a",{"jsname":"UWckNb"}) | |
| for data in completeData: | |
| urls.append(data["href"]) | |
| attemp_time += 1 | |
| time.sleep(1) | |
| print("Got " + str(len(urls)) + " urls") | |
| result = [] | |
| for url in urls: | |
| print("Crawling... " + url) | |
| provider, url, content = self.crawl_webcontent(url) | |
| if content: | |
| result.append({ | |
| "provider": provider, | |
| "url": url, | |
| "content": content | |
| }) | |
| count -= 1 | |
| if count == 0: | |
| break | |
| return result | |
| except Exception as e: | |
| print(e) | |
| return [] | |
| def scraping(self, url: str): | |
| try: | |
| provider, url, content = self.crawl_webcontent(url) | |
| if content: | |
| return True | |
| return False | |
| except Exception as e: | |
| print(e) | |
| return False |