import uuid import base64 import re from io import BytesIO from typing import Optional from codeboxapi import CodeBox # type: ignore from codeboxapi.schema import CodeBoxOutput # type: ignore from langchain.tools import StructuredTool, BaseTool from langchain.chat_models import ChatOpenAI from langchain.chat_models.base import BaseChatModel from langchain.prompts.chat import MessagesPlaceholder from langchain.agents import AgentExecutor, BaseSingleActionAgent from langchain.memory import ConversationBufferMemory from codeinterpreterapi.schema import CodeInterpreterResponse, CodeInput, File, UserRequest from codeinterpreterapi.config import settings from codeinterpreterapi.chains.functions_agent import OpenAIFunctionsAgent from codeinterpreterapi.prompts import code_interpreter_system_message from codeinterpreterapi.callbacks import CodeCallbackHandler from codeinterpreterapi.chains.modifications_check import get_file_modifications from codeinterpreterapi.chains.remove_download_link import remove_download_link class CodeInterpreterSession: def __init__( self, model=None, openai_api_key=settings.OPENAI_API_KEY, openai_proxy=settings.OPENAI_PROXY, openai_api_base=settings.OPENAI_API_BASE, verbose=settings.VERBOSE, tools: list[BaseTool] = None ) -> None: self.codebox = CodeBox() self.verbose = verbose self.tools: list[BaseTool] = self._tools(tools) self.llm: BaseChatModel = self._llm( model, openai_api_key, openai_proxy, openai_api_base) self.agent_executor: AgentExecutor = self._agent_executor() self.input_files: list[File] = [] self.output_files: list[File] = [] async def astart(self) -> None: await self.codebox.astart() def _tools(self, additional_tools: list[BaseTool] = None) -> list[BaseTool]: additional_tools = additional_tools or [] return additional_tools + [ StructuredTool( name="python", description= # TODO: variables as context to the agent # TODO: current files as context to the agent "Input a string of code to a python interpreter (jupyter kernel). " "Variables are preserved between runs. ", func=self.run_handler, coroutine=self.arun_handler, args_schema=CodeInput, ), ] def _llm(self, model: Optional[str] = None, openai_api_key: Optional[str] = None, openai_proxy: Optional[str] = None, openai_api_base: Optional[str] = None) -> BaseChatModel: print("OpenAI API Key:", openai_api_key) print("OpenAI Proxy:", openai_proxy) print("OpenAI API Base:", openai_api_base) print("OpenAI Model:", model) if model is None: model = "gpt-3.5-turbo" if openai_api_key is None: raise ValueError( "OpenAI API key missing. Set OPENAI_API_KEY env variable or pass `openai_api_key` to session." ) return ChatOpenAI( temperature=0.03, model=model, openai_api_key=openai_api_key, # openai_proxy=openai_proxy, openai_api_base=openai_api_base, max_retries=3, request_timeout=60 * 3, ) # type: ignore def _agent(self) -> BaseSingleActionAgent: return OpenAIFunctionsAgent.from_llm_and_tools( llm=self.llm, tools=self.tools, system_message=code_interpreter_system_message, extra_prompt_messages=[ MessagesPlaceholder(variable_name="memory")], ) def _agent_executor(self) -> AgentExecutor: return AgentExecutor.from_agent_and_tools( agent=self._agent(), callbacks=[CodeCallbackHandler(self)], max_iterations=9, tools=self.tools, verbose=self.verbose, memory=ConversationBufferMemory( memory_key="memory", return_messages=True), ) async def show_code(self, code: str) -> None: """Callback function to show code to the user.""" if self.verbose: print(code) def run_handler(self, code: str): raise NotImplementedError("Use arun_handler for now.") async def arun_handler(self, code: str): """Run code in container and send the output to the user""" output: CodeBoxOutput = await self.codebox.arun(code) if not isinstance(output.content, str): raise TypeError("Expected output.content to be a string.") if output.type == "image/png": filename = f"image-{uuid.uuid4()}.png" file_buffer = BytesIO(base64.b64decode(output.content)) file_buffer.name = filename self.output_files.append( File(name=filename, content=file_buffer.read())) return f"Image {filename} got send to the user." elif output.type == "error": if "ModuleNotFoundError" in output.content: if package := re.search( r"ModuleNotFoundError: No module named '(.*)'", output.content ): await self.codebox.ainstall(package.group(1)) return f"{package.group(1)} was missing but got installed now. Please try again." else: pass # TODO: preanalyze error to optimize next code generation if self.verbose: print("Error:", output.content) elif modifications := await get_file_modifications(code, self.llm): for filename in modifications: if filename in [file.name for file in self.input_files]: continue fileb = await self.codebox.adownload(filename) if not fileb.content: continue file_buffer = BytesIO(fileb.content) file_buffer.name = filename self.output_files.append( File(name=filename, content=file_buffer.read()) ) return output.content async def input_handler(self, request: UserRequest): if not request.files: return if not request.content: request.content = ( "I uploaded, just text me back and confirm that you got the file(s)." ) request.content += "\n**The user uploaded the following files: **\n" for file in request.files: self.input_files.append(file) request.content += f"[Attachment: {file.name}]\n" await self.codebox.aupload(file.name, file.content) request.content += "**File(s) are now available in the cwd. **\n" async def output_handler(self, final_response: str) -> CodeInterpreterResponse: """Embed images in the response""" for file in self.output_files: if str(file.name) in final_response: # rm ![Any](file.name) from the response final_response = re.sub( rf"\n\n!\[.*\]\(.*\)", "", final_response) if self.output_files and re.search(rf"\n\[.*\]\(.*\)", final_response): final_response = await remove_download_link(final_response, self.llm) return CodeInterpreterResponse(content=final_response, files=self.output_files) async def generate_response( self, user_msg: str, files: list[File] = [], detailed_error: bool = False, ) -> CodeInterpreterResponse: """Generate a Code Interpreter response based on the user's input.""" user_request = UserRequest(content=user_msg, files=files) try: await self.input_handler(user_request) response = await self.agent_executor.arun(input=user_request.content) return await self.output_handler(response) except Exception as e: if self.verbose: import traceback traceback.print_exc() if detailed_error: return CodeInterpreterResponse( content=f"Error in CodeInterpreterSession: {e.__class__.__name__} - {e}" ) else: return CodeInterpreterResponse( content="Sorry, something went while generating your response." "Please try again or restart the session." ) async def is_running(self) -> bool: return await self.codebox.astatus() == "running" async def astop(self) -> None: await self.codebox.astop() async def __aenter__(self) -> "CodeInterpreterSession": await self.astart() return self async def __aexit__(self, exc_type, exc_value, traceback) -> None: await self.astop()