# # api/server.py # from fastapi import FastAPI, HTTPException # from pydantic import BaseModel # import asyncio # from pipeline.pipeline_runner import run_pipeline_task # app = FastAPI(title="AI ADD Maker API", version="1.0") # # Request body schema # class GenerateRequest(BaseModel): # idea: str # # Response schema (optional, for docs clarity) # class GenerateResponse(BaseModel): # task_id: str # scenes: list # images: list # video: dict # music: dict # final_output: dict # @app.post("/generate", response_model=GenerateResponse) # async def generate_video(request: GenerateRequest): # """ # Trigger the AI Ad/Video pipeline with a product or idea description. # Returns all intermediate and final outputs. # """ # idea = request.idea.strip() # if not idea: # raise HTTPException(status_code=400, detail="Idea cannot be empty.") # try: # # Run the pipeline asynchronously # result = await run_pipeline_task(idea) # return result # except Exception as e: # raise HTTPException(status_code=500, detail=f"Pipeline failed: {str(e)}") # # Optional health check endpoint # @app.get("/health") # async def health_check(): # return {"status": "ok"} # # server.py # import uuid # import asyncio # from fastapi import FastAPI, HTTPException # from pydantic import BaseModel # from queue import QueueManager # your queue.py module # import logging # # ------------------------------- # # Setup logging # # ------------------------------- # logging.basicConfig( # level=logging.INFO, # format="%(asctime)s [%(levelname)s] %(message)s" # ) # # ------------------------------- # # FastAPI app # # ------------------------------- # app = FastAPI(title="ADD Maker Server", version="1.0") # # ------------------------------- # # Pydantic models # # ------------------------------- # class IdeaRequest(BaseModel): # idea: str # class ConfirmationRequest(BaseModel): # task_id: str # confirm: bool # # ------------------------------- # # Queue Manager Instance # # ------------------------------- # queue_manager = QueueManager() # # In-memory task storage for confirmation # pending_confirmations = {} # task_id -> asyncio.Event # # ------------------------------- # # Helper function for confirmation # # ------------------------------- # async def wait_for_confirmation(task_id: str, timeout: int = 120): # """Wait for user confirmation or auto-confirm after timeout.""" # event = asyncio.Event() # pending_confirmations[task_id] = event # try: # await asyncio.wait_for(event.wait(), timeout=timeout) # logging.info(f"Task {task_id} confirmed by user.") # return True # except asyncio.TimeoutError: # logging.info(f"Task {task_id} auto-confirmed after {timeout} seconds.") # return True # finally: # pending_confirmations.pop(task_id, None) # # ------------------------------- # # API Endpoints # # ------------------------------- # @app.post("/submit_idea") # async def submit_idea(request: IdeaRequest): # task_id = str(uuid.uuid4()) # logging.info(f"Received idea: {request.idea} | Task ID: {task_id}") # # Push task to queue # await queue_manager.enqueue({ # "task_id": task_id, # "idea": request.idea # }) # # Start confirmation wait in background # asyncio.create_task(wait_for_confirmation(task_id)) # return {"status": "submitted", "task_id": task_id, "message": "Idea received, waiting for confirmation."} # @app.post("/confirm") # async def confirm_task(request: ConfirmationRequest): # task_id = request.task_id # if task_id not in pending_confirmations: # raise HTTPException(status_code=404, detail="Task not pending confirmation or already confirmed.") # if request.confirm: # pending_confirmations[task_id].set() # return {"status": "confirmed", "task_id": task_id} # else: # return {"status": "rejected", "task_id": task_id} # @app.get("/") # async def health_check(): # return {"status": "running"} # # ------------------------------- # # Startup / Shutdown events # # ------------------------------- # @app.on_event("startup") # async def startup_event(): # logging.info("Server starting up...") # @app.on_event("shutdown") # async def shutdown_event(): # logging.info("Server shutting down...") # # best # # server.py # import uuid # import asyncio # from fastapi import FastAPI, HTTPException # from pydantic import BaseModel # from services import queue_manager as queue_manager # ✅ import your actual queue module # import logging # from fastapi.middleware.cors import CORSMiddleware # # ------------------------------- # # Setup logging # # ------------------------------- # logging.basicConfig( # level=logging.INFO, # format="%(asctime)s [%(levelname)s] %(message)s" # ) # # ------------------------------- # # FastAPI app # # ------------------------------- # app = FastAPI(title="AI ADD Generator Server", version="1.0") # # Enable CORS for local testing # app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # Allow requests from anywhere (for testing) # allow_credentials=True, # allow_methods=["*"], # Allow all HTTP methods # allow_headers=["*"], # Allow all headers # ) # # ------------------------------- # # Pydantic models # # ------------------------------- # class IdeaRequest(BaseModel): # idea: str # class ConfirmationRequest(BaseModel): # task_id: str # confirm: bool # # ------------------------------- # # In-memory confirmation tracker # # ------------------------------- # pending_confirmations = {} # task_id -> asyncio.Event # # ------------------------------- # # Helper function for confirmation # # ------------------------------- # async def wait_for_confirmation(task_id: str, timeout: int = 120): # """Wait for user confirmation or auto-confirm after timeout.""" # event = asyncio.Event() # pending_confirmations[task_id] = event # try: # await asyncio.wait_for(event.wait(), timeout=timeout) # logging.info(f"✅ Task {task_id} confirmed by user.") # await queue_manager.confirm_task(task_id) # return True # except asyncio.TimeoutError: # logging.info(f"⌛ Task {task_id} auto-confirmed after {timeout}s.") # await queue_manager.confirm_task(task_id) # return True # finally: # pending_confirmations.pop(task_id, None) # # ------------------------------- # # API Endpoints # # ------------------------------- # @app.post("/submit_idea") # async def submit_idea(request: IdeaRequest): # """Receives a new ad idea and enqueues it.""" # task_id = await queue_manager.add_task(request.idea) # logging.info(f"💡 New idea received | Task ID: {task_id}") # # Start confirmation listener # asyncio.create_task(wait_for_confirmation(task_id)) # return { # "status": "submitted", # "task_id": task_id, # "message": "Idea received. Waiting for user confirmation after script generation." # } # @app.post("/confirm") # async def confirm_task(request: ConfirmationRequest): # """Confirms a paused task and continues the pipeline.""" # task_id = request.task_id # if task_id not in pending_confirmations: # raise HTTPException(status_code=404, detail="Task not pending confirmation or already confirmed.") # if request.confirm: # pending_confirmations[task_id].set() # return {"status": "confirmed", "task_id": task_id} # else: # return {"status": "rejected", "task_id": task_id} # @app.get("/status/{task_id}") # async def get_status(task_id: str): # """Check the current status of a task.""" # status = queue_manager.get_task_status(task_id) # if not status: # raise HTTPException(status_code=404, detail="Task not found.") # return status # @app.get("/") # async def health_check(): # return {"status": "running", "message": "AI ADD Generator is live."} # # ------------------------------- # # Startup / Shutdown events # # ------------------------------- # @app.on_event("startup") # async def startup_event(): # logging.info("🚀 Server starting up...") # queue_manager.start_worker() # ✅ Start async worker loop # @app.on_event("shutdown") # async def shutdown_event(): # logging.info("🛑 Server shutting down...") import uuid import asyncio from fastapi import FastAPI, HTTPException from pydantic import BaseModel from services import queue_manager # ✅ import your actual queue module import logging from fastapi.middleware.cors import CORSMiddleware # ------------------------------- # Setup logging # ------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) # ------------------------------- # FastAPI app # ------------------------------- app = FastAPI(title="AI ADD Generator Server", version="1.0") # Enable CORS for local testing app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ------------------------------- # Pydantic models # ------------------------------- class IdeaRequest(BaseModel): idea: str class ConfirmationRequest(BaseModel): task_id: str confirm: bool # ------------------------------- # In-memory confirmation tracker # ------------------------------- pending_confirmations = {} # task_id -> asyncio.Event script_results = {} # task_id -> generated script for confirmation # ------------------------------- # API Endpoints # ------------------------------- @app.post("/submit_idea") async def submit_idea(request: IdeaRequest): """Receives a new ad idea and enqueues it.""" task_id = await queue_manager.add_task(request.idea) logging.info(f"💡 New idea received | Task ID: {task_id}") # Start worker listener asyncio.create_task(queue_manager.wait_for_script(task_id, script_results)) return { "status": "submitted", "task_id": task_id, "message": "Idea received. Script will be generated shortly.", } @app.post("/confirm") async def confirm_task(request: ConfirmationRequest): """Confirms a paused task, generates story, and returns full JSON.""" task_id = request.task_id task = queue_manager.get_task_status(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found.") if task["status"] != queue_manager.TaskStatus.WAITING_CONFIRMATION: raise HTTPException(status_code=400, detail="Task not waiting for confirmation.") if request.confirm: # Confirm task await queue_manager.confirm_task(task_id) logging.info(f"✅ Task {task_id} confirmed by user.") # Generate story immediately script_result = task["result"]["script"] story_result = await queue_manager.generate_story_after_confirm(script_result) task["result"]["story_script"] = story_result task["status"] = queue_manager.TaskStatus.COMPLETED logging.info(f"🎬 Task {task_id} story generated and task completed.") return {"status": "completed", "task": task} else: task["status"] = queue_manager.TaskStatus.FAILED return {"status": "rejected", "task_id": task_id} @app.get("/status/{task_id}") async def get_status(task_id: str): """Check the current status of a task.""" task = queue_manager.get_task_status(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found.") # If waiting confirmation, return script only if task["status"] == queue_manager.TaskStatus.WAITING_CONFIRMATION: return {"status": task["status"], "script": task["result"]["script"]} return task @app.get("/") async def health_check(): return {"status": "running", "message": "AI ADD Generator is live."} # ------------------------------- # Startup / Shutdown events # ------------------------------- @app.on_event("startup") async def startup_event(): logging.info("🚀 Server starting up...") queue_manager.start_worker() @app.on_event("shutdown") async def shutdown_event(): logging.info("🛑 Server shutting down...")