Spaces:
Runtime error
Runtime error
| # Chat.py | |
| # Chat functions for interacting with the LLMs as chatbots | |
| # Imports | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import tempfile | |
| from datetime import datetime | |
| from App_Function_Libraries.DB_Manager import get_conversation_name, save_chat_history_to_database | |
| from App_Function_Libraries.LLM_API_Calls import chat_with_openai, chat_with_anthropic, chat_with_cohere, \ | |
| chat_with_groq, chat_with_openrouter, chat_with_deepseek, chat_with_mistral, chat_with_huggingface, chat_with_vllm | |
| from App_Function_Libraries.LLM_API_Calls_Local import chat_with_aphrodite, chat_with_local_llm, chat_with_ollama, \ | |
| chat_with_kobold, chat_with_llama, chat_with_oobabooga, chat_with_tabbyapi | |
| from App_Function_Libraries.SQLite_DB import load_media_content | |
| from App_Function_Libraries.Utils import generate_unique_filename | |
| # | |
| # External Imports | |
| # | |
| # Local Imports | |
| # | |
| #################################################################################################### | |
| def chat(message, history, media_content, selected_parts, api_endpoint, api_key, prompt, temperature, | |
| system_message=None): | |
| try: | |
| logging.info(f"Debug - Chat Function - Message: {message}") | |
| logging.info(f"Debug - Chat Function - Media Content: {media_content}") | |
| logging.info(f"Debug - Chat Function - Selected Parts: {selected_parts}") | |
| logging.info(f"Debug - Chat Function - API Endpoint: {api_endpoint}") | |
| # logging.info(f"Debug - Chat Function - Prompt: {prompt}") | |
| # Ensure selected_parts is a list | |
| if not isinstance(selected_parts, (list, tuple)): | |
| selected_parts = [selected_parts] if selected_parts else [] | |
| # logging.debug(f"Debug - Chat Function - Selected Parts (after check): {selected_parts}") | |
| # Combine the selected parts of the media content | |
| combined_content = "\n\n".join( | |
| [f"{part.capitalize()}: {media_content.get(part, '')}" for part in selected_parts if part in media_content]) | |
| # Print first 500 chars | |
| # logging.debug(f"Debug - Chat Function - Combined Content: {combined_content[:500]}...") | |
| # Prepare the input for the API | |
| if not history: | |
| input_data = f"{combined_content}\n\nUser: {message}\n" | |
| else: | |
| input_data = f"User: {message}\n" | |
| # Print first 500 chars | |
| # logging.info(f"Debug - Chat Function - Input Data: {input_data[:500]}...") | |
| if system_message: | |
| print(f"System message: {system_message}") | |
| logging.debug(f"Debug - Chat Function - System Message: {system_message}") | |
| temperature = float(temperature) if temperature else 0.7 | |
| temp = temperature | |
| logging.debug("Debug - Chat Function - Temperature: {temperature}") | |
| logging.debug(f"Debug - Chat Function - API Key: {api_key[:10]}") | |
| logging.debug(f"Debug - Chat Function - Prompt: {prompt}") | |
| # Use the existing API request code based on the selected endpoint | |
| logging.info(f"Debug - Chat Function - API Endpoint: {api_endpoint}") | |
| if api_endpoint.lower() == 'openai': | |
| response = chat_with_openai(api_key, input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "anthropic": | |
| response = chat_with_anthropic(api_key, input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "cohere": | |
| response = chat_with_cohere(api_key, input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "groq": | |
| response = chat_with_groq(api_key, input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "openrouter": | |
| response = chat_with_openrouter(api_key, input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "deepseek": | |
| response = chat_with_deepseek(api_key, input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "mistral": | |
| response = chat_with_mistral(api_key, input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "llama.cpp": | |
| response = chat_with_llama(input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "kobold": | |
| response = chat_with_kobold(input_data, api_key, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "ooba": | |
| response = chat_with_oobabooga(input_data, api_key, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "tabbyapi": | |
| response = chat_with_tabbyapi(input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "vllm": | |
| response = chat_with_vllm(input_data, prompt, system_message) | |
| elif api_endpoint.lower() == "local-llm": | |
| response = chat_with_local_llm(input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "huggingface": | |
| response = chat_with_huggingface(api_key, input_data, prompt, temp) # , system_message) | |
| elif api_endpoint.lower() == "ollama": | |
| response = chat_with_ollama(input_data, prompt, temp, system_message) | |
| elif api_endpoint.lower() == "aphrodite": | |
| response = chat_with_aphrodite(input_data, prompt, temp, system_message) | |
| else: | |
| raise ValueError(f"Unsupported API endpoint: {api_endpoint}") | |
| return response | |
| except Exception as e: | |
| logging.error(f"Error in chat function: {str(e)}") | |
| return f"An error occurred: {str(e)}" | |
| def save_chat_history_to_db_wrapper(chatbot, conversation_id, media_content): | |
| logging.info(f"Attempting to save chat history. Media content type: {type(media_content)}") | |
| try: | |
| # Extract the media_id and media_name from the media_content | |
| media_id = None | |
| media_name = None | |
| if isinstance(media_content, dict): | |
| logging.debug(f"Media content keys: {media_content.keys()}") | |
| if 'content' in media_content: | |
| try: | |
| content = media_content['content'] | |
| if isinstance(content, str): | |
| content_json = json.loads(content) | |
| elif isinstance(content, dict): | |
| content_json = content | |
| else: | |
| raise ValueError(f"Unexpected content type: {type(content)}") | |
| # Use the webpage_url as the media_id | |
| media_id = content_json.get('webpage_url') | |
| # Use the title as the media_name | |
| media_name = content_json.get('title') | |
| logging.info(f"Extracted media_id: {media_id}, media_name: {media_name}") | |
| except json.JSONDecodeError: | |
| logging.error("Failed to decode JSON from media_content['content']") | |
| except Exception as e: | |
| logging.error(f"Error processing media_content: {str(e)}") | |
| else: | |
| logging.warning("'content' key not found in media_content") | |
| else: | |
| logging.warning(f"media_content is not a dictionary. Type: {type(media_content)}") | |
| if media_id is None: | |
| # If we couldn't find a media_id, we'll use a placeholder | |
| media_id = "unknown_media" | |
| logging.warning(f"Unable to extract media_id from media_content. Using placeholder: {media_id}") | |
| if media_name is None: | |
| media_name = "Unnamed Media" | |
| logging.warning(f"Unable to extract media_name from media_content. Using placeholder: {media_name}") | |
| # Generate a unique conversation name using media_id and current timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| conversation_name = f"Chat_{media_id}_{timestamp}" | |
| new_conversation_id = save_chat_history_to_database(chatbot, conversation_id, media_id, media_name, | |
| conversation_name) | |
| return new_conversation_id, f"Chat history saved successfully as {conversation_name}!" | |
| except Exception as e: | |
| error_message = f"Failed to save chat history: {str(e)}" | |
| logging.error(error_message, exc_info=True) | |
| return conversation_id, error_message | |
| def save_chat_history(history, conversation_id, media_content): | |
| try: | |
| content, conversation_name = generate_chat_history_content(history, conversation_id, media_content) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_conversation_name = re.sub(r'[^a-zA-Z0-9_-]', '_', conversation_name) | |
| base_filename = f"{safe_conversation_name}_{timestamp}.json" | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file: | |
| temp_file.write(content) | |
| temp_file_path = temp_file.name | |
| # Generate a unique filename | |
| unique_filename = generate_unique_filename(os.path.dirname(temp_file_path), base_filename) | |
| final_path = os.path.join(os.path.dirname(temp_file_path), unique_filename) | |
| # Rename the temporary file to the unique filename | |
| os.rename(temp_file_path, final_path) | |
| return final_path | |
| except Exception as e: | |
| logging.error(f"Error saving chat history: {str(e)}") | |
| return None | |
| def generate_chat_history_content(history, conversation_id, media_content): | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| conversation_name = get_conversation_name(conversation_id) | |
| if not conversation_name: | |
| media_name = extract_media_name(media_content) | |
| if media_name: | |
| conversation_name = f"{media_name}-chat" | |
| else: | |
| conversation_name = f"chat-{timestamp}" # Fallback name | |
| chat_data = { | |
| "conversation_id": conversation_id, | |
| "conversation_name": conversation_name, | |
| "timestamp": timestamp, | |
| "history": [ | |
| { | |
| "role": "user" if i % 2 == 0 else "bot", | |
| "content": msg[0] if isinstance(msg, tuple) else msg | |
| } | |
| for i, msg in enumerate(history) | |
| ] | |
| } | |
| return json.dumps(chat_data, indent=2), conversation_name | |
| def extract_media_name(media_content): | |
| if isinstance(media_content, dict): | |
| content = media_content.get('content', {}) | |
| if isinstance(content, str): | |
| try: | |
| content = json.loads(content) | |
| except json.JSONDecodeError: | |
| logging.warning("Failed to parse media_content JSON string") | |
| return None | |
| # Try to extract title from the content | |
| if isinstance(content, dict): | |
| return content.get('title') or content.get('name') | |
| logging.warning(f"Unexpected media_content format: {type(media_content)}") | |
| return None | |
| def update_chat_content(selected_item, use_content, use_summary, use_prompt, item_mapping): | |
| logging.debug(f"Debug - Update Chat Content - Selected Item: {selected_item}\n") | |
| logging.debug(f"Debug - Update Chat Content - Use Content: {use_content}\n\n\n\n") | |
| logging.debug(f"Debug - Update Chat Content - Use Summary: {use_summary}\n\n") | |
| logging.debug(f"Debug - Update Chat Content - Use Prompt: {use_prompt}\n\n") | |
| logging.debug(f"Debug - Update Chat Content - Item Mapping: {item_mapping}\n\n") | |
| if selected_item and selected_item in item_mapping: | |
| media_id = item_mapping[selected_item] | |
| content = load_media_content(media_id) | |
| selected_parts = [] | |
| if use_content and "content" in content: | |
| selected_parts.append("content") | |
| if use_summary and "summary" in content: | |
| selected_parts.append("summary") | |
| if use_prompt and "prompt" in content: | |
| selected_parts.append("prompt") | |
| # Modified debug print | |
| if isinstance(content, dict): | |
| print(f"Debug - Update Chat Content - Content keys: {list(content.keys())}") | |
| for key, value in content.items(): | |
| print(f"Debug - Update Chat Content - {key} (first 500 char): {str(value)[:500]}\n\n\n\n") | |
| else: | |
| print(f"Debug - Update Chat Content - Content(first 500 char): {str(content)[:500]}\n\n\n\n") | |
| print(f"Debug - Update Chat Content - Selected Parts: {selected_parts}") | |
| return content, selected_parts | |
| else: | |
| print(f"Debug - Update Chat Content - No item selected or item not in mapping") | |
| return {}, [] | |
| # | |
| # End of Chat.py | |
| ########################################################################################################################## |