innercontext/backend/tests/test_authz.py

293 lines
8.7 KiB
Python

from __future__ import annotations
from datetime import UTC, datetime, timedelta
from uuid import UUID, uuid4
import pytest
from fastapi import HTTPException
from sqlmodel import Session
from innercontext.api.authz import (
can_update_inventory,
check_household_inventory_access,
get_owned_or_404,
get_owned_or_404_admin_override,
is_product_visible,
list_owned,
list_owned_admin_override,
)
from innercontext.auth import (
CurrentHouseholdMembership,
CurrentUser,
IdentityData,
TokenClaims,
)
from innercontext.models import (
Household,
HouseholdMembership,
HouseholdRole,
DayTime,
MedicationEntry,
MedicationKind,
Product,
ProductCategory,
ProductInventory,
Role,
)
def _claims(subject: str) -> TokenClaims:
return TokenClaims(
issuer="https://auth.example.test",
subject=subject,
audience=("innercontext-web",),
expires_at=datetime.now(UTC) + timedelta(hours=1),
raw_claims={"iss": "https://auth.example.test", "sub": subject},
)
def _current_user(
user_id: UUID,
*,
role: Role = Role.MEMBER,
household_id: UUID | None = None,
) -> CurrentUser:
claims = _claims(str(user_id))
membership = None
if household_id is not None:
membership = CurrentHouseholdMembership(
household_id=household_id,
role=HouseholdRole.MEMBER,
)
return CurrentUser(
user_id=user_id,
role=role,
identity=IdentityData.from_claims(claims),
claims=claims,
household_membership=membership,
)
def _create_household(session: Session) -> Household:
household = Household()
session.add(household)
session.commit()
session.refresh(household)
return household
def _create_membership(
session: Session, user_id: UUID, household_id: UUID
) -> HouseholdMembership:
membership = HouseholdMembership(user_id=user_id, household_id=household_id)
session.add(membership)
session.commit()
session.refresh(membership)
return membership
def _create_medication(session: Session, user_id: UUID) -> MedicationEntry:
entry = MedicationEntry(
user_id=user_id,
kind=MedicationKind.PRESCRIPTION,
product_name="Test medication",
)
session.add(entry)
session.commit()
session.refresh(entry)
return entry
def _create_product(session: Session, user_id: UUID, short_id: str) -> Product:
product = Product(
user_id=user_id,
short_id=short_id,
name="Shared product",
brand="Test brand",
category=ProductCategory.MOISTURIZER,
recommended_time=DayTime.BOTH,
leave_on=True,
)
setattr(product, "product_effect_profile", {})
session.add(product)
session.commit()
session.refresh(product)
return product
def _create_inventory(
session: Session,
*,
user_id: UUID,
product_id: UUID,
is_household_shared: bool,
) -> ProductInventory:
inventory = ProductInventory(
user_id=user_id,
product_id=product_id,
is_household_shared=is_household_shared,
)
session.add(inventory)
session.commit()
session.refresh(inventory)
return inventory
def test_owner_helpers_return_only_owned_records(session: Session):
owner_id = uuid4()
other_id = uuid4()
owner_user = _current_user(owner_id)
owner_entry = _create_medication(session, owner_id)
_ = _create_medication(session, other_id)
fetched = get_owned_or_404(
session, MedicationEntry, owner_entry.record_id, owner_user
)
owned_entries = list_owned(session, MedicationEntry, owner_user)
assert fetched.record_id == owner_entry.record_id
assert len(owned_entries) == 1
assert owned_entries[0].user_id == owner_id
def test_admin_helpers_allow_admin_override_for_lookup_and_list(session: Session):
owner_id = uuid4()
admin_user = _current_user(uuid4(), role=Role.ADMIN)
owner_entry = _create_medication(session, owner_id)
fetched = get_owned_or_404_admin_override(
session,
MedicationEntry,
owner_entry.record_id,
admin_user,
)
listed = list_owned_admin_override(session, MedicationEntry, admin_user)
assert fetched.record_id == owner_entry.record_id
assert len(listed) == 1
def test_owner_denied_for_non_owned_lookup_returns_404(session: Session):
owner_id = uuid4()
intruder = _current_user(uuid4())
owner_entry = _create_medication(session, owner_id)
with pytest.raises(HTTPException) as exc_info:
_ = get_owned_or_404(session, MedicationEntry, owner_entry.record_id, intruder)
assert exc_info.value.status_code == 404
def test_household_shared_inventory_access_allows_same_household_member(
session: Session,
):
owner_id = uuid4()
household_member_id = uuid4()
household = _create_household(session)
_ = _create_membership(session, owner_id, household.id)
_ = _create_membership(session, household_member_id, household.id)
product = _create_product(session, owner_id, short_id="abcd0001")
inventory = _create_inventory(
session,
user_id=owner_id,
product_id=product.id,
is_household_shared=True,
)
current_user = _current_user(household_member_id, household_id=household.id)
fetched = check_household_inventory_access(session, inventory.id, current_user)
assert fetched.id == inventory.id
def test_household_shared_inventory_denied_for_cross_household_member(session: Session):
owner_id = uuid4()
outsider_id = uuid4()
owner_household = _create_household(session)
outsider_household = _create_household(session)
_ = _create_membership(session, owner_id, owner_household.id)
_ = _create_membership(session, outsider_id, outsider_household.id)
product = _create_product(session, owner_id, short_id="abcd0002")
inventory = _create_inventory(
session,
user_id=owner_id,
product_id=product.id,
is_household_shared=True,
)
outsider = _current_user(outsider_id, household_id=outsider_household.id)
with pytest.raises(HTTPException) as exc_info:
_ = check_household_inventory_access(session, inventory.id, outsider)
assert exc_info.value.status_code == 404
def test_household_inventory_update_rules_owner_admin_and_member(session: Session):
owner_id = uuid4()
member_id = uuid4()
household = _create_household(session)
_ = _create_membership(session, owner_id, household.id)
_ = _create_membership(session, member_id, household.id)
product = _create_product(session, owner_id, short_id="abcd0003")
inventory = _create_inventory(
session,
user_id=owner_id,
product_id=product.id,
is_household_shared=True,
)
owner = _current_user(owner_id, household_id=household.id)
admin = _current_user(uuid4(), role=Role.ADMIN)
member = _current_user(member_id, household_id=household.id)
assert can_update_inventory(session, inventory.id, owner) is True
assert can_update_inventory(session, inventory.id, admin) is True
assert can_update_inventory(session, inventory.id, member) is True
def test_product_visibility_for_owner_admin_and_household_shared(session: Session):
owner_id = uuid4()
member_id = uuid4()
household = _create_household(session)
_ = _create_membership(session, owner_id, household.id)
_ = _create_membership(session, member_id, household.id)
product = _create_product(session, owner_id, short_id="abcd0004")
_ = _create_inventory(
session,
user_id=owner_id,
product_id=product.id,
is_household_shared=True,
)
owner = _current_user(owner_id, household_id=household.id)
admin = _current_user(uuid4(), role=Role.ADMIN)
member = _current_user(member_id, household_id=household.id)
assert is_product_visible(session, product.id, owner) is True
assert is_product_visible(session, product.id, admin) is True
assert is_product_visible(session, product.id, member) is True
def test_product_visibility_denied_for_cross_household_member(session: Session):
owner_id = uuid4()
outsider_id = uuid4()
owner_household = _create_household(session)
outsider_household = _create_household(session)
_ = _create_membership(session, owner_id, owner_household.id)
_ = _create_membership(session, outsider_id, outsider_household.id)
product = _create_product(session, owner_id, short_id="abcd0005")
_ = _create_inventory(
session,
user_id=owner_id,
product_id=product.id,
is_household_shared=True,
)
outsider = _current_user(outsider_id, household_id=outsider_household.id)
assert is_product_visible(session, product.id, outsider) is False