Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import json | |
| from data_module import faq_data, model_options | |
| import uuid | |
| from chat_handler import ChatHandler | |
| chat = ChatHandler() | |
| def add_custom_css(): | |
| st.markdown(""" | |
| <style> | |
| .css-1d391kg { width: 35%; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| def generate_user_id(): | |
| new_id = chat.generate_id() | |
| return new_id | |
| def clear_history(user_id): | |
| chat.clear_history(user_id) | |
| return 'response' | |
| if 'user_id' not in st.session_state: | |
| st.session_state['user_id'] = generate_user_id() | |
| with open("embeddings_db_model.json", "r") as file: | |
| embedding_models = json.load(file) | |
| embedding_model_names = [model["model"] for model in embedding_models] | |
| agent_types = [ | |
| 'JSON_CHAT_MODEL', | |
| 'REACT_TEXT' | |
| ] | |
| selected_model = st.sidebar.selectbox("Escolha o Modelo LLM", model_options) | |
| selected_embedding_model = st.sidebar.selectbox("Escolha o Modelo de Embedding", embedding_model_names) | |
| selected_embedding_dir = next(item for item in embedding_models if item["model"] == selected_embedding_model)["dir"] | |
| selected_agent_type = st.sidebar.selectbox("Escolha o Tipo de Agent", agent_types) | |
| add_custom_css() | |
| with st.sidebar: | |
| st.write("## Opções de Controle") | |
| if st.button('Limpar Histórico'): | |
| # Fazer a requisição para limpar o histórico | |
| response = clear_history(st.session_state['user_id']) | |
| if response: | |
| st.session_state.messages = [{"role": "assistant", "content": "Histórico limpo. Pode começar uma nova conversa."}] | |
| st.rerun() | |
| else: | |
| st.error("Erro ao limpar o histórico") | |
| with st.container(): | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| st.caption("LLM:") | |
| st.write(selected_model) | |
| with col2: | |
| st.caption("Embeddings:") | |
| st.write(selected_embedding_model) | |
| st.title("⚖️ ChatBot Direito Tributário") | |
| st.caption("Direito Tributário da Pessoa Jurídica") | |
| st.caption("Projeto do Workshop de LLM UFG") | |
| if "messages" not in st.session_state: | |
| st.session_state["messages"] = [{"role": "assistant", "content": "Olá como posso ajudar?"}] | |
| if "faq_question" not in st.session_state: | |
| st.session_state["faq_question"] = None | |
| # Input de chat do usuário | |
| for msg in st.session_state.messages: | |
| if msg['role'] == 'assistant': | |
| img = "server_icon.png" | |
| else: | |
| img = 'user_icon.png' | |
| st.chat_message(msg["role"],avatar=img).write(msg["content"]) | |
| def process_question(question): | |
| st.session_state.messages.append({"role": "user", "content": question}) | |
| st.chat_message("user", avatar="user_icon.png").write(question) | |
| with st.chat_message("assistant", avatar="server_icon.png"): | |
| with st.spinner("Thinking..."): | |
| data = dict( | |
| user_id=st.session_state['user_id'], | |
| text= question, | |
| embedding_model= selected_embedding_model, | |
| embedding_dir= selected_embedding_dir, | |
| model= selected_model, | |
| agent_type=selected_agent_type | |
| ) | |
| msg,intermediary_steps = chat.post_message(message=data) | |
| st.write(str(msg)) | |
| st.session_state.messages.append({"role": "assistant", "content": msg}) | |
| # Adicionando os passos intermediários | |
| #intermediary_steps = response['response']['intermediate_steps'] | |
| # intermediary_steps = [] | |
| if intermediary_steps: | |
| with st.expander("Ver Passos Intermediários"): | |
| if intermediary_steps[0] == 'erro': | |
| st.markdown("## ERROR...\n") | |
| else: | |
| st.markdown("## > Entering new AgentExecutor chain...\n") | |
| for index, step in enumerate(intermediary_steps, start=1): | |
| # action = step[0].get('tool', 'Unknown') | |
| action = step[0].tool if hasattr(step[0], 'tool') else 'Unknown' | |
| # action_input = step[0].get('tool_input', 'N/A') | |
| # log = step[0].get('log', 'No log available') | |
| action_input = step[0].tool_input if hasattr(step[0], 'tool_input') else 'N/A' | |
| log = step[0].log if hasattr(step[0], 'log') else 'No log available' | |
| st.markdown(f"**Passo {index}:**") | |
| st.markdown(f" **Ação:** `{action}`") | |
| st.markdown(f" **Entrada da Ação:** `{action_input}`") | |
| st.code(log, language='json') | |
| st.markdown("---") | |
| # Adiciona a ação "Final Answer" ao final dos passos | |
| st.markdown("**Ação:** Final Answer") | |
| st.markdown(f"**Entrada da Ação:** `{msg}`") | |
| st.markdown("## > Finished chain.") | |
| else: | |
| with st.expander("Ver Passos Intermediários"): | |
| st.markdown("#### > Entering new AgentExecutor chain...\n") | |
| st.markdown("**Ação:** Final Answer") | |
| st.markdown(f"**Entrada da Ação:** `{msg}`") | |
| st.markdown("#### > Finished chain...") | |
| def add_faq_question_to_chat(question): | |
| st.session_state["faq_question"] = question | |
| # Barra lateral com perguntas frequentes | |
| with st.sidebar: | |
| st.write("## Perguntas Frequentes") | |
| for index, item in enumerate(faq_data, start=1): | |
| question_with_number = f"{index}\. {item['question']}" | |
| expander = st.expander(question_with_number, expanded=False) | |
| with expander: | |
| st.write(item["answer"]) | |
| button_key = f"button_{index}" | |
| if st.button("Enviar esta pergunta", key=button_key): | |
| st.session_state['selected_question'] = item["question"] | |
| if 'selected_question' in st.session_state and st.session_state['selected_question']: | |
| add_faq_question_to_chat(st.session_state['selected_question']) | |
| del st.session_state['selected_question'] | |
| if st.session_state["faq_question"]: | |
| process_question(st.session_state["faq_question"]) | |
| st.session_state["faq_question"] = None | |
| if prompt := st.chat_input(): | |
| process_question(prompt) | |