Spaces:
Paused
Paused
| """ | |
| User authentication models and validation for OpenManus | |
| Mobile number + password based authentication system | |
| """ | |
| import hashlib | |
| import re | |
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from dataclasses import dataclass | |
| from pydantic import BaseModel, validator | |
| class UserSignupRequest(BaseModel): | |
| """User signup request model""" | |
| full_name: str | |
| mobile_number: str | |
| password: str | |
| confirm_password: str | |
| def validate_full_name(cls, v): | |
| if not v or len(v.strip()) < 2: | |
| raise ValueError("Full name must be at least 2 characters long") | |
| if len(v.strip()) > 100: | |
| raise ValueError("Full name must be less than 100 characters") | |
| return v.strip() | |
| def validate_mobile_number(cls, v): | |
| # Remove all non-digit characters | |
| digits_only = re.sub(r"\D", "", v) | |
| # Check if it's a valid mobile number (10-15 digits) | |
| if len(digits_only) < 10 or len(digits_only) > 15: | |
| raise ValueError("Mobile number must be between 10-15 digits") | |
| # Ensure it starts with country code or local format | |
| if not re.match(r"^(\+?[1-9]\d{9,14})$", digits_only): | |
| raise ValueError("Invalid mobile number format") | |
| return digits_only | |
| def validate_password(cls, v): | |
| if len(v) < 8: | |
| raise ValueError("Password must be at least 8 characters long") | |
| if len(v) > 128: | |
| raise ValueError("Password must be less than 128 characters") | |
| # Check for at least one uppercase, lowercase, and digit | |
| if not re.search(r"[A-Z]", v): | |
| raise ValueError("Password must contain at least one uppercase letter") | |
| if not re.search(r"[a-z]", v): | |
| raise ValueError("Password must contain at least one lowercase letter") | |
| if not re.search(r"\d", v): | |
| raise ValueError("Password must contain at least one digit") | |
| return v | |
| def validate_confirm_password(cls, v, values): | |
| if "password" in values and v != values["password"]: | |
| raise ValueError("Passwords do not match") | |
| return v | |
| class UserLoginRequest(BaseModel): | |
| """User login request model""" | |
| mobile_number: str | |
| password: str | |
| def validate_mobile_number(cls, v): | |
| # Remove all non-digit characters | |
| digits_only = re.sub(r"\D", "", v) | |
| if len(digits_only) < 10 or len(digits_only) > 15: | |
| raise ValueError("Invalid mobile number") | |
| return digits_only | |
| class User: | |
| """User model""" | |
| id: str | |
| mobile_number: str | |
| full_name: str | |
| password_hash: str | |
| avatar_url: Optional[str] = None | |
| preferences: Optional[str] = None | |
| is_active: bool = True | |
| created_at: Optional[datetime] = None | |
| updated_at: Optional[datetime] = None | |
| class UserSession: | |
| """User session model""" | |
| session_id: str | |
| user_id: str | |
| mobile_number: str | |
| full_name: str | |
| created_at: datetime | |
| expires_at: datetime | |
| def is_valid(self) -> bool: | |
| """Check if session is still valid""" | |
| return datetime.utcnow() < self.expires_at | |
| class UserAuth: | |
| """User authentication utilities""" | |
| def hash_password(password: str) -> str: | |
| """Hash password using SHA-256 with salt""" | |
| salt = secrets.token_hex(32) | |
| password_hash = hashlib.sha256((password + salt).encode()).hexdigest() | |
| return f"{salt}:{password_hash}" | |
| def verify_password(password: str, password_hash: str) -> bool: | |
| """Verify password against stored hash""" | |
| try: | |
| salt, stored_hash = password_hash.split(":") | |
| password_hash_check = hashlib.sha256((password + salt).encode()).hexdigest() | |
| return password_hash_check == stored_hash | |
| except ValueError: | |
| return False | |
| def generate_session_id() -> str: | |
| """Generate secure session ID""" | |
| return secrets.token_urlsafe(32) | |
| def generate_user_id() -> str: | |
| """Generate unique user ID""" | |
| return f"user_{secrets.token_hex(16)}" | |
| def format_mobile_number(mobile_number: str) -> str: | |
| """Format mobile number for consistent storage""" | |
| # Remove all non-digit characters | |
| digits_only = re.sub(r"\D", "", mobile_number) | |
| # Add + prefix if not present and format consistently | |
| if not digits_only.startswith("+"): | |
| # Assume it's a local number, add default country code if needed | |
| if len(digits_only) == 10: # US format | |
| digits_only = f"1{digits_only}" | |
| return f"+{digits_only}" | |
| def create_session(user: User, duration_hours: int = 24) -> UserSession: | |
| """Create a new user session""" | |
| session_id = UserAuth.generate_session_id() | |
| created_at = datetime.utcnow() | |
| expires_at = created_at + timedelta(hours=duration_hours) | |
| return UserSession( | |
| session_id=session_id, | |
| user_id=user.id, | |
| mobile_number=user.mobile_number, | |
| full_name=user.full_name, | |
| created_at=created_at, | |
| expires_at=expires_at, | |
| ) | |
| # Response models | |
| class AuthResponse(BaseModel): | |
| """Authentication response model""" | |
| success: bool | |
| message: str | |
| session_id: Optional[str] = None | |
| user_id: Optional[str] = None | |
| full_name: Optional[str] = None | |
| class UserProfile(BaseModel): | |
| """User profile response model""" | |
| user_id: str | |
| full_name: str | |
| mobile_number: str # Masked for security | |
| avatar_url: Optional[str] = None | |
| created_at: Optional[str] = None | |
| def mask_mobile_number(mobile_number: str) -> str: | |
| """Mask mobile number for security (show only last 4 digits)""" | |
| if len(mobile_number) <= 4: | |
| return "*" * len(mobile_number) | |
| return "*" * (len(mobile_number) - 4) + mobile_number[-4:] | |