| import os, sys | |
| import utils | |
| import uuid | |
| import json | |
| import subprocess, threading | |
| FILE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| REPO_DIR = os.path.dirname(FILE_DIR) | |
| sys.path.append(REPO_DIR) | |
| import time | |
| def get_openai_api_key(): | |
| return os.getenv("OPENAI_API_KEY") | |
| running_apis = [] | |
| def get_state(state_file): | |
| with open(state_file, "r") as f: | |
| state = json.load(f) | |
| return state | |
| def set_state(state_file, state): | |
| with open(state_file, "w") as f: | |
| json.dump(state, f) | |
| class AutoAPI: | |
| def __init__(self, openai_key, ai_name, ai_role, top_5_goals): | |
| self.openai_key = openai_key | |
| hex = uuid.uuid4().hex | |
| print(hex) | |
| self.state_file = os.path.join(FILE_DIR, "state", f"state_{hex}.json") | |
| self.log_file = os.path.join(FILE_DIR, "state", f"log_{hex}.json") | |
| newline = "\n" | |
| with open(os.path.join(REPO_DIR, "ai_settings.yaml"), "w") as f: | |
| f.write( | |
| f"""ai_goals: | |
| {newline.join([f'- {goal[0]}' for goal in top_5_goals if goal[0]])} | |
| ai_name: {ai_name} | |
| ai_role: {ai_role} | |
| """ | |
| ) | |
| state = { | |
| "pending_input": None, | |
| "awaiting_input": False, | |
| "messages": [], | |
| "last_message_read_index": -1, | |
| } | |
| set_state(self.state_file, state) | |
| with open(self.log_file, "w") as f: | |
| subprocess.Popen( | |
| [ | |
| "python", | |
| os.path.join(REPO_DIR, "ui", "api.py"), | |
| openai_key, | |
| self.state_file, | |
| ], | |
| cwd=REPO_DIR, | |
| stdout=f, | |
| stderr=f, | |
| ) | |
| def send_message(self, message="Y"): | |
| state = get_state(self.state_file) | |
| state["pending_input"] = message | |
| state["awaiting_input"] = False | |
| set_state(self.state_file, state) | |
| def get_chatbot_response(self): | |
| while True: | |
| state = get_state(self.state_file) | |
| if ( | |
| state["awaiting_input"] | |
| and state["last_message_read_index"] >= len(state["messages"]) - 1 | |
| ): | |
| break | |
| if state["last_message_read_index"] >= len(state["messages"]) - 1: | |
| time.sleep(1) | |
| else: | |
| state["last_message_read_index"] += 1 | |
| title, content = state["messages"][state["last_message_read_index"]] | |
| yield (f"**{title.strip()}** " if title else "") + utils.remove_color( | |
| content | |
| ).replace("\n", "<br />") | |
| set_state(self.state_file, state) | |
| if __name__ == "__main__": | |
| print(sys.argv) | |
| _, openai_key, state_file = sys.argv | |
| os.environ["OPENAI_API_KEY"] = openai_key | |
| import autogpt.config.config | |
| from autogpt.logs import logger | |
| from autogpt.cli import main | |
| import autogpt.utils | |
| from autogpt.spinner import Spinner | |
| def add_message(title, content): | |
| state = get_state(state_file) | |
| state["messages"].append((title, content)) | |
| set_state(state_file, state) | |
| def typewriter_log(title="", title_color="", content="", *args, **kwargs): | |
| add_message(title, content) | |
| def warn(message, title="", *args, **kwargs): | |
| add_message(title, message) | |
| def error(title, message="", *args, **kwargs): | |
| add_message(title, message) | |
| def clean_input(prompt=""): | |
| add_message(None, prompt) | |
| state = get_state(state_file) | |
| state["awaiting_input"] = True | |
| set_state(state_file, state) | |
| while state["pending_input"] is None: | |
| state = get_state(state_file) | |
| print("Waiting for input...") | |
| time.sleep(1) | |
| print("Got input") | |
| pending_input = state["pending_input"] | |
| state["pending_input"] = None | |
| set_state(state_file, state) | |
| return pending_input | |
| def spinner_start(): | |
| add_message(None, "Thinking...") | |
| logger.typewriter_log = typewriter_log | |
| logger.warn = warn | |
| logger.error = error | |
| autogpt.utils.clean_input = clean_input | |
| Spinner.spin = spinner_start | |
| sys.argv = sys.argv[:1] | |
| main() | |