feat(repo): expand lab results workflows across backend and frontend

This commit is contained in:
Piotr Oleszczyk 2026-03-05 12:46:49 +01:00
parent f1b104909d
commit 0a4ccefe28
19 changed files with 1330 additions and 170 deletions

View file

@ -3,8 +3,9 @@ from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, Query
from pydantic import field_validator
from sqlalchemy import Integer, cast, func, or_
from sqlmodel import Session, SQLModel, col, select
from db import get_session
@ -120,6 +121,13 @@ class LabResultUpdate(SQLModel):
notes: Optional[str] = None
class LabResultListResponse(SQLModel):
items: list[LabResult]
total: int
limit: int
offset: int
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
@ -251,27 +259,86 @@ def delete_usage(usage_id: UUID, session: Session = Depends(get_session)):
# ---------------------------------------------------------------------------
@router.get("/lab-results", response_model=list[LabResult])
@router.get("/lab-results", response_model=LabResultListResponse)
def list_lab_results(
q: Optional[str] = None,
test_code: Optional[str] = None,
flag: Optional[ResultFlag] = None,
lab: Optional[str] = None,
flags: list[ResultFlag] = Query(default_factory=list),
from_date: Optional[datetime] = None,
to_date: Optional[datetime] = None,
latest_only: bool = False,
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
session: Session = Depends(get_session),
):
stmt = select(LabResult)
filters = []
if q is not None and q.strip():
query = f"%{q.strip()}%"
filters.append(
or_(
col(LabResult.test_code).ilike(query),
col(LabResult.test_name_original).ilike(query),
)
)
if test_code is not None:
stmt = stmt.where(LabResult.test_code == test_code)
filters.append(LabResult.test_code == test_code)
if flag is not None:
stmt = stmt.where(LabResult.flag == flag)
if lab is not None:
stmt = stmt.where(LabResult.lab == lab)
filters.append(LabResult.flag == flag)
if flags:
filters.append(col(LabResult.flag).in_(flags))
if from_date is not None:
stmt = stmt.where(LabResult.collected_at >= from_date)
filters.append(LabResult.collected_at >= from_date)
if to_date is not None:
stmt = stmt.where(LabResult.collected_at <= to_date)
return session.exec(stmt).all()
filters.append(LabResult.collected_at <= to_date)
if latest_only:
ranked_stmt = select(
col(LabResult.record_id).label("record_id"),
func.row_number()
.over(
partition_by=LabResult.test_code,
order_by=(
col(LabResult.collected_at).desc(),
col(LabResult.created_at).desc(),
col(LabResult.record_id).desc(),
),
)
.label("rank"),
)
if filters:
ranked_stmt = ranked_stmt.where(*filters)
ranked_subquery = ranked_stmt.subquery()
latest_ids = select(ranked_subquery.c.record_id).where(
ranked_subquery.c.rank == 1
)
stmt = select(LabResult).where(col(LabResult.record_id).in_(latest_ids))
count_stmt = select(func.count()).select_from(
select(LabResult.record_id)
.where(col(LabResult.record_id).in_(latest_ids))
.subquery()
)
else:
stmt = select(LabResult)
count_stmt = select(func.count()).select_from(LabResult)
if filters:
stmt = stmt.where(*filters)
count_stmt = count_stmt.where(*filters)
test_code_numeric = cast(
func.replace(col(LabResult.test_code), "-", ""),
Integer,
)
stmt = stmt.order_by(
col(LabResult.collected_at).desc(),
test_code_numeric.asc(),
col(LabResult.record_id).asc(),
)
total = session.exec(count_stmt).one()
items = list(session.exec(stmt.offset(offset).limit(limit)).all())
return LabResultListResponse(items=items, total=total, limit=limit, offset=offset)
@router.post("/lab-results", response_model=LabResult, status_code=201)