From 3469511bd8af3a13fdc7c8a30745894f026bc743 Mon Sep 17 00:00:00 2001 From: Piotr Oleszczyk Date: Mon, 12 May 2025 17:00:37 +0200 Subject: [PATCH] move geometry conversion to daily job --- src/ptscrapper/scheduler.py | 17 ++++++++--- src/ptscrapper/storage.py | 58 ++++++++++++++++++++++++------------- 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index 893694e..66267d4 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -25,6 +25,7 @@ from .storage import ( cleanup_small_course_geometry, cleanup_small_course_posts, cleanup_small_positions, + compact_and_geowkb_positions, compact_course_geometry, compact_course_posts, compact_positions, @@ -233,11 +234,19 @@ async def daily_compact_and_push(): # compact yesterday’s positions once/day yesterday = date.today() - timedelta(days=1) try: - big_fp = compact_positions(yesterday) - push_to_nas(big_fp) - # if you want to reclaim local space: + # 1) merge raw minute-shards → one “raw” parquet + raw_fp = compact_positions(yesterday) + if raw_fp: + push_to_nas(raw_fp) + + # 2) build geometry once/day + geo_fp = compact_and_geowkb_positions(yesterday) + if geo_fp: + push_to_nas(geo_fp) + + # 3) clean up raw shards cleanup_small_positions(yesterday) - logger.info("wrote positions compacted file for %s", yesterday) + logger.info("daily compaction complete for %s", yesterday) except Exception: logger.exception("job=daily_compact_and_push_job") diff --git a/src/ptscrapper/storage.py b/src/ptscrapper/storage.py index 3691fc0..aa22a25 100644 --- a/src/ptscrapper/storage.py +++ b/src/ptscrapper/storage.py @@ -29,26 +29,9 @@ def write_positions( server_ts: datetime, ) -> str: """ - Convert x/y to a Point geometry, drop the originals, - and write out a GeoParquet, carrying through - the server_ts and fetched_ts columns. + Write out the raw positions (including x,y, server_ts, fetched_ts). + Geometry will be added later during daily compaction. """ - # 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy) - xs = table.column("x").to_numpy() - ys = table.column("y").to_numpy() - - # 2) Build a PointType with the correct CRS - builder = ga.point().with_crs("EPSG:4326") - - # 3) Create the geometry array directly from the Arrow buffers - geom = ga.as_wkb( - builder.from_geobuffers(None, xs, ys) - ) # ensure the geometry is in WKB format - - # 4) drop old coords & append 'geometry' - table = table.drop(["x", "y"]).append_column("geometry", geom) - - # (server_ts and fetched_ts are already columns in `table`) subdir = server_ts.strftime("positions/%Y/%m/%d/%H") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) @@ -57,7 +40,6 @@ def write_positions( filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" local_path = os.path.join(local_dir, filename) pq.write_table(table, local_path, compression="snappy") - return local_path @@ -162,6 +144,42 @@ def compact_positions(date_to_compact: date) -> str: return out_path +def compact_and_geowkb_positions(date_to_compact: date) -> str: + """ + Read yesterday’s raw Parquets (with x,y,server_ts,fetched_ts), + build a single Big Table, convert x/y → WKB Point, + drop x,y, append geometry, and write out a GeoParquet. + """ + import geoarrow.pyarrow as ga + + # find the directory of raw shards + day_dir = os.path.join(LOCAL_DATA_DIR, f"positions/{date_to_compact:%Y/%m/%d}") + if not os.path.isdir(day_dir): + return "" + + # merge all raw shards + dataset = ds.dataset(day_dir, format="parquet", partitioning="hive") + raw = dataset.to_table() + + # extract coords into numpy + xs = raw.column("x").to_numpy(zero_copy_only=False) + ys = raw.column("y").to_numpy(zero_copy_only=False) + + # build WKB geometry + pt = ga.point().with_crs("EPSG:4326") + wkb = ga.as_wkb(pt.from_geobuffers(None, xs, ys)) + + # drop x,y and append geometry + enriched = raw.drop(["x", "y"]).append_column("geometry", wkb) + + # write out one big geo-parquet + out_dir = os.path.join(LOCAL_DATA_DIR, "positions_geocompacted") + os.makedirs(out_dir, exist_ok=True) + out_path = os.path.join(out_dir, f"{date_to_compact.isoformat()}_{HOST_ID}.parquet") + pq.write_table(enriched, out_path, compression="snappy") + return out_path + + def cleanup_small_positions(date_to_remove: date): """ Optionally delete the raw minute files once compacted.