diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index bdc8f91..c28a2d2 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -32,7 +32,7 @@ from .storage import ( push_to_nas, write_course_geometry, write_course_posts, - write_positions, + write_positions_batched, write_posts_lines, write_stops, ) @@ -134,7 +134,7 @@ async def positions_job(): table = pa.Table.from_pylist(enriched) # 4) pass both timestamps through - write_positions(table, server_ts) + write_positions_batched(table, server_ts) logger.info("wrote positions shard for %s", server_ts) except Exception: # record error details diff --git a/src/ptscrapper/storage.py b/src/ptscrapper/storage.py index aa22a25..710abcb 100644 --- a/src/ptscrapper/storage.py +++ b/src/ptscrapper/storage.py @@ -2,6 +2,7 @@ import glob import os +import threading from datetime import date, datetime import geoarrow.pyarrow as ga @@ -18,29 +19,64 @@ from .config import ( # base path on the remote, e.g. "pt-scraper-data" RCLONE_REMOTE_PATH, ) +# module‐level state for our rolling 10-min writer +_positions_writer = None # pq.ParquetWriter +_positions_batch = None # datetime for current batch start +_positions_path = None # where we’re writing right now +_positions_lock = threading.Lock() + def _ensure_dir(path: str): """Make sure the directory exists.""" os.makedirs(path, exist_ok=True) -def write_positions( - table: pa.Table, - server_ts: datetime, -) -> str: +def write_positions_batched(table: pa.Table, server_ts: datetime) -> None: """ - Write out the raw positions (including x,y, server_ts, fetched_ts). - Geometry will be added later during daily compaction. + Append `table` into a rolling Parquet file, one file per 10-minute window. + When the window rolls over, we close+push the old file and start a new one. """ + global _positions_writer, _positions_batch, _positions_path - subdir = server_ts.strftime("positions/%Y/%m/%d/%H") - local_dir = os.path.join(LOCAL_DATA_DIR, subdir) - _ensure_dir(local_dir) + # compute the ten-minute bucket start (floor to minute//10 * 10) + batch_start = server_ts.replace( + minute=(server_ts.minute // 10) * 10, second=0, microsecond=0 + ) - filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" - local_path = os.path.join(local_dir, filename) - pq.write_table(table, local_path, compression="snappy") - return local_path + with _positions_lock: + # if we’ve moved into a new bucket, close & push the old + if _positions_batch is None or batch_start != _positions_batch: + if _positions_writer is not None: + _positions_writer.close() + # push the completed file + rel = os.path.relpath(_positions_path, LOCAL_DATA_DIR).replace( + os.sep, "/" + ) + rclone.copy( + _positions_path, + f"{RCLONE_REMOTE}:{RCLONE_REMOTE_PATH}/{rel}", + ignore_existing=True, + ) + + # start a new writer for the new bucket + subdir = batch_start.strftime("positions/%Y/%m/%d/%H") + local_dir = os.path.join(LOCAL_DATA_DIR, subdir) + _ensure_dir(local_dir) + fn = f"{batch_start.strftime('%M%S')}_{HOST_ID}.parquet" + _positions_path = os.path.join(local_dir, fn) + _positions_writer = pq.ParquetWriter( + _positions_path, + table.schema, + compression="snappy", + use_dictionary=False, + write_statistics=False, + ) + _positions_batch = batch_start + + # write this 10-s table into the current bucket’s file + _positions_writer.write_table(table) + + # no return path: we push only on rollover def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: