Spaces:
Running
Running
| import requests | |
| from io import StringIO | |
| from Bio import SeqIO | |
| import os | |
| import time | |
| import pandas as pd | |
| import intervaltree | |
| def find_domains(email, sequence, name): | |
| # send request to interproscan api | |
| headers = { | |
| 'Content-Type': 'application/x-www-form-urlencoded', | |
| 'Accept': 'text/plain', | |
| } | |
| data= { | |
| 'email': email, | |
| 'stype': 'p', | |
| 'sequence': f'{sequence}'} | |
| job_id_response = requests.post('https://www.ebi.ac.uk/Tools/services/rest/iprscan5/run', headers=headers, data=data) | |
| job_id = job_id_response.text | |
| # get results | |
| headers = { | |
| 'Accept': 'application/json', | |
| } | |
| job_result_url = f'https://www.ebi.ac.uk/Tools/services/rest/iprscan5/result/{job_id}/json' | |
| json_output = None | |
| entries = dict() | |
| with requests.Session() as s: | |
| # try 10 times if not successful print error | |
| c=0 | |
| while c<10: | |
| job_result_response = s.get(job_result_url, headers=headers) | |
| if job_result_response.status_code == 200: | |
| json_output= job_result_response.json()['results'][0] | |
| print('InterProScan job done') | |
| break | |
| else: | |
| time.sleep(60) | |
| c+=1 | |
| if json_output is None: | |
| result_text = 'InterProScan job failed' | |
| return [result_text, job_id, job_result_response.text] | |
| else: | |
| for elem in json_output['matches']: | |
| entry = elem['signature']['entry'] | |
| location_list = [f"{i['start']}-{i['end']}" for i in elem['locations']] | |
| if type(entry) == dict and entry['type'] == 'DOMAIN': | |
| if entry['accession'] not in entries: | |
| entries[entry['accession']] = { | |
| 'name': entry['name'], | |
| # add locations as a list | |
| 'locations': location_list | |
| } | |
| else: | |
| try: | |
| entries[entry['accession']]['locations'].extend(location_list) | |
| except AttributeError: | |
| entries[entry['accession']]['locations'] = entries[entry['accession']]['locations'].split(' ') | |
| entries[entry['accession']]['locations'] = [i for i in entries[entry['accession']]['locations'] if i] | |
| entries[entry['accession']]['locations'].extend(location_list) | |
| entries[entry['accession']]['locations'] = list(set(entries[entry['accession']]['locations'])) | |
| if len(entries[entry['accession']]['locations']) > 1: | |
| entries[entry['accession']]['locations'] = merge_locations(entries[entry['accession']]['locations']) | |
| entries[entry['accession']]['locations'] = sorted([i.split('-') for i in entries[entry['accession']]['locations']], key=lambda x: (int(x[0]), int(x[1]))) | |
| entries[entry['accession']]['locations'] = ['-'.join(i) for i in entries[entry['accession']]['locations']] | |
| if entries: | |
| result_text = 'Domains found.' | |
| # create domains dataframe | |
| domains_df = pd.DataFrame.from_dict(entries, orient='index').reset_index() | |
| domains_df['protein_name'] = name | |
| domains_df = domains_df[['protein_name', 'index', 'name', 'locations']] | |
| domains_df.columns = ['protein_name', 'domain_accession', 'domain_name', 'domain_locations'] | |
| return [result_text, domains_df] | |
| else: | |
| result_text = 'No domains found.' | |
| return [result_text] | |
| # generate protein function predictions based on domain2go mappings | |
| def merge_locations(locations): | |
| temp_locs= [i.split('-') for i in locations] | |
| temp_locs = [[int(i[0]), int(i[1])] for i in temp_locs] | |
| tree = intervaltree.IntervalTree.from_tuples(temp_locs) | |
| tree.merge_overlaps() | |
| merged_locations = ['-'.join([str(i.begin), str(i.end)]) for i in tree] | |
| return merged_locations | |
| def generate_function_predictions(domains_df, mapping_path): | |
| # read domain2go mappings | |
| domain2go_df = pd.read_csv(os.path.join(mapping_path, 'finalized_domain2go_mappings.txt')) | |
| print('Domain2GO mappings loaded') | |
| # merge domain2go mappings with domains found in protein sequence | |
| merged_df = pd.merge(domains_df, domain2go_df, left_on='domain_accession', right_on='Interpro') | |
| print('Function predictions generated.') | |
| # if merged_df is empty return | |
| if merged_df.empty: | |
| result_text = 'No function predictions found.' | |
| return [result_text] | |
| else: | |
| merged_df['protein_name'] = domains_df['protein_name'].iloc[0] | |
| merged_df = merged_df[['protein_name', 'GO', 'GO_name', 'GO_aspect', 'domain_locations', 's', 'domain_accession', 'domain_name',]] | |
| merged_df.columns = ['protein_name', 'GO_ID', 'GO_term', 'GO_category', 'sequence_region', 'probability', 'domain_accession', 'domain_name',] | |
| # save protein function predictions | |
| protein_name = domains_df['protein_name'].iloc[0] | |
| result_text= 'Function predictions found.' | |
| return [result_text, merged_df] |