diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index feb73fc..61554d4 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -40,6 +40,15 @@ logger = logging.getLogger("ptscrapper.scheduler") STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db") os.makedirs(LOCAL_DATA_DIR, exist_ok=True) +# how many course‐fetch workers to run in parallel +COURSE_WORKER_COUNT = 5 + +# holds (course_id, ts) tuples to fetch +course_queue: asyncio.Queue[tuple[int, datetime]] = asyncio.Queue() + +# tracks which course_ids are already queued or being processed +in_flight_courses: set[int] = set() + def init_db(): """Create the courses table if it doesn't exist.""" @@ -116,10 +125,10 @@ async def positions_job(): await client.aclose() # schedule course fetches without awaiting - course_ids = {p["c"] for p in positions} - for cid in course_ids: - if should_fetch_course(cid): - asyncio.create_task(fetch_and_store_course(cid, ts)) + for cid in {p["c"] for p in positions}: + if should_fetch_course(cid) and cid not in in_flight_courses: + in_flight_courses.add(cid) + await course_queue.put((cid, ts)) async def fetch_and_store_course(course_id: int, ts: datetime, client=None): @@ -253,10 +262,30 @@ async def daily_compact_and_push_course_geometry(): logger.exception("job=daily_compact_and_push_course_geometry_job") +async def course_worker(name: str): + """Continuously process course IDs from the queue.""" + while True: + course_id, ts = await course_queue.get() + try: + await fetch_and_store_course(course_id, ts) + except Exception: + logger.exception( + "worker=%s fetch_and_store_course failed for course=%s", name, course_id + ) + finally: + # done with this course, allow it to be enqueued again (tomorrow or if re-seen) + in_flight_courses.discard(course_id) + course_queue.task_done() + + async def start_scheduler(): """Initialize state DB, schedule all jobs, and start the loop.""" init_db() + # spin up N workers before scheduling jobs + for i in range(COURSE_WORKER_COUNT): + asyncio.create_task(course_worker(f"worker-{i+1}")) + scheduler = AsyncIOScheduler() # every 10 seconds for positions