diff --git a/src/ptscrapper/config.py b/src/ptscrapper/config.py index e0bf8a0..5b6f36f 100644 --- a/src/ptscrapper/config.py +++ b/src/ptscrapper/config.py @@ -18,7 +18,7 @@ AUTH_PASSWORD = os.getenv("PTS_PASSWORD", "g5crehAfUCh4Wust") # HTTPX timeout (in seconds) # You can override any one by setting PTS_TIMEOUT_CONNECT, PTS_TIMEOUT_READ, etc. REQUEST_TIMEOUT = { - "connect": float(os.getenv("PTS_TIMEOUT_CONNECT", "5.0")), + "connect": float(os.getenv("PTS_TIMEOUT_CONNECT", "3.0")), "read": float(os.getenv("PTS_TIMEOUT_READ", "10.0")), "write": float(os.getenv("PTS_TIMEOUT_WRITE", "10.0")), "pool": float(os.getenv("PTS_TIMEOUT_POOL", "5.0")), @@ -35,3 +35,4 @@ LOCAL_DATA_DIR = os.getenv("LOCAL_DATA_DIR") HOST_ID = os.getenv("HOST_ID", socket.gethostname()) RCLONE_REMOTE = os.getenv("RCLONE_REMOTE", "nas") RCLONE_REMOTE_PATH = os.getenv("RCLONE_REMOTE_PATH", "pt-scraper-data") +MAX_COURSE_FETCH_ATTEMPTS = os.getenv("MAX_COURSE_FETCH_ATTEMPTS", 3) diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index 7f84bd3..fd2c53c 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -7,11 +7,12 @@ import sqlite3 from datetime import date, datetime, timedelta, timezone from itertools import islice +import httpx import pyarrow as pa from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger -from .config import LOCAL_DATA_DIR +from .config import LOCAL_DATA_DIR, MAX_COURSE_FETCH_ATTEMPTS from .fetchers import ( fetch_course_posts, fetch_positions, @@ -44,7 +45,7 @@ os.makedirs(LOCAL_DATA_DIR, exist_ok=True) COURSE_WORKER_COUNT = 5 # holds (course_id, ts) tuples to fetch -course_queue: asyncio.Queue[tuple[int, datetime]] = asyncio.Queue() +course_queue: asyncio.Queue[tuple[int, datetime, int]] = asyncio.Queue() # tracks which course_ids are already queued or being processed in_flight_courses: set[int] = set() @@ -128,7 +129,7 @@ async def positions_job(): for cid in {p["c"] for p in positions}: if should_fetch_course(cid) and cid not in in_flight_courses: in_flight_courses.add(cid) - await course_queue.put((cid, ts)) + await course_queue.put((cid, ts, 0)) async def fetch_and_store_course(course_id: int, ts: datetime, client=None): @@ -139,24 +140,28 @@ async def fetch_and_store_course(course_id: int, ts: datetime, client=None): own_client = True try: recs = await fetch_course_posts(client, [course_id]) + # now do your I/O; catch+log only errors here for rec in recs: - # flatten route stops - rows = [] - for stop in rec.get("r", []): - rows.append( - { - "fetch_date": ts, - "course_id": rec["c"], - "stop_id": stop["s"], - "dep_time": stop["t"], - } - ) + rows = [ + { + "fetch_date": ts, + "course_id": rec["c"], + "stop_id": stop["s"], + "dep_time": stop["t"], + } + for stop in rec.get("r", []) + ] table = pa.Table.from_pylist(rows) write_course_posts(table, rec["c"], ts) record_course_fetched(rec["c"], ts) write_course_geometry(rec["p"], rec["c"], ts) logger.info("wrote course %s", course_id) - except Exception: + except Exception as e: + # if it's not a ConnectTimeout (which the worker handles), log + from httpx import ConnectTimeout + + if isinstance(e, ConnectTimeout): + raise logger.exception("job=course_posts_job") finally: if own_client: @@ -226,9 +231,31 @@ async def daily_compact_and_push_course_geometry(): async def course_worker(name: str): """Continuously process course IDs from the queue.""" while True: - course_id, ts = await course_queue.get() + course_id, ts, attempts = await course_queue.get() try: await fetch_and_store_course(course_id, ts) + logger.info("worker=%s succeeded for course=%s", name, course_id) + except httpx.ConnectTimeout: + if attempts < MAX_COURSE_FETCH_ATTEMPTS: + backoff = 2**attempts + logger.warning( + "worker=%s timeout on course=%s, retry %s/%s after %ss", + name, + course_id, + attempts + 1, + MAX_COURSE_FETCH_ATTEMPTS, + backoff, + ) + await asyncio.sleep(backoff) + # requeue with incremented attempts + await course_queue.put((course_id, ts, attempts + 1)) + else: + logger.error( + "worker=%s giving up on course=%s after %s attempts", + name, + course_id, + MAX_COURSE_FETCH_ATTEMPTS, + ) except Exception: logger.exception( "worker=%s fetch_and_store_course failed for course=%s", name, course_id