From f37fd68e079003bfc2dc161e04e4314700a1aa45 Mon Sep 17 00:00:00 2001 From: Piotr Oleszczyk Date: Mon, 12 May 2025 14:30:43 +0200 Subject: [PATCH 1/3] store timestamps for positions --- src/ptscrapper/scheduler.py | 36 ++++++++++++++++++++++++++---------- src/ptscrapper/storage.py | 14 ++++++++++---- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index fd2c53c..bb5656e 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -6,6 +6,7 @@ import os import sqlite3 from datetime import date, datetime, timedelta, timezone from itertools import islice +from zoneinfo import ZoneInfo import httpx import pyarrow as pa @@ -107,18 +108,33 @@ def chunked(iterable, size): async def positions_job(): - ts = datetime.now(timezone.utc) + fetch_ts = datetime.now(timezone.utc) # when *we* fetched + client = get_async_client() try: data = await fetch_positions(client) - # first element is timestamp string - ts_str, *positions = data - ts = datetime.fromisoformat(ts_str) - # Build Arrow table - table = pa.Table.from_pylist(positions) - # write locally only - write_positions(table, ts) - logger.info("wrote positions shard for %s", ts) + # 2) API‐reported timestamp + server_ts_str, *positions = data + # server_ts is local to Warsaw + server_ts = datetime.fromisoformat(server_ts_str).replace( + tzinfo=ZoneInfo("Europe/Warsaw") + ) + + # 3) build Arrow table enrich each dict with both timestamps + enriched = [] + for p in positions: + enriched.append( + { + **p, + "server_ts": server_ts, + "fetched_ts": fetch_ts, + } + ) + table = pa.Table.from_pylist(enriched) + + # 4) pass both timestamps through + write_positions(table, server_ts) + logger.info("wrote positions shard for %s", server_ts) except Exception: # record error details logger.exception("job=positions_job") @@ -129,7 +145,7 @@ async def positions_job(): for cid in {p["c"] for p in positions}: if should_fetch_course(cid) and cid not in in_flight_courses: in_flight_courses.add(cid) - await course_queue.put((cid, ts, 0)) + await course_queue.put((cid, server_ts, 0)) async def fetch_and_store_course(course_id: int, ts: datetime, client=None): diff --git a/src/ptscrapper/storage.py b/src/ptscrapper/storage.py index 879a24e..f5ae915 100644 --- a/src/ptscrapper/storage.py +++ b/src/ptscrapper/storage.py @@ -24,10 +24,14 @@ def _ensure_dir(path: str): os.makedirs(path, exist_ok=True) -def write_positions(table: pa.Table, ts: datetime) -> str: +def write_positions( + table: pa.Table, + server_ts: datetime, +) -> str: """ Convert x/y to a Point geometry, drop the originals, - and write out a GeoParquet. + and write out a GeoParquet, carrying through + the server_ts and fetched_ts columns. """ # 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy) xs = table.column("x").to_numpy() @@ -44,11 +48,13 @@ def write_positions(table: pa.Table, ts: datetime) -> str: # 4) drop old coords & append 'geometry' table = table.drop(["x", "y"]).append_column("geometry", geom) - subdir = ts.strftime("positions/%Y/%m/%d/%H") + # (server_ts and fetched_ts are already columns in `table`) + + subdir = server_ts.strftime("positions/%Y/%m/%d/%H") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir) - filename = f"{ts.strftime('%M%S')}_{HOST_ID}.parquet" + filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" local_path = os.path.join(local_dir, filename) pq.write_table(table, local_path, compression="snappy") From 6891e3b206c6331318e88f9bb72cae7db7466f5d Mon Sep 17 00:00:00 2001 From: Piotr Oleszczyk Date: Mon, 12 May 2025 14:36:37 +0200 Subject: [PATCH 2/3] process times in courses properly --- src/ptscrapper/scheduler.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index bb5656e..9ff59bf 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -158,15 +158,21 @@ async def fetch_and_store_course(course_id: int, ts: datetime, client=None): recs = await fetch_course_posts(client, [course_id]) # now do your I/O; catch+log only errors here for rec in recs: - rows = [ - { - "fetch_date": ts, - "course_id": rec["c"], - "stop_id": stop["s"], - "dep_time": stop["t"], - } - for stop in rec.get("r", []) - ] + rows = [] + for seq, stop in enumerate(rec.get("r", []), start=1): + # stop["t"] is like "1900-01-01 05:53:00" + # parse out the time portion + t = datetime.strptime(stop["t"], "%Y-%m-%d %H:%M:%S").time() + rows.append( + { + "fetch_date": ts, # UTC fetch timestamp + "course_id": rec["c"], + "stop_seq": seq, # preserve order! + "stop_id": stop["s"], + "dep_time": t, # real datetime.time + "dep_zone": "Europe/Warsaw", + } + ) table = pa.Table.from_pylist(rows) write_course_posts(table, rec["c"], ts) record_course_fetched(rec["c"], ts) From 96f918445c639d1987101f9c245a934b7be4b0b6 Mon Sep 17 00:00:00 2001 From: Piotr Oleszczyk Date: Mon, 12 May 2025 14:46:33 +0200 Subject: [PATCH 3/3] correct daily snapshot: flatten stops, add fetch date --- src/ptscrapper/scheduler.py | 18 +++++++++++++++++- src/ptscrapper/storage.py | 34 ++++++++++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index 9ff59bf..893694e 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -196,7 +196,23 @@ async def daily_snapshot_job(): try: # stops posts = await fetch_posts(client) - table = pa.Table.from_pylist(posts) + rows = [] + for grp in posts: + # grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] } + for stop in grp.get("p", []): + rows.append( + { + "fetch_date": ts, # utc fetch timestamp + "group_name": grp["n"], + "group_id": grp["s"], + "group_type": grp["t"], + "stop_id": stop["s"], + "stop_type": stop["t"], + "x": stop["x"], + "y": stop["y"], + } + ) + table = pa.Table.from_pylist(rows) fp = write_stops(table, ts) push_to_nas(fp) diff --git a/src/ptscrapper/storage.py b/src/ptscrapper/storage.py index f5ae915..3691fc0 100644 --- a/src/ptscrapper/storage.py +++ b/src/ptscrapper/storage.py @@ -78,21 +78,47 @@ def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: def write_stops(table: pa.Table, snapshot_dt: datetime) -> str: """ - Write the daily snapshot of stops (getPosts). + Take a flat table with columns + fetch_date, group_name, group_id, group_type, + stop_id, stop_type, x, y + and convert x/y → a WKB Point geometry, + drop the raw coords, and write GeoParquet. """ + # 1) pull x,y into numpy + xs = table.column("x").to_numpy() + ys = table.column("y").to_numpy() + + # 2) build PointType with CRS + pt_builder = ga.point().with_crs("EPSG:4326") + + # 3) build WKB geometry + geom = ga.as_wkb(pt_builder.from_geobuffers(None, xs, ys)) + + # 4) drop coords & append geometry column + table = table.drop(["x", "y"]).append_column("geometry", geom) + + # 5) write out subdir = snapshot_dt.strftime("stops/%Y-%m-%d") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir) - local_path = os.path.join(local_dir, f"stops_{HOST_ID}.parquet") - pq.write_table(table, local_path, compression="snappy") - return local_path + filename = f"stops_{HOST_ID}.parquet" + out_path = os.path.join(local_dir, filename) + pq.write_table(table, out_path, compression="snappy") + return out_path def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str: """ Write the daily stop→lines mapping (getPostsLines). """ + # ensure every row gets a fetch_date + if "fetch_date" not in table.column_names: + table = table.append_column( + "fetch_date", + pa.array([snapshot_dt] * table.num_rows, type=pa.timestamp("ns", tz="UTC")), + ) + subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir)