354 lines
10 KiB
Python
354 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import Generator
|
|
from datetime import UTC, datetime, timedelta
|
|
from typing import cast
|
|
from uuid import UUID, uuid4
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlmodel import Session
|
|
|
|
from db import get_session
|
|
from innercontext.api.auth_deps import get_current_user
|
|
from innercontext.auth import (
|
|
CurrentHouseholdMembership,
|
|
CurrentUser,
|
|
IdentityData,
|
|
TokenClaims,
|
|
)
|
|
from innercontext.models import (
|
|
Household,
|
|
HouseholdMembership,
|
|
HouseholdRole,
|
|
Role,
|
|
User,
|
|
)
|
|
from main import app
|
|
|
|
|
|
def _current_user(
|
|
user_id: UUID,
|
|
*,
|
|
role: Role = Role.ADMIN,
|
|
household_id: UUID | None = None,
|
|
) -> CurrentUser:
|
|
claims = TokenClaims(
|
|
issuer="https://auth.test",
|
|
subject=str(user_id),
|
|
audience=("innercontext-web",),
|
|
expires_at=datetime.now(UTC) + timedelta(hours=1),
|
|
groups=(
|
|
("innercontext-admin",) if role is Role.ADMIN else ("innercontext-member",)
|
|
),
|
|
raw_claims={"iss": "https://auth.test", "sub": str(user_id)},
|
|
)
|
|
membership = None
|
|
if household_id is not None:
|
|
membership = CurrentHouseholdMembership(
|
|
household_id=household_id,
|
|
role=HouseholdRole.MEMBER,
|
|
)
|
|
return CurrentUser(
|
|
user_id=user_id,
|
|
role=role,
|
|
identity=IdentityData.from_claims(claims),
|
|
claims=claims,
|
|
household_membership=membership,
|
|
)
|
|
|
|
|
|
def _create_user(
|
|
session: Session,
|
|
*,
|
|
role: Role = Role.MEMBER,
|
|
subject: str | None = None,
|
|
) -> User:
|
|
user = User(
|
|
oidc_issuer="https://auth.test",
|
|
oidc_subject=subject or str(uuid4()),
|
|
role=role,
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
session.refresh(user)
|
|
return user
|
|
|
|
|
|
def _create_household(session: Session) -> Household:
|
|
household = Household()
|
|
session.add(household)
|
|
session.commit()
|
|
session.refresh(household)
|
|
return household
|
|
|
|
|
|
def _create_membership(
|
|
session: Session,
|
|
*,
|
|
user_id: UUID,
|
|
household_id: UUID,
|
|
role: HouseholdRole = HouseholdRole.MEMBER,
|
|
) -> HouseholdMembership:
|
|
membership = HouseholdMembership(
|
|
user_id=user_id,
|
|
household_id=household_id,
|
|
role=role,
|
|
)
|
|
session.add(membership)
|
|
session.commit()
|
|
session.refresh(membership)
|
|
return membership
|
|
|
|
|
|
@pytest.fixture()
|
|
def auth_client(
|
|
session: Session,
|
|
) -> Generator[tuple[TestClient, dict[str, CurrentUser]], None, None]:
|
|
auth_state = {"current_user": _current_user(uuid4(), role=Role.ADMIN)}
|
|
|
|
def _session_override():
|
|
yield session
|
|
|
|
def _current_user_override():
|
|
return auth_state["current_user"]
|
|
|
|
app.dependency_overrides[get_session] = _session_override
|
|
app.dependency_overrides[get_current_user] = _current_user_override
|
|
with TestClient(app) as client:
|
|
yield client, auth_state
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_list_users_returns_local_users_with_memberships(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
unassigned_user = _create_user(session, subject="member-a")
|
|
assigned_user = _create_user(session, subject="member-b")
|
|
household = _create_household(session)
|
|
membership = _create_membership(
|
|
session,
|
|
user_id=assigned_user.id,
|
|
household_id=household.id,
|
|
role=HouseholdRole.OWNER,
|
|
)
|
|
|
|
response = client.get("/admin/users")
|
|
|
|
assert response.status_code == 200
|
|
response_users = cast(list[dict[str, object]], response.json())
|
|
users = {item["id"]: item for item in response_users}
|
|
assert users[str(unassigned_user.id)]["household_membership"] is None
|
|
assert users[str(assigned_user.id)]["household_membership"] == {
|
|
"id": str(membership.id),
|
|
"user_id": str(assigned_user.id),
|
|
"household_id": str(household.id),
|
|
"role": "owner",
|
|
"created_at": membership.created_at.isoformat(),
|
|
"updated_at": membership.updated_at.isoformat(),
|
|
}
|
|
|
|
|
|
def test_create_household_returns_new_household(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
response = client.post("/admin/households")
|
|
|
|
assert response.status_code == 201
|
|
payload = cast(dict[str, object], response.json())
|
|
household_id = UUID(cast(str, payload["id"]))
|
|
created = session.get(Household, household_id)
|
|
assert created is not None
|
|
|
|
|
|
def test_assign_member_creates_membership(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
user = _create_user(session)
|
|
household = _create_household(session)
|
|
|
|
response = client.post(
|
|
f"/admin/households/{household.id}/members",
|
|
json={"user_id": str(user.id), "role": "owner"},
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
payload = cast(dict[str, object], response.json())
|
|
assert payload["user_id"] == str(user.id)
|
|
assert payload["household_id"] == str(household.id)
|
|
assert payload["role"] == "owner"
|
|
|
|
membership = session.get(HouseholdMembership, UUID(cast(str, payload["id"])))
|
|
assert membership is not None
|
|
assert membership.user_id == user.id
|
|
assert membership.household_id == household.id
|
|
assert membership.role is HouseholdRole.OWNER
|
|
|
|
|
|
def test_assign_member_rejects_already_assigned_user(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
user = _create_user(session)
|
|
current_household = _create_household(session)
|
|
target_household = _create_household(session)
|
|
_ = _create_membership(session, user_id=user.id, household_id=current_household.id)
|
|
|
|
response = client.post(
|
|
f"/admin/households/{target_household.id}/members",
|
|
json={"user_id": str(user.id)},
|
|
)
|
|
|
|
assert response.status_code == 409
|
|
assert response.json()["detail"] == "User already belongs to a household"
|
|
|
|
|
|
def test_assign_member_rejects_unsynced_user(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
household = _create_household(session)
|
|
user_id = uuid4()
|
|
|
|
response = client.post(
|
|
f"/admin/households/{household.id}/members",
|
|
json={"user_id": str(user_id)},
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
assert response.json()["detail"] == "User not found"
|
|
|
|
|
|
def test_move_member_moves_user_between_households(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
user = _create_user(session)
|
|
source_household = _create_household(session)
|
|
target_household = _create_household(session)
|
|
membership = _create_membership(
|
|
session,
|
|
user_id=user.id,
|
|
household_id=source_household.id,
|
|
role=HouseholdRole.OWNER,
|
|
)
|
|
|
|
response = client.patch(
|
|
f"/admin/households/{target_household.id}/members/{user.id}"
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
payload = cast(dict[str, object], response.json())
|
|
assert payload["id"] == str(membership.id)
|
|
assert payload["household_id"] == str(target_household.id)
|
|
assert payload["role"] == "owner"
|
|
|
|
session.refresh(membership)
|
|
assert membership.household_id == target_household.id
|
|
|
|
|
|
def test_move_member_rejects_user_without_membership(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
user = _create_user(session)
|
|
target_household = _create_household(session)
|
|
|
|
response = client.patch(
|
|
f"/admin/households/{target_household.id}/members/{user.id}"
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
assert response.json()["detail"] == "HouseholdMembership not found"
|
|
|
|
|
|
def test_move_member_rejects_same_household_target(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
user = _create_user(session)
|
|
household = _create_household(session)
|
|
_ = _create_membership(session, user_id=user.id, household_id=household.id)
|
|
|
|
response = client.patch(f"/admin/households/{household.id}/members/{user.id}")
|
|
|
|
assert response.status_code == 409
|
|
assert response.json()["detail"] == "User already belongs to this household"
|
|
|
|
|
|
def test_remove_membership_deletes_membership(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
user = _create_user(session)
|
|
household = _create_household(session)
|
|
membership = _create_membership(session, user_id=user.id, household_id=household.id)
|
|
|
|
response = client.delete(f"/admin/households/{household.id}/members/{user.id}")
|
|
|
|
assert response.status_code == 204
|
|
assert session.get(HouseholdMembership, membership.id) is None
|
|
|
|
|
|
def test_remove_membership_requires_matching_household(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
session: Session,
|
|
):
|
|
client, _ = auth_client
|
|
|
|
user = _create_user(session)
|
|
household = _create_household(session)
|
|
other_household = _create_household(session)
|
|
_ = _create_membership(session, user_id=user.id, household_id=household.id)
|
|
|
|
response = client.delete(
|
|
f"/admin/households/{other_household.id}/members/{user.id}"
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
assert response.json()["detail"] == "HouseholdMembership not found"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("method", "path", "json_body"),
|
|
[
|
|
("get", "/admin/users", None),
|
|
("post", "/admin/households", None),
|
|
("post", f"/admin/households/{uuid4()}/members", {"user_id": str(uuid4())}),
|
|
("patch", f"/admin/households/{uuid4()}/members/{uuid4()}", None),
|
|
("delete", f"/admin/households/{uuid4()}/members/{uuid4()}", None),
|
|
],
|
|
)
|
|
def test_admin_household_routes_forbidden_for_member(
|
|
auth_client: tuple[TestClient, dict[str, CurrentUser]],
|
|
method: str,
|
|
path: str,
|
|
json_body: dict[str, str] | None,
|
|
):
|
|
client, auth_state = auth_client
|
|
auth_state["current_user"] = _current_user(uuid4(), role=Role.MEMBER)
|
|
|
|
response = client.request(method, path, json=json_body)
|
|
|
|
assert response.status_code == 403
|
|
assert response.json()["detail"] == "Admin role required"
|