Spaces:
Paused
Paused
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy.future import select | |
| from sqlalchemy import update | |
| from passlib.context import CryptContext | |
| from typing import Optional, List, Dict, Any | |
| import logging | |
| from src.models.user import User | |
| from src.api.schemas import UserCreate, UserUpdate, UserInDB | |
| # Configure logger | |
| logger = logging.getLogger(__name__) | |
| # Password context for hashing and verification | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """ | |
| Verify password against hash. | |
| Args: | |
| plain_password: Plain password | |
| hashed_password: Hashed password | |
| Returns: | |
| bool: True if password is correct | |
| """ | |
| return pwd_context.verify(plain_password, hashed_password) | |
| def get_password_hash(password: str) -> str: | |
| """ | |
| Hash password. | |
| Args: | |
| password: Plain password | |
| Returns: | |
| str: Hashed password | |
| """ | |
| return pwd_context.hash(password) | |
| async def get_user_by_username(db: AsyncSession, username: str) -> Optional[UserInDB]: | |
| """ | |
| Get user by username. | |
| Args: | |
| db: Database session | |
| username: Username | |
| Returns: | |
| Optional[UserInDB]: User if found, None otherwise | |
| """ | |
| try: | |
| result = await db.execute(select(User).where(User.username == username)) | |
| user = result.scalars().first() | |
| if not user: | |
| return None | |
| # Convert SQLAlchemy model to Pydantic model | |
| user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns} | |
| return UserInDB(**user_dict) | |
| except Exception as e: | |
| logger.error(f"Error getting user by username: {e}") | |
| return None | |
| async def authenticate_user(db: AsyncSession, username: str, password: str) -> Optional[UserInDB]: | |
| """ | |
| Authenticate user. | |
| Args: | |
| db: Database session | |
| username: Username | |
| password: Plain password | |
| Returns: | |
| Optional[UserInDB]: User if authenticated, None otherwise | |
| """ | |
| user = await get_user_by_username(db, username) | |
| if not user: | |
| return None | |
| if not verify_password(password, user.hashed_password): | |
| return None | |
| return user | |
| async def create_user(db: AsyncSession, user_data: UserCreate) -> Optional[UserInDB]: | |
| """ | |
| Create a new user. | |
| Args: | |
| db: Database session | |
| user_data: User data | |
| Returns: | |
| Optional[UserInDB]: Created user | |
| """ | |
| try: | |
| # Check if user already exists | |
| existing_user = await get_user_by_username(db, user_data.username) | |
| if existing_user: | |
| return None | |
| # Create new user | |
| hashed_password = get_password_hash(user_data.password) | |
| user = User( | |
| username=user_data.username, | |
| email=user_data.email, | |
| full_name=user_data.full_name, | |
| hashed_password=hashed_password, | |
| is_active=user_data.is_active | |
| ) | |
| db.add(user) | |
| await db.commit() | |
| await db.refresh(user) | |
| # Convert SQLAlchemy model to Pydantic model | |
| user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns} | |
| return UserInDB(**user_dict) | |
| except Exception as e: | |
| logger.error(f"Error creating user: {e}") | |
| await db.rollback() | |
| return None | |
| async def update_user(db: AsyncSession, user_id: int, user_data: UserUpdate) -> Optional[UserInDB]: | |
| """ | |
| Update user. | |
| Args: | |
| db: Database session | |
| user_id: User ID | |
| user_data: User data | |
| Returns: | |
| Optional[UserInDB]: Updated user | |
| """ | |
| try: | |
| # Create update dictionary | |
| update_data = user_data.dict(exclude_unset=True) | |
| # Hash password if provided | |
| if "password" in update_data: | |
| update_data["hashed_password"] = get_password_hash(update_data.pop("password")) | |
| # Update user | |
| stmt = update(User).where(User.id == user_id).values(**update_data) | |
| await db.execute(stmt) | |
| await db.commit() | |
| # Get updated user | |
| result = await db.execute(select(User).where(User.id == user_id)) | |
| user = result.scalars().first() | |
| if not user: | |
| return None | |
| # Convert SQLAlchemy model to Pydantic model | |
| user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns} | |
| return UserInDB(**user_dict) | |
| except Exception as e: | |
| logger.error(f"Error updating user: {e}") | |
| await db.rollback() | |
| return None |