innercontext/backend/innercontext/api/products.py
Piotr Oleszczyk c8fa80be99 fix(api): rename 'metadata' to 'response_metadata' to avoid Pydantic conflict
The field name 'metadata' conflicts with Pydantic's internal ClassVar.
Renamed to 'response_metadata' throughout:
- Backend: RoutineSuggestion, BatchSuggestion, ShoppingSuggestionResponse
- Frontend: TypeScript types and component usages

This fixes the AttributeError when setting metadata on SQLModel instances.
2026-03-06 16:16:35 +01:00

1125 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from datetime import date
from typing import Any, Literal, Optional
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
from google.genai import types as genai_types
from pydantic import BaseModel as PydanticBase
from pydantic import ValidationError
from sqlalchemy import inspect
from sqlalchemy import select as sa_select
from sqlmodel import Field, Session, SQLModel, col, select
from db import get_session
from innercontext.api.llm_context import build_user_profile_context
from innercontext.api.product_llm_tools import (
PRODUCT_DETAILS_FUNCTION_DECLARATION,
)
from innercontext.api.product_llm_tools import (
_extract_requested_product_ids as _shared_extract_requested_product_ids,
)
from innercontext.api.product_llm_tools import (
build_last_used_on_by_product,
build_product_details_tool_handler,
)
from innercontext.api.utils import get_or_404
from innercontext.llm import (
call_gemini,
call_gemini_with_function_tools,
get_creative_config,
get_extraction_config,
)
from innercontext.llm_safety import sanitize_user_input
from innercontext.models import (
Product,
ProductBase,
ProductCategory,
ProductInventory,
ProductPublic,
ProductWithInventory,
SkinConcern,
SkinConditionSnapshot,
)
from innercontext.models.ai_log import AICallLog
from innercontext.models.api_metadata import ResponseMetadata, TokenMetrics
from innercontext.models.enums import (
AbsorptionSpeed,
DayTime,
PriceTier,
SkinType,
TextureType,
)
from innercontext.models.product import (
ActiveIngredient,
ProductContext,
ProductEffectProfile,
)
from innercontext.services.fx import convert_to_pln
from innercontext.services.pricing_jobs import enqueue_pricing_recalc
from innercontext.validators import ProductParseValidator, ShoppingValidator
from innercontext.validators.shopping_validator import ShoppingValidationContext
logger = logging.getLogger(__name__)
router = APIRouter()
def _build_response_metadata(session: Session, log_id: Any) -> ResponseMetadata | None:
"""Build ResponseMetadata from AICallLog for Phase 3 observability."""
if not log_id:
return None
log = session.get(AICallLog, log_id)
if not log:
return None
token_metrics = None
if (
log.prompt_tokens is not None
and log.completion_tokens is not None
and log.total_tokens is not None
):
token_metrics = TokenMetrics(
prompt_tokens=log.prompt_tokens,
completion_tokens=log.completion_tokens,
thoughts_tokens=log.thoughts_tokens,
total_tokens=log.total_tokens,
)
return ResponseMetadata(
model_used=log.model,
duration_ms=log.duration_ms or 0,
reasoning_chain=log.reasoning_chain,
token_metrics=token_metrics,
)
PricingSource = Literal["category", "fallback", "insufficient_data"]
PricingOutput = tuple[PriceTier | None, float | None, PricingSource | None]
PricingOutputs = dict[UUID, PricingOutput]
# ---------------------------------------------------------------------------
# Request / response schemas
# ---------------------------------------------------------------------------
class ProductCreate(ProductBase):
pass
class ProductUpdate(SQLModel):
name: Optional[str] = None
brand: Optional[str] = None
line_name: Optional[str] = None
sku: Optional[str] = None
url: Optional[str] = None
barcode: Optional[str] = None
category: Optional[ProductCategory] = None
recommended_time: Optional[DayTime] = None
texture: Optional[TextureType] = None
absorption_speed: Optional[AbsorptionSpeed] = None
leave_on: Optional[bool] = None
price_amount: Optional[float] = None
price_currency: Optional[str] = None
size_ml: Optional[float] = None
full_weight_g: Optional[float] = None
empty_weight_g: Optional[float] = None
pao_months: Optional[int] = None
inci: Optional[list[str]] = None
actives: Optional[list[ActiveIngredient]] = None
recommended_for: Optional[list[SkinType]] = None
targets: Optional[list[SkinConcern]] = None
fragrance_free: Optional[bool] = None
essential_oils_free: Optional[bool] = None
alcohol_denat_free: Optional[bool] = None
pregnancy_safe: Optional[bool] = None
product_effect_profile: Optional[ProductEffectProfile] = None
ph_min: Optional[float] = None
ph_max: Optional[float] = None
context_rules: Optional[ProductContext] = None
min_interval_hours: Optional[int] = None
max_frequency_per_week: Optional[int] = None
is_medication: Optional[bool] = None
is_tool: Optional[bool] = None
needle_length_mm: Optional[float] = None
personal_tolerance_notes: Optional[str] = None
personal_repurchase_intent: Optional[bool] = None
class ProductParseRequest(SQLModel):
text: str
class ProductParseResponse(SQLModel):
name: Optional[str] = None
brand: Optional[str] = None
line_name: Optional[str] = None
sku: Optional[str] = None
url: Optional[str] = None
barcode: Optional[str] = None
category: Optional[ProductCategory] = None
recommended_time: Optional[DayTime] = None
texture: Optional[TextureType] = None
absorption_speed: Optional[AbsorptionSpeed] = None
leave_on: Optional[bool] = None
price_amount: Optional[float] = None
price_currency: Optional[str] = None
size_ml: Optional[float] = None
full_weight_g: Optional[float] = None
empty_weight_g: Optional[float] = None
pao_months: Optional[int] = None
inci: Optional[list[str]] = None
actives: Optional[list[ActiveIngredient]] = None
recommended_for: Optional[list[SkinType]] = None
targets: Optional[list[SkinConcern]] = None
fragrance_free: Optional[bool] = None
essential_oils_free: Optional[bool] = None
alcohol_denat_free: Optional[bool] = None
pregnancy_safe: Optional[bool] = None
product_effect_profile: Optional[ProductEffectProfile] = None
ph_min: Optional[float] = None
ph_max: Optional[float] = None
context_rules: Optional[ProductContext] = None
min_interval_hours: Optional[int] = None
max_frequency_per_week: Optional[int] = None
is_medication: Optional[bool] = None
is_tool: Optional[bool] = None
needle_length_mm: Optional[float] = None
class ProductListItem(SQLModel):
id: UUID
name: str
brand: str
category: ProductCategory
recommended_time: DayTime
targets: list[SkinConcern] = Field(default_factory=list)
is_owned: bool
price_tier: PriceTier | None = None
price_per_use_pln: float | None = None
price_tier_source: PricingSource | None = None
class AIActiveIngredient(ActiveIngredient):
# Gemini API rejects int-enum values in response_schema; override with plain int.
strength_level: Optional[int] = None # type: ignore[assignment]
irritation_potential: Optional[int] = None # type: ignore[assignment]
class ProductParseLLMResponse(ProductParseResponse):
# Gemini response schema currently requires enum values to be strings.
# Strength fields are numeric in our domain (1-3), so keep them as ints here
# and convert via ProductParseResponse validation afterward.
actives: Optional[list[AIActiveIngredient]] = None # type: ignore[assignment]
class InventoryCreate(SQLModel):
is_opened: bool = False
opened_at: Optional[date] = None
finished_at: Optional[date] = None
expiry_date: Optional[date] = None
current_weight_g: Optional[float] = None
last_weighed_at: Optional[date] = None
notes: Optional[str] = None
class InventoryUpdate(SQLModel):
is_opened: Optional[bool] = None
opened_at: Optional[date] = None
finished_at: Optional[date] = None
expiry_date: Optional[date] = None
current_weight_g: Optional[float] = None
last_weighed_at: Optional[date] = None
notes: Optional[str] = None
# ---------------------------------------------------------------------------
# Shopping suggestion schemas
# ---------------------------------------------------------------------------
class ProductSuggestion(PydanticBase):
category: str
product_type: str
key_ingredients: list[str]
target_concerns: list[str]
why_needed: str
recommended_time: str
frequency: str
class ShoppingSuggestionResponse(PydanticBase):
suggestions: list[ProductSuggestion]
reasoning: str
# Phase 3: Observability fields
validation_warnings: list[str] | None = None
auto_fixes_applied: list[str] | None = None
response_metadata: "ResponseMetadata | None" = None
class _ProductSuggestionOut(PydanticBase):
category: str
product_type: str
key_ingredients: list[str]
target_concerns: list[str]
why_needed: str
recommended_time: str
frequency: str
class _ShoppingSuggestionsOut(PydanticBase):
suggestions: list[_ProductSuggestionOut]
reasoning: str
# ---------------------------------------------------------------------------
# Pricing helpers
# ---------------------------------------------------------------------------
_MIN_PRODUCTS_FOR_PRICE_TIER = 8
_MIN_CATEGORY_SIZE_FOR_FALLBACK = 4
_ESTIMATED_AMOUNT_PER_USE: dict[ProductCategory, float] = {
ProductCategory.CLEANSER: 1.5,
ProductCategory.TONER: 1.5,
ProductCategory.ESSENCE: 1.0,
ProductCategory.SERUM: 0.35,
ProductCategory.MOISTURIZER: 0.8,
ProductCategory.SPF: 1.2,
ProductCategory.MASK: 2.5,
ProductCategory.EXFOLIANT: 0.7,
ProductCategory.HAIR_TREATMENT: 1.0,
ProductCategory.SPOT_TREATMENT: 0.1,
ProductCategory.OIL: 0.35,
}
def _estimated_amount_per_use(category: ProductCategory) -> float | None:
return _ESTIMATED_AMOUNT_PER_USE.get(category)
def _net_weight_g(product: Product) -> float | None:
if product.full_weight_g is None or product.empty_weight_g is None:
return None
net = product.full_weight_g - product.empty_weight_g
if net <= 0:
return None
return net
def _price_per_use_pln(product: Product) -> float | None:
if product.price_amount is None or product.price_currency is None:
return None
amount_per_use = _estimated_amount_per_use(product.category)
if amount_per_use is None or amount_per_use <= 0:
return None
pack_amount = product.size_ml
if pack_amount is None or pack_amount <= 0:
pack_amount = _net_weight_g(product)
if pack_amount is None or pack_amount <= 0:
return None
uses_per_pack = pack_amount / amount_per_use
if uses_per_pack <= 0:
return None
price_pln = convert_to_pln(product.price_amount, product.price_currency.upper())
if price_pln is None:
return None
return price_pln / uses_per_pack
def _percentile(sorted_values: list[float], fraction: float) -> float:
if not sorted_values:
raise ValueError("sorted_values cannot be empty")
if len(sorted_values) == 1:
return sorted_values[0]
position = (len(sorted_values) - 1) * fraction
low = int(position)
high = min(low + 1, len(sorted_values) - 1)
weight = position - low
return sorted_values[low] * (1 - weight) + sorted_values[high] * weight
def _tier_from_thresholds(
value: float,
*,
p25: float,
p50: float,
p75: float,
) -> PriceTier:
if value <= p25:
return PriceTier.BUDGET
if value <= p50:
return PriceTier.MID
if value <= p75:
return PriceTier.PREMIUM
return PriceTier.LUXURY
def _thresholds(values: list[float]) -> tuple[float, float, float]:
sorted_vals = sorted(values)
return (
_percentile(sorted_vals, 0.25),
_percentile(sorted_vals, 0.50),
_percentile(sorted_vals, 0.75),
)
def _compute_pricing_outputs(
products: list[Product],
) -> PricingOutputs:
price_per_use_by_id: dict[UUID, float] = {}
grouped: dict[ProductCategory, list[tuple[UUID, float]]] = {}
for product in products:
ppu = _price_per_use_pln(product)
if ppu is None:
continue
price_per_use_by_id[product.id] = ppu
grouped.setdefault(product.category, []).append((product.id, ppu))
outputs: PricingOutputs = {
p.id: (
None,
price_per_use_by_id.get(p.id),
"insufficient_data" if p.id in price_per_use_by_id else None,
)
for p in products
}
fallback_rows: list[tuple[UUID, float]] = []
for product in products:
ppu = price_per_use_by_id.get(product.id)
if ppu is None:
continue
if product.is_tool or product.is_medication or not product.leave_on:
continue
fallback_rows.append((product.id, ppu))
fallback_thresholds: tuple[float, float, float] | None = None
if len(fallback_rows) >= _MIN_PRODUCTS_FOR_PRICE_TIER:
fallback_thresholds = _thresholds([ppu for _, ppu in fallback_rows])
for category_rows in grouped.values():
if len(category_rows) < _MIN_PRODUCTS_FOR_PRICE_TIER:
if (
len(category_rows) >= _MIN_CATEGORY_SIZE_FOR_FALLBACK
and fallback_thresholds is not None
):
p25, p50, p75 = fallback_thresholds
for product_id, ppu in category_rows:
tier = _tier_from_thresholds(ppu, p25=p25, p50=p50, p75=p75)
outputs[product_id] = (tier, ppu, "fallback")
continue
p25, p50, p75 = _thresholds([ppu for _, ppu in category_rows])
for product_id, ppu in category_rows:
tier = _tier_from_thresholds(ppu, p25=p25, p50=p50, p75=p75)
outputs[product_id] = (tier, ppu, "category")
return outputs
# ---------------------------------------------------------------------------
# Product routes
# ---------------------------------------------------------------------------
@router.get("", response_model=list[ProductWithInventory])
def list_products(
category: Optional[ProductCategory] = None,
brand: Optional[str] = None,
targets: Optional[list[SkinConcern]] = Query(default=None),
is_medication: Optional[bool] = None,
is_tool: Optional[bool] = None,
session: Session = Depends(get_session),
):
stmt = select(Product)
if category is not None:
stmt = stmt.where(Product.category == category)
if brand is not None:
stmt = stmt.where(Product.brand == brand)
if is_medication is not None:
stmt = stmt.where(Product.is_medication == is_medication)
if is_tool is not None:
stmt = stmt.where(Product.is_tool == is_tool)
products = list(session.exec(stmt).all())
# Filter by targets (JSON column — done in Python)
if targets:
target_values = {t.value for t in targets}
products = [
p
for p in products
if any(
(t.value if hasattr(t, "value") else t) in target_values
for t in (p.targets or [])
)
]
# Bulk-load inventory for all products in one query
product_ids = [p.id for p in products]
inventory_rows = (
session.exec(
select(ProductInventory).where(
col(ProductInventory.product_id).in_(product_ids)
)
).all()
if product_ids
else []
)
inv_by_product: dict = {}
for inv in inventory_rows:
inv_by_product.setdefault(inv.product_id, []).append(inv)
results = []
for p in products:
r = ProductWithInventory.model_validate(p, from_attributes=True)
r.inventory = inv_by_product.get(p.id, [])
results.append(r)
return results
@router.post("", response_model=ProductPublic, status_code=201)
def create_product(data: ProductCreate, session: Session = Depends(get_session)):
payload = data.model_dump()
if payload.get("price_currency"):
payload["price_currency"] = str(payload["price_currency"]).upper()
product = Product(
id=uuid4(),
**payload,
)
session.add(product)
enqueue_pricing_recalc(session)
session.commit()
session.refresh(product)
return product
def _product_parse_system_prompt() -> str:
return """\
You are a skincare and cosmetics product data extraction expert. \
Given raw text (product page copy, ingredient list, label scan, etc.), \
extract structured product data and return it as a single JSON object.
RULES:
- Return ONLY raw JSON — no markdown code fences, no explanation, no preamble, no indentation or extra whitespace (minified).
- Omit any field you cannot confidently determine from the text. Do not guess.
- All enum values must exactly match the allowed strings listed below.
- For INCI lists: return each ingredient as a separate string in the array, \
preserving standard INCI names exactly as they appear.
- For actives: extract name, concentration (numeric, 0100), functions \
(use the allowed strings), and strength/irritation level if inferable.
- For effect_profile scores (05 int): ALWAYS return the full product_effect_profile \
object with all 13 fields. Infer each score from ingredient activity and product claims. \
Use 0 only when you truly have no basis for an estimate.
- For pH: extract from explicit mention (e.g. "pH 5.5", "pH range 4.05.0"). \
Do not infer from ingredients alone.
- For context_rules: infer from usage instructions and ingredient interactions \
(e.g. "do not use with AHAs" → safe_after_acids: false).
- fragrance_free / essential_oils_free / alcohol_denat_free: infer from INCI \
or explicit claims. Fragrance = "Parfum" or "Fragrance" in INCI → fragrance_free: false.
- For leave_on: true = leave-on treatment, false = rinse-off (cleanser, mask to rinse).
- recommended_time: "am" if contains SPF or vitamin C; "pm" if retinoid/retinol; \
"both" otherwise (when unclear, use "both").
ENUM ALLOWED VALUES (use ONLY these exact strings):
category: "cleanser" | "toner" | "essence" | "serum" | "moisturizer" | "spf" | \
"mask" | "exfoliant" | "hair_treatment" | "tool" | "spot_treatment" | "oil"
recommended_time: "am" | "pm" | "both"
texture: "watery" | "gel" | "emulsion" | "cream" | "oil" | "balm" | "foam" | "fluid"
absorption_speed: "very_fast" | "fast" | "moderate" | "slow" | "very_slow"
recommended_for (array, pick applicable):
"dry" | "oily" | "combination" | "sensitive" | "normal" | "acne_prone"
targets (array, pick applicable):
"acne" | "rosacea" | "hyperpigmentation" | "aging" | "dehydration" | "redness" | \
"damaged_barrier" | "pore_visibility" | "uneven_texture" | "hair_growth" | "sebum_excess"
actives[].functions (array, pick applicable):
"humectant" | "emollient" | "occlusive" | "exfoliant_aha" | "exfoliant_bha" | \
"exfoliant_pha" | "retinoid" | "antioxidant" | "soothing" | "barrier_support" | \
"brightening" | "anti_acne" | "ceramide" | "niacinamide" | "sunscreen" | "peptide" | \
"hair_growth_stimulant" | "prebiotic" | "vitamin_c" | "anti_aging"
actives[].strength_level: 1 (low) | 2 (medium) | 3 (high)
actives[].irritation_potential: 1 (low) | 2 (medium) | 3 (high)
OUTPUT SCHEMA (all fields optional — omit what you cannot determine):
{
"name": string,
"brand": string,
"line_name": string,
"sku": string,
"url": string,
"barcode": string,
"category": string,
"recommended_time": string,
"texture": string,
"absorption_speed": string,
"leave_on": boolean,
"price_amount": number,
"price_currency": string,
"size_ml": number,
"full_weight_g": number,
"empty_weight_g": number,
"pao_months": integer,
"inci": [string, ...],
"actives": [
{
"name": string,
"percent": number,
"functions": [string, ...],
"strength_level": 1|2|3,
"irritation_potential": 1|2|3
}
],
"recommended_for": [string, ...],
"targets": [string, ...],
"fragrance_free": boolean,
"essential_oils_free": boolean,
"alcohol_denat_free": boolean,
"pregnancy_safe": boolean,
"product_effect_profile": {
"hydration_immediate": integer (0-5),
"hydration_long_term": integer (0-5),
"barrier_repair_strength": integer (0-5),
"soothing_strength": integer (0-5),
"exfoliation_strength": integer (0-5),
"retinoid_strength": integer (0-5),
"irritation_risk": integer (0-5),
"comedogenic_risk": integer (0-5),
"barrier_disruption_risk": integer (0-5),
"dryness_risk": integer (0-5),
"brightening_strength": integer (0-5),
"anti_acne_strength": integer (0-5),
"anti_aging_strength": integer (0-5)
},
"ph_min": number,
"ph_max": number,
"context_rules": {
"safe_after_shaving": boolean,
"safe_after_acids": boolean,
"safe_after_retinoids": boolean,
"safe_with_compromised_barrier": boolean,
"low_uv_only": boolean
},
"min_interval_hours": integer,
"max_frequency_per_week": integer,
"is_medication": boolean,
"is_tool": boolean,
"needle_length_mm": number
}
"""
@router.post("/parse-text", response_model=ProductParseResponse)
def parse_product_text(data: ProductParseRequest) -> ProductParseResponse:
# Phase 1: Sanitize input text
sanitized_text = sanitize_user_input(data.text, max_length=10000)
response, log_id = call_gemini(
endpoint="products/parse-text",
contents=f"Extract product data from this text:\n\n{sanitized_text}",
config=get_extraction_config(
system_instruction=_product_parse_system_prompt(),
response_schema=ProductParseLLMResponse,
max_output_tokens=16384,
),
user_input=sanitized_text,
)
raw = response.text
if not raw:
raise HTTPException(status_code=502, detail="LLM returned an empty response")
try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
raise HTTPException(status_code=502, detail=f"LLM returned invalid JSON: {e}")
try:
llm_parsed = ProductParseLLMResponse.model_validate(parsed)
# Phase 1: Validate the parsed product data
validator = ProductParseValidator()
validation_result = validator.validate(llm_parsed)
if not validation_result.is_valid:
logger.error(f"Product parse validation failed: {validation_result.errors}")
raise HTTPException(
status_code=502,
detail=f"Parsed product data failed validation: {'; '.join(validation_result.errors)}",
)
if validation_result.warnings:
logger.warning(f"Product parse warnings: {validation_result.warnings}")
return ProductParseResponse.model_validate(llm_parsed.model_dump())
except ValidationError as e:
raise HTTPException(status_code=422, detail=e.errors())
@router.get("/summary", response_model=list[ProductListItem])
def list_products_summary(
category: Optional[ProductCategory] = None,
brand: Optional[str] = None,
targets: Optional[list[SkinConcern]] = Query(default=None),
is_medication: Optional[bool] = None,
is_tool: Optional[bool] = None,
session: Session = Depends(get_session),
):
product_table = inspect(Product).local_table
stmt = sa_select(
product_table.c.id,
product_table.c.name,
product_table.c.brand,
product_table.c.category,
product_table.c.recommended_time,
product_table.c.targets,
product_table.c.price_tier,
product_table.c.price_per_use_pln,
product_table.c.price_tier_source,
)
if category is not None:
stmt = stmt.where(product_table.c.category == category)
if brand is not None:
stmt = stmt.where(product_table.c.brand == brand)
if is_medication is not None:
stmt = stmt.where(product_table.c.is_medication == is_medication)
if is_tool is not None:
stmt = stmt.where(product_table.c.is_tool == is_tool)
rows = list(session.execute(stmt).all())
if targets:
target_values = {t.value for t in targets}
rows = [
row
for row in rows
if any(
(t.value if hasattr(t, "value") else t) in target_values
for t in (row[5] or [])
)
]
product_ids = [row[0] for row in rows]
inventory_rows = (
session.exec(
select(ProductInventory).where(
col(ProductInventory.product_id).in_(product_ids)
)
).all()
if product_ids
else []
)
owned_ids = {
inv.product_id
for inv in inventory_rows
if inv.product_id is not None and inv.finished_at is None
}
results: list[ProductListItem] = []
for row in rows:
(
product_id,
name,
brand_value,
category_value,
recommended_time,
row_targets,
price_tier,
price_per_use_pln,
price_tier_source,
) = row
results.append(
ProductListItem(
id=product_id,
name=name,
brand=brand_value,
category=category_value,
recommended_time=recommended_time,
targets=row_targets or [],
is_owned=product_id in owned_ids,
price_tier=price_tier,
price_per_use_pln=price_per_use_pln,
price_tier_source=price_tier_source,
)
)
return results
@router.get("/{product_id}", response_model=ProductWithInventory)
def get_product(product_id: UUID, session: Session = Depends(get_session)):
product = get_or_404(session, Product, product_id)
inventory = session.exec(
select(ProductInventory).where(ProductInventory.product_id == product_id)
).all()
result = ProductWithInventory.model_validate(product, from_attributes=True)
result.inventory = list(inventory)
return result
@router.patch("/{product_id}", response_model=ProductPublic)
def update_product(
product_id: UUID, data: ProductUpdate, session: Session = Depends(get_session)
):
product = get_or_404(session, Product, product_id)
patch_data = data.model_dump(exclude_unset=True)
if patch_data.get("price_currency"):
patch_data["price_currency"] = str(patch_data["price_currency"]).upper()
for key, value in patch_data.items():
setattr(product, key, value)
session.add(product)
enqueue_pricing_recalc(session)
session.commit()
session.refresh(product)
return ProductPublic.model_validate(product, from_attributes=True)
@router.delete("/{product_id}", status_code=204)
def delete_product(product_id: UUID, session: Session = Depends(get_session)):
product = get_or_404(session, Product, product_id)
session.delete(product)
enqueue_pricing_recalc(session)
session.commit()
# ---------------------------------------------------------------------------
# Product inventory sub-routes
# ---------------------------------------------------------------------------
@router.get("/{product_id}/inventory", response_model=list[ProductInventory])
def list_product_inventory(product_id: UUID, session: Session = Depends(get_session)):
get_or_404(session, Product, product_id)
stmt = select(ProductInventory).where(ProductInventory.product_id == product_id)
return session.exec(stmt).all()
@router.post(
"/{product_id}/inventory", response_model=ProductInventory, status_code=201
)
def create_product_inventory(
product_id: UUID,
data: InventoryCreate,
session: Session = Depends(get_session),
):
get_or_404(session, Product, product_id)
entry = ProductInventory(
id=uuid4(),
product_id=product_id,
**data.model_dump(),
)
session.add(entry)
session.commit()
session.refresh(entry)
return entry
# ---------------------------------------------------------------------------
# Shopping suggestion
# ---------------------------------------------------------------------------
def _ev(v: object) -> str:
if v is None:
return ""
value = getattr(v, "value", None)
if isinstance(value, str):
return value
return str(v)
def _build_shopping_context(session: Session, reference_date: date) -> str:
profile_ctx = build_user_profile_context(session, reference_date=reference_date)
snapshot = session.exec(
select(SkinConditionSnapshot).order_by(
col(SkinConditionSnapshot.snapshot_date).desc()
)
).first()
skin_lines = ["STAN SKÓRY:"]
if snapshot:
skin_lines.append(f" Data: {snapshot.snapshot_date}")
skin_lines.append(f" Ogólny stan: {_ev(snapshot.overall_state)}")
skin_lines.append(f" Typ skóry: {_ev(snapshot.skin_type)}")
skin_lines.append(f" Nawilżenie: {snapshot.hydration_level}/5")
skin_lines.append(f" Wrażliwość: {snapshot.sensitivity_level}/5")
skin_lines.append(f" Bariera: {_ev(snapshot.barrier_state)}")
concerns = ", ".join(_ev(c) for c in (snapshot.active_concerns or []))
skin_lines.append(f" Aktywne problemy: {concerns or 'brak'}")
if snapshot.priorities:
skin_lines.append(f" Priorytety: {', '.join(snapshot.priorities)}")
else:
skin_lines.append(" (brak danych)")
products = _get_shopping_products(session)
product_ids = [p.id for p in products]
inventory_rows = (
session.exec(
select(ProductInventory).where(
col(ProductInventory.product_id).in_(product_ids)
)
).all()
if product_ids
else []
)
inv_by_product: dict = {}
for inv in inventory_rows:
inv_by_product.setdefault(inv.product_id, []).append(inv)
products_lines = ["POSIADANE PRODUKTY:"]
products_lines.append(
" Legenda: [✓] = produkt dostępny (w magazynie), [✗] = brak w magazynie"
)
for p in products:
active_inv = [i for i in inv_by_product.get(p.id, []) if i.finished_at is None]
has_stock = len(active_inv) > 0 # any unfinished inventory = in stock
stock = "" if has_stock else ""
actives = _extract_active_names(p)
actives_str = f", actives: {actives}" if actives else ""
ep = p.product_effect_profile
if isinstance(ep, dict):
effects = {k.replace("_strength", ""): v for k, v in ep.items() if v >= 3}
else:
effects = {
k.replace("_strength", ""): v
for k, v in ep.model_dump().items()
if v >= 3
}
effects_str = f", effects: {effects}" if effects else ""
targets = [_ev(t) for t in (p.targets or [])]
products_lines.append(
f" [{stock}] id={p.id} {p.name} ({p.brand or ''}) - {_ev(p.category)}, "
f"targets: {targets}{actives_str}{effects_str}"
)
return (
profile_ctx + "\n" + "\n".join(skin_lines) + "\n\n" + "\n".join(products_lines)
)
def _get_shopping_products(session: Session) -> list[Product]:
stmt = select(Product).where(col(Product.is_tool).is_(False))
products = session.exec(stmt).all()
return [p for p in products if not p.is_medication]
def _extract_active_names(product: Product) -> list[str]:
names: list[str] = []
for active in product.actives or []:
if isinstance(active, dict):
name = str(active.get("name") or "").strip()
else:
name = str(getattr(active, "name", "") or "").strip()
if not name:
continue
if name in names:
continue
names.append(name)
if len(names) >= 12:
break
return names
def _extract_requested_product_ids(
args: dict[str, object], max_ids: int = 8
) -> list[str]:
return _shared_extract_requested_product_ids(args, max_ids=max_ids)
_SHOPPING_SYSTEM_PROMPT = """Jesteś asystentem zakupowym w dziedzinie pielęgnacji skóry.
Twoim zadaniem jest przeanalizować stan skóry użytkownika oraz produkty, które już posiada,
a następnie zasugerować TYPY produktów (bez marek), które mogłyby uzupełnić ich rutynę.
LEGENDA:
- [✓] = produkt dostępny w magazynie (nawet jeśli jest zapieczętowany)
- [✗] = produkt niedostępny (brak w magazynie, wszystkie opakowania zużyte)
ZASADY:
0. Sugeruj tylko wtedy, gdy jest realna potrzeba - nie zwracaj stałej liczby produktów
1. Sugeruj TYLKO typy produktów, NIGDY konkretne marki (np. "Salicylic Acid 2% Masque", nie "La Roche-Posay")
2. Produkty oznaczone [✗] to te, których NIE MA w magazynie - możesz je zasugerować
3. Produkty oznaczone [✓] są już dostępne - nie sugeruj ich ponownie
4. Bierz pod uwagę aktywne problemy skóry (acne, hyperpigmentacja, aging, etc.)
5. Sugeruj realistyczną częstotliwość użycia (dzienna, 2-3x tygodniowo, etc.)
6. Zachowaj kolejność warstw: cleanse → toner → serum → moisturizer → SPF
7. Jeśli użytkownik ma uszkodzoną barierę, unikaj silnych eksfoliantów i retinoidów
8. Zwracaj uwagę na ewentualne konflikty polecanych składników z tymi, które użytkownik już posiada (np. nie polecaj peptydów miedziowych jeśli użytkownik nadużywa kwasów)
9. Odpowiadaj w języku polskim
Format odpowiedzi - zwróć wyłącznie JSON zgodny z podanym schematem."""
@router.post("/suggest", response_model=ShoppingSuggestionResponse)
def suggest_shopping(session: Session = Depends(get_session)):
context = _build_shopping_context(session, reference_date=date.today())
shopping_products = _get_shopping_products(session)
last_used_on_by_product = build_last_used_on_by_product(
session,
product_ids=[p.id for p in shopping_products],
)
prompt = (
f"Na podstawie poniższych danych przeanalizuj, jakie TYPY produktów "
f"mogłyby uzupełnić rutynę pielęgnacyjną użytkownika.\n\n"
f"{context}\n\n"
"NARZEDZIA:\n"
"- Masz dostep do funkcji: get_product_details.\n"
"- Wywoluj narzedzia tylko, gdy potrzebujesz detali do oceny konfliktow skladnikow lub ryzyka podraznien.\n"
"- Grupuj UUID: staraj sie pobierac dane dla wielu produktow jednym wywolaniem.\n"
f"Zwróć wyłącznie JSON zgodny ze schematem."
)
config = get_creative_config(
system_instruction=_SHOPPING_SYSTEM_PROMPT,
response_schema=_ShoppingSuggestionsOut,
max_output_tokens=8192,
).model_copy(
update={
"tools": [
genai_types.Tool(
function_declarations=[
PRODUCT_DETAILS_FUNCTION_DECLARATION,
]
)
],
"tool_config": genai_types.ToolConfig(
function_calling_config=genai_types.FunctionCallingConfig(
mode=genai_types.FunctionCallingConfigMode.AUTO,
)
),
}
)
function_handlers = {
"get_product_details": build_product_details_tool_handler(
shopping_products,
last_used_on_by_product=last_used_on_by_product,
),
}
try:
response, log_id = call_gemini_with_function_tools(
endpoint="products/suggest",
contents=prompt,
config=config,
function_handlers=function_handlers,
user_input=prompt,
max_tool_roundtrips=3,
)
except HTTPException as exc:
if (
exc.status_code != 502
or str(exc.detail) != "Gemini requested too many function calls"
):
raise
conservative_prompt = (
f"{prompt}\n\n"
"TRYB AWARYJNY (KONSERWATYWNY):\n"
"- Osiagnieto limit wywolan narzedzi.\n"
"- Nie wywoluj narzedzi ponownie.\n"
"- Zasugeruj tylko najbardziej bezpieczne i realistyczne typy produktow do uzupelnienia brakow,"
" unikaj agresywnych aktywnych przy niepelnych danych.\n"
)
response, log_id = call_gemini(
endpoint="products/suggest",
contents=conservative_prompt,
config=get_creative_config(
system_instruction=_SHOPPING_SYSTEM_PROMPT,
response_schema=_ShoppingSuggestionsOut,
max_output_tokens=8192,
),
user_input=conservative_prompt,
tool_trace={
"mode": "fallback_conservative",
"reason": "max_tool_roundtrips_exceeded",
},
)
raw = response.text
if not raw:
raise HTTPException(status_code=502, detail="LLM returned an empty response")
try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
raise HTTPException(status_code=502, detail=f"LLM returned invalid JSON: {e}")
# Get products with inventory (those user already owns)
products_with_inventory = session.exec(
select(Product).join(ProductInventory).distinct()
).all()
shopping_context = ShoppingValidationContext(
owned_product_ids=set(p.id for p in products_with_inventory),
valid_categories=set(ProductCategory),
valid_targets=set(SkinConcern),
)
# Phase 1: Validate the shopping suggestions
validator = ShoppingValidator()
# Build initial shopping response without metadata
shopping_response = ShoppingSuggestionResponse(
suggestions=[ProductSuggestion(**s) for s in parsed.get("suggestions", [])],
reasoning=parsed.get("reasoning", ""),
)
validation_result = validator.validate(shopping_response, shopping_context)
if not validation_result.is_valid:
logger.error(
f"Shopping suggestion validation failed: {validation_result.errors}"
)
raise HTTPException(
status_code=502,
detail=f"Generated shopping suggestions failed validation: {'; '.join(validation_result.errors)}",
)
# Phase 3: Add warnings, auto-fixes, and metadata to response
if validation_result.warnings:
logger.warning(f"Shopping suggestion warnings: {validation_result.warnings}")
shopping_response.validation_warnings = validation_result.warnings
if validation_result.auto_fixes:
shopping_response.auto_fixes_applied = validation_result.auto_fixes
shopping_response.response_metadata = _build_response_metadata(session, log_id)
return shopping_response