Spaces:
Runtime error
Runtime error
| from copy import deepcopy | |
| import models | |
| from psql_database import engine, Base, SessionLocal | |
| import sqlalchemy as sa | |
| import schemas as schemas | |
| import passlib.hash as pwd_hash | |
| import re | |
| from fastapi import HTTPException, Depends, security | |
| import jwt | |
| OAUTH2_SCHEME = security.OAuth2PasswordBearer(tokenUrl="/api/token") | |
| JWT_SECRET = 'myjwtsecret' | |
| def validate_email(email): | |
| pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' | |
| return bool(re.match(pattern, email)) | |
| def create_database_tables(): | |
| return Base.metadata.create_all(bind=engine) | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| async def get_user_by_email(email: str, db: sa.orm.Session): | |
| return db.query(models.User).filter(models.User.email == email).first() | |
| async def create_user(user: schemas.userCreate, db: sa.orm.Session): | |
| if not validate_email(user.email): | |
| raise HTTPException(status_code=400, detail="Invalid email") | |
| user_obj = models.User( | |
| first_name=user.first_name, | |
| last_name=user.last_name, | |
| email=user.email, | |
| password=pwd_hash.bcrypt.hash(user.password) | |
| ) | |
| db.add(user_obj) | |
| db.commit() # commit operation | |
| db.refresh(user_obj) # Await the refresh operation | |
| return user_obj | |
| async def authenticate_user(email, password, db: sa.orm.Session): | |
| user = await get_user_by_email(email=email, db=db) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return user if user.verify_password(password) else False | |
| async def create_token(user: models.User): | |
| user_dict = { | |
| "first_name": user.first_name, | |
| "last_name": user.last_name, | |
| "email": user.email, | |
| } | |
| token = jwt.encode(user_dict, JWT_SECRET) | |
| return dict(access_token=token, token_type='bearer', status='success') | |
| async def get_current_user(db: sa.orm.Session = Depends(get_db), token: str = Depends(OAUTH2_SCHEME)): | |
| try: | |
| payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256']) | |
| print(payload) | |
| user = db.query(models.User).filter( | |
| models.User.email == payload['email']).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return schemas.User(**user.__dict__) | |
| except Exception as e: | |
| raise HTTPException(status_code=401, detail="Invalid token") from e | |
| async def create_todo(user: schemas.User, todo: schemas.TodoCreate, db: sa.orm.Session = Depends(get_db)): | |
| print(todo.dict()) | |
| todo_obj = models.Todo( | |
| task_name=todo.task_name, | |
| task_description=todo.task_description, | |
| priority=todo.priority, | |
| category=todo.category, | |
| due_date=todo.due_date, | |
| status=todo.status, | |
| user_id=user.user_id, | |
| ) | |
| db.add(todo_obj) | |
| db.commit() | |
| db.refresh(todo_obj) | |
| return todo_obj | |
| async def get_todos(user: schemas.User, db: sa.orm.Session = Depends(get_db)): | |
| return db.query(models.Todo).filter(models.Todo.user_id == user.user_id).all() | |
| async def delete_todo(todo_id: int, user: schemas.User, db: sa.orm.Session = Depends(get_db)): | |
| todo = db.query(models.Todo).filter(models.Todo.user_id == | |
| user.user_id, models.Todo.todo_id == todo_id).first() | |
| if not todo: | |
| raise HTTPException(status_code=404, detail="Todo not found") | |
| db.delete(todo) | |
| db.commit() | |
| return todo | |
| async def update_todo(todo_id: int, user: schemas.User, todo: schemas.TodoCreate, db: sa.orm.Session = Depends(get_db)): | |
| todo_db = db.query(models.Todo).filter( | |
| models.Todo.user_id == user.user_id, models.Todo.todo_id == todo_id).first() | |
| if not todo_db: | |
| raise HTTPException(status_code=404, detail="Todo not found") | |
| todo_db.task_name = todo.task_name | |
| todo_db.task_description = todo.task_description or todo.task_description | |
| todo_db.priority = todo.priority | |
| todo_db.category = todo.category | |
| todo_db.due_date = todo.due_date | |
| todo_db.status = todo.status | |
| db.commit() | |
| db.refresh(todo_db) | |
| return todo_db | |
| async def get_today_todos(user: schemas.User, db: sa.orm.Session = Depends(get_db)): | |
| return db.query(models.Todo).filter(models.Todo.user_id == user.user_id, sa.func.date(models.Todo.due_date) == sa.func.date(sa.func.now())).all() | |
| async def get_overdue_todos(user: schemas.User, db: sa.orm.Session = Depends(get_db)): | |
| return db.query(models.Todo).filter(models.Todo.user_id == user.user_id, models.Todo.due_date < sa.func.now()).all() | |
| async def get_upcoming_todos(user: schemas.User, db: sa.orm.Session = Depends(get_db)): | |
| return db.query(models.Todo).filter(models.Todo.user_id == user.user_id, models.Todo.due_date > sa.func.now()).all() | |
| async def get_completed_todos(user: schemas.User, db: sa.orm.Session = Depends(get_db)): | |
| return db.query(models.Todo).filter(models.Todo.user_id == user.user_id, models.Todo.status == True).all() | |