# import logging # from fastapi import FastAPI, HTTPException # from fastapi.middleware.cors import CORSMiddleware # from pydantic import BaseModel # from services import queue_manager # import os # from pathlib import Path # # CACHE PATCH BLOCK: place FIRST in pipeline.py! # HF_CACHE_DIR = Path("/tmp/hf_cache") # HF_CACHE_DIR.mkdir(parents=True, exist_ok=True) # os.environ.update({ # "HF_HOME": str(HF_CACHE_DIR), # "HF_HUB_CACHE": str(HF_CACHE_DIR), # "DIFFUSERS_CACHE": str(HF_CACHE_DIR), # "TRANSFORMERS_CACHE": str(HF_CACHE_DIR), # "XDG_CACHE_HOME": str(HF_CACHE_DIR), # "HF_DATASETS_CACHE": str(HF_CACHE_DIR), # "HF_MODULES_CACHE": str(HF_CACHE_DIR), # "TMPDIR": str(HF_CACHE_DIR), # "CACHE_DIR": str(HF_CACHE_DIR), # "TORCH_HOME": str(HF_CACHE_DIR), # "HOME": str(HF_CACHE_DIR) # }) # import os.path # if not hasattr(os.path, "expanduser_original"): # os.path.expanduser_original = os.path.expanduser # def safe_expanduser(path): # if ( # path.startswith("~") or # path.startswith("/.cache") or # path.startswith("/root/.cache") # ): # return str(HF_CACHE_DIR) # return os.path.expanduser_original(path) # os.path.expanduser = safe_expanduser # logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") # app = FastAPI(title="AI ADD Generator", version="1.0") # app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # allow_credentials=True, # allow_methods=["*"], # allow_headers=["*"], # ) # # --------------------------- # # Pydantic models # # --------------------------- # class IdeaRequest(BaseModel): # idea: str # class ConfirmationRequest(BaseModel): # task_id: str # confirm: bool # # --------------------------- # # API endpoints # # --------------------------- # @app.post("/submit_idea") # async def submit_idea(request: IdeaRequest): # task_id = await queue_manager.add_task(request.idea) # return {"status": "submitted", "task_id": task_id} # @app.post("/confirm") # async def confirm_task(request: ConfirmationRequest): # task = queue_manager.get_task_status(request.task_id) # if not task: # raise HTTPException(status_code=404, detail="Task not found") # if task["status"] != queue_manager.TaskStatus.WAITING_CONFIRMATION: # raise HTTPException(status_code=400, detail="Task not waiting for confirmation") # await queue_manager.confirm_task(request.task_id) # return {"status": "confirmed", "task": task} # @app.get("/status/{task_id}") # async def status(task_id: str): # task = queue_manager.get_task_status(task_id) # if not task: # raise HTTPException(status_code=404, detail="Task not found") # return task # @app.get("/") # async def health(): # return {"status": "running"} # import logging # from fastapi import FastAPI, HTTPException # from fastapi.middleware.cors import CORSMiddleware # from pydantic import BaseModel # from services import queue_manager # import os # from pathlib import Path # from typing import Optional # # CACHE PATCH BLOCK: place FIRST in pipeline.py! # HF_CACHE_DIR = Path("/tmp/hf_cache") # HF_CACHE_DIR.mkdir(parents=True, exist_ok=True) # os.environ.update({ # "HF_HOME": str(HF_CACHE_DIR), # "HF_HUB_CACHE": str(HF_CACHE_DIR), # "DIFFUSERS_CACHE": str(HF_CACHE_DIR), # "TRANSFORMERS_CACHE": str(HF_CACHE_DIR), # "XDG_CACHE_HOME": str(HF_CACHE_DIR), # "HF_DATASETS_CACHE": str(HF_CACHE_DIR), # "HF_MODULES_CACHE": str(HF_CACHE_DIR), # "TMPDIR": str(HF_CACHE_DIR), # "CACHE_DIR": str(HF_CACHE_DIR), # "TORCH_HOME": str(HF_CACHE_DIR), # "HOME": str(HF_CACHE_DIR) # }) # import os.path # if not hasattr(os.path, "expanduser_original"): # os.path.expanduser_original = os.path.expanduser # def safe_expanduser(path): # if ( # path.startswith("~") or # path.startswith("/.cache") or # path.startswith("/root/.cache") # ): # return str(HF_CACHE_DIR) # return os.path.expanduser_original(path) # os.path.expanduser = safe_expanduser # logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") # app = FastAPI(title="AI ADD Generator", version="1.0") # app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # allow_credentials=True, # allow_methods=["*"], # allow_headers=["*"], # ) # # --------------------------- # # Pydantic models # # --------------------------- # class IdeaRequest(BaseModel): # idea: str # class ConfirmationRequest(BaseModel): # task_id: str # confirm: bool # edited_script: Optional[str] = None # # --------------------------- # # API endpoints # # --------------------------- # @app.post("/submit_idea") # async def submit_idea(request: IdeaRequest): # task_id = await queue_manager.add_task(request.idea) # return {"status": "submitted", "task_id": task_id} # @app.post("/confirm") # async def confirm_task(request: ConfirmationRequest): # task = queue_manager.get_task_status(request.task_id) # if not task: # raise HTTPException(status_code=404, detail="Task not found") # # status values are stored as strings by queue_manager/pipeline # if task["status"] != queue_manager.TaskStatus.WAITING_CONFIRMATION.value: # raise HTTPException(status_code=400, detail="Task not waiting for confirmation") # # if frontend supplied an edited script, persist it before unblocking the pipeline # if request.edited_script: # task["result"]["script"] = request.edited_script # await queue_manager.confirm_task(request.task_id) # return {"status": "confirmed", "task": task} # @app.get("/status/{task_id}") # async def status(task_id: str): # task = queue_manager.get_task_status(task_id) # if not task: # raise HTTPException(status_code=404, detail="Task not found") # return task # @app.get("/") # async def health(): # return {"status": "running"} import os from pathlib import Path # CACHE PATCH BLOCK: place FIRST in pipeline.py! HF_CACHE_DIR = Path("/tmp/hf_cache") HF_CACHE_DIR.mkdir(parents=True, exist_ok=True) os.environ.update({ "HF_HOME": str(HF_CACHE_DIR), "HF_HUB_CACHE": str(HF_CACHE_DIR), "DIFFUSERS_CACHE": str(HF_CACHE_DIR), "TRANSFORMERS_CACHE": str(HF_CACHE_DIR), "XDG_CACHE_HOME": str(HF_CACHE_DIR), "HF_DATASETS_CACHE": str(HF_CACHE_DIR), "HF_MODULES_CACHE": str(HF_CACHE_DIR), "TMPDIR": str(HF_CACHE_DIR), "CACHE_DIR": str(HF_CACHE_DIR), "TORCH_HOME": str(HF_CACHE_DIR), "HOME": str(HF_CACHE_DIR) }) import os.path if not hasattr(os.path, "expanduser_original"): os.path.expanduser_original = os.path.expanduser def safe_expanduser(path): if ( path.startswith("~") or path.startswith("/.cache") or path.startswith("/root/.cache") ): return str(HF_CACHE_DIR) return os.path.expanduser_original(path) os.path.expanduser = safe_expanduser import asyncio import logging import core.script_gen as script_gen import core.story_script as story_script # IMAGE: enabled by default (uncommented) — pipeline will run image generation and return images import core.image_generator as image_gen # VIDEO / MUSIC / ASSEMBLER placeholders: uncomment to enable those stages # import core.video_gen as video_gen # import core.music_gen as music_gen # import core.assemble as assemble logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) async def run_pipeline(task: dict, confirmation_event: asyncio.Event): """ Executes the workflow and updates task['result'] after each stage so frontend can poll /status/{task_id} to get intermediate outputs. Behavior: - generate_script -> write task.result['script'] and set status to waiting_for_confirmation Frontend can edit and confirm the script. - on confirm -> generate story, write task.result['story_script'] - generate images (image_gen) -> write task.result['images'] - optional stages (video_gen, music_gen, assemble) are executed only if their modules are imported. Each stage updates task['result'] and task['status'] so frontend can receive intermediate outputs. """ task_id = task["id"] idea = task["idea"] logging.info(f"[Pipeline] Starting script generation for task {task_id}") # 1) Script generation — update result so frontend can read it immediately script = await script_gen.generate_script(idea) task["result"]["script"] = script task["status"] = "waiting_for_confirmation" task["confirmation_required"] = True logging.info(f"[Pipeline] Script ready for task {task_id}; waiting for confirmation/edit...") # Wait for frontend confirmation (may include edited script saved into task["result"]["script"]) await confirmation_event.wait() # confirmed, proceed task["status"] = "confirmed" task["confirmation_required"] = False logging.info(f"[Pipeline] Task {task_id} confirmed. Generating story based on confirmed script...") # 2) Story generation — use possibly edited/confirmed script confirmed_script = task["result"].get("script", script) story = await story_script.generate_story(confirmed_script) task["result"]["story_script"] = story task["status"] = "story_generated" logging.info(f"[Pipeline] Story ready for task {task_id}") # 3) Image generation (enabled) try: logging.info(f"[Pipeline] Generating images for task {task_id}") images = await image_gen.generate_images(story) task["result"]["images"] = images task["status"] = "images_generated" logging.info(f"[Pipeline] Images ready for task {task_id}") except Exception as e: # keep pipeline going; store error for this stage task.setdefault("stage_errors", {})["images"] = str(e) logging.exception(f"[Pipeline] Image generation failed for task {task_id}: {e}") # 4) Optional: video generation (uncomment import to enable) # if "video_gen" in globals(): # try: # logging.info(f"[Pipeline] Generating video for task {task_id}") # video = await video_gen.generate_video(task["result"].get("images")) # task["result"]["video"] = video # task["status"] = "video_generated" # logging.info(f"[Pipeline] Video ready for task {task_id}") # except Exception as e: # task.setdefault("stage_errors", {})["video"] = str(e) # logging.exception(f"[Pipeline] Video generation failed for task {task_id}: {e}") # 5) Optional: music generation (uncomment import to enable) # if "music_gen" in globals(): # try: # logging.info(f"[Pipeline] Generating music for task {task_id}") # music = await music_gen.generate_music(story) # task["result"]["music"] = music # task["status"] = "music_generated" # logging.info(f"[Pipeline] Music ready for task {task_id}") # except Exception as e: # task.setdefault("stage_errors", {})["music"] = str(e) # logging.exception(f"[Pipeline] Music generation failed for task {task_id}: {e}") # 6) Optional: assembler (uncomment import to enable) # if "assemble" in globals(): # try: # logging.info(f"[Pipeline] Assembling final output for task {task_id}") # final_output = await assemble.create_final(task["result"]) # task["result"]["final_output"] = final_output # task["status"] = "assembled" # logging.info(f"[Pipeline] Assembly complete for task {task_id}") # except Exception as e: # task.setdefault("stage_errors", {})["assemble"] = str(e) # logging.exception(f"[Pipeline] Assembly failed for task {task_id}: {e}") # Finalize task["status"] = "completed" logging.info(f"[Pipeline] Task {task_id} completed.") return task["result"]