feat(products): compute price tiers from objective price/use

This commit is contained in:
Piotr Oleszczyk 2026-03-04 14:47:18 +01:00
parent c5ea38880c
commit 83ba4cc5c0
13 changed files with 664 additions and 48 deletions

View file

@ -0,0 +1,45 @@
"""replace_price_tier_with_objective_price_fields
Revision ID: 7c91e4b2af38
Revises: e4f5a6b7c8d9
Create Date: 2026-03-04 00:00:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "7c91e4b2af38"
down_revision: Union[str, None] = "e4f5a6b7c8d9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("products", sa.Column("price_amount", sa.Float(), nullable=True))
op.add_column(
"products", sa.Column("price_currency", sa.String(length=3), nullable=True)
)
op.drop_index(op.f("ix_products_price_tier"), table_name="products")
op.drop_column("products", "price_tier")
op.execute("DROP TYPE IF EXISTS pricetier")
def downgrade() -> None:
op.execute("CREATE TYPE pricetier AS ENUM ('BUDGET', 'MID', 'PREMIUM', 'LUXURY')")
op.add_column(
"products",
sa.Column(
"price_tier",
sa.Enum("BUDGET", "MID", "PREMIUM", "LUXURY", name="pricetier"),
nullable=True,
),
)
op.create_index(
op.f("ix_products_price_tier"), "products", ["price_tier"], unique=False
)
op.drop_column("products", "price_currency")
op.drop_column("products", "price_amount")

View file

@ -1,6 +1,6 @@
import json
from datetime import date
from typing import Optional
from typing import Literal, Optional
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
@ -17,6 +17,7 @@ from innercontext.llm import (
get_creative_config,
get_extraction_config,
)
from innercontext.services.fx import convert_to_pln
from innercontext.models import (
Product,
ProductBase,
@ -67,8 +68,11 @@ class ProductUpdate(SQLModel):
absorption_speed: Optional[AbsorptionSpeed] = None
leave_on: Optional[bool] = None
price_tier: Optional[PriceTier] = None
price_amount: Optional[float] = None
price_currency: Optional[str] = None
size_ml: Optional[float] = None
full_weight_g: Optional[float] = None
empty_weight_g: Optional[float] = None
pao_months: Optional[int] = None
inci: Optional[list[str]] = None
@ -119,7 +123,8 @@ class ProductParseResponse(SQLModel):
texture: Optional[TextureType] = None
absorption_speed: Optional[AbsorptionSpeed] = None
leave_on: Optional[bool] = None
price_tier: Optional[PriceTier] = None
price_amount: Optional[float] = None
price_currency: Optional[str] = None
size_ml: Optional[float] = None
full_weight_g: Optional[float] = None
empty_weight_g: Optional[float] = None
@ -213,6 +218,188 @@ class _ShoppingSuggestionsOut(PydanticBase):
reasoning: str
# ---------------------------------------------------------------------------
# Pricing helpers
# ---------------------------------------------------------------------------
_MIN_PRODUCTS_FOR_PRICE_TIER = 8
_MIN_CATEGORY_SIZE_FOR_FALLBACK = 4
_ESTIMATED_AMOUNT_PER_USE: dict[ProductCategory, float] = {
ProductCategory.CLEANSER: 1.5,
ProductCategory.TONER: 1.5,
ProductCategory.ESSENCE: 1.0,
ProductCategory.SERUM: 0.35,
ProductCategory.MOISTURIZER: 0.8,
ProductCategory.SPF: 1.2,
ProductCategory.MASK: 2.5,
ProductCategory.EXFOLIANT: 0.7,
ProductCategory.HAIR_TREATMENT: 1.0,
ProductCategory.SPOT_TREATMENT: 0.1,
ProductCategory.OIL: 0.35,
}
def _estimated_amount_per_use(category: ProductCategory) -> float | None:
return _ESTIMATED_AMOUNT_PER_USE.get(category)
def _net_weight_g(product: Product) -> float | None:
if product.full_weight_g is None or product.empty_weight_g is None:
return None
net = product.full_weight_g - product.empty_weight_g
if net <= 0:
return None
return net
def _price_per_use_pln(product: Product) -> float | None:
if product.price_amount is None or product.price_currency is None:
return None
amount_per_use = _estimated_amount_per_use(product.category)
if amount_per_use is None or amount_per_use <= 0:
return None
pack_amount = product.size_ml
if pack_amount is None or pack_amount <= 0:
pack_amount = _net_weight_g(product)
if pack_amount is None or pack_amount <= 0:
return None
uses_per_pack = pack_amount / amount_per_use
if uses_per_pack <= 0:
return None
price_pln = convert_to_pln(product.price_amount, product.price_currency.upper())
if price_pln is None:
return None
return price_pln / uses_per_pack
def _percentile(sorted_values: list[float], fraction: float) -> float:
if not sorted_values:
raise ValueError("sorted_values cannot be empty")
if len(sorted_values) == 1:
return sorted_values[0]
position = (len(sorted_values) - 1) * fraction
low = int(position)
high = min(low + 1, len(sorted_values) - 1)
weight = position - low
return sorted_values[low] * (1 - weight) + sorted_values[high] * weight
def _tier_from_thresholds(
value: float,
*,
p25: float,
p50: float,
p75: float,
) -> PriceTier:
if value <= p25:
return PriceTier.BUDGET
if value <= p50:
return PriceTier.MID
if value <= p75:
return PriceTier.PREMIUM
return PriceTier.LUXURY
def _thresholds(values: list[float]) -> tuple[float, float, float]:
sorted_vals = sorted(values)
return (
_percentile(sorted_vals, 0.25),
_percentile(sorted_vals, 0.50),
_percentile(sorted_vals, 0.75),
)
def _compute_pricing_outputs(
products: list[Product],
) -> dict[
UUID,
tuple[
PriceTier | None,
float | None,
Literal["category", "fallback", "insufficient_data"] | None,
],
]:
price_per_use_by_id: dict[UUID, float] = {}
grouped: dict[ProductCategory, list[tuple[UUID, float]]] = {}
for product in products:
ppu = _price_per_use_pln(product)
if ppu is None:
continue
price_per_use_by_id[product.id] = ppu
grouped.setdefault(product.category, []).append((product.id, ppu))
outputs: dict[
UUID,
tuple[
PriceTier | None,
float | None,
Literal["category", "fallback", "insufficient_data"] | None,
],
] = {
p.id: (
None,
price_per_use_by_id.get(p.id),
"insufficient_data" if p.id in price_per_use_by_id else None,
)
for p in products
}
fallback_rows: list[tuple[UUID, float]] = []
for product in products:
ppu = price_per_use_by_id.get(product.id)
if ppu is None:
continue
if product.is_tool or product.is_medication or not product.leave_on:
continue
fallback_rows.append((product.id, ppu))
fallback_thresholds: tuple[float, float, float] | None = None
if len(fallback_rows) >= _MIN_PRODUCTS_FOR_PRICE_TIER:
fallback_thresholds = _thresholds([ppu for _, ppu in fallback_rows])
for category_rows in grouped.values():
if len(category_rows) < _MIN_PRODUCTS_FOR_PRICE_TIER:
if (
len(category_rows) >= _MIN_CATEGORY_SIZE_FOR_FALLBACK
and fallback_thresholds is not None
):
p25, p50, p75 = fallback_thresholds
for product_id, ppu in category_rows:
tier = _tier_from_thresholds(ppu, p25=p25, p50=p50, p75=p75)
outputs[product_id] = (tier, ppu, "fallback")
continue
p25, p50, p75 = _thresholds([ppu for _, ppu in category_rows])
for product_id, ppu in category_rows:
tier = _tier_from_thresholds(ppu, p25=p25, p50=p50, p75=p75)
outputs[product_id] = (tier, ppu, "category")
return outputs
def _with_pricing(
view: ProductPublic,
pricing: tuple[
PriceTier | None,
float | None,
Literal["category", "fallback", "insufficient_data"] | None,
],
) -> ProductPublic:
price_tier, price_per_use_pln, price_tier_source = pricing
view.price_tier = price_tier
view.price_per_use_pln = price_per_use_pln
view.price_tier_source = price_tier_source
return view
# ---------------------------------------------------------------------------
# Product routes
# ---------------------------------------------------------------------------
@ -267,8 +454,12 @@ def list_products(
inv_by_product.setdefault(inv.product_id, []).append(inv)
results = []
pricing_pool = list(session.exec(select(Product)).all()) if products else []
pricing_outputs = _compute_pricing_outputs(pricing_pool)
for p in products:
r = ProductWithInventory.model_validate(p, from_attributes=True)
_with_pricing(r, pricing_outputs.get(p.id, (None, None, None)))
r.inventory = inv_by_product.get(p.id, [])
results.append(r)
return results
@ -276,9 +467,13 @@ def list_products(
@router.post("", response_model=ProductPublic, status_code=201)
def create_product(data: ProductCreate, session: Session = Depends(get_session)):
payload = data.model_dump()
if payload.get("price_currency"):
payload["price_currency"] = str(payload["price_currency"]).upper()
product = Product(
id=uuid4(),
**data.model_dump(),
**payload,
)
session.add(product)
session.commit()
@ -324,8 +519,6 @@ texture: "watery" | "gel" | "emulsion" | "cream" | "oil" | "balm" | "foam" | "fl
absorption_speed: "very_fast" | "fast" | "moderate" | "slow" | "very_slow"
price_tier: "budget" | "mid" | "premium" | "luxury"
recommended_for (array, pick applicable):
"dry" | "oily" | "combination" | "sensitive" | "normal" | "acne_prone"
@ -355,7 +548,8 @@ OUTPUT SCHEMA (all fields optional — omit what you cannot determine):
"texture": string,
"absorption_speed": string,
"leave_on": boolean,
"price_tier": string,
"price_amount": number,
"price_currency": string,
"size_ml": number,
"full_weight_g": number,
"empty_weight_g": number,
@ -440,10 +634,14 @@ def parse_product_text(data: ProductParseRequest) -> ProductParseResponse:
@router.get("/{product_id}", response_model=ProductWithInventory)
def get_product(product_id: UUID, session: Session = Depends(get_session)):
product = get_or_404(session, Product, product_id)
pricing_pool = list(session.exec(select(Product)).all())
pricing_outputs = _compute_pricing_outputs(pricing_pool)
inventory = session.exec(
select(ProductInventory).where(ProductInventory.product_id == product_id)
).all()
result = ProductWithInventory.model_validate(product, from_attributes=True)
_with_pricing(result, pricing_outputs.get(product.id, (None, None, None)))
result.inventory = list(inventory)
return result
@ -453,12 +651,19 @@ def update_product(
product_id: UUID, data: ProductUpdate, session: Session = Depends(get_session)
):
product = get_or_404(session, Product, product_id)
for key, value in data.model_dump(exclude_unset=True).items():
patch_data = data.model_dump(exclude_unset=True)
if patch_data.get("price_currency"):
patch_data["price_currency"] = str(patch_data["price_currency"]).upper()
for key, value in patch_data.items():
setattr(product, key, value)
session.add(product)
session.commit()
session.refresh(product)
return product
pricing_pool = list(session.exec(select(Product)).all())
pricing_outputs = _compute_pricing_outputs(pricing_pool)
result = ProductPublic.model_validate(product, from_attributes=True)
return _with_pricing(result, pricing_outputs.get(product.id, (None, None, None)))
@router.delete("/{product_id}", status_code=204)

View file

@ -94,7 +94,8 @@ class ProductBase(SQLModel):
absorption_speed: AbsorptionSpeed | None = None
leave_on: bool
price_tier: PriceTier | None = None
price_amount: float | None = Field(default=None, gt=0)
price_currency: str | None = Field(default=None, min_length=3, max_length=3)
size_ml: float | None = Field(default=None, gt=0)
full_weight_g: float | None = Field(default=None, gt=0)
empty_weight_g: float | None = Field(default=None, gt=0)
@ -145,9 +146,6 @@ class Product(ProductBase, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
# Override: add index for table context
price_tier: PriceTier | None = Field(default=None, index=True)
# Override 9 JSON fields with sa_column (only in table model)
inci: list[str] = Field(
default_factory=list, sa_column=Column(JSON, nullable=False)
@ -217,9 +215,17 @@ class Product(ProductBase, table=True):
if self.is_medication and not self.usage_notes:
raise ValueError("Medication products must have usage_notes")
if self.price_currency is not None:
self.price_currency = self.price_currency.upper()
return self
def to_llm_context(self) -> dict:
def to_llm_context(
self,
*,
computed_price_tier: PriceTier | None = None,
price_per_use_pln: float | None = None,
) -> dict:
ctx: dict = {
"id": str(self.id),
"name": self.name,
@ -238,8 +244,14 @@ class Product(ProductBase, table=True):
ctx["texture"] = _ev(self.texture)
if self.absorption_speed is not None:
ctx["absorption_speed"] = _ev(self.absorption_speed)
if self.price_tier is not None:
ctx["price_tier"] = _ev(self.price_tier)
if self.price_amount is not None:
ctx["price_amount"] = self.price_amount
if self.price_currency is not None:
ctx["price_currency"] = self.price_currency
if computed_price_tier is not None:
ctx["price_tier"] = _ev(computed_price_tier)
if price_per_use_pln is not None:
ctx["price_per_use_pln"] = round(price_per_use_pln, 4)
if self.size_ml is not None:
ctx["size_ml"] = self.size_ml
if self.pao_months is not None:
@ -370,6 +382,9 @@ class ProductPublic(ProductBase):
id: UUID
created_at: datetime
updated_at: datetime
price_tier: PriceTier | None = None
price_per_use_pln: float | None = None
price_tier_source: str | None = None
class ProductWithInventory(ProductPublic):

View file

@ -0,0 +1,77 @@
import json
from datetime import datetime, timedelta, timezone
from threading import Lock
from urllib.error import URLError
from urllib.request import Request, urlopen
NBP_TABLE_A_URL = "https://api.nbp.pl/api/exchangerates/tables/A"
_CACHE_TTL = timedelta(hours=24)
_cache_lock = Lock()
_cached_rates: dict[str, float] | None = None
_cached_at: datetime | None = None
def _now_utc() -> datetime:
return datetime.now(timezone.utc)
def _cache_is_fresh() -> bool:
if _cached_rates is None or _cached_at is None:
return False
return _now_utc() - _cached_at < _CACHE_TTL
def _fetch_rates_from_nbp() -> dict[str, float]:
req = Request(NBP_TABLE_A_URL, headers={"Accept": "application/json"})
with urlopen(req, timeout=10) as response:
payload = json.loads(response.read().decode("utf-8"))
if not isinstance(payload, list) or not payload:
raise ValueError("Unexpected NBP payload")
table = payload[0]
rates = table.get("rates") if isinstance(table, dict) else None
if not isinstance(rates, list):
raise ValueError("NBP payload does not include rates")
parsed: dict[str, float] = {"PLN": 1.0}
for row in rates:
if not isinstance(row, dict):
continue
code = row.get("code")
mid = row.get("mid")
if not isinstance(code, str) or not isinstance(mid, (int, float)):
continue
parsed[code.upper()] = float(mid)
return parsed
def get_pln_rates() -> dict[str, float]:
global _cached_rates, _cached_at
with _cache_lock:
if _cache_is_fresh() and _cached_rates is not None:
return dict(_cached_rates)
try:
fresh = _fetch_rates_from_nbp()
except (URLError, TimeoutError, ValueError):
if _cached_rates is not None:
return dict(_cached_rates)
return {"PLN": 1.0}
_cached_rates = fresh
_cached_at = _now_utc()
return dict(fresh)
def convert_to_pln(amount: float, currency: str) -> float | None:
if amount <= 0:
return None
rates = get_pln_rates()
rate = rates.get(currency.upper())
if rate is None:
return None
return amount * rate

View file

@ -0,0 +1,177 @@
import uuid
from innercontext.api import products as products_api
from innercontext.models import Product
from innercontext.models.enums import DayTime, ProductCategory
def _product(
*, category: ProductCategory, price_amount: float, size_ml: float
) -> Product:
return Product(
id=uuid.uuid4(),
name=f"{category}-{price_amount}",
brand="Brand",
category=category,
recommended_time=DayTime.BOTH,
leave_on=True,
price_amount=price_amount,
price_currency="PLN",
size_ml=size_ml,
)
def test_compute_pricing_outputs_groups_by_category(monkeypatch):
monkeypatch.setattr(products_api, "convert_to_pln", lambda amount, currency: amount)
serums = [
_product(category=ProductCategory.SERUM, price_amount=float(i), size_ml=35.0)
for i in range(10, 90, 10)
]
cleansers = [
_product(
category=ProductCategory.CLEANSER, price_amount=float(i), size_ml=200.0
)
for i in range(40, 120, 10)
]
outputs = products_api._compute_pricing_outputs(serums + cleansers)
serum_tiers = [outputs[p.id][0] for p in serums]
cleanser_tiers = [outputs[p.id][0] for p in cleansers]
assert serum_tiers[0] == "budget"
assert serum_tiers[-1] == "luxury"
assert cleanser_tiers[0] == "budget"
assert cleanser_tiers[-1] == "luxury"
def test_price_tier_is_null_when_not_enough_products(client, monkeypatch):
monkeypatch.setattr(products_api, "convert_to_pln", lambda amount, currency: amount)
base = {
"brand": "B",
"recommended_time": "both",
"leave_on": True,
"size_ml": 35.0,
"price_currency": "PLN",
}
for i in range(7):
response = client.post(
"/products",
json={
**base,
"name": f"Serum {i}",
"category": "serum",
"price_amount": 30 + i,
},
)
assert response.status_code == 201
products = client.get("/products").json()
assert len(products) == 7
assert all(p["price_tier"] is None for p in products)
assert all(p["price_per_use_pln"] is not None for p in products)
def test_price_tier_is_computed_on_list(client, monkeypatch):
monkeypatch.setattr(products_api, "convert_to_pln", lambda amount, currency: amount)
base = {
"brand": "B",
"recommended_time": "both",
"leave_on": True,
"size_ml": 35.0,
"price_currency": "PLN",
"category": "serum",
}
for i in range(8):
response = client.post(
"/products",
json={**base, "name": f"Serum {i}", "price_amount": 20 + i * 10},
)
assert response.status_code == 201
products = client.get("/products").json()
assert len(products) == 8
assert any(p["price_tier"] == "budget" for p in products)
assert any(p["price_tier"] == "luxury" for p in products)
def test_price_tier_uses_fallback_for_medium_categories(client, monkeypatch):
monkeypatch.setattr(products_api, "convert_to_pln", lambda amount, currency: amount)
serum_base = {
"brand": "B",
"recommended_time": "both",
"leave_on": True,
"size_ml": 35.0,
"price_currency": "PLN",
"category": "serum",
}
for i in range(8):
response = client.post(
"/products",
json={**serum_base, "name": f"Serum {i}", "price_amount": 20 + i * 5},
)
assert response.status_code == 201
toner_base = {
"brand": "B",
"recommended_time": "both",
"leave_on": True,
"size_ml": 100.0,
"price_currency": "PLN",
"category": "toner",
}
for i in range(5):
response = client.post(
"/products",
json={**toner_base, "name": f"Toner {i}", "price_amount": 10 + i * 5},
)
assert response.status_code == 201
products = client.get("/products?category=toner").json()
assert len(products) == 5
assert all(p["price_tier"] is not None for p in products)
assert all(p["price_tier_source"] == "fallback" for p in products)
def test_price_tier_stays_null_for_tiny_categories_even_with_fallback_pool(
client, monkeypatch
):
monkeypatch.setattr(products_api, "convert_to_pln", lambda amount, currency: amount)
serum_base = {
"brand": "B",
"recommended_time": "both",
"leave_on": True,
"size_ml": 35.0,
"price_currency": "PLN",
"category": "serum",
}
for i in range(8):
response = client.post(
"/products",
json={**serum_base, "name": f"Serum {i}", "price_amount": 20 + i * 5},
)
assert response.status_code == 201
oil_base = {
"brand": "B",
"recommended_time": "both",
"leave_on": True,
"size_ml": 30.0,
"price_currency": "PLN",
"category": "oil",
}
for i in range(3):
response = client.post(
"/products",
json={**oil_base, "name": f"Oil {i}", "price_amount": 30 + i * 10},
)
assert response.status_code == 201
oils = client.get("/products?category=oil").json()
assert len(oils) == 3
assert all(p["price_tier"] is None for p in oils)
assert all(p["price_tier_source"] == "insufficient_data" for p in oils)