fix(backend): apply black/isort formatting and fix ruff noqa annotations

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Piotr Oleszczyk 2026-03-01 17:27:07 +01:00
parent 4b0fedde35
commit 5cb44b2c65
6 changed files with 515 additions and 230 deletions

View file

@ -11,7 +11,13 @@ from sqlmodel import Session, SQLModel, col, select
from db import get_session
from innercontext.api.utils import get_or_404
from innercontext.llm import get_gemini_client
from innercontext.models import GroomingSchedule, Product, Routine, RoutineStep, SkinConditionSnapshot
from innercontext.models import (
GroomingSchedule,
Product,
Routine,
RoutineStep,
SkinConditionSnapshot,
)
from innercontext.models.enums import GroomingAction, PartOfDay
router = APIRouter()
@ -135,16 +141,30 @@ class _BatchOut(PydanticBase):
# Prompt helpers
# ---------------------------------------------------------------------------
_DAY_NAMES = ["poniedziałek", "wtorek", "środa", "czwartek", "piątek", "sobota", "niedziela"]
_DAY_NAMES = [
"poniedziałek",
"wtorek",
"środa",
"czwartek",
"piątek",
"sobota",
"niedziela",
]
def _ev(v: object) -> str:
return v.value if v is not None and hasattr(v, "value") else str(v) if v is not None else ""
return (
v.value
if v is not None and hasattr(v, "value")
else str(v) if v is not None else ""
)
def _build_skin_context(session: Session) -> str:
snapshot = session.exec(
select(SkinConditionSnapshot).order_by(col(SkinConditionSnapshot.snapshot_date).desc())
select(SkinConditionSnapshot).order_by(
col(SkinConditionSnapshot.snapshot_date).desc()
)
).first()
if snapshot is None:
return "STAN SKÓRY: brak danych\n"
@ -160,16 +180,24 @@ def _build_skin_context(session: Session) -> str:
)
def _build_grooming_context(session: Session, weekdays: Optional[list[int]] = None) -> str:
entries = session.exec(select(GroomingSchedule).order_by(GroomingSchedule.day_of_week)).all()
def _build_grooming_context(
session: Session, weekdays: Optional[list[int]] = None
) -> str:
entries = session.exec(
select(GroomingSchedule).order_by(GroomingSchedule.day_of_week)
).all()
if not entries:
return "HARMONOGRAM PIELĘGNACJI: brak\n"
lines = ["HARMONOGRAM PIELĘGNACJI:"]
for e in entries:
if weekdays is not None and e.day_of_week not in weekdays:
continue
day_name = _DAY_NAMES[e.day_of_week] if 0 <= e.day_of_week <= 6 else str(e.day_of_week)
lines.append(f" {day_name}: {_ev(e.action)}" + (f" ({e.notes})" if e.notes else ""))
day_name = (
_DAY_NAMES[e.day_of_week] if 0 <= e.day_of_week <= 6 else str(e.day_of_week)
)
lines.append(
f" {day_name}: {_ev(e.action)}" + (f" ({e.notes})" if e.notes else "")
)
if len(lines) == 1:
lines.append(" (brak wpisów dla podanych dni)")
return "\n".join(lines) + "\n"
@ -198,12 +226,18 @@ def _build_recent_history(session: Session) -> str:
step_names.append(p.name if p else str(s.product_id))
elif s.action_type:
step_names.append(_ev(s.action_type))
lines.append(f" {r.routine_date} {_ev(r.part_of_day).upper()}: {', '.join(step_names)}")
lines.append(
f" {r.routine_date} {_ev(r.part_of_day).upper()}: {', '.join(step_names)}"
)
return "\n".join(lines) + "\n"
def _build_products_context(session: Session, time_filter: Optional[str] = None) -> str:
stmt = select(Product).where(Product.is_medication == False).where(Product.is_tool == False) # noqa: E712
stmt = (
select(Product)
.where(Product.is_medication == False) # noqa: E712
.where(Product.is_tool == False) # noqa: E712
)
products = session.exec(stmt).all()
lines = ["DOSTĘPNE PRODUKTY:"]
for p in products:
@ -353,13 +387,17 @@ def suggest_batch(
):
delta = (data.to_date - data.from_date).days + 1
if delta > 14:
raise HTTPException(status_code=400, detail="Date range must not exceed 14 days.")
raise HTTPException(
status_code=400, detail="Date range must not exceed 14 days."
)
if data.from_date > data.to_date:
raise HTTPException(status_code=400, detail="from_date must be <= to_date.")
client, model = get_gemini_client()
weekdays = list({(data.from_date + timedelta(days=i)).weekday() for i in range(delta)})
weekdays = list(
{(data.from_date + timedelta(days=i)).weekday() for i in range(delta)}
)
skin_ctx = _build_skin_context(session)
grooming_ctx = _build_grooming_context(session, weekdays=weekdays)
history_ctx = _build_recent_history(session)
@ -437,7 +475,9 @@ def suggest_batch(
)
)
return BatchSuggestion(days=days, overall_reasoning=parsed.get("overall_reasoning", ""))
return BatchSuggestion(
days=days, overall_reasoning=parsed.get("overall_reasoning", "")
)
# Grooming-schedule GET must appear before /{routine_id} to avoid being shadowed

