admaker / api /server.py
karthikeya1212's picture
Update api/server.py
efdabc5 verified
raw
history blame
7.5 kB
# import uuid
# import asyncio
# from fastapi import FastAPI, HTTPException, APIRouter
# from pydantic import BaseModel
# from services import queue_manager # βœ… import your actual queue module
# import logging
# from fastapi.middleware.cors import CORSMiddleware
# router = APIRouter()
# # -------------------------------
# # Setup logging
# # -------------------------------
# logging.basicConfig(
# level=logging.INFO,
# format="%(asctime)s [%(levelname)s] %(message)s"
# )
# # -------------------------------
# # FastAPI app
# # -------------------------------
# app = FastAPI(title="AI ADD Generator Server", version="1.0")
# # Enable CORS for local testing
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
# # -------------------------------
# # Pydantic models
# # -------------------------------
# class IdeaRequest(BaseModel):
# idea: str
# class ConfirmationRequest(BaseModel):
# task_id: str
# confirm: bool
# # -------------------------------
# # In-memory confirmation tracker
# # -------------------------------
# pending_confirmations = {} # task_id -> asyncio.Event
# script_results = {} # task_id -> generated script for confirmation
# # -------------------------------
# # API Endpoints
# # -------------------------------
# @app.post("/submit_idea")
# async def submit_idea(request: IdeaRequest):
# """Receives a new ad idea and enqueues it."""
# task_id = await queue_manager.add_task(request.idea)
# logging.info(f"πŸ’‘ New idea received | Task ID: {task_id}")
# # Start worker listener
# asyncio.create_task(queue_manager.wait_for_script(task_id, script_results))
# return {
# "status": "submitted",
# "task_id": task_id,
# "message": "Idea received. Script will be generated shortly.",
# }
# @app.post("/confirm")
# async def confirm_task(request: ConfirmationRequest):
# """Confirms a paused task, generates story, and returns full JSON."""
# task_id = request.task_id
# task = queue_manager.get_task_status(task_id)
# if not task:
# raise HTTPException(status_code=404, detail="Task not found.")
# if task["status"] != queue_manager.TaskStatus.WAITING_CONFIRMATION:
# raise HTTPException(status_code=400, detail="Task not waiting for confirmation.")
# if request.confirm:
# # Confirm task
# await queue_manager.confirm_task(task_id)
# logging.info(f"βœ… Task {task_id} confirmed by user.")
# # Generate story immediately
# script_result = task["result"]["script"]
# story_result = await queue_manager.generate_story_after_confirm(script_result)
# task["result"]["story_script"] = story_result
# task["status"] = queue_manager.TaskStatus.COMPLETED
# logging.info(f"🎬 Task {task_id} story generated and task completed.")
# return {"status": "completed", "task": task}
# else:
# task["status"] = queue_manager.TaskStatus.FAILED
# return {"status": "rejected", "task_id": task_id}
# @app.get("/status/{task_id}")
# async def get_status(task_id: str):
# """Check the current status of a task."""
# task = queue_manager.get_task_status(task_id)
# if not task:
# raise HTTPException(status_code=404, detail="Task not found.")
# # If waiting confirmation, return script only
# if task["status"] == queue_manager.TaskStatus.WAITING_CONFIRMATION:
# return {"status": task["status"], "script": task["result"]["script"]}
# return task
# @app.get("/")
# async def health_check():
# return {"status": "running", "message": "AI ADD Generator is live."}
# # -------------------------------
# # Startup / Shutdown events
# # -------------------------------
# @app.on_event("startup")
# async def startup_event():
# logging.info("πŸš€ Server starting up...")
# queue_manager.start_worker()
# @app.on_event("shutdown")
# async def shutdown_event():
# logging.info("πŸ›‘ Server shutting down...")
# app.include_router(router)
import uuid
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from services import queue_manager # your updated queue_manager
import logging
from fastapi.middleware.cors import CORSMiddleware
# -------------------------------
# Setup logging
# -------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# -------------------------------
# FastAPI app
# -------------------------------
app = FastAPI(title="AI ADD Generator Server", version="1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# -------------------------------
# Pydantic models
# -------------------------------
class IdeaRequest(BaseModel):
idea: str
class ConfirmationRequest(BaseModel):
task_id: str
confirm: bool
# -------------------------------
# In-memory tracker for scripts
# -------------------------------
script_results = {} # task_id -> script string
# -------------------------------
# API Endpoints
# -------------------------------
@app.post("/submit_idea")
async def submit_idea(request: IdeaRequest):
task_id = await queue_manager.add_task(request.idea)
logging.info(f"πŸ’‘ New idea received | Task ID: {task_id}")
# Start worker for this task (generates script first)
asyncio.create_task(queue_manager.wait_for_script(task_id, script_results))
return {
"status": "submitted",
"task_id": task_id,
"message": "Idea received. Script will be generated shortly.",
}
@app.post("/confirm")
async def confirm_task(request: ConfirmationRequest):
task_id = request.task_id
task = queue_manager.get_task_status(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found.")
if task["status"] != queue_manager.TaskStatus.WAITING_CONFIRMATION:
raise HTTPException(status_code=400, detail="Task not waiting for confirmation.")
if request.confirm:
# βœ… Confirm the task, resume pipeline
await queue_manager.confirm_task(task_id)
logging.info(f"βœ… Task {task_id} confirmed by user. Pipeline will continue automatically.")
return {"status": "confirmed", "message": "Task confirmed. Story, images, and other stages will be generated automatically."}
else:
task["status"] = queue_manager.TaskStatus.FAILED
return {"status": "rejected", "task_id": task_id}
@app.get("/status/{task_id}")
async def get_status(task_id: str):
task = queue_manager.get_task_status(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found.")
if task["status"] == queue_manager.TaskStatus.WAITING_CONFIRMATION:
return {"status": task["status"], "script": task["result"]["script"]}
return task
@app.get("/")
async def health_check():
return {"status": "running", "message": "AI ADD Generator is live."}
# -------------------------------
# Startup / Shutdown events
# -------------------------------
@app.on_event("startup")
async def startup_event():
logging.info("πŸš€ Server starting up...")
queue_manager.start_worker()
@app.on_event("shutdown")
async def shutdown_event():
logging.info("πŸ›‘ Server shutting down...")