diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index c28a2d2..66267d4 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -32,13 +32,12 @@ from .storage import ( push_to_nas, write_course_geometry, write_course_posts, - write_positions_batched, + write_positions, write_posts_lines, write_stops, ) logger = logging.getLogger("ptscrapper.scheduler") -_client: httpx.AsyncClient = None # Path to our local SQLite state database STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db") @@ -112,8 +111,9 @@ def chunked(iterable, size): async def positions_job(): fetch_ts = datetime.now(timezone.utc) # when *we* fetched + client = get_async_client() try: - data = await fetch_positions(_client) + data = await fetch_positions(client) # 2) API‐reported timestamp server_ts_str, *positions = data # server_ts is local to Warsaw @@ -134,11 +134,13 @@ async def positions_job(): table = pa.Table.from_pylist(enriched) # 4) pass both timestamps through - write_positions_batched(table, server_ts) + write_positions(table, server_ts) logger.info("wrote positions shard for %s", server_ts) except Exception: # record error details logger.exception("job=positions_job") + finally: + await client.aclose() # schedule course fetches without awaiting for cid in {p["c"] for p in positions}: @@ -147,10 +149,14 @@ async def positions_job(): await course_queue.put((cid, server_ts, 0)) -async def fetch_and_store_course(course_id: int, ts: datetime): +async def fetch_and_store_course(course_id: int, ts: datetime, client=None): """Fetch one course’s details and store them.""" + own_client = False + if client is None: + client = get_async_client() + own_client = True try: - recs = await fetch_course_posts(_client, [course_id]) + recs = await fetch_course_posts(client, [course_id]) # now do your I/O; catch+log only errors here for rec in recs: rows = [] @@ -180,13 +186,17 @@ async def fetch_and_store_course(course_id: int, ts: datetime): if isinstance(e, ConnectTimeout): raise logger.exception("job=course_posts_job") + finally: + if own_client: + await client.aclose() async def daily_snapshot_job(): ts = datetime.now(timezone.utc) + client = get_async_client() try: # stops - posts = await fetch_posts(_client) + posts = await fetch_posts(client) rows = [] for grp in posts: # grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] } @@ -208,7 +218,7 @@ async def daily_snapshot_job(): push_to_nas(fp) # lines per stop - posts_lines = await fetch_posts_lines(_client) + posts_lines = await fetch_posts_lines(client) table2 = pa.Table.from_pylist(posts_lines) fp2 = write_posts_lines(table2, ts) push_to_nas(fp2) @@ -216,6 +226,8 @@ async def daily_snapshot_job(): logger.info("wrote daily snapshot for %s", ts) except Exception: logger.exception("job=daily_snapshot_job") + finally: + await client.aclose() async def daily_compact_and_push(): @@ -305,9 +317,6 @@ async def start_scheduler(): """Initialize state DB, schedule all jobs, and start the loop.""" init_db() - global _client - _client = get_async_client() - # spin up N workers before scheduling jobs for i in range(COURSE_WORKER_COUNT): asyncio.create_task(course_worker(f"worker-{i+1}")) @@ -331,8 +340,4 @@ async def start_scheduler(): ) scheduler.start() - - try: - await asyncio.Event().wait() - finally: - await _client.aclose() + await asyncio.Event().wait() diff --git a/src/ptscrapper/storage.py b/src/ptscrapper/storage.py index 710abcb..aa22a25 100644 --- a/src/ptscrapper/storage.py +++ b/src/ptscrapper/storage.py @@ -2,7 +2,6 @@ import glob import os -import threading from datetime import date, datetime import geoarrow.pyarrow as ga @@ -19,64 +18,29 @@ from .config import ( # base path on the remote, e.g. "pt-scraper-data" RCLONE_REMOTE_PATH, ) -# module‐level state for our rolling 10-min writer -_positions_writer = None # pq.ParquetWriter -_positions_batch = None # datetime for current batch start -_positions_path = None # where we’re writing right now -_positions_lock = threading.Lock() - def _ensure_dir(path: str): """Make sure the directory exists.""" os.makedirs(path, exist_ok=True) -def write_positions_batched(table: pa.Table, server_ts: datetime) -> None: +def write_positions( + table: pa.Table, + server_ts: datetime, +) -> str: """ - Append `table` into a rolling Parquet file, one file per 10-minute window. - When the window rolls over, we close+push the old file and start a new one. + Write out the raw positions (including x,y, server_ts, fetched_ts). + Geometry will be added later during daily compaction. """ - global _positions_writer, _positions_batch, _positions_path - # compute the ten-minute bucket start (floor to minute//10 * 10) - batch_start = server_ts.replace( - minute=(server_ts.minute // 10) * 10, second=0, microsecond=0 - ) + subdir = server_ts.strftime("positions/%Y/%m/%d/%H") + local_dir = os.path.join(LOCAL_DATA_DIR, subdir) + _ensure_dir(local_dir) - with _positions_lock: - # if we’ve moved into a new bucket, close & push the old - if _positions_batch is None or batch_start != _positions_batch: - if _positions_writer is not None: - _positions_writer.close() - # push the completed file - rel = os.path.relpath(_positions_path, LOCAL_DATA_DIR).replace( - os.sep, "/" - ) - rclone.copy( - _positions_path, - f"{RCLONE_REMOTE}:{RCLONE_REMOTE_PATH}/{rel}", - ignore_existing=True, - ) - - # start a new writer for the new bucket - subdir = batch_start.strftime("positions/%Y/%m/%d/%H") - local_dir = os.path.join(LOCAL_DATA_DIR, subdir) - _ensure_dir(local_dir) - fn = f"{batch_start.strftime('%M%S')}_{HOST_ID}.parquet" - _positions_path = os.path.join(local_dir, fn) - _positions_writer = pq.ParquetWriter( - _positions_path, - table.schema, - compression="snappy", - use_dictionary=False, - write_statistics=False, - ) - _positions_batch = batch_start - - # write this 10-s table into the current bucket’s file - _positions_writer.write_table(table) - - # no return path: we push only on rollover + filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" + local_path = os.path.join(local_dir, filename) + pq.write_table(table, local_path, compression="snappy") + return local_path def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: