655 lines
24 KiB
Python
655 lines
24 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from datetime import date, datetime, timedelta
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
import httpx
|
|
from fastapi import HTTPException
|
|
from fastmcp import FastMCP
|
|
from google.genai import types as genai_types
|
|
from pydantic import BaseModel
|
|
from sqlmodel import Session, col, select
|
|
|
|
from db import engine
|
|
from innercontext.llm import call_gemini
|
|
from innercontext.models import (
|
|
GroomingSchedule,
|
|
LabResult,
|
|
MedicationEntry,
|
|
MedicationUsage,
|
|
Product,
|
|
ProductInventory,
|
|
Routine,
|
|
RoutineStep,
|
|
SkinConditionSnapshot,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pydantic schemas for structured output
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ProductSuggestionOut(BaseModel):
|
|
category: str
|
|
product_type: str
|
|
key_ingredients: list[str]
|
|
target_concerns: list[str]
|
|
why_needed: str
|
|
recommended_time: str
|
|
frequency: str
|
|
|
|
|
|
class ShoppingSuggestionsOut(BaseModel):
|
|
suggestions: list[ProductSuggestionOut]
|
|
reasoning: str
|
|
|
|
|
|
class MarketProduct(BaseModel):
|
|
name: str
|
|
brand: str
|
|
price: Optional[float] = None
|
|
currency: str = "PLN"
|
|
store: str
|
|
url: str
|
|
in_stock: Optional[bool] = None
|
|
|
|
|
|
mcp = FastMCP("innercontext")
|
|
|
|
|
|
# ── Products ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@mcp.tool()
|
|
def get_products(
|
|
category: Optional[str] = None,
|
|
is_medication: bool = False,
|
|
is_tool: bool = False,
|
|
) -> list[dict]:
|
|
"""List products. By default returns skincare products (excludes medications and tools).
|
|
Pass is_medication=True or is_tool=True to retrieve those categories instead."""
|
|
with Session(engine) as session:
|
|
stmt = select(Product)
|
|
if category is not None:
|
|
stmt = stmt.where(Product.category == category)
|
|
stmt = stmt.where(Product.is_medication == is_medication)
|
|
stmt = stmt.where(Product.is_tool == is_tool)
|
|
products = session.exec(stmt).all()
|
|
return [p.to_llm_context() for p in products]
|
|
|
|
|
|
@mcp.tool()
|
|
def get_product(product_id: str) -> dict:
|
|
"""Get full context for a single product (UUID) including all inventory entries."""
|
|
with Session(engine) as session:
|
|
product = session.get(Product, UUID(product_id))
|
|
if product is None:
|
|
return {"error": f"Product {product_id} not found"}
|
|
ctx = product.to_llm_context()
|
|
entries = session.exec(
|
|
select(ProductInventory).where(ProductInventory.product_id == product.id)
|
|
).all()
|
|
ctx["inventory"] = [
|
|
{
|
|
"id": str(inv.id),
|
|
"is_opened": inv.is_opened,
|
|
"opened_at": inv.opened_at.isoformat() if inv.opened_at else None,
|
|
"finished_at": inv.finished_at.isoformat() if inv.finished_at else None,
|
|
"expiry_date": inv.expiry_date.isoformat() if inv.expiry_date else None,
|
|
"current_weight_g": inv.current_weight_g,
|
|
}
|
|
for inv in entries
|
|
]
|
|
return ctx
|
|
|
|
|
|
# ── Inventory ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@mcp.tool()
|
|
def get_open_inventory() -> list[dict]:
|
|
"""Return all currently open packages (is_opened=True, finished_at=None)
|
|
with product name, opening date, weight, and expiry date."""
|
|
with Session(engine) as session:
|
|
stmt = (
|
|
select(ProductInventory, Product)
|
|
.join(Product, col(ProductInventory.product_id) == col(Product.id))
|
|
.where(col(ProductInventory.is_opened).is_(True))
|
|
.where(col(ProductInventory.finished_at).is_(None))
|
|
)
|
|
rows = session.exec(stmt).all()
|
|
return [
|
|
{
|
|
"inventory_id": str(inv.id),
|
|
"product_id": str(product.id),
|
|
"product_name": product.name,
|
|
"brand": product.brand,
|
|
"opened_at": inv.opened_at.isoformat() if inv.opened_at else None,
|
|
"current_weight_g": inv.current_weight_g,
|
|
"expiry_date": inv.expiry_date.isoformat() if inv.expiry_date else None,
|
|
}
|
|
for inv, product in rows
|
|
]
|
|
|
|
|
|
# ── Routines ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@mcp.tool()
|
|
def get_recent_routines(days: int = 14) -> list[dict]:
|
|
"""Get skincare routines from the last N days, newest first.
|
|
Each routine includes its ordered steps with product name or action."""
|
|
with Session(engine) as session:
|
|
cutoff = date.today() - timedelta(days=days)
|
|
routines = session.exec(
|
|
select(Routine)
|
|
.where(Routine.routine_date >= cutoff)
|
|
.order_by(col(Routine.routine_date).desc())
|
|
).all()
|
|
|
|
result = []
|
|
for routine in routines:
|
|
steps = session.exec(
|
|
select(RoutineStep)
|
|
.where(RoutineStep.routine_id == routine.id)
|
|
.order_by(col(RoutineStep.order_index))
|
|
).all()
|
|
|
|
steps_data = []
|
|
for step in steps:
|
|
step_dict: dict = {"order": step.order_index}
|
|
if step.product_id:
|
|
product = session.get(Product, step.product_id)
|
|
if product:
|
|
step_dict["product"] = product.name
|
|
step_dict["product_id"] = str(product.id)
|
|
if step.action_type:
|
|
step_dict["action"] = (
|
|
step.action_type.value
|
|
if hasattr(step.action_type, "value")
|
|
else str(step.action_type)
|
|
)
|
|
if step.action_notes:
|
|
step_dict["notes"] = step.action_notes
|
|
if step.dose:
|
|
step_dict["dose"] = step.dose
|
|
if step.region:
|
|
step_dict["region"] = step.region
|
|
steps_data.append(step_dict)
|
|
|
|
result.append(
|
|
{
|
|
"id": str(routine.id),
|
|
"date": routine.routine_date.isoformat(),
|
|
"part_of_day": (
|
|
routine.part_of_day.value
|
|
if hasattr(routine.part_of_day, "value")
|
|
else str(routine.part_of_day)
|
|
),
|
|
"notes": routine.notes,
|
|
"steps": steps_data,
|
|
}
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
# ── Skin snapshots ────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _snapshot_to_dict(s: SkinConditionSnapshot, *, full: bool) -> dict:
|
|
def ev(v: object) -> object:
|
|
return v.value if v is not None and hasattr(v, "value") else v
|
|
|
|
d: dict = {
|
|
"id": str(s.id),
|
|
"date": s.snapshot_date.isoformat(),
|
|
"overall_state": ev(s.overall_state),
|
|
"hydration_level": s.hydration_level,
|
|
"sensitivity_level": s.sensitivity_level,
|
|
"barrier_state": ev(s.barrier_state),
|
|
"active_concerns": [ev(c) for c in (s.active_concerns or [])],
|
|
}
|
|
if full:
|
|
d.update(
|
|
{
|
|
"skin_type": ev(s.skin_type),
|
|
"texture": ev(s.texture),
|
|
"sebum_tzone": s.sebum_tzone,
|
|
"sebum_cheeks": s.sebum_cheeks,
|
|
"risks": s.risks or [],
|
|
"priorities": s.priorities or [],
|
|
"notes": s.notes,
|
|
}
|
|
)
|
|
return d
|
|
|
|
|
|
@mcp.tool()
|
|
def get_latest_skin_snapshot() -> dict | None:
|
|
"""Get the most recent skin condition snapshot with all metrics."""
|
|
with Session(engine) as session:
|
|
snapshot = session.exec(
|
|
select(SkinConditionSnapshot).order_by(
|
|
col(SkinConditionSnapshot.snapshot_date).desc()
|
|
)
|
|
).first()
|
|
if snapshot is None:
|
|
return None
|
|
return _snapshot_to_dict(snapshot, full=True)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_skin_history(weeks: int = 8) -> list[dict]:
|
|
"""Get skin condition snapshots from the last N weeks with key metrics."""
|
|
with Session(engine) as session:
|
|
cutoff = date.today() - timedelta(weeks=weeks)
|
|
snapshots = session.exec(
|
|
select(SkinConditionSnapshot)
|
|
.where(SkinConditionSnapshot.snapshot_date >= cutoff)
|
|
.order_by(col(SkinConditionSnapshot.snapshot_date).desc())
|
|
).all()
|
|
return [_snapshot_to_dict(s, full=False) for s in snapshots]
|
|
|
|
|
|
@mcp.tool()
|
|
def get_skin_snapshot_dates() -> list[str]:
|
|
"""List all dates (YYYY-MM-DD) for which skin snapshots exist, newest first."""
|
|
with Session(engine) as session:
|
|
snapshots = session.exec(
|
|
select(SkinConditionSnapshot).order_by(
|
|
col(SkinConditionSnapshot.snapshot_date).desc()
|
|
)
|
|
).all()
|
|
return [s.snapshot_date.isoformat() for s in snapshots]
|
|
|
|
|
|
@mcp.tool()
|
|
def get_skin_snapshot(snapshot_date: str) -> dict | None:
|
|
"""Get the full skin condition snapshot for a specific date (YYYY-MM-DD)."""
|
|
with Session(engine) as session:
|
|
target = date.fromisoformat(snapshot_date)
|
|
snapshot = session.exec(
|
|
select(SkinConditionSnapshot).where(
|
|
SkinConditionSnapshot.snapshot_date == target
|
|
)
|
|
).first()
|
|
if snapshot is None:
|
|
return None
|
|
return _snapshot_to_dict(snapshot, full=True)
|
|
|
|
|
|
# ── Health / medications ───────────────────────────────────────────────────────
|
|
|
|
|
|
@mcp.tool()
|
|
def get_medications() -> list[dict]:
|
|
"""Get all medication entries with their currently active usage records
|
|
(valid_to IS NULL or >= today)."""
|
|
with Session(engine) as session:
|
|
medications = session.exec(select(MedicationEntry)).all()
|
|
today = datetime.combine(date.today(), datetime.min.time())
|
|
result = []
|
|
for med in medications:
|
|
usages = session.exec(
|
|
select(MedicationUsage)
|
|
.where(MedicationUsage.medication_record_id == med.record_id)
|
|
.where(
|
|
col(MedicationUsage.valid_to).is_(None)
|
|
| (col(MedicationUsage.valid_to) >= today)
|
|
)
|
|
).all()
|
|
result.append(
|
|
{
|
|
"id": str(med.record_id),
|
|
"product_name": med.product_name,
|
|
"kind": (
|
|
med.kind.value if hasattr(med.kind, "value") else str(med.kind)
|
|
),
|
|
"active_substance": med.active_substance,
|
|
"formulation": med.formulation,
|
|
"route": med.route,
|
|
"notes": med.notes,
|
|
"active_usages": [
|
|
{
|
|
"id": str(u.record_id),
|
|
"dose": (
|
|
f"{u.dose_value} {u.dose_unit}"
|
|
if u.dose_value is not None and u.dose_unit
|
|
else None
|
|
),
|
|
"frequency": u.frequency,
|
|
"schedule_text": u.schedule_text,
|
|
"as_needed": u.as_needed,
|
|
"valid_from": (
|
|
u.valid_from.isoformat() if u.valid_from else None
|
|
),
|
|
"valid_to": u.valid_to.isoformat() if u.valid_to else None,
|
|
}
|
|
for u in usages
|
|
],
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
# ── Expiring inventory ────────────────────────────────────────────────────────
|
|
|
|
|
|
@mcp.tool()
|
|
def get_expiring_inventory(days: int = 30) -> list[dict]:
|
|
"""List open packages whose expiry date falls within the next N days.
|
|
Sorted by days remaining (soonest first)."""
|
|
with Session(engine) as session:
|
|
cutoff = date.today() + timedelta(days=days)
|
|
stmt = (
|
|
select(ProductInventory, Product)
|
|
.join(Product, col(ProductInventory.product_id) == col(Product.id))
|
|
.where(col(ProductInventory.is_opened).is_(True))
|
|
.where(col(ProductInventory.finished_at).is_(None))
|
|
.where(col(ProductInventory.expiry_date).is_not(None))
|
|
.where(col(ProductInventory.expiry_date) <= cutoff)
|
|
)
|
|
rows = session.exec(stmt).all()
|
|
today = date.today()
|
|
result = [
|
|
{
|
|
"product_name": product.name,
|
|
"brand": product.brand,
|
|
"expiry_date": inv.expiry_date.isoformat() if inv.expiry_date else None,
|
|
"days_remaining": (
|
|
(inv.expiry_date - today).days if inv.expiry_date else None
|
|
),
|
|
"current_weight_g": inv.current_weight_g,
|
|
}
|
|
for inv, product in rows
|
|
]
|
|
return sorted(result, key=lambda x: x["days_remaining"] or 0)
|
|
|
|
|
|
# ── Grooming schedule ─────────────────────────────────────────────────────────
|
|
|
|
|
|
@mcp.tool()
|
|
def get_grooming_schedule() -> list[dict]:
|
|
"""Get the full grooming schedule sorted by day of week (0=Monday, 6=Sunday)."""
|
|
with Session(engine) as session:
|
|
entries = session.exec(
|
|
select(GroomingSchedule).order_by(col(GroomingSchedule.day_of_week))
|
|
).all()
|
|
return [
|
|
{
|
|
"id": str(e.id),
|
|
"day_of_week": e.day_of_week,
|
|
"action": (
|
|
e.action.value if hasattr(e.action, "value") else str(e.action)
|
|
),
|
|
"notes": e.notes,
|
|
}
|
|
for e in entries
|
|
]
|
|
|
|
|
|
# ── Lab results ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _lab_result_to_dict(r: LabResult) -> dict:
|
|
return {
|
|
"id": str(r.record_id),
|
|
"collected_at": r.collected_at.isoformat(),
|
|
"test_code": r.test_code,
|
|
"test_name_loinc": r.test_name_loinc,
|
|
"test_name_original": r.test_name_original,
|
|
"value_num": r.value_num,
|
|
"value_text": r.value_text,
|
|
"value_bool": r.value_bool,
|
|
"unit": r.unit_ucum or r.unit_original,
|
|
"ref_low": r.ref_low,
|
|
"ref_high": r.ref_high,
|
|
"ref_text": r.ref_text,
|
|
"flag": r.flag.value if r.flag and hasattr(r.flag, "value") else r.flag,
|
|
"lab": r.lab,
|
|
"notes": r.notes,
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def get_recent_lab_results(limit: int = 30) -> list[dict]:
|
|
"""Get the most recent lab results sorted by collection date descending."""
|
|
with Session(engine) as session:
|
|
results = session.exec(
|
|
select(LabResult).order_by(col(LabResult.collected_at).desc()).limit(limit)
|
|
).all()
|
|
return [_lab_result_to_dict(r) for r in results]
|
|
|
|
|
|
@mcp.tool()
|
|
def get_available_lab_tests() -> list[dict]:
|
|
"""List all distinct lab tests ever performed, grouped by LOINC test_code.
|
|
Returns test_code, LOINC name, original lab names, result count, and last collection date.
|
|
"""
|
|
with Session(engine) as session:
|
|
results = session.exec(select(LabResult)).all()
|
|
tests: dict[str, dict] = {}
|
|
for r in results:
|
|
code = r.test_code
|
|
if code not in tests:
|
|
tests[code] = {
|
|
"test_code": code,
|
|
"test_name_loinc": r.test_name_loinc,
|
|
"test_names_original": set(),
|
|
"count": 0,
|
|
"last_collected_at": r.collected_at,
|
|
}
|
|
tests[code]["count"] += 1
|
|
if r.test_name_original:
|
|
tests[code]["test_names_original"].add(r.test_name_original)
|
|
if r.collected_at > tests[code]["last_collected_at"]:
|
|
tests[code]["last_collected_at"] = r.collected_at
|
|
|
|
return [
|
|
{
|
|
"test_code": v["test_code"],
|
|
"test_name_loinc": v["test_name_loinc"],
|
|
"test_names_original": sorted(v["test_names_original"]),
|
|
"count": v["count"],
|
|
"last_collected_at": v["last_collected_at"].isoformat(),
|
|
}
|
|
for v in sorted(tests.values(), key=lambda x: x["test_code"])
|
|
]
|
|
|
|
|
|
@mcp.tool()
|
|
def get_lab_results_for_test(test_code: str) -> list[dict]:
|
|
"""Get the full chronological history of results for a specific LOINC test code."""
|
|
with Session(engine) as session:
|
|
results = session.exec(
|
|
select(LabResult)
|
|
.where(LabResult.test_code == test_code)
|
|
.order_by(col(LabResult.collected_at).asc())
|
|
).all()
|
|
return [_lab_result_to_dict(r) for r in results]
|
|
|
|
|
|
# ── Shopping assistant ────────────────────────────────────────────────────────
|
|
|
|
|
|
def _ev(v: object) -> str:
|
|
if v is None:
|
|
return ""
|
|
value = getattr(v, "value", None)
|
|
if isinstance(value, str):
|
|
return value
|
|
return str(v)
|
|
|
|
|
|
def _build_shopping_context(session: Session) -> tuple[str, list[dict]]:
|
|
snapshot = session.exec(
|
|
select(SkinConditionSnapshot).order_by(
|
|
col(SkinConditionSnapshot.snapshot_date).desc()
|
|
)
|
|
).first()
|
|
|
|
skin_lines = ["STAN SKÓRY:"]
|
|
if snapshot:
|
|
skin_lines.append(f" Data: {snapshot.snapshot_date}")
|
|
skin_lines.append(f" Ogólny stan: {_ev(snapshot.overall_state)}")
|
|
skin_lines.append(f" Typ skóry: {_ev(snapshot.skin_type)}")
|
|
skin_lines.append(f" Nawilżenie: {snapshot.hydration_level}/5")
|
|
skin_lines.append(f" Wrażliwość: {snapshot.sensitivity_level}/5")
|
|
skin_lines.append(f" Bariera: {_ev(snapshot.barrier_state)}")
|
|
concerns = ", ".join(_ev(c) for c in (snapshot.active_concerns or []))
|
|
skin_lines.append(f" Aktywne problemy: {concerns or 'brak'}")
|
|
if snapshot.priorities:
|
|
skin_lines.append(f" Priorytety: {', '.join(snapshot.priorities)}")
|
|
else:
|
|
skin_lines.append(" (brak danych)")
|
|
|
|
stmt = select(Product).where(col(Product.is_tool).is_(False))
|
|
products = session.exec(stmt).all()
|
|
|
|
product_ids = [p.id for p in products]
|
|
inventory_rows = (
|
|
session.exec(
|
|
select(ProductInventory).where(
|
|
col(ProductInventory.product_id).in_(product_ids)
|
|
)
|
|
).all()
|
|
if product_ids
|
|
else []
|
|
)
|
|
inv_by_product: dict[UUID, list[ProductInventory]] = {}
|
|
for inv in inventory_rows:
|
|
inv_by_product.setdefault(inv.product_id, []).append(inv)
|
|
|
|
products_data = []
|
|
for p in products:
|
|
if p.is_medication:
|
|
continue
|
|
active_inv = [i for i in inv_by_product.get(p.id, []) if i.finished_at is None]
|
|
has_stock = any(i.is_opened and i.finished_at is None for i in active_inv)
|
|
products_data.append(
|
|
{
|
|
"id": str(p.id),
|
|
"name": p.name,
|
|
"brand": p.brand or "",
|
|
"category": _ev(p.category),
|
|
"targets": p.targets or [],
|
|
"actives": [
|
|
{
|
|
"name": a.get("name") if isinstance(a, dict) else a.name,
|
|
"percent": a.get("percent") if isinstance(a, dict) else None,
|
|
}
|
|
for a in (p.actives or [])
|
|
],
|
|
"has_inventory": has_stock,
|
|
}
|
|
)
|
|
|
|
products_lines = ["POSIADANE PRODUKTY:"]
|
|
for p in products_data:
|
|
stock = "✓" if p["has_inventory"] else "✗"
|
|
products_lines.append(
|
|
f" [{stock}] {p['name']} ({p['brand']}) - {p['category']}, "
|
|
f"targets: {p['targets']}"
|
|
)
|
|
|
|
return "\n".join(skin_lines) + "\n\n" + "\n".join(products_lines), products_data
|
|
|
|
|
|
_SHOPPING_SYSTEM_PROMPT = """Jesteś asystentem zakupowym w dziedzinie pielęgnacji skóry.
|
|
Twoim zadaniem jest przeanalizować stan skóry użytkownika oraz produkty, które już posiada,
|
|
a następnie zasugerować TYPY produktów (bez marek), które mogłyby uzupełnić ich rutynę.
|
|
|
|
ZASADY:
|
|
0. Sugeruj tylko wtedy, gdy jest realna potrzeba - nie zwracaj stałej liczby produktów
|
|
1. Sugeruj TYLKO typy produktów, NIGDY konkretne marki (np. "Salicylic Acid 2% Masque", nie "La Roche-Posay")
|
|
2. Koncentruj się na produktach, których użytkownik NIE posiada w swoim inventarzu
|
|
3. Bierz pod uwagę aktywne problemy skóry (acne, hyperpigmentacja, aging, etc.)
|
|
4. Sugeruj realistyczną częstotliwość użycia (dzienna, 2-3x tygodniowo, etc.)
|
|
5. Zachowaj kolejność warstw: cleanse → toner → serum → moisturizer → SPF
|
|
6. Jeśli użytkownik ma uszkodzoną barierę, unikaj silnych eksfoliantów i retinoidów
|
|
7. Odpowiadaj w języku polskim
|
|
|
|
Format odpowiedzi - zwróć wyłącznie JSON zgodny z podanym schematem."""
|
|
|
|
|
|
@mcp.tool()
|
|
def get_shopping_suggestions() -> dict:
|
|
"""Analyze skin condition and inventory to suggest product types that could fill gaps.
|
|
Returns generic product suggestions (e.g., 'Salicylic Acid 2% Masque'), not specific brands.
|
|
"""
|
|
with Session(engine) as session:
|
|
context, products = _build_shopping_context(session)
|
|
|
|
prompt = (
|
|
f"Na podstawie poniższych danych przeanalizuj, jakie TYPY produktów "
|
|
f"mogłyby uzupełnić rutynę pielęgnacyjną użytkownika.\n\n"
|
|
f"{context}\n\n"
|
|
f"Zwróć wyłącznie JSON zgodny ze schematem."
|
|
)
|
|
|
|
response = call_gemini(
|
|
endpoint="shopping/suggest",
|
|
contents=prompt,
|
|
config=genai_types.GenerateContentConfig(
|
|
system_instruction=_SHOPPING_SYSTEM_PROMPT,
|
|
response_mime_type="application/json",
|
|
response_schema=ShoppingSuggestionsOut,
|
|
max_output_tokens=4096,
|
|
temperature=0.4,
|
|
),
|
|
user_input=prompt,
|
|
)
|
|
|
|
raw = response.text
|
|
if not raw:
|
|
raise HTTPException(status_code=502, detail="LLM returned empty response")
|
|
|
|
try:
|
|
parsed = json.loads(raw)
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(status_code=502, detail=f"Invalid JSON from LLM: {e}")
|
|
|
|
return {
|
|
"suggestions": parsed.get("suggestions", []),
|
|
"reasoning": parsed.get("reasoning", ""),
|
|
}
|
|
|
|
|
|
def _call_market_service(
|
|
query: str, stores: Optional[list[str]] = None
|
|
) -> list[MarketProduct]:
|
|
market_url = os.environ.get("MARKET_SERVICE_URL")
|
|
if not market_url:
|
|
return []
|
|
|
|
try:
|
|
with httpx.Client(timeout=10.0) as client:
|
|
resp = client.get(
|
|
f"{market_url}/search",
|
|
params={"q": query, "stores": stores or ["rossmann", "dm", "hebe"]},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return [MarketProduct(**item) for item in data.get("products", [])]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
@mcp.tool()
|
|
def search_market_products(
|
|
query: str,
|
|
stores: Optional[list[str]] = None,
|
|
) -> list[dict]:
|
|
"""Search drug store catalogs for products matching the query.
|
|
Uses external market service to query Rossmann, DM, Hebe, etc.
|
|
|
|
Examples:
|
|
- query: "salicylic acid serum acne"
|
|
- stores: ["rossmann", "dm", "hebe"] (optional, queries all by default)"""
|
|
products = _call_market_service(query, stores)
|
|
return [p.model_dump(mode="json") for p in products]
|