diff --git a/src/ptscrapper/scheduler.py b/src/ptscrapper/scheduler.py index 893694e..fd2c53c 100644 --- a/src/ptscrapper/scheduler.py +++ b/src/ptscrapper/scheduler.py @@ -6,7 +6,6 @@ import os import sqlite3 from datetime import date, datetime, timedelta, timezone from itertools import islice -from zoneinfo import ZoneInfo import httpx import pyarrow as pa @@ -108,33 +107,18 @@ def chunked(iterable, size): async def positions_job(): - fetch_ts = datetime.now(timezone.utc) # when *we* fetched - + ts = datetime.now(timezone.utc) client = get_async_client() try: data = await fetch_positions(client) - # 2) API‐reported timestamp - server_ts_str, *positions = data - # server_ts is local to Warsaw - server_ts = datetime.fromisoformat(server_ts_str).replace( - tzinfo=ZoneInfo("Europe/Warsaw") - ) - - # 3) build Arrow table enrich each dict with both timestamps - enriched = [] - for p in positions: - enriched.append( - { - **p, - "server_ts": server_ts, - "fetched_ts": fetch_ts, - } - ) - table = pa.Table.from_pylist(enriched) - - # 4) pass both timestamps through - write_positions(table, server_ts) - logger.info("wrote positions shard for %s", server_ts) + # first element is timestamp string + ts_str, *positions = data + ts = datetime.fromisoformat(ts_str) + # Build Arrow table + table = pa.Table.from_pylist(positions) + # write locally only + write_positions(table, ts) + logger.info("wrote positions shard for %s", ts) except Exception: # record error details logger.exception("job=positions_job") @@ -145,7 +129,7 @@ async def positions_job(): for cid in {p["c"] for p in positions}: if should_fetch_course(cid) and cid not in in_flight_courses: in_flight_courses.add(cid) - await course_queue.put((cid, server_ts, 0)) + await course_queue.put((cid, ts, 0)) async def fetch_and_store_course(course_id: int, ts: datetime, client=None): @@ -158,21 +142,15 @@ async def fetch_and_store_course(course_id: int, ts: datetime, client=None): recs = await fetch_course_posts(client, [course_id]) # now do your I/O; catch+log only errors here for rec in recs: - rows = [] - for seq, stop in enumerate(rec.get("r", []), start=1): - # stop["t"] is like "1900-01-01 05:53:00" - # parse out the time portion - t = datetime.strptime(stop["t"], "%Y-%m-%d %H:%M:%S").time() - rows.append( - { - "fetch_date": ts, # UTC fetch timestamp - "course_id": rec["c"], - "stop_seq": seq, # preserve order! - "stop_id": stop["s"], - "dep_time": t, # real datetime.time - "dep_zone": "Europe/Warsaw", - } - ) + rows = [ + { + "fetch_date": ts, + "course_id": rec["c"], + "stop_id": stop["s"], + "dep_time": stop["t"], + } + for stop in rec.get("r", []) + ] table = pa.Table.from_pylist(rows) write_course_posts(table, rec["c"], ts) record_course_fetched(rec["c"], ts) @@ -196,23 +174,7 @@ async def daily_snapshot_job(): try: # stops posts = await fetch_posts(client) - rows = [] - for grp in posts: - # grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] } - for stop in grp.get("p", []): - rows.append( - { - "fetch_date": ts, # utc fetch timestamp - "group_name": grp["n"], - "group_id": grp["s"], - "group_type": grp["t"], - "stop_id": stop["s"], - "stop_type": stop["t"], - "x": stop["x"], - "y": stop["y"], - } - ) - table = pa.Table.from_pylist(rows) + table = pa.Table.from_pylist(posts) fp = write_stops(table, ts) push_to_nas(fp) diff --git a/src/ptscrapper/storage.py b/src/ptscrapper/storage.py index 3691fc0..879a24e 100644 --- a/src/ptscrapper/storage.py +++ b/src/ptscrapper/storage.py @@ -24,14 +24,10 @@ def _ensure_dir(path: str): os.makedirs(path, exist_ok=True) -def write_positions( - table: pa.Table, - server_ts: datetime, -) -> str: +def write_positions(table: pa.Table, ts: datetime) -> str: """ Convert x/y to a Point geometry, drop the originals, - and write out a GeoParquet, carrying through - the server_ts and fetched_ts columns. + and write out a GeoParquet. """ # 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy) xs = table.column("x").to_numpy() @@ -48,13 +44,11 @@ def write_positions( # 4) drop old coords & append 'geometry' table = table.drop(["x", "y"]).append_column("geometry", geom) - # (server_ts and fetched_ts are already columns in `table`) - - subdir = server_ts.strftime("positions/%Y/%m/%d/%H") + subdir = ts.strftime("positions/%Y/%m/%d/%H") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir) - filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" + filename = f"{ts.strftime('%M%S')}_{HOST_ID}.parquet" local_path = os.path.join(local_dir, filename) pq.write_table(table, local_path, compression="snappy") @@ -78,47 +72,21 @@ def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: def write_stops(table: pa.Table, snapshot_dt: datetime) -> str: """ - Take a flat table with columns - fetch_date, group_name, group_id, group_type, - stop_id, stop_type, x, y - and convert x/y → a WKB Point geometry, - drop the raw coords, and write GeoParquet. + Write the daily snapshot of stops (getPosts). """ - # 1) pull x,y into numpy - xs = table.column("x").to_numpy() - ys = table.column("y").to_numpy() - - # 2) build PointType with CRS - pt_builder = ga.point().with_crs("EPSG:4326") - - # 3) build WKB geometry - geom = ga.as_wkb(pt_builder.from_geobuffers(None, xs, ys)) - - # 4) drop coords & append geometry column - table = table.drop(["x", "y"]).append_column("geometry", geom) - - # 5) write out subdir = snapshot_dt.strftime("stops/%Y-%m-%d") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir) - filename = f"stops_{HOST_ID}.parquet" - out_path = os.path.join(local_dir, filename) - pq.write_table(table, out_path, compression="snappy") - return out_path + local_path = os.path.join(local_dir, f"stops_{HOST_ID}.parquet") + pq.write_table(table, local_path, compression="snappy") + return local_path def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str: """ Write the daily stop→lines mapping (getPostsLines). """ - # ensure every row gets a fetch_date - if "fetch_date" not in table.column_names: - table = table.append_column( - "fetch_date", - pa.array([snapshot_dt] * table.num_rows, type=pa.timestamp("ns", tz="UTC")), - ) - subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d") local_dir = os.path.join(LOCAL_DATA_DIR, subdir) _ensure_dir(local_dir)