View file

@ -146,11 +146,17 @@ async def analyze_skin_photos(
parts: list[genai_types.Part] = []
for photo in photos:
if photo.content_type not in allowed:
raise HTTPException(status_code=422, detail=f"Unsupported type: {photo.content_type}")
raise HTTPException(
status_code=422, detail=f"Unsupported type: {photo.content_type}"
)
data = await photo.read()
if len(data) > MAX_IMAGE_BYTES:
raise HTTPException(status_code=413, detail=f"{photo.filename} exceeds 5 MB.")
parts.append(genai_types.Part.from_bytes(data=data, mime_type=photo.content_type))
raise HTTPException(
status_code=413, detail=f"{photo.filename} exceeds 5 MB."
)
parts.append(
genai_types.Part.from_bytes(data=data, mime_type=photo.content_type)
)
parts.append(
genai_types.Part.from_text(
text="Analyze the skin condition visible in the above photo(s) and return the JSON assessment."

View file

@ -5,7 +5,6 @@ import os
from fastapi import HTTPException
from google import genai
_DEFAULT_MODEL = "gemini-flash-latest"

View file

@ -287,9 +287,9 @@ def get_medications() -> list[dict]:
"frequency": u.frequency,
"schedule_text": u.schedule_text,
"as_needed": u.as_needed,
"valid_from": u.valid_from.isoformat()
if u.valid_from
else None,
"valid_from": (
u.valid_from.isoformat() if u.valid_from else None
),
"valid_to": u.valid_to.isoformat() if u.valid_to else None,
}
for u in usages
@ -323,9 +323,9 @@ def get_expiring_inventory(days: int = 30) -> list[dict]:
"product_name": product.name,
"brand": product.brand,
"expiry_date": inv.expiry_date.isoformat() if inv.expiry_date else None,
"days_remaining": (inv.expiry_date - today).days
if inv.expiry_date
else None,
"days_remaining": (
(inv.expiry_date - today).days if inv.expiry_date else None
),
"current_weight_g": inv.current_weight_g,
}
for inv, product in rows
@ -384,9 +384,7 @@ def get_recent_lab_results(limit: int = 30) -> list[dict]:
"""Get the most recent lab results sorted by collection date descending."""
with Session(engine) as session:
results = session.exec(
select(LabResult)
.order_by(col(LabResult.collected_at).desc())
.limit(limit)
select(LabResult).order_by(col(LabResult.collected_at).desc()).limit(limit)
).all()
return [_lab_result_to_dict(r) for r in results]
@ -394,7 +392,8 @@ def get_recent_lab_results(limit: int = 30) -> list[dict]:
@mcp.tool()
def get_available_lab_tests() -> list[dict]:
"""List all distinct lab tests ever performed, grouped by LOINC test_code.
Returns test_code, LOINC name, original lab names, result count, and last collection date."""
Returns test_code, LOINC name, original lab names, result count, and last collection date.
"""
with Session(engine) as session:
results = session.exec(select(LabResult)).all()
tests: dict[str, dict] = {}