# src/ptscraper/storage.py import glob import os import threading from datetime import date, datetime import geoarrow.pyarrow as ga import polyline import pyarrow as pa import pyarrow.dataset as ds import pyarrow.parquet as pq from rclone_python import rclone from .config import HOST_ID # unique identifier for this host/container from .config import LOCAL_DATA_DIR # base local folder, e.g. "/data/pt-scraper" from .config import RCLONE_REMOTE # name of your rclone remote, e.g. "nas" from .config import ( # base path on the remote, e.g. "pt-scraper-data" RCLONE_REMOTE_PATH, ) # a fixed schema for every positions batch POSITIONS_SCHEMA = pa.schema( [ ("v", pa.int64()), ("c", pa.int64()), ("x", pa.float64()), ("y", pa.float64()), ("l", pa.string()), ("t", pa.string()), ("s", pa.string()), ("d", pa.string()), ("e", pa.int64()), ("server_ts", pa.timestamp("ns", tz="Europe/Warsaw")), ("fetched_ts", pa.timestamp("ns", tz="UTC")), ] ) # module-level rolling writer state _positions_lock = threading.Lock() _positions_writer = None # pq.ParquetWriter _positions_bucket = None # datetime start of current 10-min _positions_path = None # path to current .parquet file def _ensure_dir(path: str): """Make sure the directory exists.""" os.makedirs(path, exist_ok=True) def _rollover_writer(server_ts): """ Close the old bucket (if any), push it via rclone, then open a new ParquetWriter for the 10-min bucket of `server_ts`. """ global _positions_writer, _positions_bucket, _positions_path # floor to ten-minute bucket = server_ts.replace( minute=(server_ts.minute // 10) * 10, second=0, microsecond=0 ) if bucket != _positions_bucket: # -- close previous shard if _positions_writer is not None: _positions_writer.close() # -- open new writer subdir = bucket.strftime("positions/%Y/%m/%d/%H") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir) fn = f"{bucket.strftime('%M%S')}_{HOST_ID}.parquet" _positions_path = os.path.join(local_dir, fn) _positions_writer = pq.ParquetWriter( _positions_path, schema=POSITIONS_SCHEMA, compression="none", # or "none" if you prefer to compress only at nightly use_dictionary=False, write_statistics=False, ) _positions_bucket = bucket def write_positions_rb(positions: list[dict], server_ts, fetched_ts) -> None: """ Append the incoming `positions` (list of dicts) into the current 10-minute Parquet shard via a RecordBatchBuilder. """ global _positions_writer with _positions_lock: _rollover_writer(server_ts) # build a RecordBatch of exactly these rows builder = pa.RecordBatchBuilder(POSITIONS_SCHEMA) for p in positions: # note: column‐order must match SCHEMA builder.get_column_builder(0).append(p["v"]) builder.get_column_builder(1).append(p["c"]) builder.get_column_builder(2).append(p["x"]) builder.get_column_builder(3).append(p["y"]) builder.get_column_builder(4).append(p["l"]) builder.get_column_builder(5).append(p["t"]) builder.get_column_builder(6).append(p["s"]) builder.get_column_builder(7).append(p["d"]) builder.get_column_builder(8).append(p["e"]) builder.get_column_builder(9).append(server_ts) builder.get_column_builder(10).append(fetched_ts) batch = builder.flush() _positions_writer.write_batch(batch) def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: """ Write course-posts data for a single course to Parquet. """ subdir = ts.strftime("courses/%Y-%m-%d") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir) filename = f"course_{course_id}_{ts.strftime('%H%M%S')}_{HOST_ID}.parquet" local_path = os.path.join(local_dir, filename) pq.write_table(table, local_path, compression="snappy") return local_path def write_stops(table: pa.Table, snapshot_dt: datetime) -> str: """ Take a flat table with columns fetch_date, group_name, group_id, group_type, stop_id, stop_type, x, y and convert x/y → a WKB Point geometry, drop the raw coords, and write GeoParquet. """ # 1) pull x,y into numpy xs = table.column("x").to_numpy() ys = table.column("y").to_numpy() # 2) build PointType with CRS pt_builder = ga.point().with_crs("EPSG:4326") # 3) build WKB geometry geom = ga.as_wkb(pt_builder.from_geobuffers(None, xs, ys)) # 4) drop coords & append geometry column table = table.drop(["x", "y"]).append_column("geometry", geom) # 5) write out subdir = snapshot_dt.strftime("stops/%Y-%m-%d") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir) filename = f"stops_{HOST_ID}.parquet" out_path = os.path.join(local_dir, filename) pq.write_table(table, out_path, compression="snappy") return out_path def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str: """ Write the daily stop→lines mapping (getPostsLines). """ # ensure every row gets a fetch_date if "fetch_date" not in table.column_names: table = table.append_column( "fetch_date", pa.array([snapshot_dt] * table.num_rows, type=pa.timestamp("ns", tz="UTC")), ) subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir) local_path = os.path.join(local_dir, f"posts_lines_{HOST_ID}.parquet") pq.write_table(table, local_path, compression="snappy") return local_path def push_to_nas(local_path: str): """ Push a local file to the configured rclone remote, preserving its relative path under RCLONE_REMOTE_PATH. """ # Compute the path on the remote rel_path = os.path.relpath(local_path, LOCAL_DATA_DIR).replace(os.sep, "/") remote_uri = f"{RCLONE_REMOTE}:{RCLONE_REMOTE_PATH}/{rel_path}" # Copy, skipping files that already exist remotely rclone.copy(local_path, remote_uri, ignore_existing=True) def compact_positions(date_to_compact: date) -> str: """ Read all small Parquet under positions/YYYY/MM/DD, merge into one table, write a single daily file. Returns the path to that daily Parquet. """ day_dir = os.path.join(LOCAL_DATA_DIR, f"positions/{date_to_compact:%Y/%m/%d}") if not os.path.isdir(day_dir): # nothing to compact return "" dataset = ds.dataset(day_dir, format="parquet", partitioning="hive") table = dataset.to_table() # merges all batches # write one daily file out_dir = os.path.join(LOCAL_DATA_DIR, "positions_compacted") os.makedirs(out_dir, exist_ok=True) out_path = os.path.join(out_dir, f"{date_to_compact.isoformat()}_{HOST_ID}.parquet") pq.write_table(table, out_path, compression="snappy") return out_path def compact_and_geowkb_positions(date_to_compact: date) -> str: """ Read yesterday’s raw Parquets (with x,y,server_ts,fetched_ts), build a single Big Table, convert x/y → WKB Point, drop x,y, append geometry, and write out a GeoParquet. """ import geoarrow.pyarrow as ga # find the directory of raw shards day_dir = os.path.join(LOCAL_DATA_DIR, f"positions/{date_to_compact:%Y/%m/%d}") if not os.path.isdir(day_dir): return "" # merge all raw shards dataset = ds.dataset(day_dir, format="parquet", partitioning="hive") raw = dataset.to_table() # extract coords into numpy xs = raw.column("x").to_numpy(zero_copy_only=False) ys = raw.column("y").to_numpy(zero_copy_only=False) # build WKB geometry pt = ga.point().with_crs("EPSG:4326") wkb = ga.as_wkb(pt.from_geobuffers(None, xs, ys)) # drop x,y and append geometry enriched = raw.drop(["x", "y"]).append_column("geometry", wkb) # write out one big geo-parquet out_dir = os.path.join(LOCAL_DATA_DIR, "positions_geocompacted") os.makedirs(out_dir, exist_ok=True) out_path = os.path.join(out_dir, f"{date_to_compact.isoformat()}_{HOST_ID}.parquet") pq.write_table(enriched, out_path, compression="snappy") return out_path def cleanup_small_positions(date_to_remove: date): """ Optionally delete the raw minute files once compacted. """ base = os.path.join(LOCAL_DATA_DIR, f"positions/{date_to_remove:%Y/%m/%d}") for path in glob.glob(f"{base}/**/*.parquet", recursive=True): try: os.remove(path) except OSError: pass def compact_course_posts(date_to_compact: date) -> str: """ Read all shards under courses/YYYY-MM-DD/*.parquet, merge into one table, write a single daily file. """ day_dir = os.path.join(LOCAL_DATA_DIR, f"courses/{date_to_compact:%Y-%m-%d}") if not os.path.isdir(day_dir): return "" dataset = ds.dataset(day_dir, format="parquet") table = dataset.to_table() out_dir = os.path.join(LOCAL_DATA_DIR, "courses_compacted") os.makedirs(out_dir, exist_ok=True) out_path = os.path.join(out_dir, f"{date_to_compact.isoformat()}_{HOST_ID}.parquet") pq.write_table(table, out_path, compression="snappy") return out_path def cleanup_small_course_posts(date_to_remove: date): """ Delete the original course shards once compacted. """ base = os.path.join(LOCAL_DATA_DIR, f"courses/{date_to_remove:%Y-%m-%d}") for fn in glob.glob(f"{base}/*.parquet"): try: os.remove(fn) except OSError: pass def write_course_geometry(polyline_str: str, course_id: int, ts: datetime) -> str: """ Decode the Google polyline into a GeoArrow LineString WKB, attach EPSG:4326, and write out a GeoParquet. """ # 1) Decode → list of (lon, lat) coords = polyline.decode(polyline_str, geojson=True) # 2) Build a WKT LINESTRING pts = ", ".join(f"{x} {y}" for x, y in coords) wkt = f"LINESTRING({pts})" # 3) Convert that WKT to a WKB ExtensionArray # as_wkb() will parse each WKT string into WKB wkb_arr = ga.as_wkb([wkt]) # 4) Cast into a WkbType with EPSG:4326 built in wkb_with_crs = wkb_arr.cast(ga.wkb().with_crs("EPSG:4326")) # 5) Build a 1-row table table = pa.Table.from_arrays( [pa.array([ts]), pa.array([course_id]), wkb_with_crs], names=["fetch_date", "course_id", "geometry"], ) # 6) Write out the GeoParquet subdir = ts.strftime("courses_geometry/%Y-%m-%d") out_dir = os.path.join(LOCAL_DATA_DIR, subdir) os.makedirs(out_dir, exist_ok=True) fn = f"shape_{course_id}_{ts.strftime('%H%M%S')}.parquet" out_path = os.path.join(out_dir, fn) pq.write_table(table, out_path, compression="snappy") return out_path def compact_course_geometry(date_to_compact: date) -> str: """ Read all geometry shards under courses_geometry/YYYY-MM-DD/*.parquet, merge into one table, write a single daily file. Returns the path to that daily Parquet. """ day_dir = os.path.join( LOCAL_DATA_DIR, f"courses_geometry/{date_to_compact:%Y-%m-%d}" ) if not os.path.isdir(day_dir): return "" dataset = ds.dataset(day_dir, format="parquet") table = dataset.to_table() out_dir = os.path.join(LOCAL_DATA_DIR, "courses_geometry_compacted") os.makedirs(out_dir, exist_ok=True) out_path = os.path.join(out_dir, f"{date_to_compact.isoformat()}_{HOST_ID}.parquet") pq.write_table(table, out_path, compression="snappy") return out_path def cleanup_small_course_geometry(date_to_remove: date): """ Delete the original course-geometry shards once compacted. """ base = os.path.join(LOCAL_DATA_DIR, f"courses_geometry/{date_to_remove:%Y-%m-%d}") for fn in glob.glob(f"{base}/*.parquet"): try: os.remove(fn) except OSError: pass