Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| # Import necessary libraries | |
| import os # Interacting with the operating system (reading/writing files) | |
| import chromadb # High-performance vector database for storing/querying dense vectors | |
| from dotenv import load_dotenv # Loading environment variables from a .env file | |
| import json # Parsing and handling JSON data | |
| # LangChain imports | |
| from langchain_core.documents import Document # Document data structures | |
| from langchain_core.runnables import RunnablePassthrough # LangChain core library for running pipelines | |
| from langchain_core.output_parsers import StrOutputParser # String output parser | |
| from langchain.prompts import ChatPromptTemplate # Template for chat prompts | |
| from langchain.chains.query_constructor.base import AttributeInfo # Base classes for query construction | |
| from langchain.retrievers.self_query.base import SelfQueryRetriever # Base classes for self-querying retrievers | |
| from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker # Document compressors | |
| from langchain.retrievers import ContextualCompressionRetriever # Contextual compression retrievers | |
| # LangChain community & experimental imports | |
| from langchain_chroma import Chroma # Implementations of vector stores like Chroma | |
| from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader # Document loaders for PDFs | |
| from langchain_community.cross_encoders import HuggingFaceCrossEncoder # Cross-encoders from HuggingFace | |
| from langchain_experimental.text_splitter import SemanticChunker # Experimental text splitting methods | |
| from langchain.text_splitter import ( | |
| CharacterTextSplitter, # Splitting text by characters | |
| RecursiveCharacterTextSplitter # Recursive splitting of text by characters | |
| ) | |
| from langchain_core.tools import tool | |
| from langchain.agents import create_tool_calling_agent, AgentExecutor | |
| from langchain_core.prompts import ChatPromptTemplate | |
| # LangChain OpenAI imports | |
| from langchain_openai import OpenAIEmbeddings, ChatOpenAI # OpenAI embeddings and models | |
| # LlamaParse & LlamaIndex imports | |
| from llama_parse import LlamaParse # Document parsing library | |
| from llama_index.core import Settings, SimpleDirectoryReader # Core functionalities of the LlamaIndex | |
| # LangGraph import | |
| from langgraph.graph import StateGraph, END, START # State graph for managing states in LangChain | |
| # Pydantic import | |
| from pydantic import BaseModel # Pydantic for data validation | |
| # Typing imports | |
| from typing import Dict, List, Tuple, Any, TypedDict # Python typing for function annotations | |
| # Other utilities | |
| import numpy as np # Numpy for numerical operations | |
| from groq import Groq | |
| from mem0 import MemoryClient | |
| import streamlit as st | |
| from datetime import datetime | |
| #====================================SETUP=====================================# | |
| # Fetch secrets from Hugging Face Spaces | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| groq_api_key = os.getenv("GROQ_API_KEY") | |
| mem0_api_key = os.getenv("MEM0_API_KEY") | |
| os.environ["LANGSMITH_TRACING"] = "true" | |
| os.environ["LANGSMITH_ENDPOINT"]= "https://api.smith.langchain.com" | |
| os.environ["LANGSMITH_API_KEY"] = os.getenv("LANGSMITH_API_KEY") | |
| os.environ["LANGSMITH_PROJECT"] = "nutrition_bot" | |
| # Initialize the OpenAI embedding function for Chroma | |
| embedding_function = chromadb.utils.embedding_functions.OpenAIEmbeddingFunction( | |
| api_key=api_key, # Complete the code to define the API key | |
| model_name='text-embedding-3-small' # This is a fixed value and does not need modification | |
| ) | |
| # Initialize the OpenAI Embeddings | |
| embedding_model = OpenAIEmbeddings( | |
| openai_api_key=api_key, | |
| model='text-embedding-3-small' | |
| ) | |
| # Initialize the Chat OpenAI model | |
| llm = ChatOpenAI( | |
| openai_api_key=api_key, | |
| model="gpt-4o-mini", | |
| streaming=False | |
| ) | |
| # set the LLM and embedding model in the LlamaIndex settings. | |
| Settings.llm = llm | |
| Settings.embedding = embedding_model | |
| #================================Creating Langgraph agent======================# | |
| class AgentState(TypedDict): | |
| query: str # The current user query | |
| expanded_query: str # The expanded version of the user query | |
| context: List[Dict[str, Any]] # Retrieved documents (content and metadata) | |
| response: str # The generated response to the user query | |
| precision_score: float # The precision score of the response | |
| groundedness_score: float # The groundedness score of the response | |
| groundedness_loop_count: int # Counter for groundedness refinement loops | |
| precision_loop_count: int # Counter for precision refinement loops | |
| feedback: str | |
| query_feedback: str | |
| groundedness_check: bool | |
| loop_max_iter: int | |
| def expand_query(state): | |
| """ | |
| Expands the user query to improve retrieval of nutrition disorder-related information. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the user query. | |
| Returns: | |
| Dict: The updated state with the expanded query. | |
| """ | |
| print("---------Expanding Query---------") | |
| system_message = '''You are an AI specializing in improving search queries to retrieve the most relevant nutrition disorder-related information. | |
| Your task is to **refine** and **expand** the given query so that better search results are obtained, while **keeping the original intent** unchanged. | |
| Guidelines: | |
| - Add **specific details** where needed. Example: If a user asks about "anorexia," specify aspects like symptoms, causes, or treatment options. | |
| - Include **related terms** to improve retrieval (e.g., “bulimia” → “bulimia nervosa vs binge eating disorder”). | |
| - If the user provides an unclear query, suggest necessary clarifications. | |
| - **DO NOT** answer the question. Your job is only to enhance the query. | |
| Examples: | |
| 1. User Query: "Tell me about eating disorders." | |
| Expanded Query: "Provide details on eating disorders, including types (e.g., anorexia nervosa, bulimia nervosa), symptoms, causes, and treatment options." | |
| 2. User Query: "What is anorexia?" | |
| Expanded Query: "Explain anorexia nervosa, including its symptoms, causes, risk factors, and treatment options." | |
| 3. User Query: "How to treat bulimia?" | |
| Expanded Query: "Describe treatment options for bulimia nervosa, including psychotherapy, medications, and lifestyle changes." | |
| 4. User Query: "What are the effects of malnutrition?" | |
| Expanded Query: "Explain the effects of malnutrition on physical and mental health, including specific nutrient deficiencies and their consequences." | |
| Now, expand the following query:''' | |
| expand_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Expand this query: {query} using the feedback: {query_feedback}") | |
| ]) | |
| chain = expand_prompt | llm | StrOutputParser() | |
| expanded_query = chain.invoke({"query": state['query'], "query_feedback":state["query_feedback"]}) | |
| print("expanded_query", expanded_query) | |
| state["expanded_query"] = expanded_query | |
| return state | |
| # Initialize the Chroma vector store for retrieving documents | |
| vector_store = Chroma( | |
| collection_name="nutritional_hypotheticals", | |
| persist_directory="./nutritional_db", | |
| embedding_function=embedding_model | |
| ) | |
| # Create a retriever from the vector store | |
| retriever = vector_store.as_retriever( | |
| search_type='similarity', | |
| search_kwargs={'k': 3} | |
| ) | |
| def retrieve_context(state): | |
| """ | |
| Retrieves context from the vector store using the expanded or original query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and expanded query. | |
| Returns: | |
| Dict: The updated state with the retrieved context. | |
| """ | |
| print("---------retrieve_context---------") | |
| query = state['expanded_query'] | |
| #print("Query used for retrieval:", query) # Debugging: Print the query | |
| # Retrieve documents from the vector store | |
| docs = retriever.invoke(query) | |
| print("Retrieved documents:", docs) # Debugging: Print the raw docs object | |
| # Extract both page_content and metadata from each document | |
| context= [ | |
| { | |
| "content": doc.page_content, # The actual content of the document | |
| "metadata": doc.metadata # The metadata (e.g., source, page number, etc.) | |
| } | |
| for doc in docs | |
| ] | |
| state['context'] = context | |
| print("Extracted context with metadata:", context) # Debugging: Print the extracted context | |
| #print(f"Groundedness loop count: {state['groundedness_loop_count']}") | |
| return state | |
| def craft_response(state: Dict) -> Dict: | |
| """ | |
| Generates a response using the retrieved context, focusing on nutrition disorders. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and retrieved context. | |
| Returns: | |
| Dict: The updated state with the generated response. | |
| """ | |
| system_message = '''You are a professional AI nutrition disorder specialist generating responses based on retrieved documents. | |
| Your task is to use the given **context** to generate a highly accurate, informative, and user-friendly response. | |
| Guidelines: | |
| - **Be direct and concise** while ensuring completeness. | |
| - **DO NOT include information that is not present in the context.** | |
| - If multiple sources exist, synthesize them into a coherent response. | |
| - If the context does not fully answer the query, state what additional information is needed. | |
| - Use bullet points when explaining complex concepts. | |
| Example: | |
| User Query: "What are the symptoms of anorexia nervosa?" | |
| Context: | |
| 1. Anorexia nervosa is characterized by extreme weight loss and fear of gaining weight. | |
| 2. Common symptoms include restricted eating, distorted body image, and excessive exercise. | |
| Response: | |
| "Anorexia nervosa is an eating disorder characterized by extreme weight loss and an intense fear of gaining weight. Common symptoms include: | |
| - Restricted eating | |
| - Distorted body image | |
| - Excessive exercise | |
| If you or someone you know is experiencing these symptoms, it is important to seek professional help."''' | |
| response_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nContext: {context}\n\nResponse:") | |
| ]) | |
| chain = response_prompt | llm | StrOutputParser() | |
| state['response'] = chain.invoke({ | |
| "query": state['query'], | |
| "context": "\n".join([doc["content"] for doc in state['context']]) # Extract content from each document | |
| }) | |
| return state | |
| def score_groundedness(state: Dict) -> Dict: | |
| """ | |
| Checks whether the response is grounded in the retrieved context. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the response and context. | |
| Returns: | |
| Dict: The updated state with the groundedness score. | |
| """ | |
| print("---------check_groundedness---------") | |
| system_message = '''You are an AI tasked with evaluating whether a response is grounded in the provided context and includes proper citations. | |
| Guidelines: | |
| 1. **Groundedness Check**: | |
| - Verify that the response accurately reflects the information in the context. | |
| - Flag any unsupported claims or deviations from the context. | |
| 2. **Citation Check**: | |
| - Ensure that the response includes citations to the source material (e.g., "According to [Source], ..."). | |
| - If citations are missing, suggest adding them. | |
| 3. **Scoring**: | |
| - Assign a groundedness score between 0 and 1, where 1 means fully grounded and properly cited. | |
| Examples: | |
| 1. Response: "Anorexia nervosa is caused by genetic factors (Source 1)." | |
| Context: "Anorexia nervosa is influenced by genetic, environmental, and psychological factors (Source 1)." | |
| Evaluation: "The response is grounded and properly cited. Groundedness score: 1.0." | |
| 2. Response: "Bulimia nervosa can be cured with diet alone." | |
| Context: "Treatment for bulimia nervosa involves psychotherapy and medications (Source 2)." | |
| Evaluation: "The response is ungrounded and lacks citations. Groundedness score: 0.2." | |
| 3. Response: "Anorexia nervosa has a high mortality rate." | |
| Context: "Anorexia nervosa has one of the highest mortality rates among psychiatric disorders (Source 3)." | |
| Evaluation: "The response is grounded but lacks a citation. Groundedness score: 0.7. ." | |
| ****Return only a float score (e.g., 0.9). Do not provide explanations.**** | |
| Now, evaluate the following response: | |
| ''' | |
| groundedness_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") | |
| ]) | |
| chain = groundedness_prompt | llm | StrOutputParser() | |
| groundedness_score = float(chain.invoke({ | |
| "context": "\n".join([doc["content"] for doc in state['context']]), | |
| "response": state['response'] | |
| })) | |
| print("groundedness_score: ",groundedness_score) | |
| state['groundedness_loop_count'] +=1 | |
| print("#########Groundedness Incremented###########") | |
| state['groundedness_score'] = groundedness_score | |
| return state | |
| def check_precision(state: Dict) -> Dict: | |
| """ | |
| Checks whether the response precisely addresses the user’s query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and response. | |
| Returns: | |
| Dict: The updated state with the precision score. | |
| """ | |
| print("---------check_precision---------") | |
| system_message = '''You are an AI evaluator assessing the **precision** of the response. | |
| Your task is to **score** how well the response addresses the user’s original nutrition disorder-related query. | |
| Scoring Criteria: | |
| - 1.0 → The response is fully precise, directly answering the question. | |
| - 0.7 → The response is mostly correct but contains some generalization. | |
| - 0.5 → The response is somewhat relevant but lacks key details. | |
| - 0.3 → The response is vague or only partially correct. | |
| - 0.0 → The response is incorrect or misleading. | |
| Examples: | |
| 1. Query: "What are the symptoms of anorexia nervosa?" | |
| Response: "The symptoms of anorexia nervosa include extreme weight loss, fear of gaining weight, and a distorted body image." | |
| Precision Score: 1.0 | |
| 2. Query: "How is bulimia nervosa treated?" | |
| Response: "Bulimia nervosa is treated with therapy and medications." | |
| Precision Score: 0.7 | |
| 3. Query: "What causes binge eating disorder?" | |
| Response: "Binge eating disorder is caused by a combination of genetic, psychological, and environmental factors." | |
| Precision Score: 0.5 | |
| 4. Query: "What are the effects of malnutrition?" | |
| Response: "Malnutrition can lead to health problems." | |
| Precision Score: 0.3 | |
| 5. Query: "What is the mortality rate of anorexia nervosa?" | |
| Response: "Anorexia nervosa is a type of eating disorder." | |
| Precision Score: 0.0 | |
| *****Return only a float score (e.g., 0.9). Do not provide explanations.***** | |
| Now, evaluate the following query and response: | |
| ''' | |
| precision_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") | |
| ]) | |
| chain = precision_prompt | llm | StrOutputParser() | |
| precision_score = float(chain.invoke({ | |
| "query": state['query'], | |
| "response": state['response'] | |
| })) | |
| state['precision_score'] = precision_score | |
| print("precision_score:", precision_score) | |
| state['precision_loop_count'] +=1 | |
| print("#########Precision Incremented###########") | |
| return state | |
| def refine_response(state: Dict) -> Dict: | |
| """ | |
| Suggests improvements for the generated response. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and response. | |
| Returns: | |
| Dict: The updated state with response refinement suggestions. | |
| """ | |
| print("---------refine_response---------") | |
| system_message = '''You are an AI response refinement assistant. Your task is to suggest **improvements** for the given response. | |
| ### Guidelines: | |
| - Identify **gaps in the explanation** (missing key details). | |
| - Highlight **unclear or vague parts** that need elaboration. | |
| - Suggest **additional details** that should be included for better accuracy. | |
| - Ensure the refined response is **precise** and **grounded** in the retrieved context. | |
| ### Examples: | |
| 1. Query: "What are the symptoms of anorexia nervosa?" | |
| Response: "The symptoms include weight loss and fear of gaining weight." | |
| Suggestions: "The response is missing key details about behavioral and emotional symptoms. Add details like 'distorted body image' and 'restrictive eating patterns.'" | |
| 2. Query: "How is bulimia nervosa treated?" | |
| Response: "Bulimia nervosa is treated with therapy." | |
| Suggestions: "The response is too vague. Specify the types of therapy (e.g., cognitive-behavioral therapy) and mention other treatments like nutritional counseling and medications." | |
| 3. Query: "What causes binge eating disorder?" | |
| Response: "Binge eating disorder is caused by psychological factors." | |
| Suggestions: "The response is incomplete. Add details about genetic and environmental factors, and explain how they contribute to the disorder." | |
| Now, suggest improvements for the following response: | |
| ''' | |
| refine_response_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nResponse: {response}\n\n" | |
| "What improvements can be made to enhance accuracy and completeness?") | |
| ]) | |
| chain = refine_response_prompt | llm| StrOutputParser() | |
| # Store response suggestions in a structured format | |
| feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" | |
| print("feedback: ", feedback) | |
| print(f"State: {state}") | |
| state['feedback'] = feedback | |
| return state | |
| def refine_query(state: Dict) -> Dict: | |
| """ | |
| Suggests improvements for the expanded query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and expanded query. | |
| Returns: | |
| Dict: The updated state with query refinement suggestions. | |
| """ | |
| print("---------refine_query---------") | |
| system_message = '''You are an AI query refinement assistant. Your task is to suggest **improvements** for the expanded query. | |
| ### Guidelines: | |
| - Add **specific keywords** to improve document retrieval. | |
| - Identify **missing details** that should be included. | |
| - Suggest **ways to narrow the scope** for better precision. | |
| ### Examples: | |
| 1. Original Query: "Tell me about eating disorders." | |
| Expanded Query: "Provide details on eating disorders, including types, symptoms, causes, and treatment options." | |
| Suggestions: "Add specific types of eating disorders like 'anorexia nervosa' and 'bulimia nervosa' to improve retrieval." | |
| 2. Original Query: "What is anorexia?" | |
| Expanded Query: "Explain anorexia nervosa, including its symptoms and causes." | |
| Suggestions: "Include details about treatment options and risk factors to make the query more comprehensive." | |
| 3. Original Query: "How to treat bulimia?" | |
| Expanded Query: "Describe treatment options for bulimia nervosa." | |
| Suggestions: "Specify types of treatments like 'cognitive-behavioral therapy' and 'medications' for better precision." | |
| Now, suggest improvements for the following expanded query: | |
| ''' | |
| refine_query_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" | |
| "What improvements can be made for a better search?") | |
| ]) | |
| chain = refine_query_prompt | llm | StrOutputParser() | |
| # Store refinement suggestions without modifying the original expanded query | |
| query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" | |
| print("query_feedback: ", query_feedback) | |
| print(f"Groundedness loop count: {state['groundedness_loop_count']}") | |
| state['query_feedback'] = query_feedback | |
| return state | |
| def should_continue_groundedness(state): | |
| """Decides if groundedness is sufficient or needs improvement.""" | |
| print("---------should_continue_groundedness---------") | |
| print("groundedness loop count: ", state['groundedness_loop_count']) | |
| if state['groundedness_score'] >= 0.4: # Threshold for groundedness | |
| print("Moving to precision") | |
| return "check_precision" | |
| else: | |
| if state["groundedness_loop_count"] > state['loop_max_iter']: | |
| return "max_iterations_reached" | |
| else: | |
| print(f"---------Groundedness Score Threshold Not met. Refining Response-----------") | |
| return "refine_response" | |
| def should_continue_precision(state: Dict) -> str: | |
| """Decides if precision is sufficient or needs improvement.""" | |
| print("---------should_continue_precision---------") | |
| print("precision loop count: ",state['precision_loop_count']) | |
| if state['precision_score'] >= 0.7: # Threshold for precision | |
| return "pass" # Complete the workflow | |
| else: | |
| if state['precision_loop_count'] > state['loop_max_iter']: # Maximum allowed loops | |
| return "max_iterations_reached" | |
| else: | |
| print(f"---------Precision Score Threshold Not met. Refining Query-----------") # Debugging | |
| # Exit the loop | |
| return "refine_query" # Refine the query | |
| def max_iterations_reached(state: Dict) -> Dict: | |
| """Handles the case when the maximum number of iterations is reached.""" | |
| print("---------max_iterations_reached---------") | |
| """Handles the case when the maximum number of iterations is reached.""" | |
| response = "I'm unable to refine the response further. Please provide more context or clarify your question." | |
| state['response'] = response | |
| return state | |
| def create_workflow() -> StateGraph: | |
| """Creates the updated workflow for the AI nutrition agent.""" | |
| workflow = StateGraph(AgentState) | |
| # Add processing nodes | |
| workflow.add_node("expand_query", expand_query) # Step 1: Expand user query. | |
| workflow.add_node("retrieve_context", retrieve_context) # Step 2: Retrieve relevant documents. | |
| workflow.add_node("craft_response", craft_response) # Step 3: Generate a response based on retrieved data. | |
| workflow.add_node("score_groundedness", score_groundedness) # Step 4: Evaluate response grounding. | |
| workflow.add_node("refine_response", refine_response) # Step 5: Improve response if it's weakly grounded. | |
| workflow.add_node("check_precision", check_precision) # Step 6: Evaluate response precision. | |
| workflow.add_node("refine_query", refine_query) # Step 7: Improve query if response lacks precision. | |
| workflow.add_node("max_iterations_reached", max_iterations_reached) # Step 8: Handle max iterations. | |
| # workflow.add_node("groundedness_decider",groundedness_decider) | |
| # Main flow edges | |
| workflow.add_edge(START, "expand_query") | |
| workflow.add_edge("expand_query", "retrieve_context") | |
| workflow.add_edge("retrieve_context", "craft_response") | |
| workflow.add_edge("craft_response", "score_groundedness") | |
| # workflow.add_edge("score_groundedness","groundedness_decider") | |
| # Conditional edges based on groundedness check | |
| workflow.add_conditional_edges( | |
| "score_groundedness", | |
| should_continue_groundedness, # Use the conditional function | |
| { | |
| "check_precision": "check_precision", # If well-grounded, proceed to precision check. | |
| "refine_response": "refine_response", # If not, refine the response. | |
| "max_iterations_reached": "max_iterations_reached" # If max loops reached, exit. | |
| } | |
| ) | |
| workflow.add_edge("refine_response", "craft_response") # Refined responses are reprocessed. | |
| # Conditional edges based on precision check | |
| workflow.add_conditional_edges( | |
| "check_precision", | |
| should_continue_precision, # Use the conditional function | |
| { | |
| "pass": END, # If precise, complete the workflow. | |
| "refine_query": "refine_query", # If imprecise, refine the query. | |
| "max_iterations_reached": "max_iterations_reached" # If max loops reached, exit. | |
| } | |
| ) | |
| workflow.add_edge("refine_query", "expand_query") # Refined queries go through expansion again. | |
| workflow.add_edge("max_iterations_reached", END) | |
| # Set entry point | |
| # workflow.set_entry_point("expand_query") | |
| return workflow | |
| #=========================== Defining the agentic rag tool ====================# | |
| WORKFLOW_APP = create_workflow().compile() | |
| def agentic_rag(query: str): | |
| """ | |
| Runs the RAG-based agent with conversation history for context-aware responses. | |
| Args: | |
| query (str): The current user query. | |
| Returns: | |
| Dict[str, Any]: The updated state with the generated response and conversation history. | |
| """ | |
| # Initialize state with necessary parameters | |
| inputs = { | |
| "query": query, # Current user query | |
| "expanded_query": "", # Expanded version of the query | |
| "context": [], # Retrieved documents (initially empty) | |
| "response": "", # AI-generated response | |
| "precision_score": 0.0, # Precision score of the response | |
| "groundedness_score": 0.0, # Groundedness score of the response | |
| "groundedness_loop_count": 0, # Counter for groundedness loops | |
| "precision_loop_count": 0, # Counter for precision loops | |
| "feedback": "", | |
| "query_feedback":"", | |
| "loop_max_iter":2 | |
| } | |
| output = WORKFLOW_APP.invoke(inputs) | |
| return output | |
| #================================ Guardrails ===========================# | |
| llama_guard_client = Groq(api_key=groq_api_key) | |
| # Function to filter user input with Llama Guard | |
| def filter_input_with_llama_guard(user_input, model="meta-llama/llama-guard-4-12b"): | |
| """ | |
| Filters user input using Llama Guard to ensure it is safe. | |
| Parameters: | |
| - user_input: The input provided by the user. | |
| - model: The Llama Guard model to be used for filtering (default is "meta-llama/llama-guard-4-12b"). | |
| Returns: | |
| - The filtered and safe input. | |
| """ | |
| try: | |
| # Create a request to Llama Guard to filter the user input | |
| response = llama_guard_client.chat.completions.create( | |
| messages=[{"role": "user", "content": user_input}], | |
| model=model, | |
| ) | |
| # Return the filtered input | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| print(f"Error with Llama Guard: {e}") | |
| return None | |
| #============================= Adding Memory to the agent using mem0 ===============================# | |
| class NutritionBot: | |
| def __init__(self): | |
| """ | |
| Initialize the NutritionBot class, setting up memory, the LLM client, tools, and the agent executor. | |
| """ | |
| # Initialize a memory client to store and retrieve customer interactions | |
| self.memory = MemoryClient(api_key=mem0_api_key) | |
| self.client = ChatOpenAI( | |
| model_name="gpt-4o-mini", # Specify the model to use (e.g., GPT-4 optimized version) | |
| api_key=api_key, # API key for authentication | |
| temperature=0 # Controls randomness in responses; 0 ensures deterministic results | |
| ) | |
| # Define tools available to the chatbot, such as web search | |
| tools = [agentic_rag] | |
| # Define the system prompt to set the behavior of the chatbot | |
| system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. | |
| Guidelines for Interaction: | |
| Maintain a polite, professional, and reassuring tone. | |
| Show genuine empathy for customer concerns and health challenges. | |
| Reference past interactions to provide personalized and consistent advice. | |
| Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. | |
| Ensure consistent and accurate information across conversations. | |
| If any detail is unclear or missing, proactively ask for clarification. | |
| Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. | |
| Keep track of ongoing issues and follow-ups to ensure continuity in support. | |
| Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. | |
| """ | |
| # Build the prompt template for the agent | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_prompt), # System instructions | |
| ("human", "{input}"), # Placeholder for human input | |
| ("placeholder", "{agent_scratchpad}") # Placeholder for intermediate reasoning steps | |
| ]) | |
| # Create an agent capable of interacting with tools and executing tasks | |
| agent = create_tool_calling_agent(self.client, tools, prompt) | |
| # Wrap the agent in an executor to manage tool interactions and execution flow | |
| self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) | |
| def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): | |
| """ | |
| Store customer interaction in memory for future reference. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| message (str): Customer's query or message. | |
| response (str): Chatbot's response. | |
| metadata (Dict, optional): Additional metadata for the interaction. | |
| """ | |
| if metadata is None: | |
| metadata = {} | |
| # Add a timestamp to the metadata for tracking purposes | |
| metadata["timestamp"] = datetime.now().isoformat() | |
| # Format the conversation for storage | |
| conversation = [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": response} | |
| ] | |
| # Store the interaction in the memory client | |
| self.memory.add( | |
| conversation, | |
| user_id=user_id, | |
| output_format="v1.1", | |
| metadata=metadata | |
| ) | |
| def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: | |
| """ | |
| Retrieve past interactions relevant to the current query. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| query (str): The customer's current query. | |
| Returns: | |
| List[Dict]: A list of relevant past interactions. | |
| """ | |
| return self.memory.search( | |
| query=query, # Search for interactions related to the query | |
| user_id=user_id, # Restrict search to the specific user | |
| limit=5 # Retrieve up to 5 relevant interactions | |
| ) | |
| def handle_customer_query(self, user_id: str, query: str) -> str: | |
| """ | |
| Process a customer's query and provide a response, taking into account past interactions. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| query (str): Customer's query. | |
| Returns: | |
| str: Chatbot's response. | |
| """ | |
| # Retrieve relevant past interactions for context | |
| relevant_history = self.get_relevant_history(user_id, query) | |
| # Build a context string from the relevant history | |
| context = "Previous relevant interactions:\n" | |
| for memory in relevant_history: | |
| context += f"Customer: {memory['memory']}\n" # Customer's past messages | |
| context += f"Support: {memory['memory']}\n" # Chatbot's past responses | |
| context += "---\n" | |
| # Print context for debugging purposes | |
| print("Context: ", context) | |
| # Prepare a prompt combining past context and the current query | |
| prompt = f""" | |
| Context: | |
| {context} | |
| Current customer query: {query} | |
| Provide a helpful response that takes into account any relevant past interactions. | |
| """ | |
| # Generate a response using the agent | |
| response = self.agent_executor.invoke({"input": prompt}) | |
| # Store the current interaction for future reference | |
| self.store_customer_interaction( | |
| user_id=user_id, | |
| message=query, | |
| response=response["output"], | |
| metadata={"type": "support_query"} | |
| ) | |
| # Return the chatbot's response | |
| return response['output'] | |
| #=====================User Interface using streamlit ===========================# | |
| def nutrition_disorder_streamlit(): | |
| """ | |
| A Streamlit-based UI for the Nutrition Disorder Specialist Agent. | |
| """ | |
| st.title("Nutrition Disorder Specialist") | |
| st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") | |
| st.write("Type 'exit' to end the conversation.") | |
| # Initialize session state for chat history and user_id if they don't exist | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if 'user_id' not in st.session_state: | |
| st.session_state.user_id = None | |
| # Login form: Only if user is not logged in | |
| if st.session_state.user_id is None: | |
| with st.form("login_form", clear_on_submit=True): | |
| user_id = st.text_input("Please enter your name to begin:") | |
| submit_button = st.form_submit_button("Login") | |
| if submit_button and user_id: | |
| st.session_state.user_id = user_id | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" | |
| }) | |
| st.session_state.login_submitted = True # Set flag to trigger rerun | |
| # Trigger rerun outside the form if login was successful | |
| if st.session_state.get("login_submitted", False): | |
| st.session_state.pop("login_submitted") | |
| st.rerun() | |
| else: | |
| # Display chat history | |
| for message in st.session_state.chat_history: | |
| with st.chat_message(message["role"]): | |
| st.write(message["content"]) | |
| # Chat input | |
| user_query = st.chat_input("Type your question here (or 'exit' to end)...") | |
| if user_query: | |
| # Check if user wants to exit | |
| if user_query.lower() == "exit": | |
| st.session_state.chat_history.append({"role": "user", "content": "exit"}) | |
| with st.chat_message("user"): | |
| st.write("exit") | |
| goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." | |
| st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) | |
| with st.chat_message("assistant"): | |
| st.write(goodbye_msg) | |
| st.session_state.user_id = None | |
| st.rerun() | |
| return | |
| # Add user message to chat history | |
| st.session_state.chat_history.append({"role": "user", "content": user_query}) | |
| with st.chat_message("user"): | |
| st.write(user_query) | |
| # Filter input | |
| filtered_result = filter_input_with_llama_guard(user_query) | |
| # Process through the agent | |
| with st.chat_message("assistant"): | |
| if filtered_result in ["safe", "unsafe S7", "unsafe S6"]: | |
| try: | |
| # Initialize chatbot if not already done | |
| if 'chatbot' not in st.session_state: | |
| st.session_state.chatbot = NutritionBot() | |
| # Get response from the chatbot | |
| response = st.session_state.chatbot.handle_customer_query( | |
| st.session_state.user_id, | |
| user_query | |
| ) | |
| st.write(response) | |
| st.session_state.chat_history.append({"role": "assistant", "content": response}) | |
| except Exception as e: | |
| error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" | |
| st.write(error_msg) | |
| st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) | |
| else: | |
| inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." | |
| st.write(inappropriate_msg) | |
| st.session_state.chat_history.append({"role": "assistant", "content": inappropriate_msg}) | |
| if __name__ == "__main__": | |
| nutrition_disorder_streamlit() | |
