karthikeya1212 commited on
Commit
22ef263
·
verified ·
1 Parent(s): 82f04d1

Update services/queue_manager.py

Browse files
Files changed (1) hide show
  1. services/queue_manager.py +80 -3
services/queue_manager.py CHANGED
@@ -1,3 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import asyncio
2
  import uuid
3
  from typing import Dict, Any, Optional
@@ -32,7 +108,7 @@ async def add_task(idea: str) -> str:
32
  tasks[task_id] = {
33
  "id": task_id,
34
  "idea": idea,
35
- "status": TaskStatus.PENDING,
36
  "result": {},
37
  "confirmation_required": False
38
  }
@@ -51,7 +127,8 @@ async def confirm_task(task_id: str):
51
  task = tasks.get(task_id)
52
  if not task:
53
  return {"error": "Invalid task ID"}
54
- if task["status"] != TaskStatus.WAITING_CONFIRMATION:
 
55
  return {"error": "Task is not waiting for confirmation"}
56
 
57
  event = pending_confirmations.get(task_id)
@@ -69,4 +146,4 @@ def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
69
  # Worker (optional)
70
  # ---------------------------
71
  def start_worker():
72
- print("⚙️ Worker loop not required, pipeline runs per task")
 
1
+ # import asyncio
2
+ # import uuid
3
+ # from typing import Dict, Any, Optional
4
+ # from enum import Enum
5
+ # from pipeline.pipeline import run_pipeline
6
+
7
+ # # ---------------------------
8
+ # # Task status enum
9
+ # # ---------------------------
10
+ # class TaskStatus(str, Enum):
11
+ # PENDING = "pending"
12
+ # RUNNING = "running"
13
+ # WAITING_CONFIRMATION = "waiting_for_confirmation"
14
+ # CONFIRMED = "confirmed"
15
+ # COMPLETED = "completed"
16
+ # FAILED = "failed"
17
+
18
+ # # ---------------------------
19
+ # # Globals
20
+ # # ---------------------------
21
+ # tasks: Dict[str, Dict[str, Any]] = {}
22
+ # task_queue = asyncio.Queue()
23
+ # pending_confirmations: Dict[str, asyncio.Event] = {}
24
+
25
+ # # ---------------------------
26
+ # # Add task
27
+ # # ---------------------------
28
+ # async def add_task(idea: str) -> str:
29
+ # task_id = str(uuid.uuid4())
30
+ # confirmation_event = asyncio.Event()
31
+
32
+ # tasks[task_id] = {
33
+ # "id": task_id,
34
+ # "idea": idea,
35
+ # "status": TaskStatus.PENDING,
36
+ # "result": {},
37
+ # "confirmation_required": False
38
+ # }
39
+ # pending_confirmations[task_id] = confirmation_event
40
+ # await task_queue.put(task_id)
41
+
42
+ # # Start the pipeline immediately in background
43
+ # asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event))
44
+ # print(f"🧩 Task added and pipeline started: {task_id}")
45
+ # return task_id
46
+
47
+ # # ---------------------------
48
+ # # Confirm task
49
+ # # ---------------------------
50
+ # async def confirm_task(task_id: str):
51
+ # task = tasks.get(task_id)
52
+ # if not task:
53
+ # return {"error": "Invalid task ID"}
54
+ # if task["status"] != TaskStatus.WAITING_CONFIRMATION:
55
+ # return {"error": "Task is not waiting for confirmation"}
56
+
57
+ # event = pending_confirmations.get(task_id)
58
+ # if event:
59
+ # event.set()
60
+ # return {"message": f"Task {task_id} confirmed"}
61
+
62
+ # # ---------------------------
63
+ # # Get task status
64
+ # # ---------------------------
65
+ # def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
66
+ # return tasks.get(task_id)
67
+
68
+ # # ---------------------------
69
+ # # Worker (optional)
70
+ # # ---------------------------
71
+ # def start_worker():
72
+ # print("⚙️ Worker loop not required, pipeline runs per task")
73
+
74
+
75
+
76
+
77
  import asyncio
78
  import uuid
79
  from typing import Dict, Any, Optional
 
108
  tasks[task_id] = {
109
  "id": task_id,
110
  "idea": idea,
111
+ "status": TaskStatus.PENDING.value,
112
  "result": {},
113
  "confirmation_required": False
114
  }
 
127
  task = tasks.get(task_id)
128
  if not task:
129
  return {"error": "Invalid task ID"}
130
+ # compare to the enum .value (task status stored as string)
131
+ if task["status"] != TaskStatus.WAITING_CONFIRMATION.value:
132
  return {"error": "Task is not waiting for confirmation"}
133
 
134
  event = pending_confirmations.get(task_id)
 
146
  # Worker (optional)
147
  # ---------------------------
148
  def start_worker():
149
+ print("⚙️ Worker loop not required, pipeline runs per task")