#!/usr/bin/env python3 # pyright: reportMissingImports=false """Benchmark routines LLM prompt with variable snapshot-history window. This script does NOT call /routines/suggest. It reconstructs a production-like prompt and calls Gemini directly, while varying the number of days included in the "SKIN SNAPSHOT HISTORY" section. """ from __future__ import annotations import argparse import csv import json import math import os import re import sys import time from datetime import date, datetime, timedelta from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen ROOT_DIR = Path(__file__).resolve().parents[1] BACKEND_DIR = ROOT_DIR / "backend" if str(BACKEND_DIR) not in sys.path: sys.path.insert(0, str(BACKEND_DIR)) _BACKEND_SYMBOLS: dict[str, Any] | None = None def backend_symbols() -> dict[str, Any]: global _BACKEND_SYMBOLS if _BACKEND_SYMBOLS is not None: return _BACKEND_SYMBOLS from fastapi import HTTPException from google.genai import types as genai_types # type: ignore[import-not-found] from innercontext.api.product_llm_tools import ( # type: ignore[import-not-found] PRODUCT_DETAILS_FUNCTION_DECLARATION, ) from innercontext.api.routines import ( # type: ignore[import-not-found] _DAY_NAMES, _ROUTINES_SINGLE_EXTRA, _ROUTINES_SYSTEM_PROMPT, _SuggestionOut, ) from innercontext.llm import ( # type: ignore[import-not-found] call_gemini, call_gemini_with_function_tools, get_creative_config, ) _BACKEND_SYMBOLS = { "HTTPException": HTTPException, "genai_types": genai_types, "PRODUCT_DETAILS_FUNCTION_DECLARATION": PRODUCT_DETAILS_FUNCTION_DECLARATION, "_DAY_NAMES": _DAY_NAMES, "_ROUTINES_SINGLE_EXTRA": _ROUTINES_SINGLE_EXTRA, "_ROUTINES_SYSTEM_PROMPT": _ROUTINES_SYSTEM_PROMPT, "_SuggestionOut": _SuggestionOut, "call_gemini": call_gemini, "call_gemini_with_function_tools": call_gemini_with_function_tools, "get_creative_config": get_creative_config, } return _BACKEND_SYMBOLS DEFAULT_BASE_URL = "http://192.168.101.82/api" DEFAULT_WINDOWS = [3, 5, 7] DEFAULT_OUT = "routines_suggest_history_experiment.csv" def _ev(v: object) -> str: if v is None: return "" if hasattr(v, "value"): value = getattr(v, "value") if isinstance(value, str): return value return str(v) def _parse_iso_date(raw: str | None) -> date | None: if not raw: return None try: return date.fromisoformat(raw) except ValueError: return None def _contains_minoxidil_text(value: str | None) -> bool: if not value: return False text = value.lower() return "minoxidil" in text or "minoksydyl" in text def _is_minoxidil_product(product: dict[str, Any]) -> bool: if _contains_minoxidil_text(product.get("name")): return True if _contains_minoxidil_text(product.get("brand")): return True if _contains_minoxidil_text(product.get("line_name")): return True for inci in product.get("inci") or []: if _contains_minoxidil_text(str(inci)): return True for active in product.get("actives") or []: if isinstance(active, dict): if _contains_minoxidil_text(str(active.get("name") or "")): return True return False def http_json( method: str, url: str, body: dict[str, Any] | None = None, timeout: int = 20, ) -> tuple[int, Any]: data = None headers = {"Content-Type": "application/json"} if body is not None: data = json.dumps(body).encode("utf-8") req = Request(url=url, data=data, method=method, headers=headers) try: with urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") return resp.status, json.loads(raw) if raw else {} except HTTPError as e: raw = e.read().decode("utf-8", errors="replace") try: parsed = json.loads(raw) if raw else {} except json.JSONDecodeError: parsed = {"detail": raw} return e.code, parsed except (URLError, TimeoutError) as e: return 0, {"detail": f"Network error: {e}"} def fetch_required_data(base_url: str, timeout: int) -> dict[str, Any]: base = base_url.rstrip("/") endpoints = { "profile": "/profile", "snapshots": "/skincare", "grooming": "/routines/grooming-schedule", "routines": "/routines", "products": "/products", } results: dict[str, Any] = {} for key, path in endpoints.items(): status, payload = http_json("GET", f"{base}{path}", timeout=timeout) if status != 200: raise RuntimeError(f"GET {path} failed ({status}): {payload}") results[key] = payload snapshots = list(results["snapshots"] or []) for s in snapshots: s["_d"] = _parse_iso_date(s.get("snapshot_date")) snapshots = [s for s in snapshots if s.get("_d") is not None] snapshots.sort(key=lambda x: x["_d"], reverse=True) results["snapshots"] = snapshots routines = list(results["routines"] or []) for r in routines: r["_d"] = _parse_iso_date(r.get("routine_date")) routines = [r for r in routines if r.get("_d") is not None] routines.sort(key=lambda x: x["_d"], reverse=True) results["routines"] = routines products = list(results["products"] or []) for p in products: full_id = str(p.get("id") or "") p["_id"] = full_id p["_short_id"] = str(p.get("short_id") or full_id[:8]) results["products"] = products return results def build_user_profile_context( profile: dict[str, Any] | None, reference_date: date ) -> str: if profile is None: return "USER PROFILE: no data\n" lines = ["USER PROFILE:"] birth_date_raw = profile.get("birth_date") birth_date = _parse_iso_date(birth_date_raw) if birth_date is not None: years = reference_date.year - birth_date.year if (reference_date.month, reference_date.day) < ( birth_date.month, birth_date.day, ): years -= 1 lines.append(f" Age: {max(years, 0)}") lines.append(f" Birth date: {birth_date.isoformat()}") else: lines.append(" Age: unknown") sex = profile.get("sex_at_birth") if sex is not None: lines.append(f" Sex at birth: {sex}") else: lines.append(" Sex at birth: unknown") return "\n".join(lines) + "\n" def pick_effective_snapshot( snapshots: list[dict[str, Any]], *, routine_date: date, days_window: int, max_fallback_days: int = 14, ) -> dict[str, Any] | None: cutoff = routine_date - timedelta(days=days_window - 1) in_window = [s for s in snapshots if cutoff <= s["_d"] <= routine_date] if in_window: return in_window[0] fallback_cutoff = routine_date - timedelta(days=max_fallback_days) for s in snapshots: snapshot_date = s["_d"] if fallback_cutoff <= snapshot_date < cutoff: return s return None def build_skin_context(snapshot: dict[str, Any] | None) -> str: if snapshot is None: return "SKIN CONDITION: no data\n" return ( f"SKIN CONDITION (snapshot from {snapshot.get('snapshot_date')}):\n" f" Overall state: {_ev(snapshot.get('overall_state'))}\n" f" Hydration: {snapshot.get('hydration_level', '-')}/5\n" f" Barrier: {_ev(snapshot.get('barrier_state'))}\n" f" Active concerns: {', '.join(_ev(c) for c in (snapshot.get('active_concerns') or []))}\n" f" Priorities: {', '.join(snapshot.get('priorities') or [])}\n" f" Notes: {snapshot.get('notes') or 'none'}\n" ) def build_snapshot_history_context( snapshots: list[dict[str, Any]], routine_date: date, days_window: int, max_fallback_days: int = 14, ) -> tuple[str, int, bool]: cutoff = routine_date - timedelta(days=days_window - 1) in_window = [s for s in snapshots if cutoff <= s["_d"] <= routine_date] if not in_window: fallback_snapshot = pick_effective_snapshot( snapshots, routine_date=routine_date, days_window=days_window, max_fallback_days=max_fallback_days, ) if fallback_snapshot is None: return f"SKIN SNAPSHOT HISTORY (last {days_window} days): none\n", 0, False concerns = ( ", ".join(_ev(c) for c in (fallback_snapshot.get("active_concerns") or [])) or "none" ) lines = [f"SKIN SNAPSHOT HISTORY (last {days_window} days):"] lines.append( " " + f"fallback({max_fallback_days}d) {fallback_snapshot.get('snapshot_date')}: " + f"overall={_ev(fallback_snapshot.get('overall_state'))}, " + f"barrier={_ev(fallback_snapshot.get('barrier_state'))}, " + f"hydration={fallback_snapshot.get('hydration_level', '-')}/5, " + f"sensitivity={fallback_snapshot.get('sensitivity_level', '-')}/5, " + f"concerns={concerns}" ) return "\n".join(lines) + "\n", 1, True lines = [f"SKIN SNAPSHOT HISTORY (last {days_window} days):"] for s in in_window: concerns = ", ".join(_ev(c) for c in (s.get("active_concerns") or [])) or "none" lines.append( " " + f"{s.get('snapshot_date')}: " + f"overall={_ev(s.get('overall_state'))}, " + f"barrier={_ev(s.get('barrier_state'))}, " + f"hydration={s.get('hydration_level', '-')}/5, " + f"sensitivity={s.get('sensitivity_level', '-')}/5, " + f"concerns={concerns}" ) return "\n".join(lines) + "\n", len(in_window), False def build_upcoming_grooming_context( grooming_entries: list[dict[str, Any]], start_date: date, days: int = 7, ) -> str: day_names = backend_symbols()["_DAY_NAMES"] if not grooming_entries: return f"UPCOMING GROOMING (next {days} days): none\n" entries_by_weekday: dict[int, list[dict[str, Any]]] = {} for e in grooming_entries: day = e.get("day_of_week") if isinstance(day, int): entries_by_weekday.setdefault(day, []).append(e) lines = [f"UPCOMING GROOMING (next {days} days):"] for offset in range(days): target_date = start_date + timedelta(days=offset) day_entries = entries_by_weekday.get(target_date.weekday(), []) if not day_entries: continue if offset == 0: relative_label = "dzisiaj" elif offset == 1: relative_label = "jutro" else: relative_label = f"za {offset} dni" day_name = day_names[target_date.weekday()] actions = ", ".join( f"{_ev(e.get('action'))}" + (f" ({e.get('notes')})" if e.get("notes") else "") for e in day_entries ) lines.append(f" {relative_label} ({target_date}, {day_name}): {actions}") if len(lines) == 1: lines.append(" (no entries in this window)") return "\n".join(lines) + "\n" def build_recent_history( routines: list[dict[str, Any]], products_by_id: dict[str, dict[str, Any]], *, routine_date: date, days_window: int, ) -> str: cutoff = routine_date - timedelta(days=days_window - 1) selected = [r for r in routines if cutoff <= r["_d"] <= routine_date] if not selected: return "RECENT ROUTINES: none\n" lines = ["RECENT ROUTINES:"] for r in selected: steps = sorted( list(r.get("steps") or []), key=lambda s: int(s.get("order_index") or 0), ) step_names: list[str] = [] for step in steps: product_id = step.get("product_id") if product_id: product = products_by_id.get(str(product_id)) if product: step_names.append( f"{_ev(product.get('category'))} [{product.get('_short_id')}]" ) else: step_names.append(f"unknown [{str(product_id)[:8]}]") elif step.get("action_type"): step_names.append(f"action: {_ev(step.get('action_type'))}") part_of_day = _ev(r.get("part_of_day")).upper() lines.append( f" {r.get('routine_date')} {part_of_day}: {', '.join(step_names)}" ) return "\n".join(lines) + "\n" def build_day_context(leaving_home: bool | None) -> str: if leaving_home is None: return "" val = "yes" if leaving_home else "no" return f"DAY CONTEXT:\n Leaving home: {val}\n" def build_objectives_context(include_minoxidil_beard: bool) -> str: if include_minoxidil_beard: return ( "USER OBJECTIVES:\n" " - Priority: improve beard and mustache density\n" " - If a product with minoxidil is available, include it adhering strictly to safety rules\n" ) return "" def build_last_used_on_by_product(routines: list[dict[str, Any]]) -> dict[str, date]: last_used: dict[str, date] = {} for r in routines: routine_date = r["_d"] for step in r.get("steps") or []: product_id = step.get("product_id") if not product_id: continue key = str(product_id) if key in last_used: continue last_used[key] = routine_date return last_used def get_available_products( products: list[dict[str, Any]], *, time_filter: str | None, include_minoxidil: bool, ) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] for p in products: if p.get("is_tool"): continue if p.get("is_medication") and not _is_minoxidil_product(p): continue if not include_minoxidil and _is_minoxidil_product(p): continue rec_time = _ev(p.get("recommended_time")) if time_filter and rec_time not in (time_filter, "both"): continue result.append(p) return result def filter_products_by_interval( products: list[dict[str, Any]], *, routine_date: date, last_used_on_by_product: dict[str, date], ) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] for p in products: min_interval_hours = p.get("min_interval_hours") if isinstance(min_interval_hours, int) and min_interval_hours > 0: last_used = last_used_on_by_product.get(p["_id"]) if last_used is not None: days_needed = math.ceil(min_interval_hours / 24) if routine_date < (last_used + timedelta(days=days_needed)): continue result.append(p) return result def get_products_with_inventory(products: list[dict[str, Any]]) -> set[str]: ids: set[str] = set() for p in products: inventory = p.get("inventory") or [] for inv in inventory: if inv.get("finished_at") is None: ids.add(p["_id"]) break return ids def build_products_context_summary_list( products: list[dict[str, Any]], products_with_inventory: set[str], ) -> str: lines = ["AVAILABLE PRODUCTS:"] for p in products: status = "[✓]" if p["_id"] in products_with_inventory else "[✗]" effects: list[str] = [] profile = p.get("product_effect_profile") or {} if isinstance(profile, dict): if int(profile.get("hydration_immediate", 0) or 0) > 0: effects.append(f"hydration={profile['hydration_immediate']}") if int(profile.get("exfoliation_strength", 0) or 0) > 0: effects.append(f"exfoliation={profile['exfoliation_strength']}") if int(profile.get("retinoid_strength", 0) or 0) > 0: effects.append(f"retinoid={profile['retinoid_strength']}") if int(profile.get("irritation_risk", 0) or 0) > 0: effects.append(f"irritation_risk={profile['irritation_risk']}") if int(profile.get("barrier_disruption_risk", 0) or 0) > 0: effects.append(f"barrier_risk={profile['barrier_disruption_risk']}") rules = p.get("context_rules") or {} safety_flags: list[str] = [] if isinstance(rules, dict): if rules.get("safe_with_compromised_barrier"): safety_flags.append("barrier_ok") if rules.get("safe_after_shaving") is False: safety_flags.append("!post_shave") effects_str = f" effects={{{','.join(effects)}}}" if effects else "" safety_str = f" safety={{{','.join(safety_flags)}}}" if safety_flags else "" lines.append( f" {status} {p['_short_id']} | {p.get('brand')} {p.get('name')} ({p.get('category')})" f"{effects_str}{safety_str}" ) return "\n".join(lines) + "\n" def _extract_requested_product_ids(args: dict[str, Any], max_ids: int = 8) -> list[str]: raw_ids = args.get("product_ids") if not isinstance(raw_ids, list): return [] requested_ids: list[str] = [] seen: set[str] = set() for raw_id in raw_ids: if not isinstance(raw_id, str): continue if raw_id in seen: continue seen.add(raw_id) requested_ids.append(raw_id) if len(requested_ids) >= max_ids: break return requested_ids def _compact_actives_payload(product: dict[str, Any]) -> list[dict[str, Any]]: payload: list[dict[str, Any]] = [] for active in product.get("actives") or []: if not isinstance(active, dict): continue name = str(active.get("name") or "").strip() if not name: continue item: dict[str, Any] = {"name": name} if active.get("percent") is not None: item["percent"] = active.get("percent") if isinstance(active.get("functions"), list): item["functions"] = [str(f) for f in active["functions"][:4]] if active.get("strength_level") is not None: item["strength_level"] = str(active.get("strength_level")) payload.append(item) return payload[:5] def build_product_details_tool_handler( products: list[dict[str, Any]], *, last_used_on_by_product: dict[str, date], ): available_by_id: dict[str, dict[str, Any]] = {} for p in products: available_by_id[p["_id"]] = p available_by_id[p["_short_id"]] = p def _handler(args: dict[str, Any]) -> dict[str, object]: requested_ids = _extract_requested_product_ids(args) payload: list[dict[str, Any]] = [] seen: set[str] = set() for pid in requested_ids: product = available_by_id.get(pid) if product is None: continue full_id = product["_id"] if full_id in seen: continue seen.add(full_id) safety = {} for flag in ( "fragrance_free", "essential_oils_free", "alcohol_denat_free", "pregnancy_safe", ): value = product.get(flag) if value is not None: safety[flag] = value payload.append( { "id": product["_short_id"], "name": product.get("name"), "brand": product.get("brand"), "category": product.get("category"), "recommended_time": product.get("recommended_time"), "leave_on": product.get("leave_on"), "targets": product.get("targets") or [], "effect_profile": product.get("product_effect_profile") or {}, "actives": _compact_actives_payload(product), "context_rules": product.get("context_rules") or {}, "safety": safety, "min_interval_hours": product.get("min_interval_hours"), "max_frequency_per_week": product.get("max_frequency_per_week"), "last_used_on": ( last_used_on_by_product[full_id].isoformat() if full_id in last_used_on_by_product else None ), } ) return {"products": payload} return _handler def build_prompt( *, routine_date: date, part_of_day: str, leaving_home: bool | None, include_minoxidil_beard: bool, profile_ctx: str, skin_ctx: str, snapshot_history_ctx: str, upcoming_grooming_ctx: str, recent_history_ctx: str, products_ctx: str, objectives_ctx: str, ) -> str: symbols = backend_symbols() day_names = symbols["_DAY_NAMES"] single_extra = symbols["_ROUTINES_SINGLE_EXTRA"] weekday = routine_date.weekday() day_name = day_names[weekday] day_ctx = build_day_context(leaving_home) return ( f"Zaproponuj rutynę pielęgnacyjną {part_of_day.upper()} " f"na {routine_date} ({day_name}).\n\n" "MODE: standard\n" "INPUT DATA:\n" f"{profile_ctx}" f"{skin_ctx}" f"{snapshot_history_ctx}" f"{upcoming_grooming_ctx}" f"{recent_history_ctx}" f"{day_ctx}" f"{products_ctx}" f"{objectives_ctx}" "\nNARZEDZIA:\n" "- Masz dostep do funkcji: get_product_details.\n" "- Wywoluj narzedzia tylko, gdy potrzebujesz detali do decyzji klinicznej/bezpieczenstwa.\n" "- Staraj sie grupowac zapytania: podawaj wszystkie potrzebne UUID w jednym wywolaniu narzedzia.\n" "- Nie zgaduj detali skladu i zasad bezpieczenstwa; jesli potrzebujesz szczegolow, wywolaj odpowiednie narzedzie.\n" f"{single_extra}\n" "Zwróć JSON zgodny ze schematem." ) def extract_usage(response: Any) -> tuple[str, str, str, str]: usage = getattr(response, "usage_metadata", None) if not usage: return "", "", "", "" prompt_tokens = str(getattr(usage, "prompt_token_count", "") or "") completion_tokens = str(getattr(usage, "candidates_token_count", "") or "") total_tokens = str(getattr(usage, "total_token_count", "") or "") thoughts_tokens = str(getattr(usage, "thoughts_token_count", "") or "") return prompt_tokens, completion_tokens, total_tokens, thoughts_tokens def call_routines_llm( prompt: str, function_handler ) -> tuple[int, dict[str, Any], float]: symbols = backend_symbols() http_exception = symbols["HTTPException"] genai_types = symbols["genai_types"] function_declaration = symbols["PRODUCT_DETAILS_FUNCTION_DECLARATION"] routines_system_prompt = symbols["_ROUTINES_SYSTEM_PROMPT"] suggestion_schema = symbols["_SuggestionOut"] get_creative_config = symbols["get_creative_config"] call_gemini = symbols["call_gemini"] call_gemini_with_function_tools = symbols["call_gemini_with_function_tools"] config = get_creative_config( system_instruction=routines_system_prompt, response_schema=suggestion_schema, max_output_tokens=8192, ).model_copy( update={ "tools": [ genai_types.Tool( function_declarations=[function_declaration], ) ], "tool_config": genai_types.ToolConfig( function_calling_config=genai_types.FunctionCallingConfig( mode=genai_types.FunctionCallingConfigMode.AUTO, ) ), } ) t0 = time.perf_counter() try: response, _ = call_gemini_with_function_tools( endpoint="routines/suggest-benchmark", contents=prompt, config=config, function_handlers={"get_product_details": function_handler}, user_input=prompt, max_tool_roundtrips=3, ) except http_exception as exc: if ( exc.status_code != 502 or str(exc.detail) != "Gemini requested too many function calls" ): raise conservative_prompt = ( f"{prompt}\n\n" "TRYB AWARYJNY (KONSERWATYWNY):\n" "- Osiagnieto limit wywolan narzedzi.\n" "- Nie wywoluj narzedzi ponownie.\n" "- Zaproponuj maksymalnie konserwatywna, bezpieczna rutyne na podstawie dostepnych juz danych," " preferujac lagodne produkty wspierajace bariere i fotoprotekcje.\n" "- Gdy masz watpliwosci, pomijaj ryzykowne aktywne kroki.\n" ) response, _ = call_gemini( endpoint="routines/suggest-benchmark", contents=conservative_prompt, config=get_creative_config( system_instruction=routines_system_prompt, response_schema=suggestion_schema, max_output_tokens=8192, ), user_input=conservative_prompt, tool_trace={ "mode": "fallback_conservative", "reason": "max_tool_roundtrips_exceeded", }, ) elapsed_ms = (time.perf_counter() - t0) * 1000 raw_text = getattr(response, "text", None) if not raw_text: return 502, {"detail": "LLM returned an empty response"}, elapsed_ms try: parsed = json.loads(raw_text) except json.JSONDecodeError as exc: return ( 502, {"detail": f"LLM returned invalid JSON: {exc}", "raw": raw_text}, elapsed_ms, ) prompt_tokens, completion_tokens, total_tokens, thoughts_tokens = extract_usage( response ) parsed["_usage"] = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens, "thoughts_tokens": thoughts_tokens, } return 200, parsed, elapsed_ms def summarize_error(payload: Any) -> str: if isinstance(payload, dict): detail = payload.get("detail") if detail is not None: return str(detail) return str(payload) return str(payload) def safe_csv_text(value: object, max_len: int = 240) -> str: text = "" if value is None else str(value) text = re.sub(r"\s+", " ", text).strip() if len(text) > max_len: return text[: max_len - 3] + "..." return text def to_json_text(value: object) -> str: try: return json.dumps(value, ensure_ascii=False) except Exception: return json.dumps({"unserializable": str(value)}, ensure_ascii=False) def run_experiment( *, base_url: str, routine_date: date, part_of_day: str, leaving_home: bool, include_minoxidil_beard: bool, windows: list[int], repeats: int, out_csv: str, out_jsonl: str, api_timeout: int, throttle_sec: float, ) -> int: dataset = fetch_required_data(base_url, timeout=api_timeout) profile = dataset["profile"] snapshots = dataset["snapshots"] routines = dataset["routines"] all_products = dataset["products"] products_by_id = {p["_id"]: p for p in all_products} profile_ctx = build_user_profile_context(profile, routine_date) upcoming_grooming_ctx = build_upcoming_grooming_context( dataset["grooming"], start_date=routine_date, days=7, ) fieldnames = [ "run_at", "days_window", "repeat", "routine_date", "part_of_day", "snapshots_in_window", "snapshot_fallback_used", "http_status", "duration_ms", "steps_count", "primary_goal", "confidence", "reasoning_excerpt", "prompt_tokens", "completion_tokens", "total_tokens", "thoughts_tokens", "routine_json", "error_detail", ] rows_written = 0 with ( open(out_csv, "w", newline="", encoding="utf-8") as f, open(out_jsonl, "w", encoding="utf-8") as jf, ): writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for window in windows: for rep in range(1, repeats + 1): print(f"[{window}d #{rep}] preparing prompt...") snapshot_history_ctx, snapshots_in_window, snapshot_fallback_used = ( build_snapshot_history_context( snapshots=snapshots, routine_date=routine_date, days_window=window, ) ) effective_snapshot = pick_effective_snapshot( snapshots, routine_date=routine_date, days_window=window, max_fallback_days=14, ) skin_ctx = build_skin_context(effective_snapshot) recent_history_ctx = build_recent_history( routines, products_by_id=products_by_id, routine_date=routine_date, days_window=window, ) available_products = get_available_products( all_products, time_filter=part_of_day, include_minoxidil=include_minoxidil_beard, ) last_used = build_last_used_on_by_product(routines) available_products = filter_products_by_interval( available_products, routine_date=routine_date, last_used_on_by_product=last_used, ) products_with_inventory = get_products_with_inventory( available_products ) products_ctx = build_products_context_summary_list( available_products, products_with_inventory, ) objectives_ctx = build_objectives_context(include_minoxidil_beard) prompt = build_prompt( routine_date=routine_date, part_of_day=part_of_day, leaving_home=leaving_home if part_of_day == "am" else None, include_minoxidil_beard=include_minoxidil_beard, profile_ctx=profile_ctx, skin_ctx=skin_ctx, snapshot_history_ctx=snapshot_history_ctx, upcoming_grooming_ctx=upcoming_grooming_ctx, recent_history_ctx=recent_history_ctx, products_ctx=products_ctx, objectives_ctx=objectives_ctx, ) handler = build_product_details_tool_handler( available_products, last_used_on_by_product=last_used, ) row = { "run_at": datetime.now().isoformat(timespec="seconds"), "days_window": window, "repeat": rep, "routine_date": routine_date.isoformat(), "part_of_day": part_of_day, "snapshots_in_window": snapshots_in_window, "snapshot_fallback_used": snapshot_fallback_used, "http_status": "", "duration_ms": "", "steps_count": "", "primary_goal": "", "confidence": "", "reasoning_excerpt": "", "prompt_tokens": "", "completion_tokens": "", "total_tokens": "", "thoughts_tokens": "", "routine_json": "", "error_detail": "", } try: print(f"[{window}d #{rep}] calling Gemini...") status, payload, elapsed_ms = call_routines_llm(prompt, handler) row["http_status"] = status row["duration_ms"] = int(elapsed_ms) if status == 200 and isinstance(payload, dict): summary = payload.get("summary") or {} usage = payload.get("_usage") or {} row["steps_count"] = len(payload.get("steps") or []) row["primary_goal"] = safe_csv_text( summary.get("primary_goal"), 180 ) row["confidence"] = summary.get("confidence", "") row["reasoning_excerpt"] = safe_csv_text( payload.get("reasoning"), 240 ) row["prompt_tokens"] = usage.get("prompt_tokens", "") row["completion_tokens"] = usage.get("completion_tokens", "") row["total_tokens"] = usage.get("total_tokens", "") row["thoughts_tokens"] = usage.get("thoughts_tokens", "") row["routine_json"] = to_json_text(payload) else: row["routine_json"] = to_json_text(payload) row["error_detail"] = safe_csv_text( summarize_error(payload), 240 ) except Exception as exc: row["http_status"] = 0 row["error_detail"] = safe_csv_text(str(exc), 240) row["routine_json"] = to_json_text({"error": str(exc)}) writer.writerow(row) f.flush() jsonl_row = { "run_at": row["run_at"], "days_window": window, "repeat": rep, "routine_date": routine_date.isoformat(), "part_of_day": part_of_day, "http_status": row["http_status"], "duration_ms": row["duration_ms"], "payload": json.loads(row["routine_json"]), } jf.write(json.dumps(jsonl_row, ensure_ascii=False) + "\n") jf.flush() rows_written += 1 print( f"[{window}d #{rep}] status={row['http_status']} " f"steps={row['steps_count'] or '-'} tokens={row['total_tokens'] or '-'}" ) time.sleep(throttle_sec) return rows_written def parse_windows(raw: str) -> list[int]: values = [int(x.strip()) for x in raw.split(",") if x.strip()] if not values: raise ValueError("--windows must contain at least one integer") if any(v < 1 for v in values): raise ValueError("--windows values must be >= 1") return values def main() -> int: parser = argparse.ArgumentParser( description="Benchmark routines LLM output across skincare snapshot history windows" ) parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--routine-date", default=date.today().isoformat()) parser.add_argument("--part-of-day", choices=["am", "pm"], default="am") parser.add_argument("--leaving-home", action="store_true") parser.add_argument("--include-minoxidil-beard", action="store_true") parser.add_argument("--windows", default=",".join(str(w) for w in DEFAULT_WINDOWS)) parser.add_argument("--repeats", type=int, default=3) parser.add_argument("--out", default=DEFAULT_OUT) parser.add_argument( "--out-jsonl", default="", help="JSONL output with full routine payload per run (default: .jsonl)", ) parser.add_argument("--api-timeout", type=int, default=20) parser.add_argument("--throttle-sec", type=float, default=0.4) args = parser.parse_args() if args.repeats < 1: raise ValueError("--repeats must be >= 1") if not os.environ.get("GEMINI_API_KEY"): raise RuntimeError("GEMINI_API_KEY is not set") windows = parse_windows(args.windows) routine_date = date.fromisoformat(args.routine_date) out_jsonl = args.out_jsonl or str(Path(args.out).with_suffix(".jsonl")) print("Starting benchmark with production-like prompt + function tools") print( f"windows={windows}, repeats={args.repeats}, out={args.out}, out_jsonl={out_jsonl}" ) written = 0 try: written = run_experiment( base_url=args.base_url, routine_date=routine_date, part_of_day=args.part_of_day, leaving_home=args.leaving_home, include_minoxidil_beard=args.include_minoxidil_beard, windows=windows, repeats=args.repeats, out_csv=args.out, out_jsonl=out_jsonl, api_timeout=args.api_timeout, throttle_sec=args.throttle_sec, ) except KeyboardInterrupt: print("Interrupted by user. Partial CSV was already flushed to disk.") return 130 print(f"Done. Wrote {written} rows to: {args.out} and {out_jsonl}") return 0 if __name__ == "__main__": raise SystemExit(main())