diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index 66267d4..bdc8f91 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -38,6 +38,7 @@ from .storage import ( ) logger = logging.getLogger("ptscrapper.scheduler") +_client: httpx.AsyncClient = None # Path to our local SQLite state database STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db") @@ -111,9 +112,8 @@ def chunked(iterable, size): async def positions_job(): fetch_ts = datetime.now(timezone.utc) # when *we* fetched - client = get_async_client() try: - data = await fetch_positions(client) + data = await fetch_positions(_client) # 2) API‐reported timestamp server_ts_str, *positions = data # server_ts is local to Warsaw @@ -139,8 +139,6 @@ async def positions_job(): except Exception: # record error details logger.exception("job=positions_job") - finally: - await client.aclose() # schedule course fetches without awaiting for cid in {p["c"] for p in positions}: @@ -149,14 +147,10 @@ async def positions_job(): await course_queue.put((cid, server_ts, 0)) -async def fetch_and_store_course(course_id: int, ts: datetime, client=None): +async def fetch_and_store_course(course_id: int, ts: datetime): """Fetch one course’s details and store them.""" - own_client = False - if client is None: - client = get_async_client() - own_client = True try: - recs = await fetch_course_posts(client, [course_id]) + recs = await fetch_course_posts(_client, [course_id]) # now do your I/O; catch+log only errors here for rec in recs: rows = [] @@ -186,17 +180,13 @@ async def fetch_and_store_course(course_id: int, ts: datetime, client=None): if isinstance(e, ConnectTimeout): raise logger.exception("job=course_posts_job") - finally: - if own_client: - await client.aclose() async def daily_snapshot_job(): ts = datetime.now(timezone.utc) - client = get_async_client() try: # stops - posts = await fetch_posts(client) + posts = await fetch_posts(_client) rows = [] for grp in posts: # grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] } @@ -218,7 +208,7 @@ async def daily_snapshot_job(): push_to_nas(fp) # lines per stop - posts_lines = await fetch_posts_lines(client) + posts_lines = await fetch_posts_lines(_client) table2 = pa.Table.from_pylist(posts_lines) fp2 = write_posts_lines(table2, ts) push_to_nas(fp2) @@ -226,8 +216,6 @@ async def daily_snapshot_job(): logger.info("wrote daily snapshot for %s", ts) except Exception: logger.exception("job=daily_snapshot_job") - finally: - await client.aclose() async def daily_compact_and_push(): @@ -317,6 +305,9 @@ async def start_scheduler(): """Initialize state DB, schedule all jobs, and start the loop.""" init_db() + global _client + _client = get_async_client() + # spin up N workers before scheduling jobs for i in range(COURSE_WORKER_COUNT): asyncio.create_task(course_worker(f"worker-{i+1}")) @@ -340,4 +331,8 @@ async def start_scheduler(): ) scheduler.start() - await asyncio.Event().wait() + + try: + await asyncio.Event().wait() + finally: + await _client.aclose()