Spaces:
Running
Running
| import streamlit as st | |
| from huggingface_hub import InferenceClient | |
| import os | |
| from typing import Iterator | |
| from PIL import Image | |
| import pytesseract | |
| from PyPDF2 import PdfReader | |
| import base64 | |
| from together import Together | |
| API_KEY = os.getenv("TOGETHER_API_KEY") | |
| if not API_KEY: | |
| raise ValueError("API key is missing! Make sure TOGETHER_API_KEY is set in the Secrets.") | |
| # Initialize the client with Together AI provider | |
| def get_client(): | |
| #return InferenceClient( | |
| # provider="together", | |
| # api_key=API_KEY | |
| #) | |
| return Together(api_key=API_KEY) # Use Together.ai's official client | |
| def process_file(file) -> str: | |
| """Process uploaded file and return its content""" | |
| if file is None: | |
| return "" | |
| try: | |
| # Handle PDF files | |
| if file.type == "application/pdf": | |
| text = "" | |
| pdf_reader = PdfReader(file) | |
| for page in pdf_reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| return text | |
| # Handle image files | |
| elif file.type.startswith("image/"): | |
| return base64.b64encode(file.getvalue()).decode("utf-8") | |
| # Handle text files | |
| else: | |
| return file.getvalue().decode('utf-8') | |
| except Exception as e: | |
| return f"Error processing file: {str(e)}" | |
| def generate_response( | |
| message: str, | |
| history: list[tuple[str, str]], | |
| system_message: str, | |
| max_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| files=None | |
| ) -> Iterator[str]: | |
| client = get_client() | |
| has_images = False | |
| content_blocks = [] | |
| image_content = None # To store image data | |
| image_mime_type = None # To store MIME type | |
| if files: | |
| for file in files: | |
| content = process_file(file) | |
| if file.type.startswith("image/"): | |
| has_images = True | |
| image_content = content # Already base64 encoded | |
| image_mime_type = file.type # Store MIME type | |
| else: | |
| content_blocks.append({ | |
| "type": "text", | |
| "text": f"File content:\n{content}" | |
| }) | |
| # Build messages | |
| messages = [{"role": "system", "content": system_message}] | |
| # Add history | |
| for user_msg, assistant_msg in history: | |
| messages.append({"role": "user", "content": user_msg}) | |
| messages.append({"role": "assistant", "content": assistant_msg}) | |
| try: | |
| if has_images: | |
| # Vision model request | |
| vision_messages = [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": message}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:{image_mime_type};base64,{image_content}", | |
| }, | |
| }, | |
| ] | |
| }] | |
| stream = client.chat.completions.create( | |
| model="meta-llama/Llama-3.2-11B-Vision-Instruct-Turbo", | |
| messages=vision_messages, | |
| stream=True, | |
| ) | |
| else: | |
| # Text-only model request | |
| current_message = { | |
| "role": "user", | |
| "content": [{"type": "text", "text": message}] + content_blocks | |
| } | |
| messages.append(current_message) | |
| stream = client.chat.completions.create( | |
| model="deepseek-ai/DeepSeek-R1", | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| stream=True | |
| ) | |
| # Stream response | |
| for chunk in stream: | |
| if chunk.choices and chunk.choices[0].delta.content: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| yield f"Error: {str(e)}" | |
| def main(): | |
| st.set_page_config(page_title="DeepSeek Chat", page_icon="💭", layout="wide") | |
| # Initialize session state for chat history | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| st.title("DeepSeek Chat with File Upload") | |
| st.markdown("Chat with DeepSeek AI model. You can optionally upload files for the model to analyze.") | |
| # Sidebar for parameters | |
| with st.sidebar: | |
| st.header("Settings") | |
| system_message = st.text_area( | |
| "System Message", | |
| value="You are a friendly Chatbot.", | |
| height=100 | |
| ) | |
| max_tokens = st.slider( | |
| "Max Tokens", | |
| min_value=1, | |
| max_value=8192, | |
| value=8192, | |
| step=1 | |
| ) | |
| temperature = st.slider( | |
| "Temperature", | |
| min_value=0.1, | |
| max_value=4.0, | |
| value=0.0, | |
| step=0.1 | |
| ) | |
| top_p = st.slider( | |
| "Top-p (nucleus sampling)", | |
| min_value=0.1, | |
| max_value=1.0, | |
| value=0.95, | |
| step=0.05 | |
| ) | |
| uploaded_file = st.file_uploader( | |
| "Upload File (optional)", | |
| type=['txt', 'py', 'md', 'swift', 'java', 'js', 'ts', 'rb', 'go', | |
| 'php', 'c', 'cpp', 'h', 'hpp', 'cs', 'html', 'css', 'kt', 'svelte', | |
| 'pdf', 'png', 'jpg', 'jpeg'], # Added file types | |
| accept_multiple_files=True | |
| ) | |
| # Display chat messages | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| st.write(message["content"]) | |
| # Chat input | |
| if prompt := st.chat_input("What would you like to know?"): | |
| # Display user message | |
| st.session_state.messages.append({"role": "user", "content": prompt}) | |
| with st.chat_message("user"): | |
| st.write(prompt) | |
| # Generate and display assistant response | |
| with st.chat_message("assistant"): | |
| response_placeholder = st.empty() | |
| full_response = "" | |
| # Get message history for context | |
| history = [(msg["content"], next_msg["content"]) | |
| for msg, next_msg in zip(st.session_state.messages[::2], st.session_state.messages[1::2])] | |
| # Stream the response | |
| for response_chunk in generate_response( | |
| prompt, | |
| history, | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| uploaded_file | |
| ): | |
| full_response += response_chunk | |
| print(full_response) | |
| response_placeholder.markdown(full_response + "▌") | |
| response_placeholder.markdown(full_response) | |
| # Add assistant response to chat history | |
| st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
| if __name__ == "__main__": | |
| main() |