innercontext/backend/innercontext/api/auth_deps.py

57 lines
1.6 KiB
Python

from __future__ import annotations
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlmodel import Session
from db import get_session
from innercontext.auth import (
AuthConfigurationError,
CurrentUser,
TokenValidationError,
sync_current_user,
validate_access_token,
)
from innercontext.models import Role
_bearer_scheme = HTTPBearer(auto_error=False)
def _unauthorized(detail: str) -> HTTPException:
return HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
headers={"WWW-Authenticate": "Bearer"},
)
def get_current_user(
credentials: Annotated[
HTTPAuthorizationCredentials | None, Depends(_bearer_scheme)
],
session: Session = Depends(get_session),
) -> CurrentUser:
if credentials is None or credentials.scheme.lower() != "bearer":
raise _unauthorized("Missing bearer token")
try:
claims = validate_access_token(credentials.credentials)
return sync_current_user(session, claims)
except AuthConfigurationError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=str(exc),
) from exc
except TokenValidationError as exc:
raise _unauthorized(str(exc)) from exc
def require_admin(current_user: CurrentUser = Depends(get_current_user)) -> CurrentUser:
if current_user.role is not Role.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin role required",
)
return current_user