From a65049fd62950d4001965a447c336ebc8aab1af6 Mon Sep 17 00:00:00 2001 From: Piotr Oleszczyk Date: Mon, 12 May 2025 17:51:48 +0200 Subject: [PATCH] write positions in 10-minute batches --- src/ptscrapper/main.py | 3 + src/ptscrapper/scheduler.py | 21 ++----- src/ptscrapper/storage.py | 111 +++++++++++++++++++++++++++++++----- 3 files changed, 105 insertions(+), 30 deletions(-) diff --git a/src/ptscrapper/main.py b/src/ptscrapper/main.py index 12abacc..a5e5c19 100644 --- a/src/ptscrapper/main.py +++ b/src/ptscrapper/main.py @@ -3,6 +3,7 @@ import asyncio import logging +import pyarrow as pa import uvloop from ptscrapper.scheduler import start_scheduler @@ -14,6 +15,8 @@ try: except ImportError: pass +pa.set_cpu_count(1) + def main(): # 1) Configure the root logger diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index bdc8f91..0833649 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -32,7 +32,7 @@ from .storage import ( push_to_nas, write_course_geometry, write_course_posts, - write_positions, + write_positions_rb, write_posts_lines, write_stops, ) @@ -120,22 +120,9 @@ async def positions_job(): server_ts = datetime.fromisoformat(server_ts_str).replace( tzinfo=ZoneInfo("Europe/Warsaw") ) - - # 3) build Arrow table enrich each dict with both timestamps - enriched = [] - for p in positions: - enriched.append( - { - **p, - "server_ts": server_ts, - "fetched_ts": fetch_ts, - } - ) - table = pa.Table.from_pylist(enriched) - - # 4) pass both timestamps through - write_positions(table, server_ts) - logger.info("wrote positions shard for %s", server_ts) + # 3) hand off raw list to our super-light RecordBatchBuilder + write_positions_rb(positions, server_ts, fetch_ts) + logger.info("appended %d rows into 10 min shard %s", len(positions), server_ts) except Exception: # record error details logger.exception("job=positions_job") diff --git a/src/ptscrapper/storage.py b/src/ptscrapper/storage.py index aa22a25..6d7ff76 100644 --- a/src/ptscrapper/storage.py +++ b/src/ptscrapper/storage.py @@ -2,6 +2,7 @@ import glob import os +import threading from datetime import date, datetime import geoarrow.pyarrow as ga @@ -9,6 +10,7 @@ import polyline import pyarrow as pa import pyarrow.dataset as ds import pyarrow.parquet as pq +from pyarrow import lib as _lib from rclone_python import rclone from .config import HOST_ID # unique identifier for this host/container @@ -18,29 +20,112 @@ from .config import ( # base path on the remote, e.g. "pt-scraper-data" RCLONE_REMOTE_PATH, ) +# a fixed schema for every positions batch +POSITIONS_SCHEMA = pa.schema( + [ + ("v", pa.int64()), + ("c", pa.int64()), + ("x", pa.float64()), + ("y", pa.float64()), + ("l", pa.string()), + ("t", pa.string()), + ("s", pa.string()), + ("d", pa.string()), + ("e", pa.int64()), + ("server_ts", pa.timestamp("ns", tz="Europe/Warsaw")), + ("fetched_ts", pa.timestamp("ns", tz="UTC")), + ] +) + +# module-level rolling writer state +_positions_lock = threading.Lock() +_positions_writer = None # pq.ParquetWriter +_positions_bucket = None # datetime start of current 10-min +_positions_path = None # path to current .parquet file + def _ensure_dir(path: str): """Make sure the directory exists.""" os.makedirs(path, exist_ok=True) -def write_positions( - table: pa.Table, - server_ts: datetime, -) -> str: +def _rollover_writer(server_ts): """ - Write out the raw positions (including x,y, server_ts, fetched_ts). - Geometry will be added later during daily compaction. + Close the old bucket (if any), push it via rclone, + then open a new ParquetWriter for the 10-min bucket of `server_ts`. """ + global _positions_writer, _positions_bucket, _positions_path - subdir = server_ts.strftime("positions/%Y/%m/%d/%H") - local_dir = os.path.join(LOCAL_DATA_DIR, subdir) - _ensure_dir(local_dir) + # floor to ten-minute + bucket = server_ts.replace( + minute=(server_ts.minute // 10) * 10, second=0, microsecond=0 + ) - filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" - local_path = os.path.join(local_dir, filename) - pq.write_table(table, local_path, compression="snappy") - return local_path + if bucket != _positions_bucket: + # -- close previous shard + if _positions_writer is not None: + _positions_writer.close() + + # -- open new writer + subdir = bucket.strftime("positions/%Y/%m/%d/%H") + local_dir = os.path.join(LOCAL_DATA_DIR, subdir) + _ensure_dir(local_dir) + + fn = f"{bucket.strftime('%M%S')}_{HOST_ID}.parquet" + _positions_path = os.path.join(local_dir, fn) + + _positions_writer = pq.ParquetWriter( + _positions_path, + schema=POSITIONS_SCHEMA, + compression="none", # or "none" if you prefer to compress only at nightly + use_dictionary=False, + write_statistics=False, + ) + _positions_bucket = bucket + + +def write_positions_rb(positions: list[dict], server_ts, fetched_ts) -> None: + """ + Append the incoming `positions` (list of dicts) into the + current 10-minute Parquet shard via a RecordBatchBuilder. + """ + global _positions_writer + with _positions_lock: + _rollover_writer(server_ts) + + # 1) prepare one builder per column in the same order as POSITIONS_SCHEMA + builders = [ + _lib.Int64Builder(), # v + _lib.Int64Builder(), # c + _lib.Float64Builder(), # x + _lib.Float64Builder(), # y + _lib.StringBuilder(), # l + _lib.StringBuilder(), # t + _lib.StringBuilder(), # s + _lib.StringBuilder(), # d + _lib.Int64Builder(), # e + _lib.TimestampBuilder("ns", tz="Europe/Warsaw"), # server_ts + _lib.TimestampBuilder("ns", tz="UTC"), # fetched_ts + ] + + # 2) append into each builder + for p in positions: + builders[0].append(p["v"]) + builders[1].append(p["c"]) + builders[2].append(p["x"]) + builders[3].append(p["y"]) + builders[4].append(p["l"]) + builders[5].append(p["t"]) + builders[6].append(p["s"]) + builders[7].append(p["d"]) + builders[8].append(p["e"]) + builders[9].append(server_ts) + builders[10].append(fetched_ts) + + # 3) finish → arrays, make a RecordBatch, write it + arrays = [b.finish() for b in builders] + batch = pa.RecordBatch.from_arrays(arrays, schema=POSITIONS_SCHEMA) + _positions_writer.write_batch(batch) def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: