karthikeya1212 commited on
Commit
ad35e87
Β·
verified Β·
1 Parent(s): efdabc5

Update services/queue_manager.py

Browse files
Files changed (1) hide show
  1. services/queue_manager.py +56 -54
services/queue_manager.py CHANGED
@@ -1,18 +1,14 @@
1
-
2
  import asyncio
3
  import uuid
4
  from typing import Dict, Any, Optional
5
  from enum import Enum
6
- from core.script_gen import generate_script
7
- from core.story_script import generate_story
8
- # from core.image_gen import generate_images
9
- # from core.video_gen import generate_video
10
- # from core.music_gen import generate_music
11
- # from core.assemble import assemble_final_video
12
-
13
- # -------------------------------------------------------------
14
  # ENUMS AND GLOBALS
15
- # -------------------------------------------------------------
16
  class TaskStatus(str, Enum):
17
  PENDING = "pending"
18
  RUNNING = "running"
@@ -23,10 +19,11 @@ class TaskStatus(str, Enum):
23
 
24
  tasks: Dict[str, Dict[str, Any]] = {}
25
  task_queue = asyncio.Queue()
 
26
 
27
- # -------------------------------------------------------------
28
- # ADD NEW TASK
29
- # -------------------------------------------------------------
30
  async def add_task(idea: str) -> str:
31
  task_id = str(uuid.uuid4())
