diff --git a/backend/innercontext/api/authz.py b/backend/innercontext/api/authz.py index 23f71dc..82558e3 100644 --- a/backend/innercontext/api/authz.py +++ b/backend/innercontext/api/authz.py @@ -143,7 +143,11 @@ def can_update_inventory( return False if _is_admin(current_user): return True - return inventory.user_id == current_user.user_id + if inventory.user_id == current_user.user_id: + return True + if not inventory.is_household_shared or inventory.user_id is None: + return False + return _is_same_household(session, inventory.user_id, current_user) def is_product_visible( diff --git a/backend/innercontext/api/inventory.py b/backend/innercontext/api/inventory.py index 9d50034..6c3d797 100644 --- a/backend/innercontext/api/inventory.py +++ b/backend/innercontext/api/inventory.py @@ -1,19 +1,29 @@ from uuid import UUID -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, HTTPException from sqlmodel import Session from db import get_session +from innercontext.api.auth_deps import get_current_user +from innercontext.api.authz import ( + can_update_inventory, + check_household_inventory_access, +) from innercontext.api.products import InventoryUpdate -from innercontext.api.utils import get_or_404 +from innercontext.api.utils import get_or_404, get_owned_or_404_admin_override +from innercontext.auth import CurrentUser from innercontext.models import ProductInventory router = APIRouter() @router.get("/{inventory_id}", response_model=ProductInventory) -def get_inventory(inventory_id: UUID, session: Session = Depends(get_session)): - return get_or_404(session, ProductInventory, inventory_id) +def get_inventory( + inventory_id: UUID, + session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), +): + return check_household_inventory_access(session, inventory_id, current_user) @router.patch("/{inventory_id}", response_model=ProductInventory) @@ -21,7 +31,10 @@ def update_inventory( inventory_id: UUID, data: InventoryUpdate, session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), ): + if not can_update_inventory(session, inventory_id, current_user): + raise HTTPException(status_code=404, detail="ProductInventory not found") entry = get_or_404(session, ProductInventory, inventory_id) for key, value in data.model_dump(exclude_unset=True).items(): setattr(entry, key, value) @@ -32,7 +45,16 @@ def update_inventory( @router.delete("/{inventory_id}", status_code=204) -def delete_inventory(inventory_id: UUID, session: Session = Depends(get_session)): - entry = get_or_404(session, ProductInventory, inventory_id) +def delete_inventory( + inventory_id: UUID, + session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), +): + entry = get_owned_or_404_admin_override( + session, + ProductInventory, + inventory_id, + current_user, + ) session.delete(entry) session.commit() diff --git a/backend/innercontext/api/products.py b/backend/innercontext/api/products.py index 7391a42..b453c14 100644 --- a/backend/innercontext/api/products.py +++ b/backend/innercontext/api/products.py @@ -1,3 +1,5 @@ +# pyright: reportImportCycles=false, reportIncompatibleVariableOverride=false + import json import logging from datetime import date @@ -13,6 +15,8 @@ from sqlalchemy import select as sa_select from sqlmodel import Field, Session, SQLModel, col, select from db import get_session +from innercontext.api.auth_deps import get_current_user +from innercontext.api.authz import is_product_visible from innercontext.api.llm_context import build_user_profile_context from innercontext.api.product_llm_tools import ( PRODUCT_DETAILS_FUNCTION_DECLARATION, @@ -24,7 +28,8 @@ from innercontext.api.product_llm_tools import ( build_last_used_on_by_product, build_product_details_tool_handler, ) -from innercontext.api.utils import get_or_404 +from innercontext.api.utils import get_or_404, get_owned_or_404_admin_override +from innercontext.auth import CurrentUser from innercontext.llm import ( call_gemini, call_gemini_with_function_tools, @@ -42,6 +47,7 @@ from innercontext.models import ( SkinConcern, SkinConditionSnapshot, ) +from innercontext.models import Role from innercontext.models.ai_log import AICallLog from innercontext.models.api_metadata import ResponseMetadata, TokenMetrics from innercontext.models.enums import ( @@ -67,6 +73,34 @@ logger = logging.getLogger(__name__) router = APIRouter() +def _is_inventory_visible_to_user( + inventory: ProductInventory, + session: Session, + current_user: CurrentUser, +) -> bool: + if current_user.role is Role.ADMIN: + return True + if inventory.user_id == current_user.user_id: + return True + if not inventory.is_household_shared: + return False + if inventory.user_id is None: + return False + return is_product_visible(session, inventory.product_id, current_user) + + +def _visible_inventory_for_product( + inventories: list[ProductInventory], + session: Session, + current_user: CurrentUser, +) -> list[ProductInventory]: + return [ + inventory + for inventory in inventories + if _is_inventory_visible_to_user(inventory, session, current_user) + ] + + def _build_response_metadata(session: Session, log_id: Any) -> ResponseMetadata | None: """Build ResponseMetadata from AICallLog for Phase 3 observability.""" if not log_id: @@ -214,15 +248,15 @@ class ProductListItem(SQLModel): class AIActiveIngredient(ActiveIngredient): # Gemini API rejects int-enum values in response_schema; override with plain int. - strength_level: Optional[int] = None # type: ignore[assignment] - irritation_potential: Optional[int] = None # type: ignore[assignment] + strength_level: Optional[int] = None # pyright: ignore[reportIncompatibleVariableOverride] + irritation_potential: Optional[int] = None # pyright: ignore[reportIncompatibleVariableOverride] class ProductParseLLMResponse(ProductParseResponse): # Gemini response schema currently requires enum values to be strings. # Strength fields are numeric in our domain (1-3), so keep them as ints here # and convert via ProductParseResponse validation afterward. - actives: Optional[list[AIActiveIngredient]] = None # type: ignore[assignment] + actives: Optional[list[AIActiveIngredient]] = None # pyright: ignore[reportIncompatibleVariableOverride] class InventoryCreate(SQLModel): @@ -610,6 +644,7 @@ def list_products( is_medication: Optional[bool] = None, is_tool: Optional[bool] = None, session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), ): stmt = select(Product) if category is not None: @@ -622,6 +657,12 @@ def list_products( stmt = stmt.where(Product.is_tool == is_tool) products = list(session.exec(stmt).all()) + if current_user.role is not Role.ADMIN: + products = [ + product + for product in products + if is_product_visible(session, product.id, current_user) + ] # Filter by targets (JSON column — done in Python) if targets: @@ -646,20 +687,28 @@ def list_products( if product_ids else [] ) - inv_by_product: dict = {} + inv_by_product: dict[UUID, list[ProductInventory]] = {} for inv in inventory_rows: inv_by_product.setdefault(inv.product_id, []).append(inv) results = [] for p in products: r = ProductWithInventory.model_validate(p, from_attributes=True) - r.inventory = inv_by_product.get(p.id, []) + r.inventory = _visible_inventory_for_product( + inv_by_product.get(p.id, []), + session, + current_user, + ) results.append(r) return results @router.post("", response_model=ProductPublic, status_code=201) -def create_product(data: ProductCreate, session: Session = Depends(get_session)): +def create_product( + data: ProductCreate, + session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), +): payload = data.model_dump() if payload.get("price_currency"): payload["price_currency"] = str(payload["price_currency"]).upper() @@ -667,6 +716,7 @@ def create_product(data: ProductCreate, session: Session = Depends(get_session)) product_id = uuid4() product = Product( id=product_id, + user_id=current_user.user_id, short_id=str(product_id)[:8], **payload, ) @@ -849,10 +899,12 @@ def list_products_summary( is_medication: Optional[bool] = None, is_tool: Optional[bool] = None, session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), ): product_table = inspect(Product).local_table stmt = sa_select( product_table.c.id, + product_table.c.user_id, product_table.c.name, product_table.c.brand, product_table.c.category, @@ -872,6 +924,10 @@ def list_products_summary( stmt = stmt.where(product_table.c.is_tool == is_tool) rows = list(session.execute(stmt).all()) + if current_user.role is not Role.ADMIN: + rows = [ + row for row in rows if is_product_visible(session, row[0], current_user) + ] if targets: target_values = {t.value for t in targets} @@ -884,26 +940,11 @@ def list_products_summary( ) ] - product_ids = [row[0] for row in rows] - inventory_rows = ( - session.exec( - select(ProductInventory).where( - col(ProductInventory.product_id).in_(product_ids) - ) - ).all() - if product_ids - else [] - ) - owned_ids = { - inv.product_id - for inv in inventory_rows - if inv.product_id is not None and inv.finished_at is None - } - results: list[ProductListItem] = [] for row in rows: ( product_id, + product_user_id, name, brand_value, category_value, @@ -921,7 +962,7 @@ def list_products_summary( category=category_value, recommended_time=recommended_time, targets=row_targets or [], - is_owned=product_id in owned_ids, + is_owned=product_user_id == current_user.user_id, price_tier=price_tier, price_per_use_pln=price_per_use_pln, price_tier_source=price_tier_source, @@ -932,22 +973,35 @@ def list_products_summary( @router.get("/{product_id}", response_model=ProductWithInventory) -def get_product(product_id: UUID, session: Session = Depends(get_session)): +def get_product( + product_id: UUID, + session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), +): product = get_or_404(session, Product, product_id) + if not is_product_visible(session, product_id, current_user): + raise HTTPException(status_code=404, detail="Product not found") inventory = session.exec( select(ProductInventory).where(ProductInventory.product_id == product_id) ).all() result = ProductWithInventory.model_validate(product, from_attributes=True) - result.inventory = list(inventory) + result.inventory = _visible_inventory_for_product( + list(inventory), session, current_user + ) return result @router.patch("/{product_id}", response_model=ProductPublic) def update_product( - product_id: UUID, data: ProductUpdate, session: Session = Depends(get_session) + product_id: UUID, + data: ProductUpdate, + session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), ): - product = get_or_404(session, Product, product_id) + product = get_owned_or_404_admin_override( + session, Product, product_id, current_user + ) patch_data = data.model_dump(exclude_unset=True) if patch_data.get("price_currency"): patch_data["price_currency"] = str(patch_data["price_currency"]).upper() @@ -962,8 +1016,14 @@ def update_product( @router.delete("/{product_id}", status_code=204) -def delete_product(product_id: UUID, session: Session = Depends(get_session)): - product = get_or_404(session, Product, product_id) +def delete_product( + product_id: UUID, + session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), +): + product = get_owned_or_404_admin_override( + session, Product, product_id, current_user + ) session.delete(product) enqueue_pricing_recalc(session) session.commit() @@ -975,10 +1035,17 @@ def delete_product(product_id: UUID, session: Session = Depends(get_session)): @router.get("/{product_id}/inventory", response_model=list[ProductInventory]) -def list_product_inventory(product_id: UUID, session: Session = Depends(get_session)): +def list_product_inventory( + product_id: UUID, + session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), +): get_or_404(session, Product, product_id) + if not is_product_visible(session, product_id, current_user): + raise HTTPException(status_code=404, detail="Product not found") stmt = select(ProductInventory).where(ProductInventory.product_id == product_id) - return session.exec(stmt).all() + inventories = list(session.exec(stmt).all()) + return _visible_inventory_for_product(inventories, session, current_user) @router.post( @@ -988,10 +1055,14 @@ def create_product_inventory( product_id: UUID, data: InventoryCreate, session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), ): - get_or_404(session, Product, product_id) + product = get_owned_or_404_admin_override( + session, Product, product_id, current_user + ) entry = ProductInventory( id=uuid4(), + user_id=product.user_id or current_user.user_id, product_id=product_id, **data.model_dump(), ) @@ -1018,11 +1089,16 @@ def _ev(v: object) -> str: def _build_shopping_context( session: Session, reference_date: date, + current_user: CurrentUser, *, products: list[Product] | None = None, last_used_on_by_product: dict[str, date] | None = None, ) -> str: - profile_ctx = build_user_profile_context(session, reference_date=reference_date) + profile_ctx = build_user_profile_context( + session, + reference_date=reference_date, + current_user=current_user, + ) snapshot = session.exec( select(SkinConditionSnapshot).order_by( col(SkinConditionSnapshot.snapshot_date).desc() @@ -1061,7 +1137,7 @@ def _build_shopping_context( if product_ids else [] ) - inv_by_product: dict = {} + inv_by_product: dict[UUID, list[ProductInventory]] = {} for inv in inventory_rows: inv_by_product.setdefault(inv.product_id, []).append(inv) @@ -1213,7 +1289,10 @@ Format odpowiedzi - zwróć wyłącznie JSON zgodny z podanym schematem.""" @router.post("/suggest", response_model=ShoppingSuggestionResponse) -def suggest_shopping(session: Session = Depends(get_session)): +def suggest_shopping( + session: Session = Depends(get_session), + current_user: CurrentUser = Depends(get_current_user), +): reference_date = date.today() shopping_products = _get_shopping_products(session) last_used_on_by_product = build_last_used_on_by_product( @@ -1223,6 +1302,7 @@ def suggest_shopping(session: Session = Depends(get_session)): context = _build_shopping_context( session, reference_date=reference_date, + current_user=current_user, products=shopping_products, last_used_on_by_product=last_used_on_by_product, ) diff --git a/backend/tests/test_authz.py b/backend/tests/test_authz.py index d870678..34a59c0 100644 --- a/backend/tests/test_authz.py +++ b/backend/tests/test_authz.py @@ -246,7 +246,7 @@ def test_household_inventory_update_rules_owner_admin_and_member(session: Sessio assert can_update_inventory(session, inventory.id, owner) is True assert can_update_inventory(session, inventory.id, admin) is True - assert can_update_inventory(session, inventory.id, member) is False + assert can_update_inventory(session, inventory.id, member) is True def test_product_visibility_for_owner_admin_and_household_shared(session: Session): diff --git a/backend/tests/test_products_auth.py b/backend/tests/test_products_auth.py new file mode 100644 index 0000000..c5cc18a --- /dev/null +++ b/backend/tests/test_products_auth.py @@ -0,0 +1,370 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from uuid import UUID, uuid4 + +import pytest +from fastapi.testclient import TestClient + +from db import get_session +from innercontext.api.auth_deps import get_current_user +from innercontext.auth import ( + CurrentHouseholdMembership, + CurrentUser, + IdentityData, + TokenClaims, +) +from innercontext.models import ( + DayTime, + Household, + HouseholdMembership, + HouseholdRole, + Product, + ProductCategory, + ProductInventory, + Role, +) +from main import app + + +def _current_user( + user_id: UUID, + *, + role: Role = Role.MEMBER, + household_id: UUID | None = None, +) -> CurrentUser: + claims = TokenClaims( + issuer="https://auth.test", + subject=str(user_id), + audience=("innercontext-web",), + expires_at=datetime.now(UTC) + timedelta(hours=1), + groups=("innercontext-member",), + raw_claims={"iss": "https://auth.test", "sub": str(user_id)}, + ) + membership = None + if household_id is not None: + membership = CurrentHouseholdMembership( + household_id=household_id, + role=HouseholdRole.MEMBER, + ) + return CurrentUser( + user_id=user_id, + role=role, + identity=IdentityData.from_claims(claims), + claims=claims, + household_membership=membership, + ) + + +def _create_membership(session, user_id: UUID, household_id: UUID) -> None: + membership = HouseholdMembership( + user_id=user_id, + household_id=household_id, + role=HouseholdRole.MEMBER, + ) + session.add(membership) + session.commit() + + +def _create_product(session, *, user_id: UUID, short_id: str, name: str) -> Product: + product = Product( + user_id=user_id, + short_id=short_id, + name=name, + brand="Brand", + category=ProductCategory.SERUM, + recommended_time=DayTime.BOTH, + leave_on=True, + ) + setattr(product, "product_effect_profile", {}) + session.add(product) + session.commit() + session.refresh(product) + return product + + +def _create_inventory( + session, + *, + user_id: UUID, + product_id: UUID, + is_household_shared: bool, +) -> ProductInventory: + entry = ProductInventory( + user_id=user_id, + product_id=product_id, + is_household_shared=is_household_shared, + ) + session.add(entry) + session.commit() + session.refresh(entry) + return entry + + +@pytest.fixture() +def auth_client(session): + auth_state = {"current_user": _current_user(uuid4(), role=Role.ADMIN)} + + def _session_override(): + yield session + + def _current_user_override(): + return auth_state["current_user"] + + app.dependency_overrides[get_session] = _session_override + app.dependency_overrides[get_current_user] = _current_user_override + with TestClient(app) as client: + yield client, auth_state + app.dependency_overrides.clear() + + +def test_product_endpoints_require_authentication(session): + def _session_override(): + yield session + + app.dependency_overrides[get_session] = _session_override + app.dependency_overrides.pop(get_current_user, None) + with TestClient(app) as client: + response = client.get("/products") + app.dependency_overrides.clear() + + assert response.status_code == 401 + assert response.json()["detail"] == "Missing bearer token" + + +def test_shared_product_visible_in_summary_marks_is_owned_false(auth_client, session): + client, auth_state = auth_client + + owner_id = uuid4() + member_id = uuid4() + household = Household() + session.add(household) + session.commit() + session.refresh(household) + _create_membership(session, owner_id, household.id) + _create_membership(session, member_id, household.id) + + shared_product = _create_product( + session, + user_id=owner_id, + short_id="shprd001", + name="Shared Product", + ) + _ = _create_inventory( + session, + user_id=owner_id, + product_id=shared_product.id, + is_household_shared=True, + ) + + auth_state["current_user"] = _current_user(member_id, household_id=household.id) + response = client.get("/products/summary") + + assert response.status_code == 200 + items = response.json() + shared_item = next(item for item in items if item["id"] == str(shared_product.id)) + assert shared_item["is_owned"] is False + + +def test_shared_product_visible_filters_private_inventory_rows(auth_client, session): + client, auth_state = auth_client + + owner_id = uuid4() + member_id = uuid4() + household = Household() + session.add(household) + session.commit() + session.refresh(household) + _create_membership(session, owner_id, household.id) + _create_membership(session, member_id, household.id) + + product = _create_product( + session, + user_id=owner_id, + short_id="shprd002", + name="Shared Inventory Product", + ) + shared_row = _create_inventory( + session, + user_id=owner_id, + product_id=product.id, + is_household_shared=True, + ) + _ = _create_inventory( + session, + user_id=owner_id, + product_id=product.id, + is_household_shared=False, + ) + + auth_state["current_user"] = _current_user(member_id, household_id=household.id) + response = client.get(f"/products/{product.id}") + + assert response.status_code == 200 + inventory_ids = {entry["id"] for entry in response.json()["inventory"]} + assert str(shared_row.id) in inventory_ids + assert len(inventory_ids) == 1 + + +def test_shared_inventory_update_allows_household_member(auth_client, session): + client, auth_state = auth_client + + owner_id = uuid4() + member_id = uuid4() + household = Household() + session.add(household) + session.commit() + session.refresh(household) + _create_membership(session, owner_id, household.id) + _create_membership(session, member_id, household.id) + + product = _create_product( + session, + user_id=owner_id, + short_id="shprd003", + name="Shared Update Product", + ) + inventory = _create_inventory( + session, + user_id=owner_id, + product_id=product.id, + is_household_shared=True, + ) + + auth_state["current_user"] = _current_user(member_id, household_id=household.id) + response = client.patch( + f"/inventory/{inventory.id}", + json={"is_opened": True, "remaining_level": "low"}, + ) + + assert response.status_code == 200 + assert response.json()["is_opened"] is True + assert response.json()["remaining_level"] == "low" + + +def test_household_member_cannot_edit_shared_product(auth_client, session): + client, auth_state = auth_client + + owner_id = uuid4() + member_id = uuid4() + household = Household() + session.add(household) + session.commit() + session.refresh(household) + _create_membership(session, owner_id, household.id) + _create_membership(session, member_id, household.id) + + product = _create_product( + session, + user_id=owner_id, + short_id="shprd004", + name="Shared No Edit", + ) + _ = _create_inventory( + session, + user_id=owner_id, + product_id=product.id, + is_household_shared=True, + ) + + auth_state["current_user"] = _current_user(member_id, household_id=household.id) + response = client.patch(f"/products/{product.id}", json={"name": "Intrusion"}) + + assert response.status_code == 404 + + +def test_household_member_cannot_delete_shared_product(auth_client, session): + client, auth_state = auth_client + + owner_id = uuid4() + member_id = uuid4() + household = Household() + session.add(household) + session.commit() + session.refresh(household) + _create_membership(session, owner_id, household.id) + _create_membership(session, member_id, household.id) + + product = _create_product( + session, + user_id=owner_id, + short_id="shprd005", + name="Shared No Delete", + ) + _ = _create_inventory( + session, + user_id=owner_id, + product_id=product.id, + is_household_shared=True, + ) + + auth_state["current_user"] = _current_user(member_id, household_id=household.id) + response = client.delete(f"/products/{product.id}") + + assert response.status_code == 404 + + +def test_household_member_cannot_create_or_delete_inventory_on_shared_product( + auth_client, session +): + client, auth_state = auth_client + + owner_id = uuid4() + member_id = uuid4() + household = Household() + session.add(household) + session.commit() + session.refresh(household) + _create_membership(session, owner_id, household.id) + _create_membership(session, member_id, household.id) + + product = _create_product( + session, + user_id=owner_id, + short_id="shprd006", + name="Shared Inventory Restrictions", + ) + inventory = _create_inventory( + session, + user_id=owner_id, + product_id=product.id, + is_household_shared=True, + ) + + auth_state["current_user"] = _current_user(member_id, household_id=household.id) + create_response = client.post(f"/products/{product.id}/inventory", json={}) + delete_response = client.delete(f"/inventory/{inventory.id}") + + assert create_response.status_code == 404 + assert delete_response.status_code == 404 + + +def test_household_member_cannot_update_non_shared_inventory(auth_client, session): + client, auth_state = auth_client + + owner_id = uuid4() + member_id = uuid4() + household = Household() + session.add(household) + session.commit() + session.refresh(household) + _create_membership(session, owner_id, household.id) + _create_membership(session, member_id, household.id) + + product = _create_product( + session, + user_id=owner_id, + short_id="shprd007", + name="Private Inventory", + ) + inventory = _create_inventory( + session, + user_id=owner_id, + product_id=product.id, + is_household_shared=False, + ) + + auth_state["current_user"] = _current_user(member_id, household_id=household.id) + response = client.patch(f"/inventory/{inventory.id}", json={"is_opened": True}) + + assert response.status_code == 404