Spaces:
Running
Running
| import streamlit as st | |
| from together import Together | |
| import os | |
| from typing import Iterator | |
| from PIL import Image | |
| import base64 | |
| from PyPDF2 import PdfReader | |
| import json # λλ²κΉ μ© μΆκ° | |
| API_KEY = os.getenv("TOGETHER_API_KEY") | |
| if not API_KEY: | |
| raise ValueError("API key is missing! Make sure TOGETHER_API_KEY is set in the Secrets.") | |
| def get_client(): | |
| return Together(api_key=API_KEY) | |
| def process_file(file) -> str: | |
| if file is None: | |
| return "" | |
| try: | |
| if file.type == "application/pdf": | |
| text = "" | |
| pdf_reader = PdfReader(file) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| elif file.type.startswith("image/"): | |
| return base64.b64encode(file.getvalue()).decode("utf-8") | |
| else: | |
| return file.getvalue().decode('utf-8') | |
| except Exception as e: | |
| st.error(f"νμΌ μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}") | |
| return "" | |
| def format_message(role: str, content: str) -> dict: | |
| """API λ©μμ§ νμμ λ§κ² λ©μμ§λ₯Ό ν¬λ§·ν ν©λλ€.""" | |
| return { | |
| "role": role, | |
| "content": content | |
| } | |
| def get_formatted_history(messages: list) -> list: | |
| """λν νμ€ν 리λ₯Ό API νμμ λ§κ² λ³νν©λλ€.""" | |
| formatted_messages = [] | |
| for msg in messages: | |
| if isinstance(msg, dict) and "role" in msg and "content" in msg: | |
| # μν μ΄ μ¬λ°λ₯Έμ§ νμΈνκ³ μμ | |
| role = msg["role"] | |
| if role not in ["system", "user", "assistant"]: | |
| role = "user" if role == "human" else "assistant" | |
| formatted_messages.append(format_message(role, msg["content"])) | |
| return formatted_messages | |
| def generate_response( | |
| message: str, | |
| history: list, | |
| system_message: str, | |
| max_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| files=None | |
| ) -> Iterator[str]: | |
| client = get_client() | |
| try: | |
| # λ©μμ§ λ°°μ΄ μ΄κΈ°ν | |
| messages = [] | |
| # μμ€ν λ©μμ§ μΆκ° | |
| if system_message.strip(): | |
| messages.append(format_message("system", system_message)) | |
| # λν νμ€ν 리 μΆκ° | |
| formatted_history = get_formatted_history(history) | |
| messages.extend(formatted_history) | |
| # νμ¬ λ©μμ§μ νμΌ λ΄μ© μ€λΉ | |
| current_content = message | |
| if files: | |
| file_contents = [] | |
| for file in files: | |
| content = process_file(file) | |
| if content: | |
| file_contents.append(f"νμΌ λ΄μ©:\n{content}") | |
| if file_contents: | |
| current_content = current_content + "\n\n" + "\n\n".join(file_contents) | |
| # νμ¬ λ©μμ§ μΆκ° | |
| messages.append(format_message("user", current_content)) | |
| # λλ²κΉ : API μμ² λ΄μ© μΆλ ₯ | |
| st.write("API μμ² λ©μμ§:", json.dumps(messages, ensure_ascii=False, indent=2)) | |
| # API μμ² | |
| try: | |
| stream = client.chat.completions.create( | |
| model="deepseek-ai/DeepSeek-R1", | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| stream=True | |
| ) | |
| for chunk in stream: | |
| if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| if "rate limit" in str(e).lower(): | |
| yield "API νΈμΆ νλμ λλ¬νμ΅λλ€. μ μ ν λ€μ μλν΄μ£ΌμΈμ." | |
| else: | |
| st.error(f"API μ€λ₯ μμΈ: {str(e)}") | |
| yield "μ£μ‘ν©λλ€. μ μ ν λ€μ μλν΄μ£ΌμΈμ." | |
| except Exception as e: | |
| st.error(f"μ 체 μ€λ₯ μμΈ: {str(e)}") | |
| yield "μ€λ₯κ° λ°μνμ΅λλ€. μ μ ν λ€μ μλν΄μ£ΌμΈμ." | |
| def main(): | |
| st.set_page_config(page_title="DeepSeek μ±ν ", page_icon="π", layout="wide") | |
| # μΈμ μν μ΄κΈ°ν | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| st.title("DeepSeek μ±ν ") | |
| st.markdown("DeepSeek AI λͺ¨λΈκ³Ό λννμΈμ. νμν κ²½μ° νμΌμ μ λ‘λν μ μμ΅λλ€.") | |
| with st.sidebar: | |
| st.header("μ€μ ") | |
| system_message = st.text_area( | |
| "μμ€ν λ©μμ§", | |
| value="λΉμ μ κΉμ΄ μκ² μκ°νλ AIμ λλ€. λ¬Έμ λ₯Ό κΉμ΄ κ³ λ €νκ³ μ²΄κ³μ μΈ μΆλ‘ κ³Όμ μ ν΅ν΄ μ¬λ°λ₯Έ ν΄κ²°μ± μ λμΆνμΈμ. λ°λμ νκΈλ‘ λ΅λ³νμΈμ.", | |
| height=100 | |
| ) | |
| max_tokens = st.slider("μ΅λ ν ν° μ", 1, 4096, 2048) | |
| temperature = st.slider("μ¨λ", 0.0, 2.0, 0.7, 0.1) | |
| top_p = st.slider("Top-p", 0.0, 1.0, 0.7, 0.1) | |
| uploaded_file = st.file_uploader( | |
| "νμΌ μ λ‘λ (μ νμ¬ν)", | |
| type=['txt', 'py', 'md', 'pdf', 'png', 'jpg', 'jpeg'], | |
| accept_multiple_files=True | |
| ) | |
| # λ©μμ§ νμ | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| st.markdown(message["content"]) | |
| # μ±ν μ λ ₯ | |
| if prompt := st.chat_input("무μμ μκ³ μΆμΌμ κ°μ?"): | |
| # μ¬μ©μ λ©μμ§ μΆκ° | |
| user_message = format_message("user", prompt) | |
| st.session_state.messages.append(user_message) | |
| with st.chat_message("user"): | |
| st.markdown(prompt) | |
| # μ΄μμ€ν΄νΈ μλ΅ μμ± | |
| with st.chat_message("assistant"): | |
| response_placeholder = st.empty() | |
| full_response = "" | |
| # generate_response νΈμΆ | |
| for response_chunk in generate_response( | |
| prompt, | |
| st.session_state.messages, | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| uploaded_file | |
| ): | |
| full_response += response_chunk | |
| response_placeholder.markdown(full_response + "β") | |
| response_placeholder.markdown(full_response) | |
| # μλ΅ μ μ₯ | |
| assistant_message = format_message("assistant", full_response) | |
| st.session_state.messages.append(assistant_message) | |
| if __name__ == "__main__": | |
| main() |