253 lines
9.1 KiB
Python
253 lines
9.1 KiB
Python
from datetime import date
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from fastapi import HTTPException
|
|
from sqlmodel import Session, col, select
|
|
|
|
from innercontext.auth import CurrentUser
|
|
from innercontext.models import Product, UserProfile
|
|
from innercontext.models.enums import Role
|
|
|
|
|
|
def _resolve_target_user_id(
|
|
current_user: CurrentUser,
|
|
user_id: UUID | None,
|
|
) -> UUID:
|
|
if user_id is None:
|
|
return current_user.user_id
|
|
if current_user.role is not Role.ADMIN:
|
|
raise HTTPException(status_code=403, detail="Admin role required")
|
|
return user_id
|
|
|
|
|
|
def get_user_profile(
|
|
session: Session,
|
|
current_user: CurrentUser | None = None,
|
|
*,
|
|
user_id: UUID | None = None,
|
|
) -> UserProfile | None:
|
|
if current_user is None:
|
|
return session.exec(
|
|
select(UserProfile).order_by(col(UserProfile.created_at).desc())
|
|
).first()
|
|
|
|
target_user_id = _resolve_target_user_id(current_user, user_id)
|
|
return session.exec(
|
|
select(UserProfile)
|
|
.where(UserProfile.user_id == target_user_id)
|
|
.order_by(col(UserProfile.created_at).desc())
|
|
).first()
|
|
|
|
|
|
def calculate_age(birth_date: date, reference_date: date) -> int:
|
|
years = reference_date.year - birth_date.year
|
|
if (reference_date.month, reference_date.day) < (birth_date.month, birth_date.day):
|
|
years -= 1
|
|
return years
|
|
|
|
|
|
def build_user_profile_context(
|
|
session: Session,
|
|
reference_date: date,
|
|
current_user: CurrentUser | None = None,
|
|
*,
|
|
user_id: UUID | None = None,
|
|
) -> str:
|
|
profile = get_user_profile(session, current_user, user_id=user_id)
|
|
if profile is None:
|
|
return "USER PROFILE: no data\n"
|
|
|
|
lines = ["USER PROFILE:"]
|
|
if profile.birth_date is not None:
|
|
age = calculate_age(profile.birth_date, reference_date)
|
|
lines.append(f" Age: {max(age, 0)}")
|
|
lines.append(f" Birth date: {profile.birth_date.isoformat()}")
|
|
else:
|
|
lines.append(" Age: unknown")
|
|
|
|
if profile.sex_at_birth is not None:
|
|
sex_value = (
|
|
profile.sex_at_birth.value
|
|
if hasattr(profile.sex_at_birth, "value")
|
|
else str(profile.sex_at_birth)
|
|
)
|
|
lines.append(f" Sex at birth: {sex_value}")
|
|
else:
|
|
lines.append(" Sex at birth: unknown")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Phase 2: Tiered Product Context Assembly
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_product_context_summary(product: Product, has_inventory: bool = False) -> str:
|
|
"""
|
|
Build minimal product context (Tier 1: Summary).
|
|
|
|
Used for initial LLM context when detailed info isn't needed yet.
|
|
~15-20 tokens per product vs ~150 tokens in full mode.
|
|
|
|
Args:
|
|
product: Product to summarize
|
|
has_inventory: Whether product has active inventory
|
|
|
|
Returns:
|
|
Compact single-line product summary
|
|
"""
|
|
status = "[✓]" if has_inventory else "[✗]"
|
|
|
|
# Get effect profile scores if available
|
|
effects = []
|
|
effect_profile = getattr(product, "effect_profile", None)
|
|
if effect_profile:
|
|
profile = effect_profile
|
|
# Only include notable effects (score > 0)
|
|
# Handle both dict (from DB) and object (from Pydantic)
|
|
if isinstance(profile, dict):
|
|
if profile.get("hydration_immediate", 0) > 0:
|
|
effects.append(f"hydration={profile['hydration_immediate']}")
|
|
if profile.get("exfoliation_strength", 0) > 0:
|
|
effects.append(f"exfoliation={profile['exfoliation_strength']}")
|
|
if profile.get("retinoid_strength", 0) > 0:
|
|
effects.append(f"retinoid={profile['retinoid_strength']}")
|
|
if profile.get("irritation_risk", 0) > 0:
|
|
effects.append(f"irritation_risk={profile['irritation_risk']}")
|
|
if profile.get("barrier_disruption_risk", 0) > 0:
|
|
effects.append(f"barrier_risk={profile['barrier_disruption_risk']}")
|
|
else:
|
|
if profile.hydration_immediate and profile.hydration_immediate > 0:
|
|
effects.append(f"hydration={profile.hydration_immediate}")
|
|
if profile.exfoliation_strength and profile.exfoliation_strength > 0:
|
|
effects.append(f"exfoliation={profile.exfoliation_strength}")
|
|
if profile.retinoid_strength and profile.retinoid_strength > 0:
|
|
effects.append(f"retinoid={profile.retinoid_strength}")
|
|
if profile.irritation_risk and profile.irritation_risk > 0:
|
|
effects.append(f"irritation_risk={profile.irritation_risk}")
|
|
if profile.barrier_disruption_risk and profile.barrier_disruption_risk > 0:
|
|
effects.append(f"barrier_risk={profile.barrier_disruption_risk}")
|
|
|
|
effects_str = f" effects={{{','.join(effects)}}}" if effects else ""
|
|
|
|
# Safety flags
|
|
safety_flags = []
|
|
if hasattr(product, "context_rules") and product.context_rules:
|
|
rules = product.context_rules
|
|
# Handle both dict (from DB) and object (from Pydantic)
|
|
if isinstance(rules, dict):
|
|
if rules.get("safe_with_compromised_barrier"):
|
|
safety_flags.append("barrier_ok")
|
|
if not rules.get("safe_after_shaving", True):
|
|
safety_flags.append("!post_shave")
|
|
else:
|
|
if rules.safe_with_compromised_barrier:
|
|
safety_flags.append("barrier_ok")
|
|
if not rules.safe_after_shaving:
|
|
safety_flags.append("!post_shave")
|
|
|
|
safety_str = f" safety={{{','.join(safety_flags)}}}" if safety_flags else ""
|
|
|
|
return (
|
|
f"{status} {product.short_id} | {product.brand} {product.name} "
|
|
f"({product.category}){effects_str}{safety_str}"
|
|
)
|
|
|
|
|
|
def build_product_context_detailed(
|
|
product: Product,
|
|
has_inventory: bool = False,
|
|
last_used_date: date | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Build detailed product context (Tier 2: Clinical Decision Data).
|
|
|
|
Used for function tool responses when LLM needs safety/clinical details.
|
|
Includes actives, effect_profile, context_rules, but OMITS full INCI list.
|
|
~40-50 tokens per product.
|
|
|
|
Args:
|
|
product: Product to detail
|
|
has_inventory: Whether product has active inventory
|
|
last_used_date: When product was last used
|
|
|
|
Returns:
|
|
Dict with clinical decision fields
|
|
"""
|
|
# Top actives only (limit to 5 for token efficiency)
|
|
top_actives = []
|
|
if hasattr(product, "actives") and product.actives:
|
|
for active in (product.actives or [])[:5]:
|
|
if isinstance(active, dict):
|
|
top_actives.append(
|
|
{
|
|
"name": active.get("name"),
|
|
"percent": active.get("percent"),
|
|
"functions": active.get("functions", []),
|
|
}
|
|
)
|
|
else:
|
|
top_actives.append(
|
|
{
|
|
"name": getattr(active, "name", None),
|
|
"percent": getattr(active, "percent", None),
|
|
"functions": getattr(active, "functions", []),
|
|
}
|
|
)
|
|
|
|
# Effect profile
|
|
effect_profile = None
|
|
product_effect_profile = getattr(product, "effect_profile", None)
|
|
if product_effect_profile:
|
|
if isinstance(product_effect_profile, dict):
|
|
effect_profile = product_effect_profile
|
|
else:
|
|
effect_profile = product_effect_profile.model_dump()
|
|
|
|
# Context rules
|
|
context_rules = None
|
|
if hasattr(product, "context_rules") and product.context_rules:
|
|
if isinstance(product.context_rules, dict):
|
|
context_rules = product.context_rules
|
|
else:
|
|
context_rules = product.context_rules.model_dump()
|
|
|
|
return {
|
|
"id": str(product.id),
|
|
"name": f"{product.brand} {product.name}",
|
|
"category": product.category,
|
|
"recommended_time": getattr(product, "recommended_time", None),
|
|
"has_inventory": has_inventory,
|
|
"last_used_date": last_used_date.isoformat() if last_used_date else None,
|
|
"top_actives": top_actives,
|
|
"effect_profile": effect_profile,
|
|
"context_rules": context_rules,
|
|
"min_interval_hours": getattr(product, "min_interval_hours", None),
|
|
"max_frequency_per_week": getattr(product, "max_frequency_per_week", None),
|
|
# INCI list OMITTED for token efficiency
|
|
}
|
|
|
|
|
|
def build_products_context_summary_list(
|
|
products: list[Product], products_with_inventory: set[UUID]
|
|
) -> str:
|
|
"""
|
|
Build summary context for multiple products (Tier 1).
|
|
|
|
Used in initial routine/batch prompts where LLM doesn't need full details yet.
|
|
Can fetch details via function tools if needed.
|
|
|
|
Args:
|
|
products: List of available products
|
|
products_with_inventory: Set of product IDs that have inventory
|
|
|
|
Returns:
|
|
Compact multi-line product list
|
|
"""
|
|
lines = ["AVAILABLE PRODUCTS:"]
|
|
for product in products:
|
|
has_inv = product.id in products_with_inventory
|
|
lines.append(f" {build_product_context_summary(product, has_inv)}")
|
|
return "\n".join(lines) + "\n"
|