Compare commits

..

No commits in common. "96f918445c639d1987101f9c245a934b7be4b0b6" and "a5349123cec48a8507eb618f5f28b2086ffd18f5" have entirely different histories.

2 changed files with 28 additions and 98 deletions

View file

@ -6,7 +6,6 @@ import os
import sqlite3 import sqlite3
from datetime import date, datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
from itertools import islice from itertools import islice
from zoneinfo import ZoneInfo
import httpx import httpx
import pyarrow as pa import pyarrow as pa
@ -108,33 +107,18 @@ def chunked(iterable, size):
async def positions_job(): async def positions_job():
fetch_ts = datetime.now(timezone.utc) # when *we* fetched ts = datetime.now(timezone.utc)
client = get_async_client() client = get_async_client()
try: try:
data = await fetch_positions(client) data = await fetch_positions(client)
# 2) APIreported timestamp # first element is timestamp string
server_ts_str, *positions = data ts_str, *positions = data
# server_ts is local to Warsaw ts = datetime.fromisoformat(ts_str)
server_ts = datetime.fromisoformat(server_ts_str).replace( # Build Arrow table
tzinfo=ZoneInfo("Europe/Warsaw") table = pa.Table.from_pylist(positions)
) # write locally only
write_positions(table, ts)
# 3) build Arrow table enrich each dict with both timestamps logger.info("wrote positions shard for %s", ts)
enriched = []
for p in positions:
enriched.append(
{
**p,
"server_ts": server_ts,
"fetched_ts": fetch_ts,
}
)
table = pa.Table.from_pylist(enriched)
# 4) pass both timestamps through
write_positions(table, server_ts)
logger.info("wrote positions shard for %s", server_ts)
except Exception: except Exception:
# record error details # record error details
logger.exception("job=positions_job") logger.exception("job=positions_job")
@ -145,7 +129,7 @@ async def positions_job():
for cid in {p["c"] for p in positions}: for cid in {p["c"] for p in positions}:
if should_fetch_course(cid) and cid not in in_flight_courses: if should_fetch_course(cid) and cid not in in_flight_courses:
in_flight_courses.add(cid) in_flight_courses.add(cid)
await course_queue.put((cid, server_ts, 0)) await course_queue.put((cid, ts, 0))
async def fetch_and_store_course(course_id: int, ts: datetime, client=None): async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
@ -158,21 +142,15 @@ async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
recs = await fetch_course_posts(client, [course_id]) recs = await fetch_course_posts(client, [course_id])
# now do your I/O; catch+log only errors here # now do your I/O; catch+log only errors here
for rec in recs: for rec in recs:
rows = [] rows = [
for seq, stop in enumerate(rec.get("r", []), start=1):
# stop["t"] is like "1900-01-01 05:53:00"
# parse out the time portion
t = datetime.strptime(stop["t"], "%Y-%m-%d %H:%M:%S").time()
rows.append(
{ {
"fetch_date": ts, # UTC fetch timestamp "fetch_date": ts,
"course_id": rec["c"], "course_id": rec["c"],
"stop_seq": seq, # preserve order!
"stop_id": stop["s"], "stop_id": stop["s"],
"dep_time": t, # real datetime.time "dep_time": stop["t"],
"dep_zone": "Europe/Warsaw",
} }
) for stop in rec.get("r", [])
]
table = pa.Table.from_pylist(rows) table = pa.Table.from_pylist(rows)
write_course_posts(table, rec["c"], ts) write_course_posts(table, rec["c"], ts)
record_course_fetched(rec["c"], ts) record_course_fetched(rec["c"], ts)
@ -196,23 +174,7 @@ async def daily_snapshot_job():
try: try:
# stops # stops
posts = await fetch_posts(client) posts = await fetch_posts(client)
rows = [] table = pa.Table.from_pylist(posts)
for grp in posts:
# grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] }
for stop in grp.get("p", []):
rows.append(
{
"fetch_date": ts, # utc fetch timestamp
"group_name": grp["n"],
"group_id": grp["s"],
"group_type": grp["t"],
"stop_id": stop["s"],
"stop_type": stop["t"],
"x": stop["x"],
"y": stop["y"],
}
)
table = pa.Table.from_pylist(rows)
fp = write_stops(table, ts) fp = write_stops(table, ts)
push_to_nas(fp) push_to_nas(fp)

View file

@ -24,14 +24,10 @@ def _ensure_dir(path: str):
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
def write_positions( def write_positions(table: pa.Table, ts: datetime) -> str:
table: pa.Table,
server_ts: datetime,
) -> str:
""" """
Convert x/y to a Point geometry, drop the originals, Convert x/y to a Point geometry, drop the originals,
and write out a GeoParquet, carrying through and write out a GeoParquet.
the server_ts and fetched_ts columns.
""" """
# 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy) # 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy)
xs = table.column("x").to_numpy() xs = table.column("x").to_numpy()
@ -48,13 +44,11 @@ def write_positions(
# 4) drop old coords & append 'geometry' # 4) drop old coords & append 'geometry'
table = table.drop(["x", "y"]).append_column("geometry", geom) table = table.drop(["x", "y"]).append_column("geometry", geom)
# (server_ts and fetched_ts are already columns in `table`) subdir = ts.strftime("positions/%Y/%m/%d/%H")
subdir = server_ts.strftime("positions/%Y/%m/%d/%H")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir) local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
_ensure_dir(local_dir) _ensure_dir(local_dir)
filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" filename = f"{ts.strftime('%M%S')}_{HOST_ID}.parquet"
local_path = os.path.join(local_dir, filename) local_path = os.path.join(local_dir, filename)
pq.write_table(table, local_path, compression="snappy") pq.write_table(table, local_path, compression="snappy")
@ -78,47 +72,21 @@ def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str:
def write_stops(table: pa.Table, snapshot_dt: datetime) -> str: def write_stops(table: pa.Table, snapshot_dt: datetime) -> str:
""" """
Take a flat table with columns Write the daily snapshot of stops (getPosts).
fetch_date, group_name, group_id, group_type,
stop_id, stop_type, x, y
and convert x/y a WKB Point geometry,
drop the raw coords, and write GeoParquet.
""" """
# 1) pull x,y into numpy
xs = table.column("x").to_numpy()
ys = table.column("y").to_numpy()
# 2) build PointType with CRS
pt_builder = ga.point().with_crs("EPSG:4326")
# 3) build WKB geometry
geom = ga.as_wkb(pt_builder.from_geobuffers(None, xs, ys))
# 4) drop coords & append geometry column
table = table.drop(["x", "y"]).append_column("geometry", geom)
# 5) write out
subdir = snapshot_dt.strftime("stops/%Y-%m-%d") subdir = snapshot_dt.strftime("stops/%Y-%m-%d")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir) local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
_ensure_dir(local_dir) _ensure_dir(local_dir)
filename = f"stops_{HOST_ID}.parquet" local_path = os.path.join(local_dir, f"stops_{HOST_ID}.parquet")
out_path = os.path.join(local_dir, filename) pq.write_table(table, local_path, compression="snappy")
pq.write_table(table, out_path, compression="snappy") return local_path
return out_path
def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str: def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str:
""" """
Write the daily stoplines mapping (getPostsLines). Write the daily stoplines mapping (getPostsLines).
""" """
# ensure every row gets a fetch_date
if "fetch_date" not in table.column_names:
table = table.append_column(
"fetch_date",
pa.array([snapshot_dt] * table.num_rows, type=pa.timestamp("ns", tz="UTC")),
)
subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d") subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir) local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
_ensure_dir(local_dir) _ensure_dir(local_dir)