initial commit

This commit is contained in:
Piotr Oleszczyk 2025-05-10 17:41:31 +02:00
commit c6d1d51b00
11 changed files with 1214 additions and 0 deletions

281
src/ptscrapper/scheduler.py Normal file
View file

@ -0,0 +1,281 @@
# src/ptscraper/scheduler.py
import asyncio
import logging
import os
import sqlite3
from datetime import date, datetime, timedelta, timezone
from itertools import islice
import pyarrow as pa
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from .config import LOCAL_DATA_DIR
from .fetchers import (
fetch_course_posts,
fetch_positions,
fetch_posts,
fetch_posts_lines,
get_async_client,
)
from .storage import (
cleanup_small_course_geometry,
cleanup_small_course_posts,
cleanup_small_positions,
compact_course_geometry,
compact_course_posts,
compact_positions,
push_to_nas,
write_course_geometry,
write_course_posts,
write_positions,
write_posts_lines,
write_stops,
)
logger = logging.getLogger("ptscrapper.scheduler")
# Path to our local SQLite state database
STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db")
os.makedirs(LOCAL_DATA_DIR, exist_ok=True)
def init_db():
"""Create the courses table if it doesn't exist."""
conn = sqlite3.connect(STATE_DB)
c = conn.cursor()
c.execute(
"""
CREATE TABLE IF NOT EXISTS courses (
course_id INTEGER PRIMARY KEY,
last_fetched DATE
)
"""
)
conn.commit()
conn.close()
def record_course_fetched(course_id: int, fetched_dt: datetime):
"""Upsert the last_fetched date for a course."""
conn = sqlite3.connect(STATE_DB)
c = conn.cursor()
c.execute(
"""
INSERT INTO courses(course_id, last_fetched)
VALUES (?, ?)
ON CONFLICT(course_id) DO UPDATE SET last_fetched=excluded.last_fetched
""",
(course_id, fetched_dt.date().isoformat()),
)
conn.commit()
conn.close()
def should_fetch_course(course_id: int) -> bool:
"""Return True if we have not fetched this course today."""
conn = sqlite3.connect(STATE_DB)
c = conn.cursor()
c.execute("SELECT last_fetched FROM courses WHERE course_id = ?", (course_id,))
row = c.fetchone()
conn.close()
if row is None:
return True
last = date.fromisoformat(row[0])
return last < date.today()
def chunked(iterable, size):
"""Yield successive chunks of `size` from `iterable`."""
it = iter(iterable)
while True:
batch = list(islice(it, size))
if not batch:
break
yield batch
async def positions_job():
ts = datetime.now(timezone.utc)
client = get_async_client()
try:
data = await fetch_positions(client)
# first element is timestamp string
ts_str, *positions = data
ts = datetime.fromisoformat(ts_str)
# Build Arrow table
table = pa.Table.from_pylist(positions)
# write locally only
write_positions(table, ts)
# Schedule immediate fetch for any new course IDs
course_ids = {p["c"] for p in positions}
for cid in course_ids:
if should_fetch_course(cid):
await fetch_and_store_course(cid, ts, client)
logger.info("wrote positions shard for %s", ts)
except Exception:
# record error details
logger.exception("job=positions_job")
finally:
await client.aclose()
async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
"""Fetch one courses details and store them."""
own_client = False
if client is None:
client = get_async_client()
own_client = True
try:
recs = await fetch_course_posts(client, [course_id])
for rec in recs:
# flatten route stops
rows = []
for stop in rec.get("r", []):
rows.append(
{
"fetch_date": ts,
"course_id": rec["c"],
"stop_id": stop["s"],
"dep_time": stop["t"],
}
)
table = pa.Table.from_pylist(rows)
write_course_posts(table, rec["c"], ts)
record_course_fetched(rec["c"], ts)
write_course_geometry(rec["p"], rec["c"], ts)
logger.info("wrote course %s", course_id)
except Exception:
logger.exception("job=course_posts_job")
finally:
if own_client:
await client.aclose()
async def daily_snapshot_job():
ts = datetime.now(timezone.utc)
client = get_async_client()
try:
# stops
posts = await fetch_posts(client)
table = pa.Table.from_pylist(posts)
fp = write_stops(table, ts)
push_to_nas(fp)
# lines per stop
posts_lines = await fetch_posts_lines(client)
table2 = pa.Table.from_pylist(posts_lines)
fp2 = write_posts_lines(table2, ts)
push_to_nas(fp2)
logger.info("wrote daily snapshot for %s", ts)
except Exception:
logger.exception("job=daily_snapshot_job")
finally:
await client.aclose()
async def daily_course_sweep_job():
"""Once-a-day refetch of all known courses to detect changes."""
ts = datetime.now(timezone.utc)
# read all course_ids
conn = sqlite3.connect(STATE_DB)
c = conn.cursor()
c.execute("SELECT course_id FROM courses")
course_ids = [row[0] for row in c.fetchall()]
conn.close()
client = get_async_client()
try:
# fetch in batches of 10
for batch in chunked(course_ids, 10):
try:
recs = await fetch_course_posts(client, batch)
for rec in recs:
# flatten & store, same as in fetch_and_store_course
rows = [
{
"fetch_date": ts,
"course_id": rec["c"],
"stop_id": stop["s"],
"dep_time": stop["t"],
}
for stop in rec.get("r", [])
]
table = pa.Table.from_pylist(rows)
write_course_posts(table, rec["c"], ts)
record_course_fetched(rec["c"], ts)
write_course_geometry(rec["p"], rec["c"], ts)
logger.info("wrote course %s during daily sweep", rec["c"])
except Exception:
logger.exception("job=daily_course_sweep_job")
finally:
await client.aclose()
async def daily_compact_and_push():
# compact yesterdays positions once/day
yesterday = date.today() - timedelta(days=1)
try:
big_fp = compact_positions(yesterday)
push_to_nas(big_fp)
# if you want to reclaim local space:
cleanup_small_positions(yesterday)
logger.info("wrote positions compacted file for %s", yesterday)
except Exception:
logger.exception("job=daily_compact_and_push_job")
async def daily_compact_and_push_courses():
"""Merge yesterdays course-posts, push, then clean up."""
yesterday = date.today() - timedelta(days=1)
try:
big_fp = compact_course_posts(yesterday)
push_to_nas(big_fp)
cleanup_small_course_posts(yesterday)
logger.info("wrote course compacted file for %s", yesterday)
except Exception:
logger.exception("job=daily_compact_and_push_courses_job")
async def daily_compact_and_push_course_geometry():
"""Merge yesterdays course geometries, push, then clean up."""
yesterday = date.today() - timedelta(days=1)
try:
big_fp = compact_course_geometry(yesterday)
push_to_nas(big_fp)
cleanup_small_course_geometry(yesterday)
logger.info("wrote course geometry compacted file for %s", yesterday)
except Exception:
logger.exception("job=daily_compact_and_push_course_geometry_job")
async def start_scheduler():
"""Initialize state DB, schedule all jobs, and start the loop."""
init_db()
scheduler = AsyncIOScheduler()
# every 10 seconds for positions
scheduler.add_job(
positions_job,
trigger="interval",
seconds=10,
next_run_time=datetime.now(timezone.utc),
)
# daily data jobs (UTC times here)
scheduler.add_job(daily_snapshot_job, CronTrigger(hour=2, minute=0))
scheduler.add_job(daily_course_sweep_job, CronTrigger(hour=3, minute=0))
scheduler.add_job(daily_compact_and_push, CronTrigger(hour=4, minute=0))
scheduler.add_job(daily_compact_and_push_courses, CronTrigger(hour=5, minute=0))
scheduler.add_job(
daily_compact_and_push_course_geometry, CronTrigger(hour=6, minute=0)
)
scheduler.start()
await asyncio.Event().wait()