karthikeya1212 commited on
Commit
756c289
Β·
verified Β·
1 Parent(s): c5fb75b

Update services/queue_manager.py

Browse files
Files changed (1) hide show
  1. services/queue_manager.py +31 -79
services/queue_manager.py CHANGED
@@ -2,13 +2,11 @@ import asyncio
2
  import uuid
3
  from typing import Dict, Any, Optional
4
  from enum import Enum
 
5
 
6
- # Import pipeline that handles all stages
7
- from pipeline.pipeline import run_pipeline # This runs story β†’ images β†’ video β†’ etc.
8
-
9
- # -------------------------------
10
- # ENUMS AND GLOBALS
11
- # -------------------------------
12
  class TaskStatus(str, Enum):
13
  PENDING = "pending"
14
  RUNNING = "running"
@@ -17,104 +15,58 @@ class TaskStatus(str, Enum):
17
  COMPLETED = "completed"
18
  FAILED = "failed"
19
 
 
 
 
20
  tasks: Dict[str, Dict[str, Any]] = {}
21
  task_queue = asyncio.Queue()
22
  pending_confirmations: Dict[str, asyncio.Event] = {}
23
 
24
- # -------------------------------
25
- # ADD TASK
26
- # -------------------------------
27
  async def add_task(idea: str) -> str:
28
  task_id = str(uuid.uuid4())
 
 
29
  tasks[task_id] = {
30
  "id": task_id,
31
  "idea": idea,
32
  "status": TaskStatus.PENDING,
33
- "result": None,
34
  "confirmation_required": False
35
  }
 
36
  await task_queue.put(task_id)
37
- print(f"🧩 Task added to queue: {task_id}")
38
- return task_id
39
-
40
- # -------------------------------
41
- # WAIT FOR SCRIPT
42
- # -------------------------------
43
- async def wait_for_script(task_id: str, script_results: dict):
44
- task = tasks.get(task_id)
45
- if not task:
46
- return
47
-
48
- task["status"] = TaskStatus.RUNNING
49
-
50
- # === STEP 1: generate script ===
51
- from core.script_gen import generate_script
52
- script_result = await generate_script(task["idea"])
53
- task["result"] = {"script": script_result}
54
- task["status"] = TaskStatus.WAITING_CONFIRMATION
55
- task["confirmation_required"] = True
56
 
57
- # Store script for server endpoint
58
- script_results[task_id] = script_result
59
-
60
- # Create event for confirmation
61
- pending_confirmations[task_id] = asyncio.Event()
62
- print(f"βœ‹ Task {task_id} waiting for confirmation. Script ready.")
63
-
64
- # Wait for user confirmation
65
- await pending_confirmations[task_id].wait()
66
-
67
- # Once confirmed, run full pipeline
68
- task["status"] = TaskStatus.CONFIRMED
69
- print(f"🎬 Task {task_id} confirmed. Running full pipeline...")
70
-
71
- # Run pipeline (story, images, video, etc.)
72
- final_result = await run_pipeline(task)
73
- task["result"].update(final_result)
74
- task["status"] = TaskStatus.COMPLETED
75
- print(f"βœ… Task {task_id} completed. All stages done.")
76
 
77
- # -------------------------------
78
- # CONFIRM TASK
79
- # -------------------------------
80
  async def confirm_task(task_id: str):
81
  task = tasks.get(task_id)
82
  if not task:
83
- return {"error": "Invalid task ID."}
84
-
85
  if task["status"] != TaskStatus.WAITING_CONFIRMATION:
86
- return {"error": "Task is not waiting for confirmation."}
87
 
88
- # Set event so the worker can continue
89
  event = pending_confirmations.get(task_id)
90
  if event:
91
  event.set()
 
92
 
93
- return {"message": f"Task {task_id} confirmed."}
94
-
95
- # -------------------------------
96
- # GET STATUS
97
- # -------------------------------
98
  def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
99
  return tasks.get(task_id)
100
 
101
- # -------------------------------
102
- # WORKER
103
- # -------------------------------
104
- async def worker():
105
- while True:
106
- task_id = await task_queue.get()
107
- task = tasks.get(task_id)
108
- if not task:
109
- task_queue.task_done()
110
- continue
111
- # Script is generated in wait_for_script, rest handled by pipeline
112
- task_queue.task_done()
113
-
114
- # -------------------------------
115
- # START WORKER
116
- # -------------------------------
117
  def start_worker():
118
- loop = asyncio.get_event_loop()
119
- loop.create_task(worker())
120
- print("βš™οΈ Worker loop started.")
 
2
  import uuid
3
  from typing import Dict, Any, Optional
4
  from enum import Enum
5
+ from pipeline.pipeline import run_pipeline
6
 
7
+ # ---------------------------
8
+ # Task status enum
9
+ # ---------------------------
 
 
 
10
  class TaskStatus(str, Enum):
11
  PENDING = "pending"
12
  RUNNING = "running"
 
15
  COMPLETED = "completed"
16
  FAILED = "failed"
17
 
18
+ # ---------------------------
19
+ # Globals
20
+ # ---------------------------
21
  tasks: Dict[str, Dict[str, Any]] = {}
22
  task_queue = asyncio.Queue()
23
  pending_confirmations: Dict[str, asyncio.Event] = {}
24
 
25
+ # ---------------------------
26
+ # Add task
27
+ # ---------------------------
28
  async def add_task(idea: str) -> str:
29
  task_id = str(uuid.uuid4())
30
+ confirmation_event = asyncio.Event()
31
+
32
  tasks[task_id] = {
33
  "id": task_id,
34
  "idea": idea,
35
  "status": TaskStatus.PENDING,
36
+ "result": {},
37
  "confirmation_required": False
38
  }
39
+ pending_confirmations[task_id] = confirmation_event
40
  await task_queue.put(task_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
+ # Start the pipeline immediately in background
43
+ asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event))
44
+ print(f"🧩 Task added and pipeline started: {task_id}")
45
+ return task_id
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
 
47
+ # ---------------------------
48
+ # Confirm task
49
+ # ---------------------------
50
  async def confirm_task(task_id: str):
51
  task = tasks.get(task_id)
52
  if not task:
53
+ return {"error": "Invalid task ID"}
 
54
  if task["status"] != TaskStatus.WAITING_CONFIRMATION:
55
+ return {"error": "Task is not waiting for confirmation"}
56
 
 
57
  event = pending_confirmations.get(task_id)
58
  if event:
59
  event.set()
60
+ return {"message": f"Task {task_id} confirmed"}
61
 
62
+ # ---------------------------
63
+ # Get task status
64
+ # ---------------------------
 
 
65
  def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
66
  return tasks.get(task_id)
67
 
68
+ # ---------------------------
69
+ # Worker (optional)
70
+ # ---------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  def start_worker():
72
+ print("βš™οΈ Worker loop not required, pipeline runs per task")