ira/sim-search-api/app/api/routes/auth.py

99 lines
2.7 KiB
Python

"""
Authentication routes for the sim-search API.
This module defines the routes for user authentication and registration.
"""
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.api.dependencies import authenticate_user
from app.core.config import settings
from app.core.security import create_access_token, get_password_hash
from app.db.models import User
from app.db.session import get_db
from app.schemas.token import Token
from app.schemas.user import UserCreate, User as UserSchema
router = APIRouter()
@router.post("/token", response_model=Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests.
Args:
form_data: OAuth2 password request form
db: Database session
Returns:
Access token
Raises:
HTTPException: If authentication fails
"""
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
subject=user.id, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register", response_model=UserSchema)
async def register_user(
user_in: UserCreate,
db: Session = Depends(get_db),
) -> Any:
"""
Register a new user.
Args:
user_in: User creation data
db: Database session
Returns:
Created user
Raises:
HTTPException: If a user with the same email already exists
"""
# Check if user with this email already exists
user = db.query(User).filter(User.email == user_in.email).first()
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists",
)
# Create new user
user = User(
email=user_in.email,
hashed_password=get_password_hash(user_in.password),
full_name=user_in.full_name,
is_active=user_in.is_active,
is_superuser=user_in.is_superuser,
)
db.add(user)
db.commit()
db.refresh(user)
return user