write positions in 10-minute batches

This commit is contained in:
Piotr Oleszczyk 2025-05-12 17:51:48 +02:00
parent 4fca094f89
commit 20238e0270
2 changed files with 51 additions and 15 deletions

View file

@ -32,7 +32,7 @@ from .storage import (
push_to_nas, push_to_nas,
write_course_geometry, write_course_geometry,
write_course_posts, write_course_posts,
write_positions, write_positions_batched,
write_posts_lines, write_posts_lines,
write_stops, write_stops,
) )
@ -134,7 +134,7 @@ async def positions_job():
table = pa.Table.from_pylist(enriched) table = pa.Table.from_pylist(enriched)
# 4) pass both timestamps through # 4) pass both timestamps through
write_positions(table, server_ts) write_positions_batched(table, server_ts)
logger.info("wrote positions shard for %s", server_ts) logger.info("wrote positions shard for %s", server_ts)
except Exception: except Exception:
# record error details # record error details

View file

@ -2,6 +2,7 @@
import glob import glob
import os import os
import threading
from datetime import date, datetime from datetime import date, datetime
import geoarrow.pyarrow as ga import geoarrow.pyarrow as ga
@ -18,29 +19,64 @@ from .config import ( # base path on the remote, e.g. "pt-scraper-data"
RCLONE_REMOTE_PATH, RCLONE_REMOTE_PATH,
) )
# modulelevel state for our rolling 10-min writer
_positions_writer = None # pq.ParquetWriter
_positions_batch = None # datetime for current batch start
_positions_path = None # where were writing right now
_positions_lock = threading.Lock()
def _ensure_dir(path: str): def _ensure_dir(path: str):
"""Make sure the directory exists.""" """Make sure the directory exists."""
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
def write_positions( def write_positions_batched(table: pa.Table, server_ts: datetime) -> None:
table: pa.Table,
server_ts: datetime,
) -> str:
""" """
Write out the raw positions (including x,y, server_ts, fetched_ts). Append `table` into a rolling Parquet file, one file per 10-minute window.
Geometry will be added later during daily compaction. When the window rolls over, we close+push the old file and start a new one.
""" """
global _positions_writer, _positions_batch, _positions_path
subdir = server_ts.strftime("positions/%Y/%m/%d/%H") # compute the ten-minute bucket start (floor to minute//10 * 10)
batch_start = server_ts.replace(
minute=(server_ts.minute // 10) * 10, second=0, microsecond=0
)
with _positions_lock:
# if weve moved into a new bucket, close & push the old
if _positions_batch is None or batch_start != _positions_batch:
if _positions_writer is not None:
_positions_writer.close()
# push the completed file
rel = os.path.relpath(_positions_path, LOCAL_DATA_DIR).replace(
os.sep, "/"
)
rclone.copy(
_positions_path,
f"{RCLONE_REMOTE}:{RCLONE_REMOTE_PATH}/{rel}",
ignore_existing=True,
)
# start a new writer for the new bucket
subdir = batch_start.strftime("positions/%Y/%m/%d/%H")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir) local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
_ensure_dir(local_dir) _ensure_dir(local_dir)
fn = f"{batch_start.strftime('%M%S')}_{HOST_ID}.parquet"
_positions_path = os.path.join(local_dir, fn)
_positions_writer = pq.ParquetWriter(
_positions_path,
table.schema,
compression="snappy",
use_dictionary=False,
write_statistics=False,
)
_positions_batch = batch_start
filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" # write this 10-s table into the current buckets file
local_path = os.path.join(local_dir, filename) _positions_writer.write_table(table)
pq.write_table(table, local_path, compression="snappy")
return local_path # no return path: we push only on rollover
def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: