Spaces:
Paused
Paused
| import os | |
| import secrets | |
| import datetime | |
| import requests | |
| from fastapi import FastAPI, Depends, HTTPException, Header, Request | |
| from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, func | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker, Session, relationship | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Environment variables | |
| MYSQL_USER = os.getenv("MYSQL_USER") | |
| MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD") | |
| MYSQL_HOST = os.getenv("MYSQL_HOST") | |
| MYSQL_DB = os.getenv("MYSQL_DB") | |
| MAIN_API_KEY = os.getenv("MAIN_API_KEY") | |
| MAIN_API_URL = os.getenv("MAIN_API_URL", "https://api.typegpt.net/v1/chat/completions") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Image-Generator") | |
| DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}" | |
| engine = create_engine(DATABASE_URL) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| Base = declarative_base() | |
| # --------------------------- | |
| # Database Models | |
| # --------------------------- | |
| class User(Base): | |
| __tablename__ = "users" | |
| id = Column(Integer, primary_key=True, index=True) | |
| username = Column(String(50), unique=True, index=True, nullable=False) | |
| hashed_password = Column(String(128), nullable=False) | |
| is_admin = Column(Boolean, default=False) | |
| credits = Column(Integer, default=0) | |
| created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| # Relationship to API keys | |
| api_keys = relationship("APIKey", back_populates="owner", cascade="all, delete-orphan") | |
| class APIKey(Base): | |
| __tablename__ = "api_keys" | |
| id = Column(Integer, primary_key=True, index=True) | |
| key = Column(String(64), unique=True, index=True, nullable=False) | |
| user_id = Column(Integer, ForeignKey("users.id"), nullable=False) | |
| expiry_date = Column(DateTime, nullable=True) | |
| active = Column(Boolean, default=True) | |
| created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| owner = relationship("User", back_populates="api_keys") | |
| Base.metadata.create_all(bind=engine) | |
| # --------------------------- | |
| # FastAPI App & Config | |
| # --------------------------- | |
| app = FastAPI(title="Real API Key Platform") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def generate_api_key() -> str: | |
| return secrets.token_hex(16) | |
| # --------------------------- | |
| # Pydantic Models | |
| # --------------------------- | |
| class UserCreate(BaseModel): | |
| username: str | |
| password: str | |
| class UserOut(BaseModel): | |
| id: int | |
| username: str | |
| is_admin: bool | |
| credits: int | |
| created_at: datetime.datetime | |
| class Config: | |
| orm_mode = True | |
| class APIKeyOut(BaseModel): | |
| id: int | |
| key: str | |
| expiry_date: datetime.datetime | None = None | |
| active: bool | |
| created_at: datetime.datetime | |
| class Config: | |
| orm_mode = True | |
| class GenerateKeyPayload(BaseModel): | |
| expiry_date: datetime.datetime | None = None | |
| class RequestPayload(BaseModel): | |
| prompt: str | |
| class CreditPayload(BaseModel): | |
| username: str | |
| credits: int | |
| class DeactivateKeyPayload(BaseModel): | |
| key: str | |
| # --------------------------- | |
| # Authentication Dependency | |
| # --------------------------- | |
| def get_current_user(x_api_key: str = Header(...), db: Session = Depends(get_db)) -> User: | |
| api_key_obj = db.query(APIKey).filter(APIKey.key == x_api_key, APIKey.active == True).first() | |
| if not api_key_obj: | |
| raise HTTPException(status_code=401, detail="Invalid or inactive API Key") | |
| if api_key_obj.expiry_date and datetime.datetime.utcnow() > api_key_obj.expiry_date: | |
| raise HTTPException(status_code=401, detail="API Key expired") | |
| return api_key_obj.owner | |
| # --------------------------- | |
| # Endpoints | |
| # --------------------------- | |
| # Registration: Create a new user and generate a primary API key (no expiry) | |
| def register(user: UserCreate, db: Session = Depends(get_db)): | |
| if db.query(User).filter(User.username == user.username).first(): | |
| raise HTTPException(status_code=400, detail="Username already exists") | |
| new_user = User(username=user.username, hashed_password=user.password, is_admin=False) | |
| db.add(new_user) | |
| db.commit() | |
| db.refresh(new_user) | |
| primary_key = APIKey(key=generate_api_key(), user_id=new_user.id, expiry_date=None, active=True) | |
| db.add(primary_key) | |
| db.commit() | |
| db.refresh(primary_key) | |
| return new_user | |
| # User panel: Get current user details | |
| def read_user_me(current_user: User = Depends(get_current_user)): | |
| return current_user | |
| # List all API keys belonging to the current user | |
| def get_user_api_keys(current_user: User = Depends(get_current_user)): | |
| return current_user.api_keys | |
| # Allow user to generate a new API key (with optional expiry date) | |
| def generate_key(payload: GenerateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| new_key = APIKey(key=generate_api_key(), user_id=current_user.id, expiry_date=payload.expiry_date, active=True) | |
| db.add(new_key) | |
| db.commit() | |
| db.refresh(new_key) | |
| return new_key | |
| # Test endpoint for users | |
| def test_api(current_user: User = Depends(get_current_user)): | |
| return {"message": "API is working", "username": current_user.username, "credits": current_user.credits} | |
| # Proxy endpoint: Forwards request to main API using the secured main API key | |
| def generate_image(payload: RequestPayload, current_user: User = Depends(get_current_user)): | |
| headers = { | |
| "Authorization": f"Bearer {MAIN_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": MODEL_NAME, | |
| "prompt": payload.prompt | |
| } | |
| response = requests.post(MAIN_API_URL, json=data, headers=headers) | |
| if response.status_code != 200: | |
| raise HTTPException(status_code=response.status_code, detail="Error from main API") | |
| return response.json() | |
| # --------------------------- | |
| # Admin Endpoints | |
| # --------------------------- | |
| # List all users (admin-only) | |
| def list_users(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| if not current_user.is_admin: | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| users = db.query(User).all() | |
| return users | |
| # Add credits to a user's account (admin-only) | |
| def add_credit(payload: CreditPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| if not current_user.is_admin: | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| user = db.query(User).filter(User.username == payload.username).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user.credits += payload.credits | |
| db.commit() | |
| db.refresh(user) | |
| return {"message": f"Added {payload.credits} credits to {user.username}. Total credits: {user.credits}"} | |
| # List all API keys in the system (admin-only) | |
| def list_all_api_keys(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| if not current_user.is_admin: | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| keys = db.query(APIKey).all() | |
| return keys | |
| # Deactivate an API key (admin-only) | |
| def deactivate_key(payload: DeactivateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| if not current_user.is_admin: | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| key_obj = db.query(APIKey).filter(APIKey.key == payload.key).first() | |
| if not key_obj: | |
| raise HTTPException(status_code=404, detail="API Key not found") | |
| key_obj.active = False | |
| db.commit() | |
| return {"message": f"API Key {payload.key} deactivated."} | |
| # --------------------------- | |
| # UI Endpoints (Panels) | |
| # --------------------------- | |
| def admin_ui(request: Request): | |
| return templates.TemplateResponse("admin.html", {"request": request}) | |
| def user_ui(request: Request): | |
| return templates.TemplateResponse("user.html", {"request": request}) | |