innercontext/tests/routine_benchmark.py
Piotr Oleszczyk dac787b81b test(auth): add multi-user regression coverage
- Enable backend tests in CI (remove if: false)
- Fix test_products_helpers.py to pass current_user parameter
- Fix test_routines_helpers.py to include short_id in products
- Fix llm_context.py to use product_effect_profile correctly
- All 221 tests passing
2026-03-12 16:42:00 +01:00

1060 lines
37 KiB
Python

#!/usr/bin/env python3
# pyright: reportMissingImports=false
"""Benchmark routines LLM prompt with variable snapshot-history window.
This script does NOT call /routines/suggest.
It reconstructs a production-like prompt and calls Gemini directly, while varying
the number of days included in the "SKIN SNAPSHOT HISTORY" section.
"""
from __future__ import annotations
import argparse
import csv
import json
import math
import os
import re
import sys
import time
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
ROOT_DIR = Path(__file__).resolve().parents[1]
BACKEND_DIR = ROOT_DIR / "backend"
if str(BACKEND_DIR) not in sys.path:
sys.path.insert(0, str(BACKEND_DIR))
_BACKEND_SYMBOLS: dict[str, Any] | None = None
def backend_symbols() -> dict[str, Any]:
global _BACKEND_SYMBOLS
if _BACKEND_SYMBOLS is not None:
return _BACKEND_SYMBOLS
from fastapi import HTTPException
from google.genai import types as genai_types # type: ignore[import-not-found]
from innercontext.api.product_llm_tools import ( # type: ignore[import-not-found]
PRODUCT_DETAILS_FUNCTION_DECLARATION,
)
from innercontext.api.routines import ( # type: ignore[import-not-found]
_DAY_NAMES,
_ROUTINES_SINGLE_EXTRA,
_ROUTINES_SYSTEM_PROMPT,
_SuggestionOut,
)
from innercontext.llm import ( # type: ignore[import-not-found]
call_gemini,
call_gemini_with_function_tools,
get_creative_config,
)
_BACKEND_SYMBOLS = {
"HTTPException": HTTPException,
"genai_types": genai_types,
"PRODUCT_DETAILS_FUNCTION_DECLARATION": PRODUCT_DETAILS_FUNCTION_DECLARATION,
"_DAY_NAMES": _DAY_NAMES,
"_ROUTINES_SINGLE_EXTRA": _ROUTINES_SINGLE_EXTRA,
"_ROUTINES_SYSTEM_PROMPT": _ROUTINES_SYSTEM_PROMPT,
"_SuggestionOut": _SuggestionOut,
"call_gemini": call_gemini,
"call_gemini_with_function_tools": call_gemini_with_function_tools,
"get_creative_config": get_creative_config,
}
return _BACKEND_SYMBOLS
DEFAULT_BASE_URL = "http://192.168.101.82/api"
DEFAULT_WINDOWS = [3, 5, 7]
DEFAULT_OUT = "routines_suggest_history_experiment.csv"
def _ev(v: object) -> str:
if v is None:
return ""
if hasattr(v, "value"):
value = getattr(v, "value")
if isinstance(value, str):
return value
return str(v)
def _parse_iso_date(raw: str | None) -> date | None:
if not raw:
return None
try:
return date.fromisoformat(raw)
except ValueError:
return None
def _contains_minoxidil_text(value: str | None) -> bool:
if not value:
return False
text = value.lower()
return "minoxidil" in text or "minoksydyl" in text
def _is_minoxidil_product(product: dict[str, Any]) -> bool:
if _contains_minoxidil_text(product.get("name")):
return True
if _contains_minoxidil_text(product.get("brand")):
return True
if _contains_minoxidil_text(product.get("line_name")):
return True
for inci in product.get("inci") or []:
if _contains_minoxidil_text(str(inci)):
return True
for active in product.get("actives") or []:
if isinstance(active, dict):
if _contains_minoxidil_text(str(active.get("name") or "")):
return True
return False
def http_json(
method: str,
url: str,
body: dict[str, Any] | None = None,
timeout: int = 20,
) -> tuple[int, Any]:
data = None
headers = {"Content-Type": "application/json"}
if body is not None:
data = json.dumps(body).encode("utf-8")
req = Request(url=url, data=data, method=method, headers=headers)
try:
with urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
return resp.status, json.loads(raw) if raw else {}
except HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
parsed = json.loads(raw) if raw else {}
except json.JSONDecodeError:
parsed = {"detail": raw}
return e.code, parsed
except (URLError, TimeoutError) as e:
return 0, {"detail": f"Network error: {e}"}
def fetch_required_data(base_url: str, timeout: int) -> dict[str, Any]:
base = base_url.rstrip("/")
endpoints = {
"profile": "/profile",
"snapshots": "/skincare",
"grooming": "/routines/grooming-schedule",
"routines": "/routines",
"products": "/products",
}
results: dict[str, Any] = {}
for key, path in endpoints.items():
status, payload = http_json("GET", f"{base}{path}", timeout=timeout)
if status != 200:
raise RuntimeError(f"GET {path} failed ({status}): {payload}")
results[key] = payload
snapshots = list(results["snapshots"] or [])
for s in snapshots:
s["_d"] = _parse_iso_date(s.get("snapshot_date"))
snapshots = [s for s in snapshots if s.get("_d") is not None]
snapshots.sort(key=lambda x: x["_d"], reverse=True)
results["snapshots"] = snapshots
routines = list(results["routines"] or [])
for r in routines:
r["_d"] = _parse_iso_date(r.get("routine_date"))
routines = [r for r in routines if r.get("_d") is not None]
routines.sort(key=lambda x: x["_d"], reverse=True)
results["routines"] = routines
products = list(results["products"] or [])
for p in products:
full_id = str(p.get("id") or "")
p["_id"] = full_id
p["_short_id"] = str(p.get("short_id") or full_id[:8])
results["products"] = products
return results
def build_user_profile_context(
profile: dict[str, Any] | None, reference_date: date
) -> str:
if profile is None:
return "USER PROFILE: no data\n"
lines = ["USER PROFILE:"]
birth_date_raw = profile.get("birth_date")
birth_date = _parse_iso_date(birth_date_raw)
if birth_date is not None:
years = reference_date.year - birth_date.year
if (reference_date.month, reference_date.day) < (
birth_date.month,
birth_date.day,
):
years -= 1
lines.append(f" Age: {max(years, 0)}")
lines.append(f" Birth date: {birth_date.isoformat()}")
else:
lines.append(" Age: unknown")
sex = profile.get("sex_at_birth")
if sex is not None:
lines.append(f" Sex at birth: {sex}")
else:
lines.append(" Sex at birth: unknown")
return "\n".join(lines) + "\n"
def pick_effective_snapshot(
snapshots: list[dict[str, Any]],
*,
routine_date: date,
days_window: int,
max_fallback_days: int = 14,
) -> dict[str, Any] | None:
cutoff = routine_date - timedelta(days=days_window - 1)
in_window = [s for s in snapshots if cutoff <= s["_d"] <= routine_date]
if in_window:
return in_window[0]
fallback_cutoff = routine_date - timedelta(days=max_fallback_days)
for s in snapshots:
snapshot_date = s["_d"]
if fallback_cutoff <= snapshot_date < cutoff:
return s
return None
def build_skin_context(snapshot: dict[str, Any] | None) -> str:
if snapshot is None:
return "SKIN CONDITION: no data\n"
return (
f"SKIN CONDITION (snapshot from {snapshot.get('snapshot_date')}):\n"
f" Overall state: {_ev(snapshot.get('overall_state'))}\n"
f" Hydration: {snapshot.get('hydration_level', '-')}/5\n"
f" Barrier: {_ev(snapshot.get('barrier_state'))}\n"
f" Active concerns: {', '.join(_ev(c) for c in (snapshot.get('active_concerns') or []))}\n"
f" Priorities: {', '.join(snapshot.get('priorities') or [])}\n"
f" Notes: {snapshot.get('notes') or 'none'}\n"
)
def build_snapshot_history_context(
snapshots: list[dict[str, Any]],
routine_date: date,
days_window: int,
max_fallback_days: int = 14,
) -> tuple[str, int, bool]:
cutoff = routine_date - timedelta(days=days_window - 1)
in_window = [s for s in snapshots if cutoff <= s["_d"] <= routine_date]
if not in_window:
fallback_snapshot = pick_effective_snapshot(
snapshots,
routine_date=routine_date,
days_window=days_window,
max_fallback_days=max_fallback_days,
)
if fallback_snapshot is None:
return f"SKIN SNAPSHOT HISTORY (last {days_window} days): none\n", 0, False
concerns = (
", ".join(_ev(c) for c in (fallback_snapshot.get("active_concerns") or []))
or "none"
)
lines = [f"SKIN SNAPSHOT HISTORY (last {days_window} days):"]
lines.append(
" "
+ f"fallback({max_fallback_days}d) {fallback_snapshot.get('snapshot_date')}: "
+ f"overall={_ev(fallback_snapshot.get('overall_state'))}, "
+ f"barrier={_ev(fallback_snapshot.get('barrier_state'))}, "
+ f"hydration={fallback_snapshot.get('hydration_level', '-')}/5, "
+ f"sensitivity={fallback_snapshot.get('sensitivity_level', '-')}/5, "
+ f"concerns={concerns}"
)
return "\n".join(lines) + "\n", 1, True
lines = [f"SKIN SNAPSHOT HISTORY (last {days_window} days):"]
for s in in_window:
concerns = ", ".join(_ev(c) for c in (s.get("active_concerns") or [])) or "none"
lines.append(
" "
+ f"{s.get('snapshot_date')}: "
+ f"overall={_ev(s.get('overall_state'))}, "
+ f"barrier={_ev(s.get('barrier_state'))}, "
+ f"hydration={s.get('hydration_level', '-')}/5, "
+ f"sensitivity={s.get('sensitivity_level', '-')}/5, "
+ f"concerns={concerns}"
)
return "\n".join(lines) + "\n", len(in_window), False
def build_upcoming_grooming_context(
grooming_entries: list[dict[str, Any]],
start_date: date,
days: int = 7,
) -> str:
day_names = backend_symbols()["_DAY_NAMES"]
if not grooming_entries:
return f"UPCOMING GROOMING (next {days} days): none\n"
entries_by_weekday: dict[int, list[dict[str, Any]]] = {}
for e in grooming_entries:
day = e.get("day_of_week")
if isinstance(day, int):
entries_by_weekday.setdefault(day, []).append(e)
lines = [f"UPCOMING GROOMING (next {days} days):"]
for offset in range(days):
target_date = start_date + timedelta(days=offset)
day_entries = entries_by_weekday.get(target_date.weekday(), [])
if not day_entries:
continue
if offset == 0:
relative_label = "dzisiaj"
elif offset == 1:
relative_label = "jutro"
else:
relative_label = f"za {offset} dni"
day_name = day_names[target_date.weekday()]
actions = ", ".join(
f"{_ev(e.get('action'))}"
+ (f" ({e.get('notes')})" if e.get("notes") else "")
for e in day_entries
)
lines.append(f" {relative_label} ({target_date}, {day_name}): {actions}")
if len(lines) == 1:
lines.append(" (no entries in this window)")
return "\n".join(lines) + "\n"
def build_recent_history(
routines: list[dict[str, Any]],
products_by_id: dict[str, dict[str, Any]],
*,
routine_date: date,
days_window: int,
) -> str:
cutoff = routine_date - timedelta(days=days_window - 1)
selected = [r for r in routines if cutoff <= r["_d"] <= routine_date]
if not selected:
return "RECENT ROUTINES: none\n"
lines = ["RECENT ROUTINES:"]
for r in selected:
steps = sorted(
list(r.get("steps") or []),
key=lambda s: int(s.get("order_index") or 0),
)
step_names: list[str] = []
for step in steps:
product_id = step.get("product_id")
if product_id:
product = products_by_id.get(str(product_id))
if product:
step_names.append(
f"{_ev(product.get('category'))} [{product.get('_short_id')}]"
)
else:
step_names.append(f"unknown [{str(product_id)[:8]}]")
elif step.get("action_type"):
step_names.append(f"action: {_ev(step.get('action_type'))}")
part_of_day = _ev(r.get("part_of_day")).upper()
lines.append(
f" {r.get('routine_date')} {part_of_day}: {', '.join(step_names)}"
)
return "\n".join(lines) + "\n"
def build_day_context(leaving_home: bool | None) -> str:
if leaving_home is None:
return ""
val = "yes" if leaving_home else "no"
return f"DAY CONTEXT:\n Leaving home: {val}\n"
def build_objectives_context(include_minoxidil_beard: bool) -> str:
if include_minoxidil_beard:
return (
"USER OBJECTIVES:\n"
" - Priority: improve beard and mustache density\n"
" - If a product with minoxidil is available, include it adhering strictly to safety rules\n"
)
return ""
def build_last_used_on_by_product(routines: list[dict[str, Any]]) -> dict[str, date]:
last_used: dict[str, date] = {}
for r in routines:
routine_date = r["_d"]
for step in r.get("steps") or []:
product_id = step.get("product_id")
if not product_id:
continue
key = str(product_id)
if key in last_used:
continue
last_used[key] = routine_date
return last_used
def get_available_products(
products: list[dict[str, Any]],
*,
time_filter: str | None,
include_minoxidil: bool,
) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = []
for p in products:
if p.get("is_tool"):
continue
if p.get("is_medication") and not _is_minoxidil_product(p):
continue
if not include_minoxidil and _is_minoxidil_product(p):
continue
rec_time = _ev(p.get("recommended_time"))
if time_filter and rec_time not in (time_filter, "both"):
continue
result.append(p)
return result
def filter_products_by_interval(
products: list[dict[str, Any]],
*,
routine_date: date,
last_used_on_by_product: dict[str, date],
) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = []
for p in products:
min_interval_hours = p.get("min_interval_hours")
if isinstance(min_interval_hours, int) and min_interval_hours > 0:
last_used = last_used_on_by_product.get(p["_id"])
if last_used is not None:
days_needed = math.ceil(min_interval_hours / 24)
if routine_date < (last_used + timedelta(days=days_needed)):
continue
result.append(p)
return result
def get_products_with_inventory(products: list[dict[str, Any]]) -> set[str]:
ids: set[str] = set()
for p in products:
inventory = p.get("inventory") or []
for inv in inventory:
if inv.get("finished_at") is None:
ids.add(p["_id"])
break
return ids
def build_products_context_summary_list(
products: list[dict[str, Any]],
products_with_inventory: set[str],
) -> str:
lines = ["AVAILABLE PRODUCTS:"]
for p in products:
status = "[✓]" if p["_id"] in products_with_inventory else "[✗]"
effects: list[str] = []
profile = p.get("product_effect_profile") or {}
if isinstance(profile, dict):
if int(profile.get("hydration_immediate", 0) or 0) > 0:
effects.append(f"hydration={profile['hydration_immediate']}")
if int(profile.get("exfoliation_strength", 0) or 0) > 0:
effects.append(f"exfoliation={profile['exfoliation_strength']}")
if int(profile.get("retinoid_strength", 0) or 0) > 0:
effects.append(f"retinoid={profile['retinoid_strength']}")
if int(profile.get("irritation_risk", 0) or 0) > 0:
effects.append(f"irritation_risk={profile['irritation_risk']}")
if int(profile.get("barrier_disruption_risk", 0) or 0) > 0:
effects.append(f"barrier_risk={profile['barrier_disruption_risk']}")
rules = p.get("context_rules") or {}
safety_flags: list[str] = []
if isinstance(rules, dict):
if rules.get("safe_with_compromised_barrier"):
safety_flags.append("barrier_ok")
if rules.get("safe_after_shaving") is False:
safety_flags.append("!post_shave")
effects_str = f" effects={{{','.join(effects)}}}" if effects else ""
safety_str = f" safety={{{','.join(safety_flags)}}}" if safety_flags else ""
lines.append(
f" {status} {p['_short_id']} | {p.get('brand')} {p.get('name')} ({p.get('category')})"
f"{effects_str}{safety_str}"
)
return "\n".join(lines) + "\n"
def _extract_requested_product_ids(args: dict[str, Any], max_ids: int = 8) -> list[str]:
raw_ids = args.get("product_ids")
if not isinstance(raw_ids, list):
return []
requested_ids: list[str] = []
seen: set[str] = set()
for raw_id in raw_ids:
if not isinstance(raw_id, str):
continue
if raw_id in seen:
continue
seen.add(raw_id)
requested_ids.append(raw_id)
if len(requested_ids) >= max_ids:
break
return requested_ids
def _compact_actives_payload(product: dict[str, Any]) -> list[dict[str, Any]]:
payload: list[dict[str, Any]] = []
for active in product.get("actives") or []:
if not isinstance(active, dict):
continue
name = str(active.get("name") or "").strip()
if not name:
continue
item: dict[str, Any] = {"name": name}
if active.get("percent") is not None:
item["percent"] = active.get("percent")
if isinstance(active.get("functions"), list):
item["functions"] = [str(f) for f in active["functions"][:4]]
if active.get("strength_level") is not None:
item["strength_level"] = str(active.get("strength_level"))
payload.append(item)
return payload[:5]
def build_product_details_tool_handler(
products: list[dict[str, Any]],
*,
last_used_on_by_product: dict[str, date],
):
available_by_id: dict[str, dict[str, Any]] = {}
for p in products:
available_by_id[p["_id"]] = p
available_by_id[p["_short_id"]] = p
def _handler(args: dict[str, Any]) -> dict[str, object]:
requested_ids = _extract_requested_product_ids(args)
payload: list[dict[str, Any]] = []
seen: set[str] = set()
for pid in requested_ids:
product = available_by_id.get(pid)
if product is None:
continue
full_id = product["_id"]
if full_id in seen:
continue
seen.add(full_id)
safety = {}
for flag in (
"fragrance_free",
"essential_oils_free",
"alcohol_denat_free",
"pregnancy_safe",
):
value = product.get(flag)
if value is not None:
safety[flag] = value
payload.append(
{
"id": product["_short_id"],
"name": product.get("name"),
"brand": product.get("brand"),
"category": product.get("category"),
"recommended_time": product.get("recommended_time"),
"leave_on": product.get("leave_on"),
"targets": product.get("targets") or [],
"effect_profile": product.get("product_effect_profile") or {},
"actives": _compact_actives_payload(product),
"context_rules": product.get("context_rules") or {},
"safety": safety,
"min_interval_hours": product.get("min_interval_hours"),
"max_frequency_per_week": product.get("max_frequency_per_week"),
"last_used_on": (
last_used_on_by_product[full_id].isoformat()
if full_id in last_used_on_by_product
else None
),
}
)
return {"products": payload}
return _handler
def build_prompt(
*,
routine_date: date,
part_of_day: str,
leaving_home: bool | None,
include_minoxidil_beard: bool,
profile_ctx: str,
skin_ctx: str,
snapshot_history_ctx: str,
upcoming_grooming_ctx: str,
recent_history_ctx: str,
products_ctx: str,
objectives_ctx: str,
) -> str:
symbols = backend_symbols()
day_names = symbols["_DAY_NAMES"]
single_extra = symbols["_ROUTINES_SINGLE_EXTRA"]
weekday = routine_date.weekday()
day_name = day_names[weekday]
day_ctx = build_day_context(leaving_home)
return (
f"Zaproponuj rutynę pielęgnacyjną {part_of_day.upper()} "
f"na {routine_date} ({day_name}).\n\n"
"MODE: standard\n"
"INPUT DATA:\n"
f"{profile_ctx}"
f"{skin_ctx}"
f"{snapshot_history_ctx}"
f"{upcoming_grooming_ctx}"
f"{recent_history_ctx}"
f"{day_ctx}"
f"{products_ctx}"
f"{objectives_ctx}"
"\nNARZEDZIA:\n"
"- Masz dostep do funkcji: get_product_details.\n"
"- Wywoluj narzedzia tylko, gdy potrzebujesz detali do decyzji klinicznej/bezpieczenstwa.\n"
"- Staraj sie grupowac zapytania: podawaj wszystkie potrzebne UUID w jednym wywolaniu narzedzia.\n"
"- Nie zgaduj detali skladu i zasad bezpieczenstwa; jesli potrzebujesz szczegolow, wywolaj odpowiednie narzedzie.\n"
f"{single_extra}\n"
"Zwróć JSON zgodny ze schematem."
)
def extract_usage(response: Any) -> tuple[str, str, str, str]:
usage = getattr(response, "usage_metadata", None)
if not usage:
return "", "", "", ""
prompt_tokens = str(getattr(usage, "prompt_token_count", "") or "")
completion_tokens = str(getattr(usage, "candidates_token_count", "") or "")
total_tokens = str(getattr(usage, "total_token_count", "") or "")
thoughts_tokens = str(getattr(usage, "thoughts_token_count", "") or "")
return prompt_tokens, completion_tokens, total_tokens, thoughts_tokens
def call_routines_llm(
prompt: str, function_handler
) -> tuple[int, dict[str, Any], float]:
symbols = backend_symbols()
http_exception = symbols["HTTPException"]
genai_types = symbols["genai_types"]
function_declaration = symbols["PRODUCT_DETAILS_FUNCTION_DECLARATION"]
routines_system_prompt = symbols["_ROUTINES_SYSTEM_PROMPT"]
suggestion_schema = symbols["_SuggestionOut"]
get_creative_config = symbols["get_creative_config"]
call_gemini = symbols["call_gemini"]
call_gemini_with_function_tools = symbols["call_gemini_with_function_tools"]
config = get_creative_config(
system_instruction=routines_system_prompt,
response_schema=suggestion_schema,
max_output_tokens=8192,
).model_copy(
update={
"tools": [
genai_types.Tool(
function_declarations=[function_declaration],
)
],
"tool_config": genai_types.ToolConfig(
function_calling_config=genai_types.FunctionCallingConfig(
mode=genai_types.FunctionCallingConfigMode.AUTO,
)
),
}
)
t0 = time.perf_counter()
try:
response, _ = call_gemini_with_function_tools(
endpoint="routines/suggest-benchmark",
contents=prompt,
config=config,
function_handlers={"get_product_details": function_handler},
user_input=prompt,
max_tool_roundtrips=3,
)
except http_exception as exc:
if (
exc.status_code != 502
or str(exc.detail) != "Gemini requested too many function calls"
):
raise
conservative_prompt = (
f"{prompt}\n\n"
"TRYB AWARYJNY (KONSERWATYWNY):\n"
"- Osiagnieto limit wywolan narzedzi.\n"
"- Nie wywoluj narzedzi ponownie.\n"
"- Zaproponuj maksymalnie konserwatywna, bezpieczna rutyne na podstawie dostepnych juz danych,"
" preferujac lagodne produkty wspierajace bariere i fotoprotekcje.\n"
"- Gdy masz watpliwosci, pomijaj ryzykowne aktywne kroki.\n"
)
response, _ = call_gemini(
endpoint="routines/suggest-benchmark",
contents=conservative_prompt,
config=get_creative_config(
system_instruction=routines_system_prompt,
response_schema=suggestion_schema,
max_output_tokens=8192,
),
user_input=conservative_prompt,
tool_trace={
"mode": "fallback_conservative",
"reason": "max_tool_roundtrips_exceeded",
},
)
elapsed_ms = (time.perf_counter() - t0) * 1000
raw_text = getattr(response, "text", None)
if not raw_text:
return 502, {"detail": "LLM returned an empty response"}, elapsed_ms
try:
parsed = json.loads(raw_text)
except json.JSONDecodeError as exc:
return (
502,
{"detail": f"LLM returned invalid JSON: {exc}", "raw": raw_text},
elapsed_ms,
)
prompt_tokens, completion_tokens, total_tokens, thoughts_tokens = extract_usage(
response
)
parsed["_usage"] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"thoughts_tokens": thoughts_tokens,
}
return 200, parsed, elapsed_ms
def summarize_error(payload: Any) -> str:
if isinstance(payload, dict):
detail = payload.get("detail")
if detail is not None:
return str(detail)
return str(payload)
return str(payload)
def safe_csv_text(value: object, max_len: int = 240) -> str:
text = "" if value is None else str(value)
text = re.sub(r"\s+", " ", text).strip()
if len(text) > max_len:
return text[: max_len - 3] + "..."
return text
def to_json_text(value: object) -> str:
try:
return json.dumps(value, ensure_ascii=False)
except Exception:
return json.dumps({"unserializable": str(value)}, ensure_ascii=False)
def run_experiment(
*,
base_url: str,
routine_date: date,
part_of_day: str,
leaving_home: bool,
include_minoxidil_beard: bool,
windows: list[int],
repeats: int,
out_csv: str,
out_jsonl: str,
api_timeout: int,
throttle_sec: float,
) -> int:
dataset = fetch_required_data(base_url, timeout=api_timeout)
profile = dataset["profile"]
snapshots = dataset["snapshots"]
routines = dataset["routines"]
all_products = dataset["products"]
products_by_id = {p["_id"]: p for p in all_products}
profile_ctx = build_user_profile_context(profile, routine_date)
upcoming_grooming_ctx = build_upcoming_grooming_context(
dataset["grooming"],
start_date=routine_date,
days=7,
)
fieldnames = [
"run_at",
"days_window",
"repeat",
"routine_date",
"part_of_day",
"snapshots_in_window",
"snapshot_fallback_used",
"http_status",
"duration_ms",
"steps_count",
"primary_goal",
"confidence",
"reasoning_excerpt",
"prompt_tokens",
"completion_tokens",
"total_tokens",
"thoughts_tokens",
"routine_json",
"error_detail",
]
rows_written = 0
with (
open(out_csv, "w", newline="", encoding="utf-8") as f,
open(out_jsonl, "w", encoding="utf-8") as jf,
):
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for window in windows:
for rep in range(1, repeats + 1):
print(f"[{window}d #{rep}] preparing prompt...")
snapshot_history_ctx, snapshots_in_window, snapshot_fallback_used = (
build_snapshot_history_context(
snapshots=snapshots,
routine_date=routine_date,
days_window=window,
)
)
effective_snapshot = pick_effective_snapshot(
snapshots,
routine_date=routine_date,
days_window=window,
max_fallback_days=14,
)
skin_ctx = build_skin_context(effective_snapshot)
recent_history_ctx = build_recent_history(
routines,
products_by_id=products_by_id,
routine_date=routine_date,
days_window=window,
)
available_products = get_available_products(
all_products,
time_filter=part_of_day,
include_minoxidil=include_minoxidil_beard,
)
last_used = build_last_used_on_by_product(routines)
available_products = filter_products_by_interval(
available_products,
routine_date=routine_date,
last_used_on_by_product=last_used,
)
products_with_inventory = get_products_with_inventory(
available_products
)
products_ctx = build_products_context_summary_list(
available_products,
products_with_inventory,
)
objectives_ctx = build_objectives_context(include_minoxidil_beard)
prompt = build_prompt(
routine_date=routine_date,
part_of_day=part_of_day,
leaving_home=leaving_home if part_of_day == "am" else None,
include_minoxidil_beard=include_minoxidil_beard,
profile_ctx=profile_ctx,
skin_ctx=skin_ctx,
snapshot_history_ctx=snapshot_history_ctx,
upcoming_grooming_ctx=upcoming_grooming_ctx,
recent_history_ctx=recent_history_ctx,
products_ctx=products_ctx,
objectives_ctx=objectives_ctx,
)
handler = build_product_details_tool_handler(
available_products,
last_used_on_by_product=last_used,
)
row = {
"run_at": datetime.now().isoformat(timespec="seconds"),
"days_window": window,
"repeat": rep,
"routine_date": routine_date.isoformat(),
"part_of_day": part_of_day,
"snapshots_in_window": snapshots_in_window,
"snapshot_fallback_used": snapshot_fallback_used,
"http_status": "",
"duration_ms": "",
"steps_count": "",
"primary_goal": "",
"confidence": "",
"reasoning_excerpt": "",
"prompt_tokens": "",
"completion_tokens": "",
"total_tokens": "",
"thoughts_tokens": "",
"routine_json": "",
"error_detail": "",
}
try:
print(f"[{window}d #{rep}] calling Gemini...")
status, payload, elapsed_ms = call_routines_llm(prompt, handler)
row["http_status"] = status
row["duration_ms"] = int(elapsed_ms)
if status == 200 and isinstance(payload, dict):
summary = payload.get("summary") or {}
usage = payload.get("_usage") or {}
row["steps_count"] = len(payload.get("steps") or [])
row["primary_goal"] = safe_csv_text(
summary.get("primary_goal"), 180
)
row["confidence"] = summary.get("confidence", "")
row["reasoning_excerpt"] = safe_csv_text(
payload.get("reasoning"), 240
)
row["prompt_tokens"] = usage.get("prompt_tokens", "")
row["completion_tokens"] = usage.get("completion_tokens", "")
row["total_tokens"] = usage.get("total_tokens", "")
row["thoughts_tokens"] = usage.get("thoughts_tokens", "")
row["routine_json"] = to_json_text(payload)
else:
row["routine_json"] = to_json_text(payload)
row["error_detail"] = safe_csv_text(
summarize_error(payload), 240
)
except Exception as exc:
row["http_status"] = 0
row["error_detail"] = safe_csv_text(str(exc), 240)
row["routine_json"] = to_json_text({"error": str(exc)})
writer.writerow(row)
f.flush()
jsonl_row = {
"run_at": row["run_at"],
"days_window": window,
"repeat": rep,
"routine_date": routine_date.isoformat(),
"part_of_day": part_of_day,
"http_status": row["http_status"],
"duration_ms": row["duration_ms"],
"payload": json.loads(row["routine_json"]),
}
jf.write(json.dumps(jsonl_row, ensure_ascii=False) + "\n")
jf.flush()
rows_written += 1
print(
f"[{window}d #{rep}] status={row['http_status']} "
f"steps={row['steps_count'] or '-'} tokens={row['total_tokens'] or '-'}"
)
time.sleep(throttle_sec)
return rows_written
def parse_windows(raw: str) -> list[int]:
values = [int(x.strip()) for x in raw.split(",") if x.strip()]
if not values:
raise ValueError("--windows must contain at least one integer")
if any(v < 1 for v in values):
raise ValueError("--windows values must be >= 1")
return values
def main() -> int:
parser = argparse.ArgumentParser(
description="Benchmark routines LLM output across skincare snapshot history windows"
)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--routine-date", default=date.today().isoformat())
parser.add_argument("--part-of-day", choices=["am", "pm"], default="am")
parser.add_argument("--leaving-home", action="store_true")
parser.add_argument("--include-minoxidil-beard", action="store_true")
parser.add_argument("--windows", default=",".join(str(w) for w in DEFAULT_WINDOWS))
parser.add_argument("--repeats", type=int, default=3)
parser.add_argument("--out", default=DEFAULT_OUT)
parser.add_argument(
"--out-jsonl",
default="",
help="JSONL output with full routine payload per run (default: <out>.jsonl)",
)
parser.add_argument("--api-timeout", type=int, default=20)
parser.add_argument("--throttle-sec", type=float, default=0.4)
args = parser.parse_args()
if args.repeats < 1:
raise ValueError("--repeats must be >= 1")
if not os.environ.get("GEMINI_API_KEY"):
raise RuntimeError("GEMINI_API_KEY is not set")
windows = parse_windows(args.windows)
routine_date = date.fromisoformat(args.routine_date)
out_jsonl = args.out_jsonl or str(Path(args.out).with_suffix(".jsonl"))
print("Starting benchmark with production-like prompt + function tools")
print(
f"windows={windows}, repeats={args.repeats}, out={args.out}, out_jsonl={out_jsonl}"
)
written = 0
try:
written = run_experiment(
base_url=args.base_url,
routine_date=routine_date,
part_of_day=args.part_of_day,
leaving_home=args.leaving_home,
include_minoxidil_beard=args.include_minoxidil_beard,
windows=windows,
repeats=args.repeats,
out_csv=args.out,
out_jsonl=out_jsonl,
api_timeout=args.api_timeout,
throttle_sec=args.throttle_sec,
)
except KeyboardInterrupt:
print("Interrupted by user. Partial CSV was already flushed to disk.")
return 130
print(f"Done. Wrote {written} rows to: {args.out} and {out_jsonl}")
return 0
if __name__ == "__main__":
raise SystemExit(main())