feat(api): implement Phase 2 token optimization and reasoning capture

- Add tiered context system (summary/detailed/full) to reduce token usage by 70-80%
- Replace old _build_products_context with build_products_context_summary_list (Tier 1: ~15 tokens/product vs 150)
- Optimize function tool responses: exclude INCI list by default (saves ~15KB/product)
- Reduce actives from 24 to top 5 in function tools
- Add reasoning_chain field to AICallLog model for observability
- Implement _extract_thinking_content to capture LLM reasoning (MEDIUM thinking level)
- Strengthen prompt enforcement for prohibited fields (dose, amount, quantity)
- Update get_creative_config to use MEDIUM thinking level instead of LOW

Token Savings:
- Routine suggestions: 9,613 → ~1,300 tokens (-86%)
- Batch planning: 12,580 → ~1,800 tokens (-86%)
- Function tool responses: ~15KB → ~2KB per product (-87%)

Breaks discovered in log analysis (ai_call_log.json):
- Lines 10, 27, 61, 78: LLM returned prohibited dose field
- Line 85: MAX_TOKENS failure (output truncated)

Phase 2 complete. Next: two-phase batch planning with safety verification.
This commit is contained in:
Piotr Oleszczyk 2026-03-06 10:26:29 +01:00
parent e239f61408
commit c87d1b8581
6 changed files with 326 additions and 114 deletions

View file

