Spaces:
Runtime error
Runtime error
| import requests | |
| import json | |
| import yaml | |
| import scipdf | |
| import os | |
| import time | |
| import aiohttp | |
| import asyncio | |
| import numpy as np | |
| import random | |
| def get_content_between_a_b(start_tag, end_tag, text): | |
| extracted_text = "" | |
| start_index = text.find(start_tag) | |
| while start_index != -1: | |
| end_index = text.find(end_tag, start_index + len(start_tag)) | |
| if end_index != -1: | |
| extracted_text += text[start_index + len(start_tag) : end_index] + " " | |
| start_index = text.find(start_tag, end_index + len(end_tag)) | |
| else: | |
| break | |
| return extracted_text.strip() | |
| def extract(text, type): | |
| if text: | |
| target_str = get_content_between_a_b(f"<{type}>", f"</{type}>", text) | |
| if target_str: | |
| return target_str | |
| else: | |
| return text | |
| else: | |
| return "" | |
| def download(url): | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' | |
| 'AppleWebKit/537.36 (KHTML, like Gecko) ' | |
| 'Chrome/87.0.4280.88 Safari/537.36' | |
| } # Mimic a common browser's User-Agent | |
| response = requests.get(url,headers=headers,timeout=120) | |
| if response.status_code == 200: | |
| return response.content | |
| else: | |
| print(f"Failed to download the file from the URL: {url}") | |
| return None | |
| except requests.RequestException as e: | |
| print(f"An error occurred while downloading the file from the URL: {url}") | |
| print(e) | |
| return None | |
| except Exception as e: | |
| print(f"An unexpected error occurred while downloading the file from the URL: {url}") | |
| print(e) | |
| return None | |
| class Result: | |
| def __init__(self,title="",abstract="",article = "",citations_conut = 0,year = None) -> None: | |
| self.title = title | |
| self.abstract = abstract | |
| self.article = article | |
| self.citations_conut = citations_conut | |
| self.year = year | |
| # Define the API endpoint URL | |
| semantic_fields = ["title", "abstract", "year", "authors.name", "authors.paperCount", "authors.citationCount","authors.hIndex","url","referenceCount","citationCount","influentialCitationCount","isOpenAccess","openAccessPdf","fieldsOfStudy","s2FieldsOfStudy","embedding.specter_v1","embedding.specter_v2","publicationDate","citations"] | |
| fieldsOfStudy = ["Computer Science","Medicine","Chemistry","Biology","Materials Science","Physics","Geology","Art","History","Geography","Sociology","Business","Political Science","Philosophy","Art","Literature","Music","Economics","Philosophy","Mathematics","Engineering","Environmental Science","Agricultural and Food Sciences","Education","Law","Linguistics"] | |
| # citations.paperId, citations.title, citations.year, citations.authors.name, citations.authors.paperCount, citations.authors.citationCount, citations.authors.hIndex, citations.url, citations.referenceCount, citations.citationCount, citations.influentialCitationCount, citations.isOpenAccess, citations.openAccessPdf, citations.fieldsOfStudy, citations.s2FieldsOfStudy, citations.publicationDate | |
| # publicationDateOrYear: 2019-03-05 ; 2019-03 ; 2019 ; 2016-03-05:2020-06-06 ; 1981-08-25: ; :2020-06-06 ; 1981:2020 | |
| # publicationTypes: Review ; JournalArticle CaseReport ; ClinicalTrial ; Dataset ; Editorial ; LettersAndComments ; MetaAnalysis ; News ; Study ; Book ; BookSection | |
| def process_fields(fields): | |
| return ",".join(fields) | |
| class SementicSearcher: | |
| def __init__(self, ban_paper = []) -> None: | |
| self.ban_paper = ban_paper | |
| def search_papers(self, query, limit=5, offset=0, fields=["title", "paperId", "abstract", "isOpenAccess", 'openAccessPdf', "year","publicationDate","citations.title","citations.abstract","citations.isOpenAccess","citations.openAccessPdf","citations.citationCount","citationCount","citations.year"], | |
| publicationDate=None, minCitationCount=0, year=None, | |
| publicationTypes=None, fieldsOfStudy=None): | |
| url = 'https://api.semanticscholar.org/graph/v1/paper/search' | |
| fields = process_fields(fields) if isinstance(fields, list) else fields | |
| # More specific query parameter | |
| query_params = { | |
| 'query': query, | |
| "limit": limit, | |
| "offset": offset, | |
| 'fields': fields, | |
| 'publicationDateOrYear': publicationDate, | |
| 'minCitationCount': minCitationCount, | |
| 'year': year, | |
| 'publicationTypes': publicationTypes, | |
| 'fieldsOfStudy': fieldsOfStudy | |
| } | |
| # Load the API key from the configuration file | |
| api_key = os.environ.get('SEMENTIC_SEARCH_API_KEY',None) | |
| headers = {'x-api-key': api_key} if api_key else None | |
| try: | |
| filtered_query_params = {key: value for key, value in query_params.items() if value is not None} | |
| response = requests.get(url, params=filtered_query_params, headers=headers) | |
| if response.status_code == 200: | |
| response_data = response.json() | |
| return response_data | |
| elif response.status_code == 429: | |
| time.sleep(1) | |
| print(f"Request failed with status code {response.status_code}: begin to retry") | |
| return self.search_papers(query, limit, offset, fields, publicationDate, minCitationCount, year, publicationTypes, fieldsOfStudy) | |
| else: | |
| print(f"Request failed with status code {response.status_code}: {response.text}") | |
| return None | |
| except requests.RequestException as e: | |
| print(f"An error occurred: {e}") | |
| return None | |
| def cal_cosine_similarity(self, vec1, vec2): | |
| return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) | |
| def cal_cosine_similarity_matric(self,matric1, matric2): | |
| if isinstance(matric1, list): | |
| matric1 = np.array(matric1) | |
| if isinstance(matric2, list): | |
| matric2 = np.array(matric2) | |
| if len(matric1.shape) == 1: | |
| matric1 = matric1.reshape(1, -1) | |
| if len(matric2.shape) == 1: | |
| matric2 = matric2.reshape(1, -1) | |
| dot_product = np.dot(matric1, matric2.T) | |
| norm1 = np.linalg.norm(matric1, axis=1) | |
| norm2 = np.linalg.norm(matric2, axis=1) | |
| cos_sim = dot_product / np.outer(norm1, norm2) | |
| scores = cos_sim.flatten() | |
| return scores.tolist() | |
| def read_arxiv_from_path(self, pdf_path): | |
| def is_pdf(binary_data): | |
| pdf_header = b'%PDF-' | |
| return binary_data.startswith(pdf_header) | |
| try: | |
| flag = is_pdf(pdf_path) | |
| if not flag: | |
| return None | |
| except Exception as e: | |
| pass | |
| try: | |
| article_dict = scipdf.parse_pdf_to_dict(pdf_path) | |
| except Exception as e: | |
| print(f"Failed to parse the PDF") | |
| return None | |
| return article_dict | |
| def get_paper_embbeding_and_score(self,query_embedding, paper,llm): | |
| paper_content = f""" | |
| Title: {paper['title']} | |
| Abstract: {paper['abstract']} | |
| """ | |
| paper_embbeding = llm.get_embbeding(paper_content) | |
| paper_embbeding = np.array(paper_embbeding) | |
| score = self.cal_cosine_similarity(query_embedding,paper_embbeding) | |
| return [paper,score] | |
| def rerank_papers(self, query_embedding, paper_list,llm): | |
| if len(paper_list) == 0: | |
| return [] | |
| paper_list = [paper for paper in paper_list if paper] | |
| paper_contents = [] | |
| for paper in paper_list: | |
| paper_content = f""" | |
| Title: {paper['title']} | |
| Abstract: {paper['abstract']} | |
| """ | |
| paper_contents.append(paper_content) | |
| paper_contents_embbeding = llm.get_embbeding(paper_contents) | |
| paper_contents_embbeding = np.array(paper_contents_embbeding) | |
| scores = self.cal_cosine_similarity_matric(query_embedding,paper_contents_embbeding) | |
| # 根据score对paper_list进行排序 | |
| paper_list = sorted(zip(paper_list,scores),key = lambda x: x[1],reverse = True) | |
| paper_list = [paper[0] for paper in paper_list] | |
| return paper_list | |
| def search(self,query,max_results = 5 ,paper_list = None ,rerank_query = None,llm = None,year = None,publicationDate = None,need_download = True,fields = ["title", "paperId", "abstract", "isOpenAccess", 'openAccessPdf', "year","publicationDate","citationCount"]): | |
| if rerank_query: | |
| rerank_query_embbeding = llm.get_embbeding(rerank_query) | |
| rerank_query_embbeding = np.array(rerank_query_embbeding) | |
| readed_papers = [] | |
| if paper_list: | |
| if isinstance(paper_list,set): | |
| paper_list = list(paper_list) | |
| if len(paper_list) == 0 : | |
| pass | |
| elif isinstance(paper_list[0], str): | |
| readed_papers = paper_list | |
| elif isinstance(paper_list[0], Result): | |
| readed_papers = [paper.title for paper in paper_list] | |
| print(f"Searching for papers related to the query: <{query}>") | |
| results = self.search_papers(query,limit = 10 * max_results,year=year,publicationDate = publicationDate,fields = fields) | |
| if not results or "data" not in results: | |
| return [] | |
| new_results = [] | |
| for result in results['data']: | |
| if result['title'] in self.ban_paper: | |
| continue | |
| new_results.append(result) | |
| results = new_results | |
| final_results = [] | |
| if need_download: | |
| paper_candidates = [] | |
| for result in results: | |
| if not result['isOpenAccess'] or not result['openAccessPdf'] or result['title'] in readed_papers: | |
| continue | |
| else: | |
| paper_candidates.append(result) | |
| else: | |
| paper_candidates = results | |
| if llm and rerank_query: | |
| paper_candidates = self.rerank_papers(rerank_query_embbeding, paper_candidates,llm) | |
| if need_download: | |
| for result in paper_candidates: | |
| pdf_link = result['openAccessPdf']["url"] | |
| try: | |
| content = self.download_pdf(pdf_link) | |
| if not content: | |
| continue | |
| except Exception as e: | |
| continue | |
| title = result['title'] | |
| abstract = result['abstract'] | |
| citationCount = result['citationCount'] | |
| year = result['year'] | |
| article = self.read_arxiv_from_path(content) | |
| if not article: | |
| continue | |
| final_results.append(Result(title,abstract,article,citationCount,year)) | |
| if len(final_results) >= max_results: | |
| break | |
| else: | |
| for result in paper_candidates: | |
| title = result['title'] | |
| abstract = result['abstract'] | |
| citationCount = result['citationCount'] | |
| year = result['year'] | |
| final_results.append(Result(title,abstract,None,citationCount,year)) | |
| if len(final_results) >= max_results: | |
| break | |
| return final_results | |
| def search_related_paper(self,title,need_citation = True,need_reference = True,rerank_query = None,llm = None,paper_list = []): | |
| print(f"Searching for the related papers of <{title}>, need_citation: {need_citation}, need_reference: {need_reference}") | |
| fileds = ["title","abstract","citations.title","citations.abstract","citations.citationCount","references.title","references.abstract","references.citationCount","citations.isOpenAccess","citations.openAccessPdf","references.isOpenAccess","references.openAccessPdf","citations.year","references.year"] | |
| results = self.search_papers(title,limit = 3,fields=fileds) | |
| related_papers = [] | |
| related_papers_title = [] | |
| if not results or "data" not in results: | |
| return None | |
| for result in results["data"]: | |
| if not result: | |
| continue | |
| if need_citation: | |
| for citation in result["citations"]: | |
| if "openAccessPdf" not in citation or not citation["openAccessPdf"]: | |
| continue | |
| elif citation["title"] in related_papers_title or citation["title"] in self.ban_paper or citation["title"] in paper_list: | |
| continue | |
| elif citation["isOpenAccess"] == False or citation["openAccessPdf"] == None: | |
| continue | |
| else: | |
| related_papers.append(citation) | |
| related_papers_title.append(citation["title"]) | |
| if need_reference: | |
| for reference in result["references"]: | |
| if "openAccessPdf" not in reference or not reference["openAccessPdf"]: | |
| continue | |
| elif reference["title"] in related_papers_title or reference["title"] in self.ban_paper or reference["title"] in paper_list: | |
| continue | |
| elif reference["isOpenAccess"] == False or reference["openAccessPdf"] == None: | |
| continue | |
| else: | |
| related_papers.append(reference) | |
| related_papers_title.append(reference["title"]) | |
| if result: | |
| break | |
| if len(related_papers) >= 200: | |
| related_papers = related_papers[:200] | |
| if rerank_query and llm: | |
| rerank_query_embbeding = llm.get_embbeding(rerank_query) | |
| rerank_query_embbeding = np.array(rerank_query_embbeding) | |
| related_papers = self.rerank_papers(rerank_query_embbeding, related_papers,llm) | |
| related_papers = [[paper["title"],paper["abstract"],paper["openAccessPdf"]["url"],paper["citationCount"],paper['year']] for paper in related_papers] | |
| else: | |
| related_papers = [[paper["title"],paper["abstract"],paper["openAccessPdf"]["url"],paper["citationCount"],paper['year']] for paper in related_papers] | |
| related_papers = sorted(related_papers,key = lambda x: x[3],reverse = True) | |
| print(f"Found {len(related_papers)} related papers") | |
| for paper in related_papers: | |
| url = paper[2] | |
| content = self.download_pdf(url) | |
| if content: | |
| article = self.read_arxiv_from_path(content) | |
| if not article: | |
| continue | |
| result = Result(paper[0],paper[1],article,paper[3],paper[4]) | |
| return result | |
| return None | |
| def download_pdf(self, pdf_link): | |
| content = download(pdf_link) | |
| return content | |
| def read_paper_title_abstract(self,article): | |
| title = article["title"] | |
| abstract = article["abstract"] | |
| paper_content = f""" | |
| Title: {title} | |
| Abstract: {abstract} | |
| """ | |
| return paper_content | |
| def read_paper_content(self,article): | |
| paper_content = self.read_paper_title_abstract(article) | |
| for section in article["sections"]: | |
| paper_content += f"section: {section['heading']}\n content: {section['text']}\n ref_ids: {section['publication_ref']}\n" | |
| return paper_content | |
| def read_paper_content_with_ref(self,article): | |
| paper_content = self.read_paper_content(article) | |
| paper_content += "<References>\n" | |
| i = 1 | |
| for refer in article["references"]: | |
| ref_id = refer["ref_id"] | |
| title = refer["title"] | |
| year = refer["year"] | |
| paper_content += f"Ref_id:{ref_id} Title: {title} Year: ({year})\n" | |
| i += 1 | |
| paper_content += "</References>\n" | |
| return paper_content | |