add retries to course fetching

This commit is contained in:
Piotr Oleszczyk 2025-05-11 13:17:07 +02:00
parent 36f3f8e50a
commit 823ca4c259
2 changed files with 45 additions and 17 deletions

View file

@ -18,7 +18,7 @@ AUTH_PASSWORD = os.getenv("PTS_PASSWORD", "g5crehAfUCh4Wust")
# HTTPX timeout (in seconds) # HTTPX timeout (in seconds)
# You can override any one by setting PTS_TIMEOUT_CONNECT, PTS_TIMEOUT_READ, etc. # You can override any one by setting PTS_TIMEOUT_CONNECT, PTS_TIMEOUT_READ, etc.
REQUEST_TIMEOUT = { REQUEST_TIMEOUT = {
"connect": float(os.getenv("PTS_TIMEOUT_CONNECT", "5.0")), "connect": float(os.getenv("PTS_TIMEOUT_CONNECT", "3.0")),
"read": float(os.getenv("PTS_TIMEOUT_READ", "10.0")), "read": float(os.getenv("PTS_TIMEOUT_READ", "10.0")),
"write": float(os.getenv("PTS_TIMEOUT_WRITE", "10.0")), "write": float(os.getenv("PTS_TIMEOUT_WRITE", "10.0")),
"pool": float(os.getenv("PTS_TIMEOUT_POOL", "5.0")), "pool": float(os.getenv("PTS_TIMEOUT_POOL", "5.0")),
@ -35,3 +35,4 @@ LOCAL_DATA_DIR = os.getenv("LOCAL_DATA_DIR")
HOST_ID = os.getenv("HOST_ID", socket.gethostname()) HOST_ID = os.getenv("HOST_ID", socket.gethostname())
RCLONE_REMOTE = os.getenv("RCLONE_REMOTE", "nas") RCLONE_REMOTE = os.getenv("RCLONE_REMOTE", "nas")
RCLONE_REMOTE_PATH = os.getenv("RCLONE_REMOTE_PATH", "pt-scraper-data") RCLONE_REMOTE_PATH = os.getenv("RCLONE_REMOTE_PATH", "pt-scraper-data")
MAX_COURSE_FETCH_ATTEMPTS = os.getenv("MAX_COURSE_FETCH_ATTEMPTS", 3)

View file

@ -7,11 +7,12 @@ import sqlite3
from datetime import date, datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
from itertools import islice from itertools import islice
import httpx
import pyarrow as pa import pyarrow as pa
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
from .config import LOCAL_DATA_DIR from .config import LOCAL_DATA_DIR, MAX_COURSE_FETCH_ATTEMPTS
from .fetchers import ( from .fetchers import (
fetch_course_posts, fetch_course_posts,
fetch_positions, fetch_positions,
@ -44,7 +45,7 @@ os.makedirs(LOCAL_DATA_DIR, exist_ok=True)
COURSE_WORKER_COUNT = 5 COURSE_WORKER_COUNT = 5
# holds (course_id, ts) tuples to fetch # holds (course_id, ts) tuples to fetch
course_queue: asyncio.Queue[tuple[int, datetime]] = asyncio.Queue() course_queue: asyncio.Queue[tuple[int, datetime, int]] = asyncio.Queue()
# tracks which course_ids are already queued or being processed # tracks which course_ids are already queued or being processed
in_flight_courses: set[int] = set() in_flight_courses: set[int] = set()
@ -128,7 +129,7 @@ async def positions_job():
for cid in {p["c"] for p in positions}: for cid in {p["c"] for p in positions}:
if should_fetch_course(cid) and cid not in in_flight_courses: if should_fetch_course(cid) and cid not in in_flight_courses:
in_flight_courses.add(cid) in_flight_courses.add(cid)
await course_queue.put((cid, ts)) await course_queue.put((cid, ts, 0))
async def fetch_and_store_course(course_id: int, ts: datetime, client=None): async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
@ -139,24 +140,28 @@ async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
own_client = True own_client = True
try: try:
recs = await fetch_course_posts(client, [course_id]) recs = await fetch_course_posts(client, [course_id])
# now do your I/O; catch+log only errors here
for rec in recs: for rec in recs:
# flatten route stops rows = [
rows = [] {
for stop in rec.get("r", []): "fetch_date": ts,
rows.append( "course_id": rec["c"],
{ "stop_id": stop["s"],
"fetch_date": ts, "dep_time": stop["t"],
"course_id": rec["c"], }
"stop_id": stop["s"], for stop in rec.get("r", [])
"dep_time": stop["t"], ]
}
)
table = pa.Table.from_pylist(rows) table = pa.Table.from_pylist(rows)
write_course_posts(table, rec["c"], ts) write_course_posts(table, rec["c"], ts)
record_course_fetched(rec["c"], ts) record_course_fetched(rec["c"], ts)
write_course_geometry(rec["p"], rec["c"], ts) write_course_geometry(rec["p"], rec["c"], ts)
logger.info("wrote course %s", course_id) logger.info("wrote course %s", course_id)
except Exception: except Exception as e:
# if it's not a ConnectTimeout (which the worker handles), log
from httpx import ConnectTimeout
if isinstance(e, ConnectTimeout):
raise
logger.exception("job=course_posts_job") logger.exception("job=course_posts_job")
finally: finally:
if own_client: if own_client:
@ -226,9 +231,31 @@ async def daily_compact_and_push_course_geometry():
async def course_worker(name: str): async def course_worker(name: str):
"""Continuously process course IDs from the queue.""" """Continuously process course IDs from the queue."""
while True: while True:
course_id, ts = await course_queue.get() course_id, ts, attempts = await course_queue.get()
try: try:
await fetch_and_store_course(course_id, ts) await fetch_and_store_course(course_id, ts)
logger.info("worker=%s succeeded for course=%s", name, course_id)
except httpx.ConnectTimeout:
if attempts < MAX_COURSE_FETCH_ATTEMPTS:
backoff = 2**attempts
logger.warning(
"worker=%s timeout on course=%s, retry %s/%s after %ss",
name,
course_id,
attempts + 1,
MAX_COURSE_FETCH_ATTEMPTS,
backoff,
)
await asyncio.sleep(backoff)
# requeue with incremented attempts
await course_queue.put((course_id, ts, attempts + 1))
else:
logger.error(
"worker=%s giving up on course=%s after %s attempts",
name,
course_id,
MAX_COURSE_FETCH_ATTEMPTS,
)
except Exception: except Exception:
logger.exception( logger.exception(
"worker=%s fetch_and_store_course failed for course=%s", name, course_id "worker=%s fetch_and_store_course failed for course=%s", name, course_id