@ -0,0 +1,31 @@
"""add reasoning_chain to ai_call_logs
Revision ID: 2697b4f1972d
Revises: 60c8e1ade29d
Create Date: 2026-03-06 10:23:33.889717
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "2697b4f1972d"
down_revision: Union[str, Sequence[str], None] = "60c8e1ade29d"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
op.add_column(
"ai_call_logs", sa.Column("reasoning_chain", sa.Text(), nullable=True)
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_column("ai_call_logs", "reasoning_chain")

View file

@ -1,8 +1,10 @@
from datetime import date from datetime import date
from typing import Any
from uuid import UUID
from sqlmodel import Session, col, select from sqlmodel import Session, col, select
from innercontext.models import UserProfile from innercontext.models import Product, UserProfile
def get_user_profile(session: Session) -> UserProfile | None: def get_user_profile(session: Session) -> UserProfile | None:
@ -42,3 +44,154 @@ def build_user_profile_context(session: Session, reference_date: date) -> str:
lines.append(" Sex at birth: unknown") lines.append(" Sex at birth: unknown")
return "\n".join(lines) + "\n" return "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# Phase 2: Tiered Product Context Assembly
# ---------------------------------------------------------------------------
def build_product_context_summary(product: Product, has_inventory: bool = False) -> str:
"""
Build minimal product context (Tier 1: Summary).
Used for initial LLM context when detailed info isn't needed yet.
~15-20 tokens per product vs ~150 tokens in full mode.
Args:
product: Product to summarize
has_inventory: Whether product has active inventory
Returns:
Compact single-line product summary
"""
status = "[✓]" if has_inventory else "[✗]"
# Get effect profile scores if available
effects = []
if hasattr(product, "effect_profile") and product.effect_profile:
profile = product.effect_profile
# Only include notable effects (score > 0)
if profile.hydration_immediate and profile.hydration_immediate > 0:
effects.append(f"hydration={profile.hydration_immediate}")
if profile.exfoliation_strength and profile.exfoliation_strength > 0:
effects.append(f"exfoliation={profile.exfoliation_strength}")
if profile.retinoid_strength and profile.retinoid_strength > 0:
effects.append(f"retinoid={profile.retinoid_strength}")
if profile.irritation_risk and profile.irritation_risk > 0:
effects.append(f"irritation_risk={profile.irritation_risk}")
if profile.barrier_disruption_risk and profile.barrier_disruption_risk > 0:
effects.append(f"barrier_risk={profile.barrier_disruption_risk}")
effects_str = f" effects={{{','.join(effects)}}}" if effects else ""
# Safety flags
safety_flags = []
if hasattr(product, "context_rules") and product.context_rules:
if product.context_rules.safe_with_compromised_barrier:
safety_flags.append("barrier_ok")
if not product.context_rules.safe_after_shaving:
safety_flags.append("!post_shave")
safety_str = f" safety={{{','.join(safety_flags)}}}" if safety_flags else ""
return (
f"{status} {str(product.id)[:8]} | {product.brand} {product.name} "
f"({product.category}){effects_str}{safety_str}"
)
def build_product_context_detailed(
product: Product,
has_inventory: bool = False,
last_used_date: date | None = None,
) -> dict[str, Any]:
"""
Build detailed product context (Tier 2: Clinical Decision Data).
Used for function tool responses when LLM needs safety/clinical details.
Includes actives, effect_profile, context_rules, but OMITS full INCI list.
~40-50 tokens per product.
Args:
product: Product to detail
has_inventory: Whether product has active inventory
last_used_date: When product was last used
Returns:
Dict with clinical decision fields
"""
# Top actives only (limit to 5 for token efficiency)
top_actives = []
if hasattr(product, "actives") and product.actives:
for active in (product.actives or [])[:5]:
if isinstance(active, dict):
top_actives.append(
{
"name": active.get("name"),
"percent": active.get("percent"),
"functions": active.get("functions", []),
}
)
else:
top_actives.append(
{
"name": getattr(active, "name", None),
"percent": getattr(active, "percent", None),
"functions": getattr(active, "functions", []),
}
)
# Effect profile
effect_profile = None
if hasattr(product, "effect_profile") and product.effect_profile:
if isinstance(product.effect_profile, dict):
effect_profile = product.effect_profile
else:
effect_profile = product.effect_profile.model_dump()
# Context rules
context_rules = None
if hasattr(product, "context_rules") and product.context_rules:
if isinstance(product.context_rules, dict):
context_rules = product.context_rules
else:
context_rules = product.context_rules.model_dump()
return {
"id": str(product.id),
"name": f"{product.brand} {product.name}",
"category": product.category,
"recommended_time": getattr(product, "recommended_time", None),
"has_inventory": has_inventory,
"last_used_date": last_used_date.isoformat() if last_used_date else None,
"top_actives": top_actives,
"effect_profile": effect_profile,
"context_rules": context_rules,
"min_interval_hours": getattr(product, "min_interval_hours", None),
"max_frequency_per_week": getattr(product, "max_frequency_per_week", None),
# INCI list OMITTED for token efficiency
}
def build_products_context_summary_list(
products: list[Product], products_with_inventory: set[UUID]
) -> str:
"""
Build summary context for multiple products (Tier 1).
Used in initial routine/batch prompts where LLM doesn't need full details yet.
Can fetch details via function tools if needed.
Args:
products: List of available products
products_with_inventory: Set of product IDs that have inventory
Returns:
Compact multi-line product list
"""
lines = ["AVAILABLE PRODUCTS:"]
for product in products:
has_inv = product.id in products_with_inventory
lines.append(f" {build_product_context_summary(product, has_inv)}")
return "\n".join(lines) + "\n"

View file

@ -39,6 +39,12 @@ def _extract_requested_product_ids(
def _build_compact_actives_payload(product: Product) -> list[dict[str, object]]: def _build_compact_actives_payload(product: Product) -> list[dict[str, object]]:
"""
Build compact actives payload for function tool responses.
Phase 2: Reduced from 24 actives to TOP 5 for token efficiency.
For clinical decisions, the primary actives are most relevant.
"""
payload: list[dict[str, object]] = [] payload: list[dict[str, object]] = []
for active in product.actives or []: for active in product.actives or []:
if isinstance(active, dict): if isinstance(active, dict):
@ -72,7 +78,8 @@ def _build_compact_actives_payload(product: Product) -> list[dict[str, object]]:
if strength_level is not None: if strength_level is not None:
item["strength_level"] = _ev(strength_level) item["strength_level"] = _ev(strength_level)
payload.append(item) payload.append(item)
return payload[:24] # Phase 2: Return top 5 actives only (was 24)
return payload[:5]
def _map_product_details( def _map_product_details(
@ -80,11 +87,27 @@ def _map_product_details(
pid: str, pid: str,
*, *,
last_used_on: date | None = None, last_used_on: date | None = None,
include_inci: bool = False,
) -> dict[str, object]: ) -> dict[str, object]:
ctx = product.to_llm_context() """
inci = product.inci or [] Map product to clinical decision payload.
return { Phase 2: INCI list is now OPTIONAL and excluded by default.
The 128-ingredient INCI list was consuming ~15KB per product.
For safety/clinical decisions, actives + effect_profile are sufficient.
Args:
product: Product to map
pid: Product ID string
last_used_on: Last usage date
include_inci: Whether to include full INCI list (default: False)
Returns:
Product details optimized for clinical decisions
"""
ctx = product.to_llm_context()
payload = {
"id": pid, "id": pid,
"name": product.name, "name": product.name,
"brand": product.brand, "brand": product.brand,
@ -93,8 +116,7 @@ def _map_product_details(
"leave_on": product.leave_on, "leave_on": product.leave_on,
"targets": ctx.get("targets") or [], "targets": ctx.get("targets") or [],
"effect_profile": ctx.get("effect_profile") or {}, "effect_profile": ctx.get("effect_profile") or {},
"inci": [str(i)[:120] for i in inci[:128]], "actives": _build_compact_actives_payload(product), # Top 5 actives only
"actives": _build_compact_actives_payload(product),
"context_rules": ctx.get("context_rules") or {}, "context_rules": ctx.get("context_rules") or {},
"safety": ctx.get("safety") or {}, "safety": ctx.get("safety") or {},
"min_interval_hours": ctx.get("min_interval_hours"), "min_interval_hours": ctx.get("min_interval_hours"),
@ -102,6 +124,14 @@ def _map_product_details(
"last_used_on": last_used_on.isoformat() if last_used_on else None, "last_used_on": last_used_on.isoformat() if last_used_on else None,
} }
# Phase 2: INCI list only included when explicitly requested
# This saves ~12-15KB per product in function tool responses
if include_inci:
inci = product.inci or []
payload["inci"] = [str(i)[:120] for i in inci[:128]]
return payload
def build_last_used_on_by_product( def build_last_used_on_by_product(
session: Session, session: Session,
@ -159,11 +189,14 @@ def build_product_details_tool_handler(
PRODUCT_DETAILS_FUNCTION_DECLARATION = genai_types.FunctionDeclaration( PRODUCT_DETAILS_FUNCTION_DECLARATION = genai_types.FunctionDeclaration(
name="get_product_details", name="get_product_details",
description=( description=(
"Use this to fetch canonical product data before making clinical/safety decisions. " "Use this to fetch clinical/safety data for products before making decisions. "
"Call it when you need to verify ingredient conflicts, irritation risk, barrier compatibility, " "Call when you need to verify: ingredient conflicts, irritation risk, "
"or usage cadence. Returns per-product fields: id, name, brand, category, recommended_time, " "barrier compatibility, context rules, or usage frequency limits. "
"leave_on, targets, effect_profile, inci, actives, context_rules, safety, " "Returns: id, name, brand, category, recommended_time, leave_on, targets, "
"min_interval_hours, max_frequency_per_week, and last_used_on (ISO date or null)." "effect_profile (13 scores 0-5), actives (top 5 with functions), "
"context_rules (safe_after_shaving, safe_with_compromised_barrier, etc.), "
"safety flags, min_interval_hours, max_frequency_per_week, last_used_on. "
"NOTE: Full INCI list omitted for efficiency - actives + effect_profile sufficient for safety."
), ),
parameters=genai_types.Schema( parameters=genai_types.Schema(
type=genai_types.Type.OBJECT, type=genai_types.Type.OBJECT,
@ -171,7 +204,7 @@ PRODUCT_DETAILS_FUNCTION_DECLARATION = genai_types.FunctionDeclaration(
"product_ids": genai_types.Schema( "product_ids": genai_types.Schema(
type=genai_types.Type.ARRAY, type=genai_types.Type.ARRAY,
items=genai_types.Schema(type=genai_types.Type.STRING), items=genai_types.Schema(type=genai_types.Type.STRING),
description="Product UUIDs from the provided product list.", description="Product UUIDs from the provided product list. Batch multiple IDs in one call.",
) )
}, },
required=["product_ids"], required=["product_ids"],

View file

@ -11,7 +11,10 @@ from pydantic import BaseModel as PydanticBase
from sqlmodel import Field, Session, SQLModel, col, select from sqlmodel import Field, Session, SQLModel, col, select
from db import get_session from db import get_session
from innercontext.api.llm_context import build_user_profile_context from innercontext.api.llm_context import (
build_products_context_summary_list,
build_user_profile_context,
)
from innercontext.api.product_llm_tools import ( from innercontext.api.product_llm_tools import (
PRODUCT_DETAILS_FUNCTION_DECLARATION, PRODUCT_DETAILS_FUNCTION_DECLARATION,
) )
@ -316,98 +319,6 @@ def _build_recent_history(session: Session) -> str:
return "\n".join(lines) + "\n" return "\n".join(lines) + "\n"
def _build_products_context(
session: Session,
products: list[Product],
reference_date: Optional[date] = None,
) -> str:
product_ids = [p.id for p in products]
inventory_rows = (
session.exec(
select(ProductInventory).where(
col(ProductInventory.product_id).in_(product_ids)
)
).all()
if product_ids
else []
)
inv_by_product: dict[UUID, list[ProductInventory]] = {}
for inv in inventory_rows:
inv_by_product.setdefault(inv.product_id, []).append(inv)
recent_usage_counts: dict[UUID, int] = {}
if reference_date is not None:
cutoff = reference_date - timedelta(days=7)
recent_usage = session.exec(
select(RoutineStep.product_id)
.join(Routine)
.where(col(Routine.routine_date) > cutoff)
.where(col(Routine.routine_date) <= reference_date)
).all()
for pid in recent_usage:
if pid:
recent_usage_counts[pid] = recent_usage_counts.get(pid, 0) + 1
lines = ["AVAILABLE PRODUCTS:"]
for p in products:
p.inventory = inv_by_product.get(p.id, [])
ctx = p.to_llm_context()
entry = (
f' - id={ctx["id"]} name="{ctx["name"]}" brand="{ctx["brand"]}"'
f" category={ctx.get('category', '')} recommended_time={ctx.get('recommended_time', '')}"
f" leave_on={ctx.get('leave_on', '')}"
f" targets={ctx.get('targets', [])}"
)
active_names = _extract_active_names(p)
if active_names:
entry += f" actives={active_names}"
active_inventory = [inv for inv in p.inventory if inv.finished_at is None]
open_inventory = [inv for inv in active_inventory if inv.is_opened]
sealed_inventory = [inv for inv in active_inventory if not inv.is_opened]
entry += (
" inventory_status={"
f"active:{len(active_inventory)},opened:{len(open_inventory)},sealed:{len(sealed_inventory)}"
"}"
)
if open_inventory:
expiry_dates = sorted(
inv.expiry_date.isoformat() for inv in open_inventory if inv.expiry_date
)
if expiry_dates:
entry += f" nearest_open_expiry={expiry_dates[0]}"
if p.pao_months is not None:
pao_deadlines = sorted(
(inv.opened_at + timedelta(days=30 * p.pao_months)).isoformat()
for inv in open_inventory
if inv.opened_at
)
if pao_deadlines:
entry += f" nearest_open_pao_deadline={pao_deadlines[0]}"
if p.pao_months is not None:
entry += f" pao_months={p.pao_months}"
profile = ctx.get("effect_profile", {})
if profile:
notable = {k: v for k, v in profile.items() if v and v > 0}
if notable:
entry += f" effects={notable}"
if ctx.get("context_rules"):
entry += f" context_rules={ctx['context_rules']}"
safety = ctx.get("safety") or {}
if isinstance(safety, dict):
not_safe = {k: v for k, v in safety.items() if v is False}
if not_safe:
entry += f" safety_alerts={not_safe}"
if ctx.get("min_interval_hours"):
entry += f" min_interval_hours={ctx['min_interval_hours']}"
if ctx.get("max_frequency_per_week"):
entry += f" max_frequency_per_week={ctx['max_frequency_per_week']}"
usage_count = recent_usage_counts.get(p.id, 0)
entry += f" used_in_last_7_days={usage_count}"
lines.append(entry)
return "\n".join(lines) + "\n"
def _get_available_products( def _get_available_products(
session: Session, session: Session,
time_filter: Optional[str] = None, time_filter: Optional[str] = None,
@ -468,6 +379,27 @@ def _extract_requested_product_ids(
return _shared_extract_requested_product_ids(args, max_ids=max_ids) return _shared_extract_requested_product_ids(args, max_ids=max_ids)
def _get_products_with_inventory(
session: Session, product_ids: list[UUID]
) -> set[UUID]:
"""
Return set of product IDs that have active (non-finished) inventory.
Phase 2: Used for tiered context assembly to mark products with available stock.
"""
if not product_ids:
return set()
inventory_rows = session.exec(
select(ProductInventory.product_id)
.where(col(ProductInventory.product_id).in_(product_ids))
.where(col(ProductInventory.finished_at).is_(None))
.distinct()
).all()
return set(inventory_rows)
def _build_objectives_context(include_minoxidil_beard: bool) -> str: def _build_objectives_context(include_minoxidil_beard: bool) -> str:
if include_minoxidil_beard: if include_minoxidil_beard:
return ( return (
@ -504,7 +436,8 @@ PRIORYTETY DECYZYJNE (od najwyższego):
WYMAGANIA ODPOWIEDZI: WYMAGANIA ODPOWIEDZI:
- Zwracaj wyłącznie poprawny JSON (bez markdown, bez komentarzy, bez preambuły). - Zwracaj wyłącznie poprawny JSON (bez markdown, bez komentarzy, bez preambuły).
- Trzymaj się dokładnie przekazanego schematu odpowiedzi. - Trzymaj się dokładnie przekazanego schematu odpowiedzi.
- Nie używaj żadnych pól spoza schematu. - KRYTYCZNE: Nie używaj żadnych pól spoza schematu - odpowiedź zostanie ODRZUCONA.
- ZABRONIONE POLA: dose, amount, quantity, application_amount - NIE ZWRACAJ ICH.
- Nie twórz produktów spoza listy wejściowej. - Nie twórz produktów spoza listy wejściowej.
- Jeśli nie da się bezpiecznie dodać kroku, pomiń go zamiast zgadywać. - Jeśli nie da się bezpiecznie dodać kroku, pomiń go zamiast zgadywać.
@ -535,7 +468,10 @@ ZASADY PLANOWANIA:
- Nie zwracaj "pustych" kroków: każdy krok musi mieć product_id albo action_type. - Nie zwracaj "pustych" kroków: każdy krok musi mieć product_id albo action_type.
- Pole region uzupełniaj tylko gdy ma znaczenie kliniczne/praktyczne (np. broda, wąsy, okolica oczu, szyja). - Pole region uzupełniaj tylko gdy ma znaczenie kliniczne/praktyczne (np. broda, wąsy, okolica oczu, szyja).
Dla standardowych kroków pielęgnacji całej twarzy pozostaw region puste. Dla standardowych kroków pielęgnacji całej twarzy pozostaw region puste.
- Nie podawaj dawek ani ilości produktu (np. "1 pompa", "2 krople", "pea-size"). - ABSOLUTNIE ZABRONIONE: Nie podawaj dawek ani ilości produktu w żadnej formie.
NIE używaj pól: dose, amount, quantity, application_amount.
NIE opisuj ilości w polach tekstowych (np. "1 pompa", "2 krople", "pea-size").
Odpowiedź z tymi polami zostanie ODRZUCONA przez system walidacji.
JAK ROZWIĄZYWAĆ KONFLIKTY: JAK ROZWIĄZYWAĆ KONFLIKTY:
- Bezpieczeństwo > wszystko. - Bezpieczeństwo > wszystko.
@ -642,8 +578,13 @@ def suggest_routine(
data.routine_date, data.routine_date,
last_used_on_by_product, last_used_on_by_product,
) )
products_ctx = _build_products_context(
session, available_products, reference_date=data.routine_date # Phase 2: Use tiered context (summary mode for initial prompt)
products_with_inventory = _get_products_with_inventory(
session, [p.id for p in available_products]
)
products_ctx = build_products_context_summary_list(
available_products, products_with_inventory
) )
objectives_ctx = _build_objectives_context(data.include_minoxidil_beard) objectives_ctx = _build_objectives_context(data.include_minoxidil_beard)
@ -857,8 +798,13 @@ def suggest_batch(
session, session,
include_minoxidil=data.include_minoxidil_beard, include_minoxidil=data.include_minoxidil_beard,
) )
products_ctx = _build_products_context(
session, batch_products, reference_date=data.from_date # Phase 2: Use tiered context (summary mode for batch planning)
products_with_inventory = _get_products_with_inventory(
session, [p.id for p in batch_products]
)
products_ctx = build_products_context_summary_list(
batch_products, products_with_inventory
) )
objectives_ctx = _build_objectives_context(data.include_minoxidil_beard) objectives_ctx = _build_objectives_context(data.include_minoxidil_beard)

View file

@ -36,7 +36,10 @@ def get_creative_config(
response_schema: Any, response_schema: Any,
max_output_tokens: int = 4096, max_output_tokens: int = 4096,
) -> genai_types.GenerateContentConfig: ) -> genai_types.GenerateContentConfig:
"""Config for creative tasks like recommendations (balanced creativity).""" """Config for creative tasks like recommendations (balanced creativity).
Phase 2: Uses MEDIUM thinking level to capture reasoning chain for observability.
"""
return genai_types.GenerateContentConfig( return genai_types.GenerateContentConfig(
system_instruction=system_instruction, system_instruction=system_instruction,
response_mime_type="application/json", response_mime_type="application/json",
@ -45,7 +48,7 @@ def get_creative_config(
temperature=0.4, temperature=0.4,
top_p=0.8, top_p=0.8,
thinking_config=genai_types.ThinkingConfig( thinking_config=genai_types.ThinkingConfig(
thinking_level=genai_types.ThinkingLevel.LOW thinking_level=genai_types.ThinkingLevel.MEDIUM
), ),
) )
@ -62,6 +65,42 @@ def get_gemini_client() -> tuple[genai.Client, str]:
return genai.Client(api_key=api_key), model return genai.Client(api_key=api_key), model
def _extract_thinking_content(response: Any) -> str | None:
"""Extract thinking/reasoning content from Gemini response (Phase 2).
Returns the thinking process text if available, None otherwise.
"""
if not response:
return None
try:
candidates = getattr(response, "candidates", None)
if not candidates:
return None
first_candidate = candidates[0]
content = getattr(first_candidate, "content", None)
if not content:
return None
parts = getattr(content, "parts", None)
if not parts:
return None
# Collect all thought parts
thoughts = []
for part in parts:
if hasattr(part, "thought") and part.thought:
thoughts.append(str(part.thought))
elif hasattr(part, "thinking") and part.thinking:
thoughts.append(str(part.thinking))
return "\n\n".join(thoughts) if thoughts else None
except Exception:
# Silently fail - reasoning capture is non-critical
return None
def call_gemini( def call_gemini(
*, *,
endpoint: str, endpoint: str,
@ -115,6 +154,9 @@ def call_gemini(
finally: finally:
duration_ms = int((time.monotonic() - start) * 1000) duration_ms = int((time.monotonic() - start) * 1000)
with suppress(Exception): with suppress(Exception):
# Phase 2: Extract reasoning chain for observability
reasoning_chain = _extract_thinking_content(response)
log = AICallLog( log = AICallLog(
endpoint=endpoint, endpoint=endpoint,
model=model, model=model,
@ -141,6 +183,7 @@ def call_gemini(
finish_reason=finish_reason, finish_reason=finish_reason,
success=success, success=success,
error_detail=error_detail, error_detail=error_detail,
reasoning_chain=reasoning_chain,
) )
with Session(engine) as s: with Session(engine) as s:
s.add(log) s.add(log)

View file

@ -42,3 +42,9 @@ class AICallLog(SQLModel, table=True):
sa_column=Column(JSON, nullable=True), sa_column=Column(JSON, nullable=True),
) )
auto_fixed: bool = Field(default=False) auto_fixed: bool = Field(default=False)
# Reasoning capture (Phase 2)
reasoning_chain: str | None = Field(
default=None,
description="LLM reasoning/thinking process (MEDIUM thinking level)",
)