Compare commits
No commits in common. "96f918445c639d1987101f9c245a934b7be4b0b6" and "a5349123cec48a8507eb618f5f28b2086ffd18f5" have entirely different histories.
96f918445c
...
a5349123ce
2 changed files with 28 additions and 98 deletions
|
|
@ -6,7 +6,6 @@ import os
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from datetime import date, datetime, timedelta, timezone
|
from datetime import date, datetime, timedelta, timezone
|
||||||
from itertools import islice
|
from itertools import islice
|
||||||
from zoneinfo import ZoneInfo
|
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
|
|
@ -108,33 +107,18 @@ def chunked(iterable, size):
|
||||||
|
|
||||||
|
|
||||||
async def positions_job():
|
async def positions_job():
|
||||||
fetch_ts = datetime.now(timezone.utc) # when *we* fetched
|
ts = datetime.now(timezone.utc)
|
||||||
|
|
||||||
client = get_async_client()
|
client = get_async_client()
|
||||||
try:
|
try:
|
||||||
data = await fetch_positions(client)
|
data = await fetch_positions(client)
|
||||||
# 2) API‐reported timestamp
|
# first element is timestamp string
|
||||||
server_ts_str, *positions = data
|
ts_str, *positions = data
|
||||||
# server_ts is local to Warsaw
|
ts = datetime.fromisoformat(ts_str)
|
||||||
server_ts = datetime.fromisoformat(server_ts_str).replace(
|
# Build Arrow table
|
||||||
tzinfo=ZoneInfo("Europe/Warsaw")
|
table = pa.Table.from_pylist(positions)
|
||||||
)
|
# write locally only
|
||||||
|
write_positions(table, ts)
|
||||||
# 3) build Arrow table enrich each dict with both timestamps
|
logger.info("wrote positions shard for %s", ts)
|
||||||
enriched = []
|
|
||||||
for p in positions:
|
|
||||||
enriched.append(
|
|
||||||
{
|
|
||||||
**p,
|
|
||||||
"server_ts": server_ts,
|
|
||||||
"fetched_ts": fetch_ts,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
table = pa.Table.from_pylist(enriched)
|
|
||||||
|
|
||||||
# 4) pass both timestamps through
|
|
||||||
write_positions(table, server_ts)
|
|
||||||
logger.info("wrote positions shard for %s", server_ts)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
# record error details
|
# record error details
|
||||||
logger.exception("job=positions_job")
|
logger.exception("job=positions_job")
|
||||||
|
|
@ -145,7 +129,7 @@ async def positions_job():
|
||||||
for cid in {p["c"] for p in positions}:
|
for cid in {p["c"] for p in positions}:
|
||||||
if should_fetch_course(cid) and cid not in in_flight_courses:
|
if should_fetch_course(cid) and cid not in in_flight_courses:
|
||||||
in_flight_courses.add(cid)
|
in_flight_courses.add(cid)
|
||||||
await course_queue.put((cid, server_ts, 0))
|
await course_queue.put((cid, ts, 0))
|
||||||
|
|
||||||
|
|
||||||
async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
|
async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
|
||||||
|
|
@ -158,21 +142,15 @@ async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
|
||||||
recs = await fetch_course_posts(client, [course_id])
|
recs = await fetch_course_posts(client, [course_id])
|
||||||
# now do your I/O; catch+log only errors here
|
# now do your I/O; catch+log only errors here
|
||||||
for rec in recs:
|
for rec in recs:
|
||||||
rows = []
|
rows = [
|
||||||
for seq, stop in enumerate(rec.get("r", []), start=1):
|
|
||||||
# stop["t"] is like "1900-01-01 05:53:00"
|
|
||||||
# parse out the time portion
|
|
||||||
t = datetime.strptime(stop["t"], "%Y-%m-%d %H:%M:%S").time()
|
|
||||||
rows.append(
|
|
||||||
{
|
{
|
||||||
"fetch_date": ts, # UTC fetch timestamp
|
"fetch_date": ts,
|
||||||
"course_id": rec["c"],
|
"course_id": rec["c"],
|
||||||
"stop_seq": seq, # preserve order!
|
|
||||||
"stop_id": stop["s"],
|
"stop_id": stop["s"],
|
||||||
"dep_time": t, # real datetime.time
|
"dep_time": stop["t"],
|
||||||
"dep_zone": "Europe/Warsaw",
|
|
||||||
}
|
}
|
||||||
)
|
for stop in rec.get("r", [])
|
||||||
|
]
|
||||||
table = pa.Table.from_pylist(rows)
|
table = pa.Table.from_pylist(rows)
|
||||||
write_course_posts(table, rec["c"], ts)
|
write_course_posts(table, rec["c"], ts)
|
||||||
record_course_fetched(rec["c"], ts)
|
record_course_fetched(rec["c"], ts)
|
||||||
|
|
@ -196,23 +174,7 @@ async def daily_snapshot_job():
|
||||||
try:
|
try:
|
||||||
# stops
|
# stops
|
||||||
posts = await fetch_posts(client)
|
posts = await fetch_posts(client)
|
||||||
rows = []
|
table = pa.Table.from_pylist(posts)
|
||||||
for grp in posts:
|
|
||||||
# grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] }
|
|
||||||
for stop in grp.get("p", []):
|
|
||||||
rows.append(
|
|
||||||
{
|
|
||||||
"fetch_date": ts, # utc fetch timestamp
|
|
||||||
"group_name": grp["n"],
|
|
||||||
"group_id": grp["s"],
|
|
||||||
"group_type": grp["t"],
|
|
||||||
"stop_id": stop["s"],
|
|
||||||
"stop_type": stop["t"],
|
|
||||||
"x": stop["x"],
|
|
||||||
"y": stop["y"],
|
|
||||||
}
|
|
||||||
)
|
|
||||||
table = pa.Table.from_pylist(rows)
|
|
||||||
fp = write_stops(table, ts)
|
fp = write_stops(table, ts)
|
||||||
push_to_nas(fp)
|
push_to_nas(fp)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,14 +24,10 @@ def _ensure_dir(path: str):
|
||||||
os.makedirs(path, exist_ok=True)
|
os.makedirs(path, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
def write_positions(
|
def write_positions(table: pa.Table, ts: datetime) -> str:
|
||||||
table: pa.Table,
|
|
||||||
server_ts: datetime,
|
|
||||||
) -> str:
|
|
||||||
"""
|
"""
|
||||||
Convert x/y to a Point geometry, drop the originals,
|
Convert x/y to a Point geometry, drop the originals,
|
||||||
and write out a GeoParquet, carrying through
|
and write out a GeoParquet.
|
||||||
the server_ts and fetched_ts columns.
|
|
||||||
"""
|
"""
|
||||||
# 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy)
|
# 1) Extract x/y as numpy arrays (zero_copy_only=False to ensure a NumPy copy)
|
||||||
xs = table.column("x").to_numpy()
|
xs = table.column("x").to_numpy()
|
||||||
|
|
@ -48,13 +44,11 @@ def write_positions(
|
||||||
# 4) drop old coords & append 'geometry'
|
# 4) drop old coords & append 'geometry'
|
||||||
table = table.drop(["x", "y"]).append_column("geometry", geom)
|
table = table.drop(["x", "y"]).append_column("geometry", geom)
|
||||||
|
|
||||||
# (server_ts and fetched_ts are already columns in `table`)
|
subdir = ts.strftime("positions/%Y/%m/%d/%H")
|
||||||
|
|
||||||
subdir = server_ts.strftime("positions/%Y/%m/%d/%H")
|
|
||||||
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
||||||
_ensure_dir(local_dir)
|
_ensure_dir(local_dir)
|
||||||
|
|
||||||
filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet"
|
filename = f"{ts.strftime('%M%S')}_{HOST_ID}.parquet"
|
||||||
local_path = os.path.join(local_dir, filename)
|
local_path = os.path.join(local_dir, filename)
|
||||||
pq.write_table(table, local_path, compression="snappy")
|
pq.write_table(table, local_path, compression="snappy")
|
||||||
|
|
||||||
|
|
@ -78,47 +72,21 @@ def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str:
|
||||||
|
|
||||||
def write_stops(table: pa.Table, snapshot_dt: datetime) -> str:
|
def write_stops(table: pa.Table, snapshot_dt: datetime) -> str:
|
||||||
"""
|
"""
|
||||||
Take a flat table with columns
|
Write the daily snapshot of stops (getPosts).
|
||||||
fetch_date, group_name, group_id, group_type,
|
|
||||||
stop_id, stop_type, x, y
|
|
||||||
and convert x/y → a WKB Point geometry,
|
|
||||||
drop the raw coords, and write GeoParquet.
|
|
||||||
"""
|
"""
|
||||||
# 1) pull x,y into numpy
|
|
||||||
xs = table.column("x").to_numpy()
|
|
||||||
ys = table.column("y").to_numpy()
|
|
||||||
|
|
||||||
# 2) build PointType with CRS
|
|
||||||
pt_builder = ga.point().with_crs("EPSG:4326")
|
|
||||||
|
|
||||||
# 3) build WKB geometry
|
|
||||||
geom = ga.as_wkb(pt_builder.from_geobuffers(None, xs, ys))
|
|
||||||
|
|
||||||
# 4) drop coords & append geometry column
|
|
||||||
table = table.drop(["x", "y"]).append_column("geometry", geom)
|
|
||||||
|
|
||||||
# 5) write out
|
|
||||||
subdir = snapshot_dt.strftime("stops/%Y-%m-%d")
|
subdir = snapshot_dt.strftime("stops/%Y-%m-%d")
|
||||||
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
||||||
_ensure_dir(local_dir)
|
_ensure_dir(local_dir)
|
||||||
|
|
||||||
filename = f"stops_{HOST_ID}.parquet"
|
local_path = os.path.join(local_dir, f"stops_{HOST_ID}.parquet")
|
||||||
out_path = os.path.join(local_dir, filename)
|
pq.write_table(table, local_path, compression="snappy")
|
||||||
pq.write_table(table, out_path, compression="snappy")
|
return local_path
|
||||||
return out_path
|
|
||||||
|
|
||||||
|
|
||||||
def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str:
|
def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str:
|
||||||
"""
|
"""
|
||||||
Write the daily stop→lines mapping (getPostsLines).
|
Write the daily stop→lines mapping (getPostsLines).
|
||||||
"""
|
"""
|
||||||
# ensure every row gets a fetch_date
|
|
||||||
if "fetch_date" not in table.column_names:
|
|
||||||
table = table.append_column(
|
|
||||||
"fetch_date",
|
|
||||||
pa.array([snapshot_dt] * table.num_rows, type=pa.timestamp("ns", tz="UTC")),
|
|
||||||
)
|
|
||||||
|
|
||||||
subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d")
|
subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d")
|
||||||
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
||||||
_ensure_dir(local_dir)
|
_ensure_dir(local_dir)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue