# src/ptscraper/scheduler.py import asyncio import logging import os import sqlite3 from datetime import date, datetime, timedelta, timezone from itertools import islice import pyarrow as pa from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from .config import LOCAL_DATA_DIR from .fetchers import ( fetch_course_posts, fetch_positions, fetch_posts, fetch_posts_lines, get_async_client, ) from .storage import ( cleanup_small_course_geometry, cleanup_small_course_posts, cleanup_small_positions, compact_course_geometry, compact_course_posts, compact_positions, push_to_nas, write_course_geometry, write_course_posts, write_positions, write_posts_lines, write_stops, ) logger = logging.getLogger("ptscrapper.scheduler") # Path to our local SQLite state database STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db") os.makedirs(LOCAL_DATA_DIR, exist_ok=True) def init_db(): """Create the courses table if it doesn't exist.""" conn = sqlite3.connect(STATE_DB) c = conn.cursor() c.execute( """ CREATE TABLE IF NOT EXISTS courses ( course_id INTEGER PRIMARY KEY, last_fetched DATE ) """ ) conn.commit() conn.close() def record_course_fetched(course_id: int, fetched_dt: datetime): """Upsert the last_fetched date for a course.""" conn = sqlite3.connect(STATE_DB) c = conn.cursor() c.execute( """ INSERT INTO courses(course_id, last_fetched) VALUES (?, ?) ON CONFLICT(course_id) DO UPDATE SET last_fetched=excluded.last_fetched """, (course_id, fetched_dt.date().isoformat()), ) conn.commit() conn.close() def should_fetch_course(course_id: int) -> bool: """Return True if we have not fetched this course today.""" conn = sqlite3.connect(STATE_DB) c = conn.cursor() c.execute("SELECT last_fetched FROM courses WHERE course_id = ?", (course_id,)) row = c.fetchone() conn.close() if row is None: return True last = date.fromisoformat(row[0]) return last < date.today() def chunked(iterable, size): """Yield successive chunks of `size` from `iterable`.""" it = iter(iterable) while True: batch = list(islice(it, size)) if not batch: break yield batch async def positions_job(): ts = datetime.now(timezone.utc) client = get_async_client() try: data = await fetch_positions(client) # first element is timestamp string ts_str, *positions = data ts = datetime.fromisoformat(ts_str) # Build Arrow table table = pa.Table.from_pylist(positions) # write locally only write_positions(table, ts) logger.info("wrote positions shard for %s", ts) except Exception: # record error details logger.exception("job=positions_job") finally: await client.aclose() # schedule course fetches without awaiting course_ids = {p["c"] for p in positions} for cid in course_ids: if should_fetch_course(cid): asyncio.create_task(fetch_and_store_course(cid, ts)) async def fetch_and_store_course(course_id: int, ts: datetime, client=None): """Fetch one course’s details and store them.""" own_client = False if client is None: client = get_async_client() own_client = True try: recs = await fetch_course_posts(client, [course_id]) for rec in recs: # flatten route stops rows = [] for stop in rec.get("r", []): rows.append( { "fetch_date": ts, "course_id": rec["c"], "stop_id": stop["s"], "dep_time": stop["t"], } ) table = pa.Table.from_pylist(rows) write_course_posts(table, rec["c"], ts) record_course_fetched(rec["c"], ts) write_course_geometry(rec["p"], rec["c"], ts) logger.info("wrote course %s", course_id) except Exception: logger.exception("job=course_posts_job") finally: if own_client: await client.aclose() async def daily_snapshot_job(): ts = datetime.now(timezone.utc) client = get_async_client() try: # stops posts = await fetch_posts(client) table = pa.Table.from_pylist(posts) fp = write_stops(table, ts) push_to_nas(fp) # lines per stop posts_lines = await fetch_posts_lines(client) table2 = pa.Table.from_pylist(posts_lines) fp2 = write_posts_lines(table2, ts) push_to_nas(fp2) logger.info("wrote daily snapshot for %s", ts) except Exception: logger.exception("job=daily_snapshot_job") finally: await client.aclose() async def daily_course_sweep_job(): """Once-a-day refetch of all known courses to detect changes.""" ts = datetime.now(timezone.utc) # read all course_ids conn = sqlite3.connect(STATE_DB) c = conn.cursor() c.execute("SELECT course_id FROM courses") course_ids = [row[0] for row in c.fetchall()] conn.close() client = get_async_client() try: # fetch in batches of 10 for batch in chunked(course_ids, 10): try: recs = await fetch_course_posts(client, batch) for rec in recs: # flatten & store, same as in fetch_and_store_course rows = [ { "fetch_date": ts, "course_id": rec["c"], "stop_id": stop["s"], "dep_time": stop["t"], } for stop in rec.get("r", []) ] table = pa.Table.from_pylist(rows) write_course_posts(table, rec["c"], ts) record_course_fetched(rec["c"], ts) write_course_geometry(rec["p"], rec["c"], ts) logger.info("wrote course %s during daily sweep", rec["c"]) except Exception: logger.exception("job=daily_course_sweep_job") finally: await client.aclose() async def daily_compact_and_push(): # compact yesterday’s positions once/day yesterday = date.today() - timedelta(days=1) try: big_fp = compact_positions(yesterday) push_to_nas(big_fp) # if you want to reclaim local space: cleanup_small_positions(yesterday) logger.info("wrote positions compacted file for %s", yesterday) except Exception: logger.exception("job=daily_compact_and_push_job") async def daily_compact_and_push_courses(): """Merge yesterday’s course-posts, push, then clean up.""" yesterday = date.today() - timedelta(days=1) try: big_fp = compact_course_posts(yesterday) push_to_nas(big_fp) cleanup_small_course_posts(yesterday) logger.info("wrote course compacted file for %s", yesterday) except Exception: logger.exception("job=daily_compact_and_push_courses_job") async def daily_compact_and_push_course_geometry(): """Merge yesterday’s course geometries, push, then clean up.""" yesterday = date.today() - timedelta(days=1) try: big_fp = compact_course_geometry(yesterday) push_to_nas(big_fp) cleanup_small_course_geometry(yesterday) logger.info("wrote course geometry compacted file for %s", yesterday) except Exception: logger.exception("job=daily_compact_and_push_course_geometry_job") async def start_scheduler(): """Initialize state DB, schedule all jobs, and start the loop.""" init_db() scheduler = AsyncIOScheduler() # every 10 seconds for positions scheduler.add_job( positions_job, trigger="interval", seconds=10, next_run_time=datetime.now(timezone.utc), ) # daily data jobs (UTC times here) scheduler.add_job(daily_snapshot_job, CronTrigger(hour=2, minute=0)) scheduler.add_job(daily_course_sweep_job, CronTrigger(hour=3, minute=0)) scheduler.add_job(daily_compact_and_push, CronTrigger(hour=4, minute=0)) scheduler.add_job(daily_compact_and_push_courses, CronTrigger(hour=5, minute=0)) scheduler.add_job( daily_compact_and_push_course_geometry, CronTrigger(hour=6, minute=0) ) scheduler.start() await asyncio.Event().wait()