store timestamps for positions

This commit is contained in:
Piotr Oleszczyk 2025-05-12 14:30:43 +02:00
parent a5349123ce
commit f37fd68e07
2 changed files with 36 additions and 14 deletions

View file

@ -6,6 +6,7 @@ import os
import sqlite3 import sqlite3
from datetime import date, datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
from itertools import islice from itertools import islice
from zoneinfo import ZoneInfo
import httpx import httpx
import pyarrow as pa import pyarrow as pa
@ -107,18 +108,33 @@ def chunked(iterable, size):
async def positions_job(): async def positions_job():
ts = datetime.now(timezone.utc) fetch_ts = datetime.now(timezone.utc) # when *we* fetched
client = get_async_client() client = get_async_client()
try: try:
data = await fetch_positions(client) data = await fetch_positions(client)
# first element is timestamp string # 2) APIreported timestamp
ts_str, *positions = data server_ts_str, *positions = data
ts = datetime.fromisoformat(ts_str) # server_ts is local to Warsaw
# Build Arrow table server_ts = datetime.fromisoformat(server_ts_str).replace(
table = pa.Table.from_pylist(positions) tzinfo=ZoneInfo("Europe/Warsaw")
# write locally only )
write_positions(table, ts)
logger.info("wrote positions shard for %s", ts) # 3) build Arrow table enrich each dict with both timestamps
enriched = []
for p in positions:
enriched.append(
{
**p,
"server_ts": server_ts,
"fetched_ts": fetch_ts,
}
)
table = pa.Table.from_pylist(enriched)
# 4) pass both timestamps through
write_positions(table, server_ts)
logger.info("wrote positions shard for %s", server_ts)
except Exception: except Exception:
# record error details # record error details
logger.exception("job=positions_job") logger.exception("job=positions_job")
@ -129,7 +145,7 @@ async def positions_job():
for cid in {p["c"] for p in positions}: for cid in {p["c"] for p in positions}:
if should_fetch_course(cid) and cid not in in_flight_courses: if should_fetch_course(cid) and cid not in in_flight_courses:
in_flight_courses.add(cid) in_flight_courses.add(cid)
await course_queue.put((cid, ts, 0)) await course_queue.put((cid, server_ts, 0))
async def fetch_and_store_course(course_id: int, ts: datetime, client=None): async def fetch_and_store_course(course_id: int, ts: datetime, client=None):

View file

@ -24,10 +24,14 @@ def _ensure_dir(path: str):
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
def write_positions(table: pa.Table, ts: datetime) -> str: def write_positions(
table: pa.Table,
server_ts: datetime,
) -> str:
""" """
Convert x/y to a Point geometry, drop the originals, Convert x/y to a Point geometry, drop the originals,
and write out a GeoParquet. and write out a GeoParquet, carrying through
the server_ts and fetched_ts columns.
""" """
# 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy) # 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy)
xs = table.column("x").to_numpy() xs = table.column("x").to_numpy()
@ -44,11 +48,13 @@ def write_positions(table: pa.Table, ts: datetime) -> str:
# 4) drop old coords & append 'geometry' # 4) drop old coords & append 'geometry'
table = table.drop(["x", "y"]).append_column("geometry", geom) table = table.drop(["x", "y"]).append_column("geometry", geom)
subdir = ts.strftime("positions/%Y/%m/%d/%H") # (server_ts and fetched_ts are already columns in `table`)
subdir = server_ts.strftime("positions/%Y/%m/%d/%H")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir) local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
_ensure_dir(local_dir) _ensure_dir(local_dir)
filename = f"{ts.strftime('%M%S')}_{HOST_ID}.parquet" filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet"
local_path = os.path.join(local_dir, filename) local_path = os.path.join(local_dir, filename)
pq.write_table(table, local_path, compression="snappy") pq.write_table(table, local_path, compression="snappy")