correct daily snapshot: flatten stops, add fetch date
This commit is contained in:
parent
6891e3b206
commit
96f918445c
2 changed files with 47 additions and 5 deletions
|
|
@ -196,7 +196,23 @@ async def daily_snapshot_job():
|
||||||
try:
|
try:
|
||||||
# stops
|
# stops
|
||||||
posts = await fetch_posts(client)
|
posts = await fetch_posts(client)
|
||||||
table = pa.Table.from_pylist(posts)
|
rows = []
|
||||||
|
for grp in posts:
|
||||||
|
# grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] }
|
||||||
|
for stop in grp.get("p", []):
|
||||||
|
rows.append(
|
||||||
|
{
|
||||||
|
"fetch_date": ts, # utc fetch timestamp
|
||||||
|
"group_name": grp["n"],
|
||||||
|
"group_id": grp["s"],
|
||||||
|
"group_type": grp["t"],
|
||||||
|
"stop_id": stop["s"],
|
||||||
|
"stop_type": stop["t"],
|
||||||
|
"x": stop["x"],
|
||||||
|
"y": stop["y"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
table = pa.Table.from_pylist(rows)
|
||||||
fp = write_stops(table, ts)
|
fp = write_stops(table, ts)
|
||||||
push_to_nas(fp)
|
push_to_nas(fp)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -78,21 +78,47 @@ def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str:
|
||||||
|
|
||||||
def write_stops(table: pa.Table, snapshot_dt: datetime) -> str:
|
def write_stops(table: pa.Table, snapshot_dt: datetime) -> str:
|
||||||
"""
|
"""
|
||||||
Write the daily snapshot of stops (getPosts).
|
Take a flat table with columns
|
||||||
|
fetch_date, group_name, group_id, group_type,
|
||||||
|
stop_id, stop_type, x, y
|
||||||
|
and convert x/y → a WKB Point geometry,
|
||||||
|
drop the raw coords, and write GeoParquet.
|
||||||
"""
|
"""
|
||||||
|
# 1) pull x,y into numpy
|
||||||
|
xs = table.column("x").to_numpy()
|
||||||
|
ys = table.column("y").to_numpy()
|
||||||
|
|
||||||
|
# 2) build PointType with CRS
|
||||||
|
pt_builder = ga.point().with_crs("EPSG:4326")
|
||||||
|
|
||||||
|
# 3) build WKB geometry
|
||||||
|
geom = ga.as_wkb(pt_builder.from_geobuffers(None, xs, ys))
|
||||||
|
|
||||||
|
# 4) drop coords & append geometry column
|
||||||
|
table = table.drop(["x", "y"]).append_column("geometry", geom)
|
||||||
|
|
||||||
|
# 5) write out
|
||||||
subdir = snapshot_dt.strftime("stops/%Y-%m-%d")
|
subdir = snapshot_dt.strftime("stops/%Y-%m-%d")
|
||||||
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
||||||
_ensure_dir(local_dir)
|
_ensure_dir(local_dir)
|
||||||
|
|
||||||
local_path = os.path.join(local_dir, f"stops_{HOST_ID}.parquet")
|
filename = f"stops_{HOST_ID}.parquet"
|
||||||
pq.write_table(table, local_path, compression="snappy")
|
out_path = os.path.join(local_dir, filename)
|
||||||
return local_path
|
pq.write_table(table, out_path, compression="snappy")
|
||||||
|
return out_path
|
||||||
|
|
||||||
|
|
||||||
def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str:
|
def write_posts_lines(table: pa.Table, snapshot_dt: datetime) -> str:
|
||||||
"""
|
"""
|
||||||
Write the daily stop→lines mapping (getPostsLines).
|
Write the daily stop→lines mapping (getPostsLines).
|
||||||
"""
|
"""
|
||||||
|
# ensure every row gets a fetch_date
|
||||||
|
if "fetch_date" not in table.column_names:
|
||||||
|
table = table.append_column(
|
||||||
|
"fetch_date",
|
||||||
|
pa.array([snapshot_dt] * table.num_rows, type=pa.timestamp("ns", tz="UTC")),
|
||||||
|
)
|
||||||
|
|
||||||
subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d")
|
subdir = snapshot_dt.strftime("stops_lines/%Y-%m-%d")
|
||||||
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
||||||
_ensure_dir(local_dir)
|
_ensure_dir(local_dir)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue