# -*- coding: utf-8 -*- """ Created on Thu Oct 2 18:21:39 2025 @author: joana """ #######HUGGING FACE SPACE A FUNCIONAR ############### ## COM A PERGUNTA DE USBAILIDADE ###### ##NAO GUARDA NADA NO EXCEL####### # app.py # app.py import os import gradio as gr import uuid from datetime import datetime import csv from huggingface_hub import InferenceClient from transformers import AutoTokenizer, AutoModel import torch import torch.nn.functional as F from torch import Tensor from sentence_transformers import SentenceTransformer, util import chromadb from langchain_community.document_loaders import TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter import json import gspread from oauth2client.service_account import ServiceAccountCredentials ########### CONFIGURAÇÃO GOOGLE SHEETS ########### scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] gcp_credentials_json = os.environ["GOOGLE_APPLICATION_CREDENTIALS_JSON"] creds_dict = json.loads(gcp_credentials_json) creds = ServiceAccountCredentials.from_json_keyfile_dict(creds_dict, scope) client_gs = gspread.authorize(creds) sheet = client_gs.open("Chat History").sheet1 # Nome da Sheet no Google Sheets #### Para manter todo o que vai salvar na mesma linha##### current_interaction={ "pergunta":"", "resposta":"", "opcao_escolhida":"", "usabilidade":"" } ## Salvar no csv ### """def save_interaction(pergunta, resposta, opcao_escolhida="", usabilidade=""): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") sheet.append_row([session_id, timestamp, pergunta, resposta, opcao_escolhida, usabilidade])""" def save_interaction_final(state): ci = state["current_interaction"] session_id = state.get("session_id", str(uuid.uuid4())) state["session_id"] = session_id timestamp = datetime.now().strftime("%m-%d %H:%M") sheet.append_row([ session_id, timestamp, ci["pergunta"], ci["resposta"], ci["opcao_escolhida"], ci["usabilidade"] ]) state["current_interaction"] = {"pergunta":"","resposta":"","opcao_escolhida":"","usabilidade":""} ########### CHROMADB E CACHE ########### folder_path = "./data_EN_PT" pasta_cache = "./resumos_cache" chroma_client = chromadb.PersistentClient(path="./chroma_db") ########### CUDA ########### device = torch.device("cuda" if torch.cuda.is_available() else "cpu") torch.set_default_device(device) ########### LaBSE ########### model_LaBSE = SentenceTransformer('sentence-transformers/LaBSE') from typing import List, Literal #Gerar embedding usando LaBSE, este modelo já cuida do tokenization e do processamento def generate_embeddings_LaBSE(text:str) -> List[float]: embeddings = model_LaBSE.encode(text, convert_to_tensor = True) embeddings = F.normalize(embeddings, p=2, dim=0) return embeddings.cpu().numpy().tolist() class CustomEmbeddingFunction_LaBSE(chromadb.EmbeddingFunction): def __call__(self, texts: chromadb.Documents) -> chromadb.Embeddings: return list(map(generate_embeddings_LaBSE, texts)) ########### LLM HUGGINGFACE ########### hf_token = os.environ["HF_TOKEN"] client = InferenceClient("meta-llama/Llama-3.3-70B-Instruct", token=hf_token) ########### RESUMO DOS DOCUMENTOS ########### def resumir_documentos_cache(folder_path, client, pasta_cache): os.makedirs(pasta_cache, exist_ok=True) descricoes = {} for filename in os.listdir(folder_path): if filename.endswith(".txt"): base_name = os.path.splitext(filename)[0] cache_path = os.path.join(pasta_cache, f"{base_name}.txt") if os.path.exists(cache_path): with open(cache_path, "r", encoding="utf-8") as f: resumo = f.read() else: file_path = os.path.join(folder_path, filename) with open(file_path, "r", encoding="utf-8") as file: texto = file.read() prompt_resumo = f"""Lê o seguinte texto da área da cardiologia pediátrica e extrai as seguintes informações organizadas em 3 linhas separadas: doenças mencionadas, exames médicos mencionados e todos os processos, intervenções ou procedimentos médicos. Escreve em lista, em português europeu, sem explicações adicionais. Texto: {texto} Resumo:""" messages = [{"role": "user", "content": prompt_resumo}] response = client.chat_completion(messages=messages, max_tokens=200) resumo = response.choices[0].message.content.strip() with open(cache_path, "w", encoding="utf-8") as f: f.write(resumo) descricoes[base_name] = resumo return descricoes document_descriptions = resumir_documentos_cache(folder_path, client, pasta_cache) ########### SPLIT DOCUMENTOS ########### def process_split_txt_create_collection(folder_path, descriptions): for filename, descr in descriptions.items(): full_filename = filename + ".txt" file_path = os.path.join(folder_path, full_filename) if not os.path.exists(file_path): continue loader = TextLoader(file_path, encoding='utf-8') documents = loader.load() text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( encoding_name="cl100k_base", chunk_size=512, chunk_overlap=50) pages = text_splitter.split_documents(documents) chunks = [str(p.page_content) for p in pages] ids = [f"{filename}_{i}" for i in range(len(chunks))] collection = chroma_client.get_or_create_collection( name=f"doc_{filename}", embedding_function=CustomEmbeddingFunction_LaBSE()) collection.add(documents=chunks, ids=ids) process_split_txt_create_collection(folder_path, document_descriptions) ########### SELECIONAR O DOCUMENTO ########### model_st = SentenceTransformer('all-MiniLM-L6-v2') def select_document(disease, descriptions): descr_values = list(descriptions.values()) descr_keys = list(descriptions.keys()) disease_embedding = model_st.encode(disease, convert_to_tensor=True) descr_embeddings = model_st.encode(descr_values, convert_to_tensor=True) scores = util.cos_sim(disease_embedding, descr_embeddings)[0] chosen_idx = torch.argmax(scores).item() return descr_keys[chosen_idx] ### GENERATION PHASE###### def get_collection_by_doc(filename): return chroma_client.get_collection(name=f"doc_{filename}", embedding_function=CustomEmbeddingFunction_LaBSE()) def create_context(query, collection, k=3): results = collection.query(query_texts=[query], n_results=k) documents = results.get('documents', [[]])[0] distances = results.get('distances', [[]])[0] to_return = "" for doc, dist in zip(documents, distances): to_return += f"\nDOCUMENT: '{doc}'\nDISTANCE: {dist}\n" return to_return def generate_prompt(query, collection): context = create_context(query, collection) prompt = f"""Tu és um assistente português. Usa o contexto para responder à pergunta. Se não estiver no contexto, diz que não consegues responder. Question: {query} Context: {context} Answer:""" return prompt def generate_answer_chat_completion(client, query, collection, temp): prompt = generate_prompt(query, collection) messages = [{"role": "user", "content": prompt}] return client.chat_completion(messages=messages, max_tokens=384, temperature=temp) def safety_check_query(text, client): prompt = f'Verifica se a pergunta é segura e relacionada com cardiologia pediátrica. Responde apenas "valid" ou "not valid". Pergunta: {text}' messages = [{"role": "user", "content": prompt}] response = client.chat_completion(messages=messages, max_tokens=50) return response.choices[0].message.content.strip().lower() ########### CHATBOT ########### class ChatBot: def __init__(self, client): self.client = client self.awaiting_disease = True self.awaiting_usability = False self.selected_disease = None self.collection = None def start(self): return "Olá! Sobre que doença ou procedimento quer saber mais?." def answer(self, query, state): if "current_interaction" not in state or state["current_interaction"] is None: state["current_interaction"] = {"pergunta": "", "resposta": "", "opcao_escolhida": "", "usabilidade": ""} ci = state["current_interaction"] if query.strip().lower() == "novo tema": self.awaiting_usability = True return ("Antes de passar ao próximo tema: a informação que recebeu estava clinicamente correta? \n " "1 - Discordo totalmente\n" "2 - Discordo parcialmente\n" "3 - Neutro\n" "4 - Concordo parcialmente\n" "5 - Concordo totalmente\n\n" "Pode adicionar um comentário se desejar") if self.awaiting_usability: msg = "Obrigado! A sua resposta foi registada." self.awaiting_usability = False self.awaiting_disease = True ci["usabilidade"]=query save_interaction_final(state) self.selected_disease = None self.collection = None return msg + "\nSobre que tema gostaria de saber mais agora?" if self.awaiting_disease: self.selected_disease = query.strip().lower() #ci["pergunta"]= query #Isto dá a doença try: best_doc = select_document(self.selected_disease, document_descriptions) print(f"[LOG] Doença recebida: '{self.selected_disease}'") print(f"[LOG] Documento selecionado: '{best_doc}'") self.collection = get_collection_by_doc(best_doc) self.awaiting_disease = False return f"O que gostaria de saber sobre '{self.selected_disease}' ?." except Exception as e: return f"Erro ao selecionar documento: {e}" else: ci["pergunta"] = query if safety_check_query(query, self.client) == "valid": try: answer1 = generate_answer_chat_completion(self.client, query, self.collection, temp=0) answer2 = generate_answer_chat_completion(self.client, query, self.collection, temp=1) bot_message = f"Opção 1:\n{answer1.choices[0].message.content.strip()}\n\nOpção 2:\n{answer2.choices[0].message.content.strip()} \n\nEscolha a sua opção preferida." except Exception as e: bot_message = f"Erro ao gerar resposta: {e}" else: bot_message = "A pergunta não parece estar ligada a cardiologia pediátrica." ci["resposta"] = bot_message return bot_message def get_chatbot(state): if state is None: state = {} if "chat_bot" not in state: state["chat_bot"] = ChatBot(client) if "chat_history" not in state: state["chat_history"] = [] if "current_interaction" not in state: state["current_interaction"] = {"pergunta":"","resposta":"","opcao_escolhida":"","usabilidade":""} return state["chat_bot"] ########### GRADIO ########### def respond(message, chat_history,state): chat_bot = get_chatbot(state) bot_message = chat_bot.answer(message, state) if "chat_history" not in state: state["chat_history"] = [] state["chat_history"].append((message, bot_message)) return "", state["chat_history"], state def iniciar_novo_tema(chat_history, state): chat_bot = get_chatbot(state) bot_message = chat_bot.start() chat_bot.awaiting_disease = True chat_bot.selected_disease = None chat_bot.collection = None state["current_interaction"] = {"pergunta":"","resposta":"","opcao_escolhida":"","usabilidade":""} state["chat_history"] = [] return "", [(None, bot_message)], state def mensagem_inicial(state): chat_bot = get_chatbot(state) chat_bot.awaiting_disease = True chat_bot.selected_disease = None chat_bot.collection = None return [(None, chat_bot.start())] def responder_usabilidade(resposta, chat_history, state): state["current_interaction"]["usabilidade"] = resposta save_interaction_final(state) chat_history.append((None, f"Usabilidade registada: {resposta}")) # Reset da interação state["current_interaction"] = {"pergunta":"","resposta":"","opcao_escolhida":"","usabilidade":""} return "", chat_history, state def escolher_opcao(opcao, chat_history, state): state["current_interaction"]["opcao_escolhida"] = opcao save_interaction_final(state) state["chat_history"].append((None, f"Escolheu a {opcao}.")) state["current_interaction"] = {"pergunta":"","resposta":"","opcao_escolhida":"","usabilidade":""} return state["chat_history"], state with gr.Blocks() as demo: chatbot_ui = gr.Chatbot() msg = gr.Textbox(label="Escreva a sua pergunta ou 'novo tema' para mudar de assunto.") state = gr.State(value={}) with gr.Row(): btn1 = gr.Button("Opção 1") btn1.click( lambda chat_history, state: escolher_opcao("Opção 1", chat_history, state), inputs=[chatbot_ui, state], outputs=[chatbot_ui, state] ) btn2 = gr.Button("Opção 2") btn2.click( lambda chat_history, state: escolher_opcao("Opção 2", chat_history, state), inputs=[chatbot_ui, state], outputs=[chatbot_ui, state] ) with gr.Row(): clear = gr.ClearButton([msg, chatbot_ui], value="Limpar conversa") send = gr.Button("Enviar") msg.submit(respond, [msg, chatbot_ui, state], [msg, chatbot_ui, state]) send.click(respond, [msg, chatbot_ui, state], [msg, chatbot_ui, state]) clear.click(iniciar_novo_tema, inputs=[chatbot_ui, state], outputs=[msg, chatbot_ui, state]) demo.load(fn = mensagem_inicial, inputs =state, outputs=chatbot_ui) demo.launch()