Mahdiyar
Add initial implementation of TinyCodeAgent with Python code execution capabilities
ebb6a31
| import asyncio | |
| from tinyagent import tool | |
| from textwrap import dedent | |
| from typing import Optional, List, Dict, Any,Union | |
| from tinyagent.hooks.logging_manager import LoggingManager | |
| import modal | |
| import cloudpickle | |
| def clean_response(resp): | |
| return {k:v for k,v in resp.items() if k in ['printed_output','return_value','stderr','error_traceback']} | |
| def make_session_blob(ns: dict) -> bytes: | |
| clean = {} | |
| for name, val in ns.items(): | |
| try: | |
| # Try serializing just this one object | |
| cloudpickle.dumps(val) | |
| except Exception: | |
| # drop anything that fails | |
| continue | |
| else: | |
| clean[name] = val | |
| return cloudpickle.dumps(clean) | |
| def _run_python(code: str,globals_dict:Dict[str,Any]={},locals_dict:Dict[str,Any]={}): | |
| import contextlib | |
| import traceback | |
| import io | |
| import ast | |
| # Make copies to avoid mutating the original parameters | |
| updated_globals = globals_dict.copy() | |
| updated_locals = locals_dict.copy() | |
| tree = ast.parse(code, mode="exec") | |
| compiled = compile(tree, filename="<ast>", mode="exec") | |
| stdout_buf = io.StringIO() | |
| stderr_buf = io.StringIO() | |
| # --- 4. Execute with stdout+stderr capture and exception handling --- | |
| error_traceback = None | |
| output = None | |
| with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf): | |
| try: | |
| output = exec(code, updated_globals, updated_locals) | |
| except Exception: | |
| # Capture the full traceback as a string | |
| error_traceback = traceback.format_exc() | |
| printed_output = stdout_buf.getvalue() | |
| stderr_output = stderr_buf.getvalue() | |
| error_traceback_output = error_traceback | |
| return { | |
| "printed_output": printed_output, | |
| "return_value": output, | |
| "stderr": stderr_output, | |
| "error_traceback": error_traceback_output, | |
| "updated_globals": updated_globals, | |
| "updated_locals": updated_locals | |
| } | |
| class PythonCodeInterpreter: | |
| executed_default_codes = False | |
| PYTHON_VERSION = "3.11" | |
| app = None | |
| _app_run_python = None | |
| _globals_dict = {} | |
| _locals_dict = {} | |
| def __init__(self,log_manager: LoggingManager, | |
| default_python_codes:Optional[List[str]]=[], | |
| code_tools:List[Dict[str,Any]]=[], | |
| pip_packages:List[str]=[], | |
| modal_secrets:Dict[str,Union[str,None]]={}, | |
| lazy_init:bool=True, | |
| **kwargs): | |
| self.log_manager = log_manager | |
| self.code_tools = code_tools | |
| self._globals_dict.update(**kwargs.get("globals_dict",{})) | |
| self._locals_dict.update(**kwargs.get("locals_dict",{})) | |
| self.default_python_codes = default_python_codes | |
| self.modal_secrets = modal.Secret.from_dict(modal_secrets) | |
| self.pip_packages = list(set(["cloudpickle"]+pip_packages)) | |
| self.lazy_init = lazy_init | |
| self.create_app(self.modal_secrets,self.pip_packages,self.code_tools) | |
| if not self.lazy_init: | |
| self.sandbox = self.create_sandbox(modal_secrets,pip_packages,self.code_tools) | |
| def create_app(self,modal_secrets:Dict[str,Union[str,None]],pip_packages:List[str]=[],code_tools:List[Dict[str,Any]]=[]): | |
| agent_image = modal.Image.debian_slim(python_version=self.PYTHON_VERSION).pip_install( | |
| "tinyagent-py[all]==0.0.6", | |
| "gradio", | |
| "arize-phoenix-otel", | |
| *pip_packages | |
| ) | |
| self.app = modal.App( | |
| name=self.sandbox_name, | |
| image=agent_image, | |
| secrets=[modal_secrets] | |
| ) | |
| self._app_run_python = self.app.function()(_run_python) | |
| self.add_tools(code_tools) | |
| return self.app | |
| def add_tools(self,tools): | |
| tools_str_list = ["import cloudpickle"] | |
| tools_str_list.append("###########<tools>###########\n") | |
| for tool in tools: | |
| tools_str_list.append(f"globals()['{tool._tool_metadata['name']}'] = cloudpickle.loads( {cloudpickle.dumps(tool)})") | |
| tools_str_list.append("\n\n") | |
| tools_str_list.append("###########</tools>###########\n") | |
| tools_str_list.append("\n\n") | |
| self.default_python_codes.extend(tools_str_list) | |
| def _python_executor(self,code: str,globals_dict:Dict[str,Any]={},locals_dict:Dict[str,Any]={}): | |
| with self.app.run(): | |
| if self.executed_default_codes: | |
| print("✔️ default codes already executed") | |
| full_code = code | |
| else: | |
| full_code = "\n".join(self.default_python_codes)+ "\n\n"+(code) | |
| self.executed_default_codes = True | |
| return self._app_run_python.remote(full_code,globals_dict,locals_dict) | |
| async def run_python(self,code_lines:list[str],timeout:int=120) -> str: | |
| if type(code_lines) == str: | |
| code_lines = [code_lines] | |
| code = code_lines | |
| full_code = "\n".join(code) | |
| print("##"*50) | |
| print("#########################code#########################") | |
| print(full_code) | |
| print("##"*50) | |
| response = self._python_executor(full_code,self._globals_dict,self._locals_dict) | |
| print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!<response>!!!!!!!!!!!!!!!!!!!!!!!!!") | |
| # Update the instance globals and locals with the execution results | |
| self._globals_dict = cloudpickle.loads(make_session_blob(response["updated_globals"])) | |
| self._locals_dict = cloudpickle.loads(make_session_blob(response["updated_locals"])) | |
| print("#########################<printed_output>#########################") | |
| print(response["printed_output"]) | |
| print("#########################</printed_output>#########################") | |
| print("#########################<return_value>#########################") | |
| print(response["return_value"]) | |
| print("#########################</return_value>#########################") | |
| print("#########################<stderr>#########################") | |
| print(response["stderr"]) | |
| print("#########################</stderr>#########################") | |
| print("#########################<traceback>#########################") | |
| print(response["error_traceback"]) | |
| print("#########################</traceback>#########################") | |
| return clean_response(response) | |
| weather_global = '-' | |
| traffic_global = '-' | |
| def get_weather(city: str)->str: | |
| """Get the weather for a given city. | |
| Args: | |
| city: The city to get the weather for | |
| Returns: | |
| The weather for the given city | |
| """ | |
| import random | |
| global weather_global | |
| output = f"Last time weather was checked was {weather_global}" | |
| weather_global = random.choice(['sunny','cloudy','rainy','snowy']) | |
| output += f"\n\nThe weather in {city} is now {weather_global}" | |
| return output | |
| def get_traffic(city: str)->str: | |
| """Get the traffic for a given city. | |
| Args: | |
| city: The city to get the traffic for | |
| Returns: | |
| The traffic for the given city | |
| """ | |
| import random | |
| global traffic_global | |
| output = f"Last time traffic was checked was {traffic_global}" | |
| traffic_global = random.choice(['light','moderate','heavy','blocked']) | |
| output += f"\n\nThe traffic in {city} is now {traffic_global}" | |
| return output | |
| async def run_example(): | |
| """Example usage of GradioCallback with TinyAgent.""" | |
| import os | |
| import sys | |
| import tempfile | |
| import shutil | |
| import asyncio | |
| from tinyagent import TinyAgent # Assuming TinyAgent is importable | |
| from tinyagent.hooks.logging_manager import LoggingManager # Assuming LoggingManager exists | |
| from tinyagent.hooks.gradio_callback import GradioCallback | |
| import logging | |
| # --- Logging Setup (Simplified) --- | |
| log_manager = LoggingManager(default_level=logging.INFO) | |
| log_manager.set_levels({ | |
| 'tinyagent.hooks.gradio_callback': logging.DEBUG, | |
| 'tinyagent.tiny_agent': logging.DEBUG, | |
| 'tinyagent.mcp_client': logging.DEBUG, | |
| }) | |
| console_handler = logging.StreamHandler(sys.stdout) | |
| log_manager.configure_handler( | |
| console_handler, | |
| format_string='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| level=logging.DEBUG | |
| ) | |
| ui_logger = log_manager.get_logger('tinyagent.hooks.gradio_callback') | |
| agent_logger = log_manager.get_logger('tinyagent.tiny_agent') | |
| ui_logger.info("--- Starting GradioCallback Example ---") | |
| # --- End Logging Setup --- | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| ui_logger.error("OPENAI_API_KEY environment variable not set.") | |
| return | |
| # Create a temporary folder for file uploads | |
| upload_folder = tempfile.mkdtemp(prefix="gradio_uploads_") | |
| ui_logger.info(f"Created temporary upload folder: {upload_folder}") | |
| # Ensure we're using a single event loop for everything | |
| loop = asyncio.get_event_loop() | |
| ui_logger.debug(f"Using event loop: {loop}") | |
| # Initialize the agent | |
| from helper import translate_tool_for_code_agent,load_template,render_system_prompt,prompt_code_example | |
| tools = [get_weather,get_traffic] | |
| tools_meta_data = {} | |
| for tool in tools: | |
| metadata = translate_tool_for_code_agent(tool) | |
| tools_meta_data[metadata["name"]] = metadata | |
| template_str = load_template("./prompts/code_agent.yaml") | |
| system_prompt = render_system_prompt(template_str, tools_meta_data, {}, ["tinyagent","gradio","requests","asyncio"]) + prompt_code_example | |
| agent = TinyAgent(model="gpt-4.1-mini", api_key=api_key, | |
| logger=agent_logger, | |
| system_prompt=system_prompt) | |
| python_interpreter = PythonCodeInterpreter(log_manager=log_manager,code_tools=tools) | |
| agent.add_tool(python_interpreter.run_python) | |
| # Create the Gradio callback | |
| gradio_ui = GradioCallback( | |
| file_upload_folder=upload_folder, | |
| show_thinking=True, | |
| show_tool_calls=True, | |
| logger=ui_logger # Pass the specific logger | |
| ) | |
| agent.add_callback(gradio_ui) | |
| # Connect to MCP servers | |
| try: | |
| ui_logger.info("Connecting to MCP servers...") | |
| # Use standard MCP servers as per contribution guide | |
| #await agent.connect_to_server("npx",["-y","@openbnb/mcp-server-airbnb","--ignore-robots-txt"]) | |
| await agent.connect_to_server("npx", ["-y", "@modelcontextprotocol/server-sequential-thinking"]) | |
| ui_logger.info("Connected to MCP servers.") | |
| except Exception as e: | |
| ui_logger.error(f"Failed to connect to MCP servers: {e}", exc_info=True) | |
| # Continue without servers - we still have the local get_weather tool | |
| # Create the Gradio app but don't launch it yet | |
| #app = gradio_ui.create_app( | |
| # agent, | |
| # title="TinyAgent Chat Interface", | |
| # description="Chat with TinyAgent. Try asking: 'Plan a trip to Toronto for 7 days in the next month.'", | |
| #) | |
| # Configure the queue without extra parameters | |
| #app.queue() | |
| # Launch the app in a way that doesn't block our event loop | |
| ui_logger.info("Launching Gradio interface...") | |
| try: | |
| # Launch without blocking | |
| #app.launch( | |
| # share=False, | |
| # prevent_thread_lock=True, # Critical to not block our event loop | |
| # show_error=True | |
| #) | |
| gradio_ui.launch( | |
| agent, | |
| title="TinyCodeAgent Chat Interface", | |
| description="Chat with TinyAgent. Try asking: 'I need to know the weather and traffic in Toronto, Montreal, New York, Paris and San Francisco.'", | |
| share=False, | |
| prevent_thread_lock=True, # Critical to not block our event loop | |
| show_error=True | |
| ) | |
| ui_logger.info("Gradio interface launched (non-blocking).") | |
| # Keep the main event loop running to handle both Gradio and MCP operations | |
| # This is the key part - we need to keep our main event loop running | |
| # but also allow it to process both Gradio and MCP client operations | |
| while True: | |
| await asyncio.sleep(1) # More efficient than an Event().wait() | |
| except KeyboardInterrupt: | |
| ui_logger.info("Received keyboard interrupt, shutting down...") | |
| except Exception as e: | |
| ui_logger.error(f"Failed to launch or run Gradio app: {e}", exc_info=True) | |
| finally: | |
| # Clean up | |
| ui_logger.info("Cleaning up resources...") | |
| if os.path.exists(upload_folder): | |
| ui_logger.info(f"Removing temporary upload folder: {upload_folder}") | |
| shutil.rmtree(upload_folder) | |
| await agent.close() | |
| ui_logger.info("--- GradioCallback Example Finished ---") | |
| if __name__ == "__main__": | |
| asyncio.run(run_example()) |