32
  tasks[task_id] = {
@@ -40,56 +37,46 @@ async def add_task(idea: str) -> str:
40
  print(f"🧩 Task added to queue: {task_id}")
41
  return task_id
42
 
43
- # -------------------------------------------------------------
44
- # WAIT FOR SCRIPT FOR CONFIRMATION
45
- # -------------------------------------------------------------
46
  async def wait_for_script(task_id: str, script_results: dict):
47
  task = tasks.get(task_id)
48
  if not task:
49
  return
50
 
51
  task["status"] = TaskStatus.RUNNING
52
- # Generate script
 
 
53
  script_result = await generate_script(task["idea"])
54
  task["result"] = {"script": script_result}
55
  task["status"] = TaskStatus.WAITING_CONFIRMATION
56
  task["confirmation_required"] = True
57
 
58
- # Keep script accessible for server endpoint
59
  script_results[task_id] = script_result
 
 
 
60
  print(f"βœ‹ Task {task_id} waiting for confirmation. Script ready.")
61
 
62
- # -------------------------------------------------------------
63
- # GENERATE STORY AFTER CONFIRMATION
64
- # -------------------------------------------------------------
65
- async def generate_story_after_confirm(script: str):
66
- story_result = await generate_story(script)
67
- return story_result
68
 
69
- # -------------------------------------------------------------
70
- # WORKER LOOP (Optional: future stages)
71
- # -------------------------------------------------------------
72
- async def worker():
73
- while True:
74
- task_id = await task_queue.get()
75
- task = tasks.get(task_id)
76
- if not task:
77
- task_queue.task_done()
78
- continue
79
- try:
80
- # Already handled script in wait_for_script
81
- # Future stages like images/video/music can go here
82
- pass
83
- except Exception as e:
84
- task["status"] = TaskStatus.FAILED
85
- task["result"] = {"error": str(e)}
86
- print(f"❌ Task {task_id} failed with error: {e}")
87
- finally:
88
- task_queue.task_done()
89
 
90
- # -------------------------------------------------------------
91
  # CONFIRM TASK
92
- # -------------------------------------------------------------
93
  async def confirm_task(task_id: str):
94
  task = tasks.get(task_id)
95
  if not task:
@@ -98,20 +85,35 @@ async def confirm_task(task_id: str):
98
  if task["status"] != TaskStatus.WAITING_CONFIRMATION:
99
  return {"error": "Task is not waiting for confirmation."}
100
 
101
- task["status"] = TaskStatus.CONFIRMED
102
- task["confirmation_required"] = False
103
- print(f"πŸ‘ Task {task_id} confirmed. Ready for story generation...")
 
 
104
  return {"message": f"Task {task_id} confirmed."}
105
 
106
- # -------------------------------------------------------------
107
- # GET TASK STATUS
108
- # -------------------------------------------------------------
109
  def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
110
  return tasks.get(task_id)
111
 
112
- # -------------------------------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  # START WORKER
114
- # -------------------------------------------------------------
115
  def start_worker():
116
  loop = asyncio.get_event_loop()
117
  loop.create_task(worker())
 
 
1
  import asyncio
2
  import uuid
3
  from typing import Dict, Any, Optional
4
  from enum import Enum
5
+
6
+ # Import pipeline that handles all stages
7
+ from core.pipeline import run_pipeline # This runs story β†’ images β†’ video β†’ etc.
8
+
9
+ # -------------------------------
 
 
 
10
  # ENUMS AND GLOBALS
11
+ # -------------------------------
12
  class TaskStatus(str, Enum):
13
  PENDING = "pending"
14
  RUNNING = "running"
 
19
 
20
  tasks: Dict[str, Dict[str, Any]] = {}
21
  task_queue = asyncio.Queue()
22
+ pending_confirmations: Dict[str, asyncio.Event] = {}
23
 
24
+ # -------------------------------
25
+ # ADD TASK
26
+ # -------------------------------
27
  async def add_task(idea: str) -> str:
28
  task_id = str(uuid.uuid4())
29
  tasks[task_id] = {
 
37
  print(f"🧩 Task added to queue: {task_id}")
38
  return task_id
39
 
40
+ # -------------------------------
41
+ # WAIT FOR SCRIPT
42
+ # -------------------------------
43
  async def wait_for_script(task_id: str, script_results: dict):
44
  task = tasks.get(task_id)
45
  if not task:
46
  return
47
 
48
  task["status"] = TaskStatus.RUNNING
49
+
50
+ # === STEP 1: generate script ===
51
+ from core.script_gen import generate_script
52
  script_result = await generate_script(task["idea"])
53
  task["result"] = {"script": script_result}
54
  task["status"] = TaskStatus.WAITING_CONFIRMATION
55
  task["confirmation_required"] = True
56
 
57
+ # Store script for server endpoint
58
  script_results[task_id] = script_result
59
+
60
+ # Create event for confirmation
61
+ pending_confirmations[task_id] = asyncio.Event()
62
  print(f"βœ‹ Task {task_id} waiting for confirmation. Script ready.")
63
 
64
+ # Wait for user confirmation
65
+ await pending_confirmations[task_id].wait()
 
 
 
 
66
 
67
+ # Once confirmed, run full pipeline
68
+ task["status"] = TaskStatus.CONFIRMED
69
+ print(f"🎬 Task {task_id} confirmed. Running full pipeline...")
70
+
71
+ # Run pipeline (story, images, video, etc.)
72
+ final_result = await run_pipeline(task)
73
+ task["result"].update(final_result)
74
+ task["status"] = TaskStatus.COMPLETED
75
+ print(f"βœ… Task {task_id} completed. All stages done.")
 
 
 
 
 
 
 
 
 
 
 
76
 
77
+ # -------------------------------
78
  # CONFIRM TASK
79
+ # -------------------------------
80
  async def confirm_task(task_id: str):
81
  task = tasks.get(task_id)
82
  if not task:
 
85
  if task["status"] != TaskStatus.WAITING_CONFIRMATION:
86
  return {"error": "Task is not waiting for confirmation."}
87
 
88
+ # Set event so the worker can continue
89
+ event = pending_confirmations.get(task_id)
90
+ if event:
91
+ event.set()
92
+
93
  return {"message": f"Task {task_id} confirmed."}
94
 
95
+ # -------------------------------
96
+ # GET STATUS
97
+ # -------------------------------
98
  def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
99
  return tasks.get(task_id)
100
 
101
+ # -------------------------------
102
+ # WORKER
103
+ # -------------------------------
104
+ async def worker():
105
+ while True:
106
+ task_id = await task_queue.get()
107
+ task = tasks.get(task_id)
108
+ if not task:
109
+ task_queue.task_done()
110
+ continue
111
+ # Script is generated in wait_for_script, rest handled by pipeline
112
+ task_queue.task_done()
113
+
114
+ # -------------------------------
115
  # START WORKER
116
+ # -------------------------------
117
  def start_worker():
118
  loop = asyncio.get_event_loop()
119
  loop.create_task(worker())