fix(backend): resolve ty check errors across api, mcp, and lifespan typing

This commit is contained in:
Piotr Oleszczyk 2026-03-02 15:51:14 +01:00
parent 679e4e81f4
commit 389ca5ffdc
6 changed files with 36 additions and 28 deletions

View file

@ -6,7 +6,7 @@ from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
from google.genai import types as genai_types
from pydantic import ValidationError
from sqlmodel import Session, SQLModel, select
from sqlmodel import Session, SQLModel, col, select
from db import get_session
from innercontext.api.utils import get_or_404
@ -218,7 +218,7 @@ def list_products(
product_ids = [p.id for p in products]
inventory_rows = (
session.exec(
select(ProductInventory).where(ProductInventory.product_id.in_(product_ids))
select(ProductInventory).where(col(ProductInventory.product_id).in_(product_ids))
).all()
if product_ids
else []

View file

@ -187,11 +187,12 @@ def _is_minoxidil_product(product: Product) -> bool:
def _ev(v: object) -> str:
return (
v.value
if v is not None and hasattr(v, "value")
else str(v) if v is not None else ""
)
if v is None:
return ""
value = getattr(v, "value", None)
if isinstance(value, str):
return value
return str(v)
def _build_skin_context(session: Session) -> str:
@ -218,7 +219,7 @@ def _build_grooming_context(
session: Session, weekdays: Optional[list[int]] = None
) -> str:
entries = session.exec(
select(GroomingSchedule).order_by(GroomingSchedule.day_of_week)
select(GroomingSchedule).order_by(col(GroomingSchedule.day_of_week))
).all()
if not entries:
return "HARMONOGRAM PIELĘGNACJI: brak\n"
@ -251,7 +252,7 @@ def _build_recent_history(session: Session) -> str:
steps = session.exec(
select(RoutineStep)
.where(RoutineStep.routine_id == r.id)
.order_by(RoutineStep.order_index)
.order_by(col(RoutineStep.order_index))
).all()
step_names = []
for s in steps:
@ -267,12 +268,12 @@ def _build_recent_history(session: Session) -> str:
def _build_products_context(session: Session, time_filter: Optional[str] = None) -> str:
stmt = select(Product).where(Product.is_tool.is_(False))
stmt = select(Product).where(col(Product.is_tool).is_(False))
products = session.exec(stmt).all()
product_ids = [p.id for p in products]
inventory_rows = (
session.exec(
select(ProductInventory).where(ProductInventory.product_id.in_(product_ids))
select(ProductInventory).where(col(ProductInventory.product_id).in_(product_ids))
).all()
if product_ids
else []
@ -433,7 +434,7 @@ def list_routines(
steps_by_routine: dict = {}
if routine_ids:
all_steps = session.exec(
select(RoutineStep).where(RoutineStep.routine_id.in_(routine_ids))
select(RoutineStep).where(col(RoutineStep.routine_id).in_(routine_ids))
).all()
for step in all_steps:
steps_by_routine.setdefault(step.routine_id, []).append(step)

View file

@ -168,7 +168,7 @@ async def analyze_skin_photos(
)
image_summary = (
f"{len(photos)} image(s): {', '.join(p.content_type for p in photos)}"
f"{len(photos)} image(s): {', '.join((p.content_type or 'unknown') for p in photos)}"
)
response = call_gemini(
endpoint="skincare/analyze-photos",

View file

@ -64,8 +64,13 @@ def call_gemini(
response = client.models.generate_content(
model=model, contents=contents, config=config
)
with suppress(Exception):
finish_reason = response.candidates[0].finish_reason.name
candidates = getattr(response, "candidates", None)
if candidates:
first_candidate = candidates[0]
reason = getattr(first_candidate, "finish_reason", None)
reason_name = getattr(reason, "name", None)
if isinstance(reason_name, str):
finish_reason = reason_name
if finish_reason and finish_reason != "STOP":
success = False
error_detail = f"finish_reason: {finish_reason}"

View file

@ -1,6 +1,6 @@
from __future__ import annotations
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from typing import Optional
from uuid import UUID
@ -79,9 +79,9 @@ def get_open_inventory() -> list[dict]:
with Session(engine) as session:
stmt = (
select(ProductInventory, Product)
.join(Product, ProductInventory.product_id == Product.id)
.where(ProductInventory.is_opened == True) # noqa: E712
.where(ProductInventory.finished_at == None) # noqa: E711
.join(Product, col(ProductInventory.product_id) == col(Product.id))
.where(col(ProductInventory.is_opened).is_(True))
.where(col(ProductInventory.finished_at).is_(None))
)
rows = session.exec(stmt).all()
return [
@ -118,7 +118,7 @@ def get_recent_routines(days: int = 14) -> list[dict]:
steps = session.exec(
select(RoutineStep)
.where(RoutineStep.routine_id == routine.id)
.order_by(RoutineStep.order_index)
.order_by(col(RoutineStep.order_index))
).all()
steps_data = []
@ -254,14 +254,14 @@ def get_medications() -> list[dict]:
(valid_to IS NULL or >= today)."""
with Session(engine) as session:
medications = session.exec(select(MedicationEntry)).all()
today = date.today()
today = datetime.combine(date.today(), datetime.min.time())
result = []
for med in medications:
usages = session.exec(
select(MedicationUsage)
.where(MedicationUsage.medication_record_id == med.record_id)
.where(
(MedicationUsage.valid_to == None) # noqa: E711
col(MedicationUsage.valid_to).is_(None)
| (col(MedicationUsage.valid_to) >= today)
)
).all()
@ -310,10 +310,10 @@ def get_expiring_inventory(days: int = 30) -> list[dict]:
cutoff = date.today() + timedelta(days=days)
stmt = (
select(ProductInventory, Product)
.join(Product, ProductInventory.product_id == Product.id)
.where(ProductInventory.is_opened == True) # noqa: E712
.where(ProductInventory.finished_at == None) # noqa: E711
.where(ProductInventory.expiry_date != None) # noqa: E711
.join(Product, col(ProductInventory.product_id) == col(Product.id))
.where(col(ProductInventory.is_opened).is_(True))
.where(col(ProductInventory.finished_at).is_(None))
.where(col(ProductInventory.expiry_date).is_not(None))
.where(col(ProductInventory.expiry_date) <= cutoff)
)
rows = session.exec(stmt).all()
@ -341,7 +341,7 @@ def get_grooming_schedule() -> list[dict]:
"""Get the full grooming schedule sorted by day of week (0=Monday, 6=Sunday)."""
with Session(engine) as session:
entries = session.exec(
select(GroomingSchedule).order_by(GroomingSchedule.day_of_week)
select(GroomingSchedule).order_by(col(GroomingSchedule.day_of_week))
).all()
return [
{

View file

@ -1,4 +1,5 @@
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Mapping
from dotenv import load_dotenv
@ -7,6 +8,7 @@ load_dotenv() # load .env before db.py reads DATABASE_URL
from fastapi import FastAPI # noqa: E402
from fastapi.middleware.cors import CORSMiddleware # noqa: E402
from fastmcp.utilities.lifespan import combine_lifespans # noqa: E402
from starlette.applications import Starlette # noqa: E402
from db import create_db_and_tables # noqa: E402
from innercontext.api import ( # noqa: E402
@ -23,7 +25,7 @@ mcp_app = mcp.http_app(path="/mcp")
@asynccontextmanager
async def lifespan(app: FastAPI):
async def lifespan(app: Starlette) -> AsyncIterator[Mapping[str, Any] | None]:
create_db_and_tables()
yield