innercontext/backend/innercontext/mcp_server.py

655 lines
24 KiB
Python

from __future__ import annotations
import json
import os
from datetime import date, datetime, timedelta
from typing import Optional
from uuid import UUID
import httpx
from fastapi import HTTPException
from fastmcp import FastMCP
from google.genai import types as genai_types
from pydantic import BaseModel
from sqlmodel import Session, col, select
from db import engine
from innercontext.llm import call_gemini
from innercontext.models import (
GroomingSchedule,
LabResult,
MedicationEntry,
MedicationUsage,
Product,
ProductInventory,
Routine,
RoutineStep,
SkinConditionSnapshot,
)
# ---------------------------------------------------------------------------
# Pydantic schemas for structured output
# ---------------------------------------------------------------------------
class ProductSuggestionOut(BaseModel):
category: str
product_type: str
key_ingredients: list[str]
target_concerns: list[str]
why_needed: str
recommended_time: str
frequency: str
class ShoppingSuggestionsOut(BaseModel):
suggestions: list[ProductSuggestionOut]
reasoning: str
class MarketProduct(BaseModel):
name: str
brand: str
price: Optional[float] = None
currency: str = "PLN"
store: str
url: str
in_stock: Optional[bool] = None
mcp = FastMCP("innercontext")
# ── Products ──────────────────────────────────────────────────────────────────
@mcp.tool()
def get_products(
category: Optional[str] = None,
is_medication: bool = False,
is_tool: bool = False,
) -> list[dict]:
"""List products. By default returns skincare products (excludes medications and tools).
Pass is_medication=True or is_tool=True to retrieve those categories instead."""
with Session(engine) as session:
stmt = select(Product)
if category is not None:
stmt = stmt.where(Product.category == category)
stmt = stmt.where(Product.is_medication == is_medication)
stmt = stmt.where(Product.is_tool == is_tool)
products = session.exec(stmt).all()
return [p.to_llm_context() for p in products]
@mcp.tool()
def get_product(product_id: str) -> dict:
"""Get full context for a single product (UUID) including all inventory entries."""
with Session(engine) as session:
product = session.get(Product, UUID(product_id))
if product is None:
return {"error": f"Product {product_id} not found"}
ctx = product.to_llm_context()
entries = session.exec(
select(ProductInventory).where(ProductInventory.product_id == product.id)
).all()
ctx["inventory"] = [
{
"id": str(inv.id),
"is_opened": inv.is_opened,
"opened_at": inv.opened_at.isoformat() if inv.opened_at else None,
"finished_at": inv.finished_at.isoformat() if inv.finished_at else None,
"expiry_date": inv.expiry_date.isoformat() if inv.expiry_date else None,
"current_weight_g": inv.current_weight_g,
}
for inv in entries
]
return ctx
# ── Inventory ─────────────────────────────────────────────────────────────────
@mcp.tool()
def get_open_inventory() -> list[dict]:
"""Return all currently open packages (is_opened=True, finished_at=None)
with product name, opening date, weight, and expiry date."""
with Session(engine) as session:
stmt = (
select(ProductInventory, Product)
.join(Product, col(ProductInventory.product_id) == col(Product.id))
.where(col(ProductInventory.is_opened).is_(True))
.where(col(ProductInventory.finished_at).is_(None))
)
rows = session.exec(stmt).all()
return [
{
"inventory_id": str(inv.id),
"product_id": str(product.id),
"product_name": product.name,
"brand": product.brand,
"opened_at": inv.opened_at.isoformat() if inv.opened_at else None,
"current_weight_g": inv.current_weight_g,
"expiry_date": inv.expiry_date.isoformat() if inv.expiry_date else None,
}
for inv, product in rows
]
# ── Routines ──────────────────────────────────────────────────────────────────
@mcp.tool()
def get_recent_routines(days: int = 14) -> list[dict]:
"""Get skincare routines from the last N days, newest first.
Each routine includes its ordered steps with product name or action."""
with Session(engine) as session:
cutoff = date.today() - timedelta(days=days)
routines = session.exec(
select(Routine)
.where(Routine.routine_date >= cutoff)
.order_by(col(Routine.routine_date).desc())
).all()
result = []
for routine in routines:
steps = session.exec(
select(RoutineStep)
.where(RoutineStep.routine_id == routine.id)
.order_by(col(RoutineStep.order_index))
).all()
steps_data = []
for step in steps:
step_dict: dict = {"order": step.order_index}
if step.product_id:
product = session.get(Product, step.product_id)
if product:
step_dict["product"] = product.name
step_dict["product_id"] = str(product.id)
if step.action_type:
step_dict["action"] = (
step.action_type.value
if hasattr(step.action_type, "value")
else str(step.action_type)
)
if step.action_notes:
step_dict["notes"] = step.action_notes
if step.dose:
step_dict["dose"] = step.dose
if step.region:
step_dict["region"] = step.region
steps_data.append(step_dict)
result.append(
{
"id": str(routine.id),
"date": routine.routine_date.isoformat(),
"part_of_day": (
routine.part_of_day.value
if hasattr(routine.part_of_day, "value")
else str(routine.part_of_day)
),
"notes": routine.notes,
"steps": steps_data,
}
)
return result
# ── Skin snapshots ────────────────────────────────────────────────────────────
def _snapshot_to_dict(s: SkinConditionSnapshot, *, full: bool) -> dict:
def ev(v: object) -> object:
return v.value if v is not None and hasattr(v, "value") else v
d: dict = {
"id": str(s.id),
"date": s.snapshot_date.isoformat(),
"overall_state": ev(s.overall_state),
"hydration_level": s.hydration_level,
"sensitivity_level": s.sensitivity_level,
"barrier_state": ev(s.barrier_state),
"active_concerns": [ev(c) for c in (s.active_concerns or [])],
}
if full:
d.update(
{
"skin_type": ev(s.skin_type),
"texture": ev(s.texture),
"sebum_tzone": s.sebum_tzone,
"sebum_cheeks": s.sebum_cheeks,
"risks": s.risks or [],
"priorities": s.priorities or [],
"notes": s.notes,
}
)
return d
@mcp.tool()
def get_latest_skin_snapshot() -> dict | None:
"""Get the most recent skin condition snapshot with all metrics."""
with Session(engine) as session:
snapshot = session.exec(
select(SkinConditionSnapshot).order_by(
col(SkinConditionSnapshot.snapshot_date).desc()
)
).first()
if snapshot is None:
return None
return _snapshot_to_dict(snapshot, full=True)
@mcp.tool()
def get_skin_history(weeks: int = 8) -> list[dict]:
"""Get skin condition snapshots from the last N weeks with key metrics."""
with Session(engine) as session:
cutoff = date.today() - timedelta(weeks=weeks)
snapshots = session.exec(
select(SkinConditionSnapshot)
.where(SkinConditionSnapshot.snapshot_date >= cutoff)
.order_by(col(SkinConditionSnapshot.snapshot_date).desc())
).all()
return [_snapshot_to_dict(s, full=False) for s in snapshots]
@mcp.tool()
def get_skin_snapshot_dates() -> list[str]:
"""List all dates (YYYY-MM-DD) for which skin snapshots exist, newest first."""
with Session(engine) as session:
snapshots = session.exec(
select(SkinConditionSnapshot).order_by(
col(SkinConditionSnapshot.snapshot_date).desc()
)
).all()
return [s.snapshot_date.isoformat() for s in snapshots]
@mcp.tool()
def get_skin_snapshot(snapshot_date: str) -> dict | None:
"""Get the full skin condition snapshot for a specific date (YYYY-MM-DD)."""
with Session(engine) as session:
target = date.fromisoformat(snapshot_date)
snapshot = session.exec(
select(SkinConditionSnapshot).where(
SkinConditionSnapshot.snapshot_date == target
)
).first()
if snapshot is None:
return None
return _snapshot_to_dict(snapshot, full=True)
# ── Health / medications ───────────────────────────────────────────────────────
@mcp.tool()
def get_medications() -> list[dict]:
"""Get all medication entries with their currently active usage records
(valid_to IS NULL or >= today)."""
with Session(engine) as session:
medications = session.exec(select(MedicationEntry)).all()
today = datetime.combine(date.today(), datetime.min.time())
result = []
for med in medications:
usages = session.exec(
select(MedicationUsage)
.where(MedicationUsage.medication_record_id == med.record_id)
.where(
col(MedicationUsage.valid_to).is_(None)
| (col(MedicationUsage.valid_to) >= today)
)
).all()
result.append(
{
"id": str(med.record_id),
"product_name": med.product_name,
"kind": (
med.kind.value if hasattr(med.kind, "value") else str(med.kind)
),
"active_substance": med.active_substance,
"formulation": med.formulation,
"route": med.route,
"notes": med.notes,
"active_usages": [
{
"id": str(u.record_id),
"dose": (
f"{u.dose_value} {u.dose_unit}"
if u.dose_value is not None and u.dose_unit
else None
),
"frequency": u.frequency,
"schedule_text": u.schedule_text,
"as_needed": u.as_needed,
"valid_from": (
u.valid_from.isoformat() if u.valid_from else None
),
"valid_to": u.valid_to.isoformat() if u.valid_to else None,
}
for u in usages
],
}
)
return result
# ── Expiring inventory ────────────────────────────────────────────────────────
@mcp.tool()
def get_expiring_inventory(days: int = 30) -> list[dict]:
"""List open packages whose expiry date falls within the next N days.
Sorted by days remaining (soonest first)."""
with Session(engine) as session:
cutoff = date.today() + timedelta(days=days)
stmt = (
select(ProductInventory, Product)
.join(Product, col(ProductInventory.product_id) == col(Product.id))
.where(col(ProductInventory.is_opened).is_(True))
.where(col(ProductInventory.finished_at).is_(None))
.where(col(ProductInventory.expiry_date).is_not(None))
.where(col(ProductInventory.expiry_date) <= cutoff)
)
rows = session.exec(stmt).all()
today = date.today()
result = [
{
"product_name": product.name,
"brand": product.brand,
"expiry_date": inv.expiry_date.isoformat() if inv.expiry_date else None,
"days_remaining": (
(inv.expiry_date - today).days if inv.expiry_date else None
),
"current_weight_g": inv.current_weight_g,
}
for inv, product in rows
]
return sorted(result, key=lambda x: x["days_remaining"] or 0)
# ── Grooming schedule ─────────────────────────────────────────────────────────
@mcp.tool()
def get_grooming_schedule() -> list[dict]:
"""Get the full grooming schedule sorted by day of week (0=Monday, 6=Sunday)."""
with Session(engine) as session:
entries = session.exec(
select(GroomingSchedule).order_by(col(GroomingSchedule.day_of_week))
).all()
return [
{
"id": str(e.id),
"day_of_week": e.day_of_week,
"action": (
e.action.value if hasattr(e.action, "value") else str(e.action)
),
"notes": e.notes,
}
for e in entries
]
# ── Lab results ───────────────────────────────────────────────────────────────
def _lab_result_to_dict(r: LabResult) -> dict:
return {
"id": str(r.record_id),
"collected_at": r.collected_at.isoformat(),
"test_code": r.test_code,
"test_name_loinc": r.test_name_loinc,
"test_name_original": r.test_name_original,
"value_num": r.value_num,
"value_text": r.value_text,
"value_bool": r.value_bool,
"unit": r.unit_ucum or r.unit_original,
"ref_low": r.ref_low,
"ref_high": r.ref_high,
"ref_text": r.ref_text,
"flag": r.flag.value if r.flag and hasattr(r.flag, "value") else r.flag,
"lab": r.lab,
"notes": r.notes,
}
@mcp.tool()
def get_recent_lab_results(limit: int = 30) -> list[dict]:
"""Get the most recent lab results sorted by collection date descending."""
with Session(engine) as session:
results = session.exec(
select(LabResult).order_by(col(LabResult.collected_at).desc()).limit(limit)
).all()
return [_lab_result_to_dict(r) for r in results]
@mcp.tool()
def get_available_lab_tests() -> list[dict]:
"""List all distinct lab tests ever performed, grouped by LOINC test_code.
Returns test_code, LOINC name, original lab names, result count, and last collection date.
"""
with Session(engine) as session:
results = session.exec(select(LabResult)).all()
tests: dict[str, dict] = {}
for r in results:
code = r.test_code
if code not in tests:
tests[code] = {
"test_code": code,
"test_name_loinc": r.test_name_loinc,
"test_names_original": set(),
"count": 0,
"last_collected_at": r.collected_at,
}
tests[code]["count"] += 1
if r.test_name_original:
tests[code]["test_names_original"].add(r.test_name_original)
if r.collected_at > tests[code]["last_collected_at"]:
tests[code]["last_collected_at"] = r.collected_at
return [
{
"test_code": v["test_code"],
"test_name_loinc": v["test_name_loinc"],
"test_names_original": sorted(v["test_names_original"]),
"count": v["count"],
"last_collected_at": v["last_collected_at"].isoformat(),
}
for v in sorted(tests.values(), key=lambda x: x["test_code"])
]
@mcp.tool()
def get_lab_results_for_test(test_code: str) -> list[dict]:
"""Get the full chronological history of results for a specific LOINC test code."""
with Session(engine) as session:
results = session.exec(
select(LabResult)
.where(LabResult.test_code == test_code)
.order_by(col(LabResult.collected_at).asc())
).all()
return [_lab_result_to_dict(r) for r in results]
# ── Shopping assistant ────────────────────────────────────────────────────────
def _ev(v: object) -> str:
if v is None:
return ""
value = getattr(v, "value", None)
if isinstance(value, str):
return value
return str(v)
def _build_shopping_context(session: Session) -> tuple[str, list[dict]]:
snapshot = session.exec(
select(SkinConditionSnapshot).order_by(
col(SkinConditionSnapshot.snapshot_date).desc()
)
).first()
skin_lines = ["STAN SKÓRY:"]
if snapshot:
skin_lines.append(f" Data: {snapshot.snapshot_date}")
skin_lines.append(f" Ogólny stan: {_ev(snapshot.overall_state)}")
skin_lines.append(f" Typ skóry: {_ev(snapshot.skin_type)}")
skin_lines.append(f" Nawilżenie: {snapshot.hydration_level}/5")
skin_lines.append(f" Wrażliwość: {snapshot.sensitivity_level}/5")
skin_lines.append(f" Bariera: {_ev(snapshot.barrier_state)}")
concerns = ", ".join(_ev(c) for c in (snapshot.active_concerns or []))
skin_lines.append(f" Aktywne problemy: {concerns or 'brak'}")
if snapshot.priorities:
skin_lines.append(f" Priorytety: {', '.join(snapshot.priorities)}")
else:
skin_lines.append(" (brak danych)")
stmt = select(Product).where(col(Product.is_tool).is_(False))
products = session.exec(stmt).all()
product_ids = [p.id for p in products]
inventory_rows = (
session.exec(
select(ProductInventory).where(
col(ProductInventory.product_id).in_(product_ids)
)
).all()
if product_ids
else []
)
inv_by_product: dict[UUID, list[ProductInventory]] = {}
for inv in inventory_rows:
inv_by_product.setdefault(inv.product_id, []).append(inv)
products_data = []
for p in products:
if p.is_medication:
continue
active_inv = [i for i in inv_by_product.get(p.id, []) if i.finished_at is None]
has_stock = any(i.is_opened and i.finished_at is None for i in active_inv)
products_data.append(
{
"id": str(p.id),
"name": p.name,
"brand": p.brand or "",
"category": _ev(p.category),
"targets": p.targets or [],
"actives": [
{
"name": a.get("name") if isinstance(a, dict) else a.name,
"percent": a.get("percent") if isinstance(a, dict) else None,
}
for a in (p.actives or [])
],
"has_inventory": has_stock,
}
)
products_lines = ["POSIADANE PRODUKTY:"]
for p in products_data:
stock = "" if p["has_inventory"] else ""
products_lines.append(
f" [{stock}] {p['name']} ({p['brand']}) - {p['category']}, "
f"targets: {p['targets']}"
)
return "\n".join(skin_lines) + "\n\n" + "\n".join(products_lines), products_data
_SHOPPING_SYSTEM_PROMPT = """Jesteś asystentem zakupowym w dziedzinie pielęgnacji skóry.
Twoim zadaniem jest przeanalizować stan skóry użytkownika oraz produkty, które już posiada,
a następnie zasugerować TYPY produktów (bez marek), które mogłyby uzupełnić ich rutynę.
ZASADY:
0. Sugeruj tylko wtedy, gdy jest realna potrzeba - nie zwracaj stałej liczby produktów
1. Sugeruj TYLKO typy produktów, NIGDY konkretne marki (np. "Salicylic Acid 2% Masque", nie "La Roche-Posay")
2. Koncentruj się na produktach, których użytkownik NIE posiada w swoim inventarzu
3. Bierz pod uwagę aktywne problemy skóry (acne, hyperpigmentacja, aging, etc.)
4. Sugeruj realistyczną częstotliwość użycia (dzienna, 2-3x tygodniowo, etc.)
5. Zachowaj kolejność warstw: cleanse → toner → serum → moisturizer → SPF
6. Jeśli użytkownik ma uszkodzoną barierę, unikaj silnych eksfoliantów i retinoidów
7. Odpowiadaj w języku polskim
Format odpowiedzi - zwróć wyłącznie JSON zgodny z podanym schematem."""
@mcp.tool()
def get_shopping_suggestions() -> dict:
"""Analyze skin condition and inventory to suggest product types that could fill gaps.
Returns generic product suggestions (e.g., 'Salicylic Acid 2% Masque'), not specific brands.
"""
with Session(engine) as session:
context, products = _build_shopping_context(session)
prompt = (
f"Na podstawie poniższych danych przeanalizuj, jakie TYPY produktów "
f"mogłyby uzupełnić rutynę pielęgnacyjną użytkownika.\n\n"
f"{context}\n\n"
f"Zwróć wyłącznie JSON zgodny ze schematem."
)
response = call_gemini(
endpoint="shopping/suggest",
contents=prompt,
config=genai_types.GenerateContentConfig(
system_instruction=_SHOPPING_SYSTEM_PROMPT,
response_mime_type="application/json",
response_schema=ShoppingSuggestionsOut,
max_output_tokens=4096,
temperature=0.4,
),
user_input=prompt,
)
raw = response.text
if not raw:
raise HTTPException(status_code=502, detail="LLM returned empty response")
try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
raise HTTPException(status_code=502, detail=f"Invalid JSON from LLM: {e}")
return {
"suggestions": parsed.get("suggestions", []),
"reasoning": parsed.get("reasoning", ""),
}
def _call_market_service(
query: str, stores: Optional[list[str]] = None
) -> list[MarketProduct]:
market_url = os.environ.get("MARKET_SERVICE_URL")
if not market_url:
return []
try:
with httpx.Client(timeout=10.0) as client:
resp = client.get(
f"{market_url}/search",
params={"q": query, "stores": stores or ["rossmann", "dm", "hebe"]},
)
resp.raise_for_status()
data = resp.json()
return [MarketProduct(**item) for item in data.get("products", [])]
except Exception:
return []
@mcp.tool()
def search_market_products(
query: str,
stores: Optional[list[str]] = None,
) -> list[dict]:
"""Search drug store catalogs for products matching the query.
Uses external market service to query Rossmann, DM, Hebe, etc.
Examples:
- query: "salicylic acid serum acne"
- stores: ["rossmann", "dm", "hebe"] (optional, queries all by default)"""
products = _call_market_service(query, stores)
return [p.model_dump(mode="json") for p in products]