# import asyncio # import uuid # from typing import Dict, Any, Optional # from enum import Enum # from pipeline.pipeline import run_pipeline # # --------------------------- # # Task status enum # # --------------------------- # class TaskStatus(str, Enum): # PENDING = "pending" # RUNNING = "running" # WAITING_CONFIRMATION = "waiting_for_confirmation" # CONFIRMED = "confirmed" # COMPLETED = "completed" # FAILED = "failed" # # --------------------------- # # Globals # # --------------------------- # tasks: Dict[str, Dict[str, Any]] = {} # task_queue = asyncio.Queue() # pending_confirmations: Dict[str, asyncio.Event] = {} # # --------------------------- # # Add task # # --------------------------- # async def add_task(idea: str) -> str: # task_id = str(uuid.uuid4()) # confirmation_event = asyncio.Event() # tasks[task_id] = { # "id": task_id, # "idea": idea, # "status": TaskStatus.PENDING, # "result": {}, # "confirmation_required": False # } # pending_confirmations[task_id] = confirmation_event # await task_queue.put(task_id) # # Start the pipeline immediately in background # asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event)) # print(f"🧩 Task added and pipeline started: {task_id}") # return task_id # # --------------------------- # # Confirm task # # --------------------------- # async def confirm_task(task_id: str): # task = tasks.get(task_id) # if not task: # return {"error": "Invalid task ID"} # if task["status"] != TaskStatus.WAITING_CONFIRMATION: # return {"error": "Task is not waiting for confirmation"} # event = pending_confirmations.get(task_id) # if event: # event.set() # return {"message": f"Task {task_id} confirmed"} # # --------------------------- # # Get task status # # --------------------------- # def get_task_status(task_id: str) -> Optional[Dict[str, Any]]: # return tasks.get(task_id) # # --------------------------- # # Worker (optional) # # --------------------------- # def start_worker(): # print("⚙️ Worker loop not required, pipeline runs per task") import asyncio import uuid from typing import Dict, Any, Optional from enum import Enum from pipeline.pipeline import run_pipeline # --------------------------- # Task status enum # --------------------------- class TaskStatus(str, Enum): PENDING = "pending" RUNNING = "running" WAITING_CONFIRMATION = "waiting_for_confirmation" CONFIRMED = "confirmed" COMPLETED = "completed" FAILED = "failed" # --------------------------- # Globals # --------------------------- tasks: Dict[str, Dict[str, Any]] = {} task_queue = asyncio.Queue() pending_confirmations: Dict[str, asyncio.Event] = {} # --------------------------- # Add task # --------------------------- async def add_task(idea: str) -> str: task_id = str(uuid.uuid4()) confirmation_event = asyncio.Event() tasks[task_id] = { "id": task_id, "idea": idea, "status": TaskStatus.PENDING.value, "result": {}, "confirmation_required": False } pending_confirmations[task_id] = confirmation_event await task_queue.put(task_id) # Start the pipeline immediately in background asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event)) print(f"🧩 Task added and pipeline started: {task_id}") return task_id # --------------------------- # Confirm task # --------------------------- async def confirm_task(task_id: str): task = tasks.get(task_id) if not task: return {"error": "Invalid task ID"} # compare to the enum .value (task status stored as string) if task["status"] != TaskStatus.WAITING_CONFIRMATION.value: return {"error": "Task is not waiting for confirmation"} event = pending_confirmations.get(task_id) if event: event.set() return {"message": f"Task {task_id} confirmed"} # --------------------------- # Get task status # --------------------------- def get_task_status(task_id: str) -> Optional[Dict[str, Any]]: return tasks.get(task_id) # --------------------------- # Worker (optional) # --------------------------- def start_worker(): print("⚙️ Worker loop not required, pipeline runs per task")