add queue for course fetching
This commit is contained in:
parent
8975c158a5
commit
746b569ad8
1 changed files with 33 additions and 4 deletions
|
|
@ -40,6 +40,15 @@ logger = logging.getLogger("ptscrapper.scheduler")
|
||||||
STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db")
|
STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db")
|
||||||
os.makedirs(LOCAL_DATA_DIR, exist_ok=True)
|
os.makedirs(LOCAL_DATA_DIR, exist_ok=True)
|
||||||
|
|
||||||
|
# how many course‐fetch workers to run in parallel
|
||||||
|
COURSE_WORKER_COUNT = 5
|
||||||
|
|
||||||
|
# holds (course_id, ts) tuples to fetch
|
||||||
|
course_queue: asyncio.Queue[tuple[int, datetime]] = asyncio.Queue()
|
||||||
|
|
||||||
|
# tracks which course_ids are already queued or being processed
|
||||||
|
in_flight_courses: set[int] = set()
|
||||||
|
|
||||||
|
|
||||||
def init_db():
|
def init_db():
|
||||||
"""Create the courses table if it doesn't exist."""
|
"""Create the courses table if it doesn't exist."""
|
||||||
|
|
@ -116,10 +125,10 @@ async def positions_job():
|
||||||
await client.aclose()
|
await client.aclose()
|
||||||
|
|
||||||
# schedule course fetches without awaiting
|
# schedule course fetches without awaiting
|
||||||
course_ids = {p["c"] for p in positions}
|
for cid in {p["c"] for p in positions}:
|
||||||
for cid in course_ids:
|
if should_fetch_course(cid) and cid not in in_flight_courses:
|
||||||
if should_fetch_course(cid):
|
in_flight_courses.add(cid)
|
||||||
asyncio.create_task(fetch_and_store_course(cid, ts))
|
await course_queue.put((cid, ts))
|
||||||
|
|
||||||
|
|
||||||
async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
|
async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
|
||||||
|
|
@ -253,10 +262,30 @@ async def daily_compact_and_push_course_geometry():
|
||||||
logger.exception("job=daily_compact_and_push_course_geometry_job")
|
logger.exception("job=daily_compact_and_push_course_geometry_job")
|
||||||
|
|
||||||
|
|
||||||
|
async def course_worker(name: str):
|
||||||
|
"""Continuously process course IDs from the queue."""
|
||||||
|
while True:
|
||||||
|
course_id, ts = await course_queue.get()
|
||||||
|
try:
|
||||||
|
await fetch_and_store_course(course_id, ts)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"worker=%s fetch_and_store_course failed for course=%s", name, course_id
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
# done with this course, allow it to be enqueued again (tomorrow or if re-seen)
|
||||||
|
in_flight_courses.discard(course_id)
|
||||||
|
course_queue.task_done()
|
||||||
|
|
||||||
|
|
||||||
async def start_scheduler():
|
async def start_scheduler():
|
||||||
"""Initialize state DB, schedule all jobs, and start the loop."""
|
"""Initialize state DB, schedule all jobs, and start the loop."""
|
||||||
init_db()
|
init_db()
|
||||||
|
|
||||||
|
# spin up N workers before scheduling jobs
|
||||||
|
for i in range(COURSE_WORKER_COUNT):
|
||||||
|
asyncio.create_task(course_worker(f"worker-{i+1}"))
|
||||||
|
|
||||||
scheduler = AsyncIOScheduler()
|
scheduler = AsyncIOScheduler()
|
||||||
|
|
||||||
# every 10 seconds for positions
|
# every 10 seconds for positions
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue