use shared httpx client
This commit is contained in:
parent
3469511bd8
commit
4fca094f89
1 changed files with 14 additions and 19 deletions
|
|
@ -38,6 +38,7 @@ from .storage import (
|
|||
)
|
||||
|
||||
logger = logging.getLogger("ptscrapper.scheduler")
|
||||
_client: httpx.AsyncClient = None
|
||||
|
||||
# Path to our local SQLite state database
|
||||
STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db")
|
||||
|
|
@ -111,9 +112,8 @@ def chunked(iterable, size):
|
|||
async def positions_job():
|
||||
fetch_ts = datetime.now(timezone.utc) # when *we* fetched
|
||||
|
||||
client = get_async_client()
|
||||
try:
|
||||
data = await fetch_positions(client)
|
||||
data = await fetch_positions(_client)
|
||||
# 2) API‐reported timestamp
|
||||
server_ts_str, *positions = data
|
||||
# server_ts is local to Warsaw
|
||||
|
|
@ -139,8 +139,6 @@ async def positions_job():
|
|||
except Exception:
|
||||
# record error details
|
||||
logger.exception("job=positions_job")
|
||||
finally:
|
||||
await client.aclose()
|
||||
|
||||
# schedule course fetches without awaiting
|
||||
for cid in {p["c"] for p in positions}:
|
||||
|
|
@ -149,14 +147,10 @@ async def positions_job():
|
|||
await course_queue.put((cid, server_ts, 0))
|
||||
|
||||
|
||||
async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
|
||||
async def fetch_and_store_course(course_id: int, ts: datetime):
|
||||
"""Fetch one course’s details and store them."""
|
||||
own_client = False
|
||||
if client is None:
|
||||
client = get_async_client()
|
||||
own_client = True
|
||||
try:
|
||||
recs = await fetch_course_posts(client, [course_id])
|
||||
recs = await fetch_course_posts(_client, [course_id])
|
||||
# now do your I/O; catch+log only errors here
|
||||
for rec in recs:
|
||||
rows = []
|
||||
|
|
@ -186,17 +180,13 @@ async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
|
|||
if isinstance(e, ConnectTimeout):
|
||||
raise
|
||||
logger.exception("job=course_posts_job")
|
||||
finally:
|
||||
if own_client:
|
||||
await client.aclose()
|
||||
|
||||
|
||||
async def daily_snapshot_job():
|
||||
ts = datetime.now(timezone.utc)
|
||||
client = get_async_client()
|
||||
try:
|
||||
# stops
|
||||
posts = await fetch_posts(client)
|
||||
posts = await fetch_posts(_client)
|
||||
rows = []
|
||||
for grp in posts:
|
||||
# grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] }
|
||||
|
|
@ -218,7 +208,7 @@ async def daily_snapshot_job():
|
|||
push_to_nas(fp)
|
||||
|
||||
# lines per stop
|
||||
posts_lines = await fetch_posts_lines(client)
|
||||
posts_lines = await fetch_posts_lines(_client)
|
||||
table2 = pa.Table.from_pylist(posts_lines)
|
||||
fp2 = write_posts_lines(table2, ts)
|
||||
push_to_nas(fp2)
|
||||
|
|
@ -226,8 +216,6 @@ async def daily_snapshot_job():
|
|||
logger.info("wrote daily snapshot for %s", ts)
|
||||
except Exception:
|
||||
logger.exception("job=daily_snapshot_job")
|
||||
finally:
|
||||
await client.aclose()
|
||||
|
||||
|
||||
async def daily_compact_and_push():
|
||||
|
|
@ -317,6 +305,9 @@ async def start_scheduler():
|
|||
"""Initialize state DB, schedule all jobs, and start the loop."""
|
||||
init_db()
|
||||
|
||||
global _client
|
||||
_client = get_async_client()
|
||||
|
||||
# spin up N workers before scheduling jobs
|
||||
for i in range(COURSE_WORKER_COUNT):
|
||||
asyncio.create_task(course_worker(f"worker-{i+1}"))
|
||||
|
|
@ -340,4 +331,8 @@ async def start_scheduler():
|
|||
)
|
||||
|
||||
scheduler.start()
|
||||
await asyncio.Event().wait()
|
||||
|
||||
try:
|
||||
await asyncio.Event().wait()
|
||||
finally:
|
||||
await _client.aclose()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue