innercontext/backend/innercontext/api/products.py

1126 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from datetime import date
from typing import Literal, Optional
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
from google.genai import types as genai_types
from pydantic import BaseModel as PydanticBase
from pydantic import ValidationError
from sqlmodel import Session, SQLModel, col, select
from db import get_session
from innercontext.api.utils import get_or_404
from innercontext.llm import (
call_gemini,
call_gemini_with_function_tools,
get_creative_config,
get_extraction_config,
)
from innercontext.services.fx import convert_to_pln
from innercontext.models import (
Product,
ProductBase,
ProductCategory,
ProductInventory,
ProductPublic,
ProductWithInventory,
SkinConcern,
SkinConditionSnapshot,
)
from innercontext.models.enums import (
AbsorptionSpeed,
DayTime,
PriceTier,
SkinType,
TextureType,
)
from innercontext.models.product import (
ActiveIngredient,
ProductContext,
ProductEffectProfile,
)
router = APIRouter()
# ---------------------------------------------------------------------------
# Request / response schemas
# ---------------------------------------------------------------------------
class ProductCreate(ProductBase):
pass
class ProductUpdate(SQLModel):
name: Optional[str] = None
brand: Optional[str] = None
line_name: Optional[str] = None
sku: Optional[str] = None
url: Optional[str] = None
barcode: Optional[str] = None
category: Optional[ProductCategory] = None
recommended_time: Optional[DayTime] = None
texture: Optional[TextureType] = None
absorption_speed: Optional[AbsorptionSpeed] = None
leave_on: Optional[bool] = None
price_amount: Optional[float] = None
price_currency: Optional[str] = None
size_ml: Optional[float] = None
full_weight_g: Optional[float] = None
empty_weight_g: Optional[float] = None
pao_months: Optional[int] = None
inci: Optional[list[str]] = None
actives: Optional[list[ActiveIngredient]] = None
recommended_for: Optional[list[SkinType]] = None
targets: Optional[list[SkinConcern]] = None
contraindications: Optional[list[str]] = None
usage_notes: Optional[str] = None
fragrance_free: Optional[bool] = None
essential_oils_free: Optional[bool] = None
alcohol_denat_free: Optional[bool] = None
pregnancy_safe: Optional[bool] = None
product_effect_profile: Optional[ProductEffectProfile] = None
ph_min: Optional[float] = None
ph_max: Optional[float] = None
context_rules: Optional[ProductContext] = None
min_interval_hours: Optional[int] = None
max_frequency_per_week: Optional[int] = None
is_medication: Optional[bool] = None
is_tool: Optional[bool] = None
needle_length_mm: Optional[float] = None
personal_tolerance_notes: Optional[str] = None
personal_repurchase_intent: Optional[bool] = None
class ProductParseRequest(SQLModel):
text: str
class ProductParseResponse(SQLModel):
name: Optional[str] = None
brand: Optional[str] = None
line_name: Optional[str] = None
sku: Optional[str] = None
url: Optional[str] = None
barcode: Optional[str] = None
category: Optional[ProductCategory] = None
recommended_time: Optional[DayTime] = None
texture: Optional[TextureType] = None
absorption_speed: Optional[AbsorptionSpeed] = None
leave_on: Optional[bool] = None
price_amount: Optional[float] = None
price_currency: Optional[str] = None
size_ml: Optional[float] = None
full_weight_g: Optional[float] = None
empty_weight_g: Optional[float] = None
pao_months: Optional[int] = None
inci: Optional[list[str]] = None
actives: Optional[list[ActiveIngredient]] = None
recommended_for: Optional[list[SkinType]] = None
targets: Optional[list[SkinConcern]] = None
contraindications: Optional[list[str]] = None
usage_notes: Optional[str] = None
fragrance_free: Optional[bool] = None
essential_oils_free: Optional[bool] = None
alcohol_denat_free: Optional[bool] = None
pregnancy_safe: Optional[bool] = None
product_effect_profile: Optional[ProductEffectProfile] = None
ph_min: Optional[float] = None
ph_max: Optional[float] = None
context_rules: Optional[ProductContext] = None
min_interval_hours: Optional[int] = None
max_frequency_per_week: Optional[int] = None
is_medication: Optional[bool] = None
is_tool: Optional[bool] = None
needle_length_mm: Optional[float] = None
class AIActiveIngredient(ActiveIngredient):
# Gemini API rejects int-enum values in response_schema; override with plain int.
strength_level: Optional[int] = None # type: ignore[assignment]
irritation_potential: Optional[int] = None # type: ignore[assignment]
class ProductParseLLMResponse(ProductParseResponse):
# Gemini response schema currently requires enum values to be strings.
# Strength fields are numeric in our domain (1-3), so keep them as ints here
# and convert via ProductParseResponse validation afterward.
actives: Optional[list[AIActiveIngredient]] = None # type: ignore[assignment]
class InventoryCreate(SQLModel):
is_opened: bool = False
opened_at: Optional[date] = None
finished_at: Optional[date] = None
expiry_date: Optional[date] = None
current_weight_g: Optional[float] = None
last_weighed_at: Optional[date] = None
notes: Optional[str] = None
class InventoryUpdate(SQLModel):
is_opened: Optional[bool] = None
opened_at: Optional[date] = None
finished_at: Optional[date] = None
expiry_date: Optional[date] = None
current_weight_g: Optional[float] = None
last_weighed_at: Optional[date] = None
notes: Optional[str] = None
# ---------------------------------------------------------------------------
# Shopping suggestion schemas
# ---------------------------------------------------------------------------
class ProductSuggestion(PydanticBase):
category: str
product_type: str
key_ingredients: list[str]
target_concerns: list[str]
why_needed: str
recommended_time: str
frequency: str
class ShoppingSuggestionResponse(PydanticBase):
suggestions: list[ProductSuggestion]
reasoning: str
class _ProductSuggestionOut(PydanticBase):
category: str
product_type: str
key_ingredients: list[str]
target_concerns: list[str]
why_needed: str
recommended_time: str
frequency: str
class _ShoppingSuggestionsOut(PydanticBase):
suggestions: list[_ProductSuggestionOut]
reasoning: str
# ---------------------------------------------------------------------------
# Pricing helpers
# ---------------------------------------------------------------------------
_MIN_PRODUCTS_FOR_PRICE_TIER = 8
_MIN_CATEGORY_SIZE_FOR_FALLBACK = 4
_ESTIMATED_AMOUNT_PER_USE: dict[ProductCategory, float] = {
ProductCategory.CLEANSER: 1.5,
ProductCategory.TONER: 1.5,
ProductCategory.ESSENCE: 1.0,
ProductCategory.SERUM: 0.35,
ProductCategory.MOISTURIZER: 0.8,
ProductCategory.SPF: 1.2,
ProductCategory.MASK: 2.5,
ProductCategory.EXFOLIANT: 0.7,
ProductCategory.HAIR_TREATMENT: 1.0,
ProductCategory.SPOT_TREATMENT: 0.1,
ProductCategory.OIL: 0.35,
}
def _estimated_amount_per_use(category: ProductCategory) -> float | None:
return _ESTIMATED_AMOUNT_PER_USE.get(category)
def _net_weight_g(product: Product) -> float | None:
if product.full_weight_g is None or product.empty_weight_g is None:
return None
net = product.full_weight_g - product.empty_weight_g
if net <= 0:
return None
return net
def _price_per_use_pln(product: Product) -> float | None:
if product.price_amount is None or product.price_currency is None:
return None
amount_per_use = _estimated_amount_per_use(product.category)
if amount_per_use is None or amount_per_use <= 0:
return None
pack_amount = product.size_ml
if pack_amount is None or pack_amount <= 0:
pack_amount = _net_weight_g(product)
if pack_amount is None or pack_amount <= 0:
return None
uses_per_pack = pack_amount / amount_per_use
if uses_per_pack <= 0:
return None
price_pln = convert_to_pln(product.price_amount, product.price_currency.upper())
if price_pln is None:
return None
return price_pln / uses_per_pack
def _percentile(sorted_values: list[float], fraction: float) -> float:
if not sorted_values:
raise ValueError("sorted_values cannot be empty")
if len(sorted_values) == 1:
return sorted_values[0]
position = (len(sorted_values) - 1) * fraction
low = int(position)
high = min(low + 1, len(sorted_values) - 1)
weight = position - low
return sorted_values[low] * (1 - weight) + sorted_values[high] * weight
def _tier_from_thresholds(
value: float,
*,
p25: float,
p50: float,
p75: float,
) -> PriceTier:
if value <= p25:
return PriceTier.BUDGET
if value <= p50:
return PriceTier.MID
if value <= p75:
return PriceTier.PREMIUM
return PriceTier.LUXURY
def _thresholds(values: list[float]) -> tuple[float, float, float]:
sorted_vals = sorted(values)
return (
_percentile(sorted_vals, 0.25),
_percentile(sorted_vals, 0.50),
_percentile(sorted_vals, 0.75),
)
def _compute_pricing_outputs(
products: list[Product],
) -> dict[
UUID,
tuple[
PriceTier | None,
float | None,
Literal["category", "fallback", "insufficient_data"] | None,
],
]:
price_per_use_by_id: dict[UUID, float] = {}
grouped: dict[ProductCategory, list[tuple[UUID, float]]] = {}
for product in products:
ppu = _price_per_use_pln(product)
if ppu is None:
continue
price_per_use_by_id[product.id] = ppu
grouped.setdefault(product.category, []).append((product.id, ppu))
outputs: dict[
UUID,
tuple[
PriceTier | None,
float | None,
Literal["category", "fallback", "insufficient_data"] | None,
],
] = {
p.id: (
None,
price_per_use_by_id.get(p.id),
"insufficient_data" if p.id in price_per_use_by_id else None,
)
for p in products
}
fallback_rows: list[tuple[UUID, float]] = []
for product in products:
ppu = price_per_use_by_id.get(product.id)
if ppu is None:
continue
if product.is_tool or product.is_medication or not product.leave_on:
continue
fallback_rows.append((product.id, ppu))
fallback_thresholds: tuple[float, float, float] | None = None
if len(fallback_rows) >= _MIN_PRODUCTS_FOR_PRICE_TIER:
fallback_thresholds = _thresholds([ppu for _, ppu in fallback_rows])
for category_rows in grouped.values():
if len(category_rows) < _MIN_PRODUCTS_FOR_PRICE_TIER:
if (
len(category_rows) >= _MIN_CATEGORY_SIZE_FOR_FALLBACK
and fallback_thresholds is not None
):
p25, p50, p75 = fallback_thresholds
for product_id, ppu in category_rows:
tier = _tier_from_thresholds(ppu, p25=p25, p50=p50, p75=p75)
outputs[product_id] = (tier, ppu, "fallback")
continue
p25, p50, p75 = _thresholds([ppu for _, ppu in category_rows])
for product_id, ppu in category_rows:
tier = _tier_from_thresholds(ppu, p25=p25, p50=p50, p75=p75)
outputs[product_id] = (tier, ppu, "category")
return outputs
def _with_pricing(
view: ProductPublic,
pricing: tuple[
PriceTier | None,
float | None,
Literal["category", "fallback", "insufficient_data"] | None,
],
) -> ProductPublic:
price_tier, price_per_use_pln, price_tier_source = pricing
view.price_tier = price_tier
view.price_per_use_pln = price_per_use_pln
view.price_tier_source = price_tier_source
return view
# ---------------------------------------------------------------------------
# Product routes
# ---------------------------------------------------------------------------
@router.get("", response_model=list[ProductWithInventory])
def list_products(
category: Optional[ProductCategory] = None,
brand: Optional[str] = None,
targets: Optional[list[SkinConcern]] = Query(default=None),
is_medication: Optional[bool] = None,
is_tool: Optional[bool] = None,
session: Session = Depends(get_session),
):
stmt = select(Product)
if category is not None:
stmt = stmt.where(Product.category == category)
if brand is not None:
stmt = stmt.where(Product.brand == brand)
if is_medication is not None:
stmt = stmt.where(Product.is_medication == is_medication)
if is_tool is not None:
stmt = stmt.where(Product.is_tool == is_tool)
products = session.exec(stmt).all()
# Filter by targets (JSON column — done in Python)
if targets:
target_values = {t.value for t in targets}
products = [
p
for p in products
if any(
(t.value if hasattr(t, "value") else t) in target_values
for t in (p.targets or [])
)
]
# Bulk-load inventory for all products in one query
product_ids = [p.id for p in products]
inventory_rows = (
session.exec(
select(ProductInventory).where(
col(ProductInventory.product_id).in_(product_ids)
)
).all()
if product_ids
else []
)
inv_by_product: dict = {}
for inv in inventory_rows:
inv_by_product.setdefault(inv.product_id, []).append(inv)
results = []
pricing_pool = list(session.exec(select(Product)).all()) if products else []
pricing_outputs = _compute_pricing_outputs(pricing_pool)
for p in products:
r = ProductWithInventory.model_validate(p, from_attributes=True)
_with_pricing(r, pricing_outputs.get(p.id, (None, None, None)))
r.inventory = inv_by_product.get(p.id, [])
results.append(r)
return results
@router.post("", response_model=ProductPublic, status_code=201)
def create_product(data: ProductCreate, session: Session = Depends(get_session)):
payload = data.model_dump()
if payload.get("price_currency"):
payload["price_currency"] = str(payload["price_currency"]).upper()
product = Product(
id=uuid4(),
**payload,
)
session.add(product)
session.commit()
session.refresh(product)
return product
def _product_parse_system_prompt() -> str:
return """\
You are a skincare and cosmetics product data extraction expert. \
Given raw text (product page copy, ingredient list, label scan, etc.), \
extract structured product data and return it as a single JSON object.
RULES:
- Return ONLY raw JSON — no markdown code fences, no explanation, no preamble, no indentation or extra whitespace (minified).
- Omit any field you cannot confidently determine from the text. Do not guess.
- All enum values must exactly match the allowed strings listed below.
- For INCI lists: return each ingredient as a separate string in the array, \
preserving standard INCI names exactly as they appear.
- For actives: extract name, concentration (numeric, 0100), functions \
(use the allowed strings), and strength/irritation level if inferable.
- For effect_profile scores (05 int): ALWAYS return the full product_effect_profile \
object with all 13 fields. Infer each score from ingredient activity and product claims. \
Use 0 only when you truly have no basis for an estimate.
- For pH: extract from explicit mention (e.g. "pH 5.5", "pH range 4.05.0"). \
Do not infer from ingredients alone.
- For context_rules: infer from usage instructions and ingredient interactions \
(e.g. "do not use with AHAs" → safe_after_acids: false).
- fragrance_free / essential_oils_free / alcohol_denat_free: infer from INCI \
or explicit claims. Fragrance = "Parfum" or "Fragrance" in INCI → fragrance_free: false.
- For leave_on: true = leave-on treatment, false = rinse-off (cleanser, mask to rinse).
- recommended_time: "am" if contains SPF or vitamin C; "pm" if retinoid/retinol; \
"both" otherwise (when unclear, use "both").
ENUM ALLOWED VALUES (use ONLY these exact strings):
category: "cleanser" | "toner" | "essence" | "serum" | "moisturizer" | "spf" | \
"mask" | "exfoliant" | "hair_treatment" | "tool" | "spot_treatment" | "oil"
recommended_time: "am" | "pm" | "both"
texture: "watery" | "gel" | "emulsion" | "cream" | "oil" | "balm" | "foam" | "fluid"
absorption_speed: "very_fast" | "fast" | "moderate" | "slow" | "very_slow"
recommended_for (array, pick applicable):
"dry" | "oily" | "combination" | "sensitive" | "normal" | "acne_prone"
targets (array, pick applicable):
"acne" | "rosacea" | "hyperpigmentation" | "aging" | "dehydration" | "redness" | \
"damaged_barrier" | "pore_visibility" | "uneven_texture" | "hair_growth" | "sebum_excess"
actives[].functions (array, pick applicable):
"humectant" | "emollient" | "occlusive" | "exfoliant_aha" | "exfoliant_bha" | \
"exfoliant_pha" | "retinoid" | "antioxidant" | "soothing" | "barrier_support" | \
"brightening" | "anti_acne" | "ceramide" | "niacinamide" | "sunscreen" | "peptide" | \
"hair_growth_stimulant" | "prebiotic" | "vitamin_c" | "anti_aging"
actives[].strength_level: 1 (low) | 2 (medium) | 3 (high)
actives[].irritation_potential: 1 (low) | 2 (medium) | 3 (high)
OUTPUT SCHEMA (all fields optional — omit what you cannot determine):
{
"name": string,
"brand": string,
"line_name": string,
"sku": string,
"url": string,
"barcode": string,
"category": string,
"recommended_time": string,
"texture": string,
"absorption_speed": string,
"leave_on": boolean,
"price_amount": number,
"price_currency": string,
"size_ml": number,
"full_weight_g": number,
"empty_weight_g": number,
"pao_months": integer,
"inci": [string, ...],
"actives": [
{
"name": string,
"percent": number,
"functions": [string, ...],
"strength_level": 1|2|3,
"irritation_potential": 1|2|3
}
],
"recommended_for": [string, ...],
"targets": [string, ...],
"contraindications": [string, ...],
"usage_notes": string,
"fragrance_free": boolean,
"essential_oils_free": boolean,
"alcohol_denat_free": boolean,
"pregnancy_safe": boolean,
"product_effect_profile": {
"hydration_immediate": integer (0-5),
"hydration_long_term": integer (0-5),
"barrier_repair_strength": integer (0-5),
"soothing_strength": integer (0-5),
"exfoliation_strength": integer (0-5),
"retinoid_strength": integer (0-5),
"irritation_risk": integer (0-5),
"comedogenic_risk": integer (0-5),
"barrier_disruption_risk": integer (0-5),
"dryness_risk": integer (0-5),
"brightening_strength": integer (0-5),
"anti_acne_strength": integer (0-5),
"anti_aging_strength": integer (0-5)
},
"ph_min": number,
"ph_max": number,
"context_rules": {
"safe_after_shaving": boolean,
"safe_after_acids": boolean,
"safe_after_retinoids": boolean,
"safe_with_compromised_barrier": boolean,
"low_uv_only": boolean
},
"min_interval_hours": integer,
"max_frequency_per_week": integer,
"is_medication": boolean,
"is_tool": boolean,
"needle_length_mm": number
}
"""
@router.post("/parse-text", response_model=ProductParseResponse)
def parse_product_text(data: ProductParseRequest) -> ProductParseResponse:
response = call_gemini(
endpoint="products/parse-text",
contents=f"Extract product data from this text:\n\n{data.text}",
config=get_extraction_config(
system_instruction=_product_parse_system_prompt(),
response_schema=ProductParseLLMResponse,
max_output_tokens=16384,
),
user_input=data.text,
)
raw = response.text
if not raw:
raise HTTPException(status_code=502, detail="LLM returned an empty response")
try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
raise HTTPException(status_code=502, detail=f"LLM returned invalid JSON: {e}")
try:
llm_parsed = ProductParseLLMResponse.model_validate(parsed)
return ProductParseResponse.model_validate(llm_parsed.model_dump())
except ValidationError as e:
raise HTTPException(status_code=422, detail=e.errors())
@router.get("/{product_id}", response_model=ProductWithInventory)
def get_product(product_id: UUID, session: Session = Depends(get_session)):
product = get_or_404(session, Product, product_id)
pricing_pool = list(session.exec(select(Product)).all())
pricing_outputs = _compute_pricing_outputs(pricing_pool)
inventory = session.exec(
select(ProductInventory).where(ProductInventory.product_id == product_id)
).all()
result = ProductWithInventory.model_validate(product, from_attributes=True)
_with_pricing(result, pricing_outputs.get(product.id, (None, None, None)))
result.inventory = list(inventory)
return result
@router.patch("/{product_id}", response_model=ProductPublic)
def update_product(
product_id: UUID, data: ProductUpdate, session: Session = Depends(get_session)
):
product = get_or_404(session, Product, product_id)
patch_data = data.model_dump(exclude_unset=True)
if patch_data.get("price_currency"):
patch_data["price_currency"] = str(patch_data["price_currency"]).upper()
for key, value in patch_data.items():
setattr(product, key, value)
session.add(product)
session.commit()
session.refresh(product)
pricing_pool = list(session.exec(select(Product)).all())
pricing_outputs = _compute_pricing_outputs(pricing_pool)
result = ProductPublic.model_validate(product, from_attributes=True)
return _with_pricing(result, pricing_outputs.get(product.id, (None, None, None)))
@router.delete("/{product_id}", status_code=204)
def delete_product(product_id: UUID, session: Session = Depends(get_session)):
product = get_or_404(session, Product, product_id)
session.delete(product)
session.commit()
# ---------------------------------------------------------------------------
# Product inventory sub-routes
# ---------------------------------------------------------------------------
@router.get("/{product_id}/inventory", response_model=list[ProductInventory])
def list_product_inventory(product_id: UUID, session: Session = Depends(get_session)):
get_or_404(session, Product, product_id)
stmt = select(ProductInventory).where(ProductInventory.product_id == product_id)
return session.exec(stmt).all()
@router.post(
"/{product_id}/inventory", response_model=ProductInventory, status_code=201
)
def create_product_inventory(
product_id: UUID,
data: InventoryCreate,
session: Session = Depends(get_session),
):
get_or_404(session, Product, product_id)
entry = ProductInventory(
id=uuid4(),
product_id=product_id,
**data.model_dump(),
)
session.add(entry)
session.commit()
session.refresh(entry)
return entry
# ---------------------------------------------------------------------------
# Shopping suggestion
# ---------------------------------------------------------------------------
def _ev(v: object) -> str:
if v is None:
return ""
value = getattr(v, "value", None)
if isinstance(value, str):
return value
return str(v)
def _build_shopping_context(session: Session) -> str:
snapshot = session.exec(
select(SkinConditionSnapshot).order_by(
col(SkinConditionSnapshot.snapshot_date).desc()
)
).first()
skin_lines = ["STAN SKÓRY:"]
if snapshot:
skin_lines.append(f" Data: {snapshot.snapshot_date}")
skin_lines.append(f" Ogólny stan: {_ev(snapshot.overall_state)}")
skin_lines.append(f" Typ skóry: {_ev(snapshot.skin_type)}")
skin_lines.append(f" Nawilżenie: {snapshot.hydration_level}/5")
skin_lines.append(f" Wrażliwość: {snapshot.sensitivity_level}/5")
skin_lines.append(f" Bariera: {_ev(snapshot.barrier_state)}")
concerns = ", ".join(_ev(c) for c in (snapshot.active_concerns or []))
skin_lines.append(f" Aktywne problemy: {concerns or 'brak'}")
if snapshot.priorities:
skin_lines.append(f" Priorytety: {', '.join(snapshot.priorities)}")
else:
skin_lines.append(" (brak danych)")
products = _get_shopping_products(session)
product_ids = [p.id for p in products]
inventory_rows = (
session.exec(
select(ProductInventory).where(
col(ProductInventory.product_id).in_(product_ids)
)
).all()
if product_ids
else []
)
inv_by_product: dict = {}
for inv in inventory_rows:
inv_by_product.setdefault(inv.product_id, []).append(inv)
products_lines = ["POSIADANE PRODUKTY:"]
products_lines.append(
" Legenda: [✓] = produkt dostępny (w magazynie), [✗] = brak w magazynie"
)
for p in products:
active_inv = [i for i in inv_by_product.get(p.id, []) if i.finished_at is None]
has_stock = len(active_inv) > 0 # any unfinished inventory = in stock
stock = "" if has_stock else ""
actives = _extract_active_names(p)
actives_str = f", actives: {actives}" if actives else ""
ep = p.product_effect_profile
if isinstance(ep, dict):
effects = {k.replace("_strength", ""): v for k, v in ep.items() if v >= 3}
else:
effects = {
k.replace("_strength", ""): v
for k, v in ep.model_dump().items()
if v >= 3
}
effects_str = f", effects: {effects}" if effects else ""
targets = [_ev(t) for t in (p.targets or [])]
products_lines.append(
f" [{stock}] id={p.id} {p.name} ({p.brand or ''}) - {_ev(p.category)}, "
f"targets: {targets}{actives_str}{effects_str}"
)
return "\n".join(skin_lines) + "\n\n" + "\n".join(products_lines)
def _get_shopping_products(session: Session) -> list[Product]:
stmt = select(Product).where(col(Product.is_tool).is_(False))
products = session.exec(stmt).all()
return [p for p in products if not p.is_medication]
def _extract_active_names(product: Product) -> list[str]:
names: list[str] = []
for active in product.actives or []:
if isinstance(active, dict):
name = str(active.get("name") or "").strip()
else:
name = str(getattr(active, "name", "") or "").strip()
if not name:
continue
if name in names:
continue
names.append(name)
if len(names) >= 12:
break
return names
def _extract_requested_product_ids(
args: dict[str, object], max_ids: int = 8
) -> list[str]:
raw_ids = args.get("product_ids")
if not isinstance(raw_ids, list):
return []
requested_ids: list[str] = []
seen: set[str] = set()
for raw_id in raw_ids:
if not isinstance(raw_id, str):
continue
if raw_id in seen:
continue
seen.add(raw_id)
requested_ids.append(raw_id)
if len(requested_ids) >= max_ids:
break
return requested_ids
def _build_product_details_tool_handler(products: list[Product], mapper):
available_by_id = {str(p.id): p for p in products}
def _handler(args: dict[str, object]) -> dict[str, object]:
requested_ids = _extract_requested_product_ids(args)
products_payload = []
for pid in requested_ids:
product = available_by_id.get(pid)
if product is None:
continue
products_payload.append(mapper(product, pid))
return {"products": products_payload}
return _handler
def _build_inci_tool_handler(products: list[Product]):
def _mapper(product: Product, pid: str) -> dict[str, object]:
inci = product.inci or []
compact_inci = [str(i)[:120] for i in inci[:128]]
return {"id": pid, "name": product.name, "inci": compact_inci}
return _build_product_details_tool_handler(products, mapper=_mapper)
def _build_actives_tool_handler(products: list[Product]):
def _mapper(product: Product, pid: str) -> dict[str, object]:
payload = []
for active in product.actives or []:
if isinstance(active, dict):
name = str(active.get("name") or "").strip()
if not name:
continue
item = {"name": name}
percent = active.get("percent")
if percent is not None:
item["percent"] = percent
functions = active.get("functions")
if isinstance(functions, list):
item["functions"] = [str(f) for f in functions[:4]]
strength_level = active.get("strength_level")
if strength_level is not None:
item["strength_level"] = str(strength_level)
payload.append(item)
continue
name = str(getattr(active, "name", "") or "").strip()
if not name:
continue
item = {"name": name}
percent = getattr(active, "percent", None)
if percent is not None:
item["percent"] = percent
functions = getattr(active, "functions", None)
if isinstance(functions, list):
item["functions"] = [_ev(f) for f in functions[:4]]
strength_level = getattr(active, "strength_level", None)
if strength_level is not None:
item["strength_level"] = _ev(strength_level)
payload.append(item)
return {"id": pid, "name": product.name, "actives": payload[:24]}
return _build_product_details_tool_handler(products, mapper=_mapper)
def _build_usage_notes_tool_handler(products: list[Product]):
def _mapper(product: Product, pid: str) -> dict[str, object]:
notes = " ".join(str(product.usage_notes or "").split())
if len(notes) > 500:
notes = notes[:497] + "..."
return {"id": pid, "name": product.name, "usage_notes": notes}
return _build_product_details_tool_handler(products, mapper=_mapper)
def _build_safety_rules_tool_handler(products: list[Product]):
def _mapper(product: Product, pid: str) -> dict[str, object]:
ctx = product.to_llm_context()
return {
"id": pid,
"name": product.name,
"contraindications": (ctx.get("contraindications") or [])[:24],
"context_rules": ctx.get("context_rules") or {},
"safety": ctx.get("safety") or {},
"min_interval_hours": ctx.get("min_interval_hours"),
"max_frequency_per_week": ctx.get("max_frequency_per_week"),
}
return _build_product_details_tool_handler(products, mapper=_mapper)
_INCI_FUNCTION_DECLARATION = genai_types.FunctionDeclaration(
name="get_product_inci",
description=(
"Return exact INCI ingredient lists for selected product UUIDs from "
"POSIADANE PRODUKTY."
),
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"product_ids": genai_types.Schema(
type=genai_types.Type.ARRAY,
items=genai_types.Schema(type=genai_types.Type.STRING),
description="Product UUIDs from POSIADANE PRODUKTY.",
)
},
required=["product_ids"],
),
)
_SAFETY_RULES_FUNCTION_DECLARATION = genai_types.FunctionDeclaration(
name="get_product_safety_rules",
description=(
"Return safety and compatibility rules for selected product UUIDs, "
"including contraindications, context_rules and safety flags."
),
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"product_ids": genai_types.Schema(
type=genai_types.Type.ARRAY,
items=genai_types.Schema(type=genai_types.Type.STRING),
description="Product UUIDs from POSIADANE PRODUKTY.",
)
},
required=["product_ids"],
),
)
_ACTIVES_FUNCTION_DECLARATION = genai_types.FunctionDeclaration(
name="get_product_actives",
description=(
"Return detailed active ingredients for selected product UUIDs, "
"including concentration and functions when available."
),
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"product_ids": genai_types.Schema(
type=genai_types.Type.ARRAY,
items=genai_types.Schema(type=genai_types.Type.STRING),
description="Product UUIDs from POSIADANE PRODUKTY.",
)
},
required=["product_ids"],
),
)
_USAGE_NOTES_FUNCTION_DECLARATION = genai_types.FunctionDeclaration(
name="get_product_usage_notes",
description=(
"Return compact usage notes for selected product UUIDs "
"(timing, application method and cautions)."
),
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"product_ids": genai_types.Schema(
type=genai_types.Type.ARRAY,
items=genai_types.Schema(type=genai_types.Type.STRING),
description="Product UUIDs from POSIADANE PRODUKTY.",
)
},
required=["product_ids"],
),
)
_SHOPPING_SYSTEM_PROMPT = """Jesteś asystentem zakupowym w dziedzinie pielęgnacji skóry.
Twoim zadaniem jest przeanalizować stan skóry użytkownika oraz produkty, które już posiada,
a następnie zasugerować TYPY produktów (bez marek), które mogłyby uzupełnić ich rutynę.
LEGENDA:
- [✓] = produkt dostępny w magazynie (nawet jeśli jest zapieczętowany)
- [✗] = produkt niedostępny (brak w magazynie, wszystkie opakowania zużyte)
ZASADY:
0. Sugeruj tylko wtedy, gdy jest realna potrzeba - nie zwracaj stałej liczby produktów
1. Sugeruj TYLKO typy produktów, NIGDY konkretne marki (np. "Salicylic Acid 2% Masque", nie "La Roche-Posay")
2. Produkty oznaczone [✗] to te, których NIE MA w magazynie - możesz je zasugerować
3. Produkty oznaczone [✓] są już dostępne - nie sugeruj ich ponownie
4. Bierz pod uwagę aktywne problemy skóry (acne, hyperpigmentacja, aging, etc.)
5. Sugeruj realistyczną częstotliwość użycia (dzienna, 2-3x tygodniowo, etc.)
6. Zachowaj kolejność warstw: cleanse → toner → serum → moisturizer → SPF
7. Jeśli użytkownik ma uszkodzoną barierę, unikaj silnych eksfoliantów i retinoidów
8. Zwracaj uwagę na ewentualne konflikty polecanych składników z tymi, które użytkownik już posiada (np. nie polecaj peptydów miedziowych jeśli użytkownik nadużywa kwasów)
9. Odpowiadaj w języku polskim
Format odpowiedzi - zwróć wyłącznie JSON zgodny z podanym schematem."""
@router.post("/suggest", response_model=ShoppingSuggestionResponse)
def suggest_shopping(session: Session = Depends(get_session)):
context = _build_shopping_context(session)
shopping_products = _get_shopping_products(session)
prompt = (
f"Na podstawie poniższych danych przeanalizuj, jakie TYPY produktów "
f"mogłyby uzupełnić rutynę pielęgnacyjną użytkownika.\n\n"
f"{context}\n\n"
"NARZEDZIA:\n"
"- Masz dostep do funkcji: get_product_inci, get_product_safety_rules, get_product_actives, get_product_usage_notes.\n"
"- Wywoluj narzedzia tylko, gdy potrzebujesz detali do oceny konfliktow skladnikow lub ryzyka podraznien.\n"
"- Grupuj UUID: staraj sie pobierac dane dla wielu produktow jednym wywolaniem.\n"
f"Zwróć wyłącznie JSON zgodny ze schematem."
)
config = get_creative_config(
system_instruction=_SHOPPING_SYSTEM_PROMPT,
response_schema=_ShoppingSuggestionsOut,
max_output_tokens=4096,
).model_copy(
update={
"tools": [
genai_types.Tool(
function_declarations=[
_INCI_FUNCTION_DECLARATION,
_SAFETY_RULES_FUNCTION_DECLARATION,
_ACTIVES_FUNCTION_DECLARATION,
_USAGE_NOTES_FUNCTION_DECLARATION,
]
)
],
"tool_config": genai_types.ToolConfig(
function_calling_config=genai_types.FunctionCallingConfig(
mode=genai_types.FunctionCallingConfigMode.AUTO,
)
),
}
)
function_handlers = {
"get_product_inci": _build_inci_tool_handler(shopping_products),
"get_product_safety_rules": _build_safety_rules_tool_handler(shopping_products),
"get_product_actives": _build_actives_tool_handler(shopping_products),
"get_product_usage_notes": _build_usage_notes_tool_handler(shopping_products),
}
try:
response = call_gemini_with_function_tools(
endpoint="products/suggest",
contents=prompt,
config=config,
function_handlers=function_handlers,
user_input=prompt,
max_tool_roundtrips=3,
)
except HTTPException as exc:
if (
exc.status_code != 502
or str(exc.detail) != "Gemini requested too many function calls"
):
raise
conservative_prompt = (
f"{prompt}\n\n"
"TRYB AWARYJNY (KONSERWATYWNY):\n"
"- Osiagnieto limit wywolan narzedzi.\n"
"- Nie wywoluj narzedzi ponownie.\n"
"- Zasugeruj tylko najbardziej bezpieczne i realistyczne typy produktow do uzupelnienia brakow,"
" unikaj agresywnych aktywnych przy niepelnych danych.\n"
)
response = call_gemini(
endpoint="products/suggest",
contents=conservative_prompt,
config=get_creative_config(
system_instruction=_SHOPPING_SYSTEM_PROMPT,
response_schema=_ShoppingSuggestionsOut,
max_output_tokens=4096,
),
user_input=conservative_prompt,
tool_trace={
"mode": "fallback_conservative",
"reason": "max_tool_roundtrips_exceeded",
},
)
raw = response.text
if not raw:
raise HTTPException(status_code=502, detail="LLM returned an empty response")
try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
raise HTTPException(status_code=502, detail=f"LLM returned invalid JSON: {e}")
return ShoppingSuggestionResponse(
suggestions=[ProductSuggestion(**s) for s in parsed.get("suggestions", [])],
reasoning=parsed.get("reasoning", ""),
)