Spaces:
Runtime error
Runtime error
| from spacy.tokens import Span | |
| from spacy.tokens import Doc | |
| from spacy.tokens import Token | |
| import regex_spatial | |
| from spacy.language import Language | |
| import re | |
| from utils import llm_ent_extract | |
| id ="" | |
| rse_id = "rse_id" | |
| def set_extension(): | |
| Span.set_extension(rse_id, default = "",force = True) | |
| Doc.set_extension(rse_id, default = "",force = True) | |
| Token.set_extension(rse_id, default = "",force = True) | |
| def get_level1(doc, sentence, ent): | |
| return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level1_regex()) | |
| def get_level2(doc, sentence, ent): | |
| return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level2_regex()) | |
| def get_level3(doc, sentence, ent): | |
| return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level3_regex()) | |
| def find_ent_by_regex(doc, sentence, ent, regex): | |
| global id | |
| if id == "": | |
| id = ent.text | |
| for match in re.finditer(regex, doc.text): | |
| start, end = match.span() | |
| if(start>= sentence.start_char and start<= sentence.end_char): | |
| span = doc.char_span(start, end) | |
| if span is not None: | |
| id = span.text +"_"+ id | |
| if(start > ent.end_char): | |
| ent.end_char = end | |
| else: | |
| ent.start_char = start | |
| return ent | |
| return ent | |
| def update_entities(doc, entity_texts, replace=True): | |
| """ | |
| 根据给定的文本内容标注实体,并直接修改 doc.ents。 | |
| :param doc: spaCy 解析后的 Doc 对象 | |
| :param entity_texts: 字典,键是要标注的实体文本,值是对应的实体类别 | |
| :param replace: 布尔值,True 则替换现有实体,False 则保留现有实体并添加新的 | |
| """ | |
| new_ents = list(doc.ents) if not replace else [] # 如果 replace=False,保留已有实体 | |
| for ent_text, ent_label in entity_texts.items(): | |
| start = doc.text.find(ent_text) # 在全文中查找文本位置 | |
| if start != -1: | |
| start_token = len(doc.text[:start].split()) # 计算起始 token 索引 | |
| end_token = start_token + len(ent_text.split()) # 计算结束 token 索引 | |
| if start_token < len(doc) and end_token <= len(doc): # 确保索引不越界 | |
| new_ent = Span(doc, start_token, end_token, label=ent_label) | |
| new_ents.append(new_ent) | |
| doc.set_ents(new_ents) # 更新 doc.ents | |
| def get_relative_entity(doc, sentence, ent): | |
| global id | |
| id = "" | |
| rel_entity = get_level1(doc, sentence, ent) | |
| # print(1111 ,rel_entity) | |
| rel_entity = get_level2(doc, sentence, rel_entity) | |
| # print(2222 ,rel_entity) | |
| rel_entity = get_level3(doc, sentence, rel_entity) | |
| # print(3333 ,rel_entity) | |
| if("_" in id): | |
| rel_entity = doc.char_span(rel_entity.start_char, rel_entity.end_char, "RSE") | |
| rel_entity._.rse_id = id | |
| # print(id, 'idid') | |
| # print(rel_entity._.rse_id, '._._') | |
| return rel_entity | |
| rel_entity = doc.char_span(ent.start_char, ent.end_char, ent.label_) | |
| rel_entity._.rse_id = id | |
| # print(4444 ,rel_entity) | |
| return rel_entity | |
| def get_spatial_ent(doc): | |
| set_extension() | |
| new_ents = [] | |
| # ents = [ent for ent in doc.ents if ent.label_ == "GPE" or ent.label_ == "LOC"] # 筛选出ase | |
| # LLM 输出 | |
| # GPE = '[###Pyrmont###, ###Glebe###]' # LLM 输出的实体 | |
| GPE = llm_ent_extract.extract_GPE(doc.text) # LLM 输出的实体 | |
| print(doc.text, 'llmin') | |
| print(GPE, 'llout') | |
| GPE = llm_ent_extract.extract(GPE, 'GPE') | |
| print(GPE, 'llmout2') | |
| update_entities(doc, GPE, True) | |
| ents = doc.ents | |
| print(ents, 'eee') | |
| # print(doc, 'ddd') | |
| # print(ents, 'ddd') | |
| # GPE = llm_ent_extract.extract(llm_ent_extract.extract_GPE(doc.text), 'gpe') | |
| # update_entities(doc, GPE) | |
| # LLM 输出完毕 | |
| # print(doc.ents, 111) | |
| # print(doc.ents[2], 222) | |
| # print(type(doc.ents[2]), 222) | |
| # print(doc.ents[2].label_, 333) | |
| # print('----------') | |
| # doc.ents[2] = 'pp' | |
| # print(doc.ents[2], 111) | |
| # print(doc.ents[2].label_, 222) | |
| # print(type(doc.ents), 333) | |
| end = None | |
| for ent in ents: | |
| if ent.end != len(doc): | |
| next_token = doc[ent.end] | |
| if end is not None: | |
| start = end | |
| else: | |
| start = ent.sent.start | |
| if next_token.text.lower() in regex_spatial.get_keywords(): | |
| end = next_token.i | |
| else: | |
| end = ent.end | |
| else: | |
| start = ent.sent.start | |
| end = ent.end | |
| # print(doc, '//',start, '//', end, 999888) | |
| # print(doc[start],'//', doc[end]) | |
| # print(ents, 999) | |
| rsi_ent = get_relative_entity(doc,Span(doc, start, end), ent) | |
| # print(doc.ents[0]._.rse_id, '._._2') | |
| # print(rsi_ent.text, rsi_ent.label_, rsi_ent._.rse_id) | |
| new_ents.append(rsi_ent) | |
| doc.ents = new_ents | |
| return doc | |
| # def update_doc_ents(doc, new_dict): | |
| # """ | |
| # 更新 doc.ents, 将新的实体文本和标签添加到 doc 中。 | |
| # | |
| # 参数: | |
| # - doc: spaCy 的 Doc 对象 | |
| # - new_dict: 一个字典,键是实体文本,值是标签 | |
| # """ | |
| # modified_ents = [] | |
| # | |
| # # 遍历字典中的实体文本和标签 | |
| # for ent_text, label in new_dict.items(): | |
| # # 将实体文本拆分成单词 | |
| # ent_words = ent_text.split() | |
| # | |
| # # 遍历 doc 中的 token 来查找第一个单词 | |
| # start = None | |
| # for i in range(len(doc)): | |
| # # 如果当前 token 和实体的第一个单词匹配,确定 start | |
| # if doc[i].text == ent_words[0]: | |
| # start = i | |
| # # 然后检查后续的单词是否都匹配 | |
| # end = start + len(ent_words) # 计算 end 为 start + 单词数 | |
| # if all(doc[start + j].text == ent_words[j] for j in range(len(ent_words))): | |
| # # 创建 Span 对象 | |
| # new_ent = Span(doc, start, end, label=label) | |
| # modified_ents.append(new_ent) | |
| # break # 找到匹配后跳出循环 | |
| # | |
| # # 使用 doc.set_ents() 更新 doc.ents | |
| # doc.set_ents(modified_ents) | |
| # | |
| # | |
| # # def llm_extract(doc, model): | |
| # | |
| # def split_doc_into_sentences(doc): | |
| # """ | |
| # 将 doc 的文本按句子分割,并返回每个句子的字符串列表。 | |
| # """ | |
| # sentence_list = [sent.text.strip() for sent in doc.sents] | |
| # return sentence_list | |
| # | |
| # | |
| # @Language.component("spatial_pipeline") | |
| # def get_spatial_ent(doc): | |
| # | |
| # set_extension() | |
| # | |
| # split_sent = split_doc_into_sentences(doc) | |
| # for i in range(len(split_sent)): | |
| # gpe_dict = llm_ent_extract.extract_GPE(split_sent[i]) | |
| # loc_dict = llm_ent_extract.extract_LOC(split_sent[i]) | |
| # new_dict = gpe_dict|loc_dict | |
| # | |
| # | |
| # print(gpe_dict, '111') | |
| # print(loc_dict) | |
| # print(new_dict) | |
| # # new_dict = {'pp': 'ORG', 'France': 'GPE', 'Paris': 'GPE'} | |
| # | |
| # | |
| # # 调用新的函数更新 doc 的实体 | |
| # update_doc_ents(doc, new_dict) | |
| # | |
| # # 继续处理 doc.ents | |
| # ents = [ent for ent in doc.ents if ent.label_ == "GPE" or ent.label_ == "LOC"] | |
| # print(ents[1].label_) | |
| # | |
| # end = None | |
| # new_ents = [] | |
| # | |
| # for ent in ents: | |
| # if ent.end != len(doc): | |
| # next_token = doc[ent.end + 1] | |
| # if end is not None: | |
| # start = end | |
| # else: | |
| # start = ent.sent.start | |
| # if next_token.text.lower() in regex_spatial.get_keywords(): | |
| # end = next_token.i | |
| # else: | |
| # end = ent.end | |
| # else: | |
| # start = ent.sent.start | |
| # end = ent.end | |
| # | |
| # # 调用 get_relative_entity 来获得新的实体信息 | |
| # rsi_ent = get_relative_entity(doc, Span(doc, start, end), ent) | |
| # | |
| # # 将处理后的实体添加到新的实体列表中 | |
| # new_ents.append(rsi_ent) | |
| # | |
| # doc.ents = new_ents # 更新 doc.ents | |
| # print(new_ents, '111222') | |
| # | |
| # return doc |