move geometry conversion to daily job

This commit is contained in:
Piotr Oleszczyk 2025-05-12 17:00:37 +02:00
parent 9e53b428cd
commit 3469511bd8
2 changed files with 51 additions and 24 deletions

View file

@ -25,6 +25,7 @@ from .storage import (
cleanup_small_course_geometry, cleanup_small_course_geometry,
cleanup_small_course_posts, cleanup_small_course_posts,
cleanup_small_positions, cleanup_small_positions,
compact_and_geowkb_positions,
compact_course_geometry, compact_course_geometry,
compact_course_posts, compact_course_posts,
compact_positions, compact_positions,
@ -233,11 +234,19 @@ async def daily_compact_and_push():
# compact yesterdays positions once/day # compact yesterdays positions once/day
yesterday = date.today() - timedelta(days=1) yesterday = date.today() - timedelta(days=1)
try: try:
big_fp = compact_positions(yesterday) # 1) merge raw minute-shards → one “raw” parquet
push_to_nas(big_fp) raw_fp = compact_positions(yesterday)
# if you want to reclaim local space: if raw_fp:
push_to_nas(raw_fp)
# 2) build geometry once/day
geo_fp = compact_and_geowkb_positions(yesterday)
if geo_fp:
push_to_nas(geo_fp)
# 3) clean up raw shards
cleanup_small_positions(yesterday) cleanup_small_positions(yesterday)
logger.info("wrote positions compacted file for %s", yesterday) logger.info("daily compaction complete for %s", yesterday)
except Exception: except Exception:
logger.exception("job=daily_compact_and_push_job") logger.exception("job=daily_compact_and_push_job")

View file

@ -29,26 +29,9 @@ def write_positions(
server_ts: datetime, server_ts: datetime,
) -> str: ) -> str:
""" """
Convert x/y to a Point geometry, drop the originals, Write out the raw positions (including x,y, server_ts, fetched_ts).
and write out a GeoParquet, carrying through Geometry will be added later during daily compaction.
the server_ts and fetched_ts columns.
""" """
# 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy)
xs = table.column("x").to_numpy()
ys = table.column("y").to_numpy()
# 2) Build a PointType with the correct CRS
builder = ga.point().with_crs("EPSG:4326")
# 3) Create the geometry array directly from the Arrow buffers
geom = ga.as_wkb(
builder.from_geobuffers(None, xs, ys)
) # ensure the geometry is in WKB format
# 4) drop old coords & append 'geometry'
table = table.drop(["x", "y"]).append_column("geometry", geom)
# (server_ts and fetched_ts are already columns in `table`)
subdir = server_ts.strftime("positions/%Y/%m/%d/%H") subdir = server_ts.strftime("positions/%Y/%m/%d/%H")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir) local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
@ -57,7 +40,6 @@ def write_positions(
filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet"
local_path = os.path.join(local_dir, filename) local_path = os.path.join(local_dir, filename)
pq.write_table(table, local_path, compression="snappy") pq.write_table(table, local_path, compression="snappy")
return local_path return local_path
@ -162,6 +144,42 @@ def compact_positions(date_to_compact: date) -> str:
return out_path return out_path
def compact_and_geowkb_positions(date_to_compact: date) -> str:
"""
Read yesterdays raw Parquets (with x,y,server_ts,fetched_ts),
build a single Big Table, convert x/y WKB Point,
drop x,y, append geometry, and write out a GeoParquet.
"""
import geoarrow.pyarrow as ga
# find the directory of raw shards
day_dir = os.path.join(LOCAL_DATA_DIR, f"positions/{date_to_compact:%Y/%m/%d}")
if not os.path.isdir(day_dir):
return ""
# merge all raw shards
dataset = ds.dataset(day_dir, format="parquet", partitioning="hive")
raw = dataset.to_table()
# extract coords into numpy
xs = raw.column("x").to_numpy(zero_copy_only=False)
ys = raw.column("y").to_numpy(zero_copy_only=False)
# build WKB geometry
pt = ga.point().with_crs("EPSG:4326")
wkb = ga.as_wkb(pt.from_geobuffers(None, xs, ys))
# drop x,y and append geometry
enriched = raw.drop(["x", "y"]).append_column("geometry", wkb)
# write out one big geo-parquet
out_dir = os.path.join(LOCAL_DATA_DIR, "positions_geocompacted")
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, f"{date_to_compact.isoformat()}_{HOST_ID}.parquet")
pq.write_table(enriched, out_path, compression="snappy")
return out_path
def cleanup_small_positions(date_to_remove: date): def cleanup_small_positions(date_to_remove: date):
""" """
Optionally delete the raw minute files once compacted. Optionally delete the raw minute files once compacted.