write positions in 10-minute batches

This commit is contained in:
Piotr Oleszczyk 2025-05-12 17:51:48 +02:00
parent 4fca094f89
commit 3344809106
3 changed files with 104 additions and 30 deletions

View file

@ -3,6 +3,7 @@
import asyncio import asyncio
import logging import logging
import pyarrow as pa
import uvloop import uvloop
from ptscrapper.scheduler import start_scheduler from ptscrapper.scheduler import start_scheduler
@ -14,6 +15,8 @@ try:
except ImportError: except ImportError:
pass pass
pa.set_cpu_count(1)
def main(): def main():
# 1) Configure the root logger # 1) Configure the root logger

View file

@ -32,7 +32,7 @@ from .storage import (
push_to_nas, push_to_nas,
write_course_geometry, write_course_geometry,
write_course_posts, write_course_posts,
write_positions, write_positions_rb,
write_posts_lines, write_posts_lines,
write_stops, write_stops,
) )
@ -120,22 +120,9 @@ async def positions_job():
server_ts = datetime.fromisoformat(server_ts_str).replace( server_ts = datetime.fromisoformat(server_ts_str).replace(
tzinfo=ZoneInfo("Europe/Warsaw") tzinfo=ZoneInfo("Europe/Warsaw")
) )
# 3) hand off raw list to our super-light RecordBatchBuilder
# 3) build Arrow table enrich each dict with both timestamps write_positions_rb(positions, server_ts, fetch_ts)
enriched = [] logger.info("appended %d rows into 10 min shard %s", len(positions), server_ts)
for p in positions:
enriched.append(
{
**p,
"server_ts": server_ts,
"fetched_ts": fetch_ts,
}
)
table = pa.Table.from_pylist(enriched)
# 4) pass both timestamps through
write_positions(table, server_ts)
logger.info("wrote positions shard for %s", server_ts)
except Exception: except Exception:
# record error details # record error details
logger.exception("job=positions_job") logger.exception("job=positions_job")

View file

@ -2,6 +2,7 @@
import glob import glob
import os import os
import threading
from datetime import date, datetime from datetime import date, datetime
import geoarrow.pyarrow as ga import geoarrow.pyarrow as ga
@ -18,29 +19,112 @@ from .config import ( # base path on the remote, e.g. "pt-scraper-data"
RCLONE_REMOTE_PATH, RCLONE_REMOTE_PATH,
) )
# a fixed schema for every positions batch
POSITIONS_SCHEMA = pa.schema(
[
("v", pa.int64()),
("c", pa.int64()),
("x", pa.float64()),
("y", pa.float64()),
("l", pa.string()),
("t", pa.string()),
("s", pa.string()),
("d", pa.string()),
("e", pa.int64()),
("server_ts", pa.timestamp("ns", tz="Europe/Warsaw")),
("fetched_ts", pa.timestamp("ns", tz="UTC")),
]
)
# module-level rolling writer state
_positions_lock = threading.Lock()
_positions_writer = None # pq.ParquetWriter
_positions_bucket = None # datetime start of current 10-min
_positions_path = None # path to current .parquet file
def _ensure_dir(path: str): def _ensure_dir(path: str):
"""Make sure the directory exists.""" """Make sure the directory exists."""
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
def write_positions( def _rollover_writer(server_ts):
table: pa.Table,
server_ts: datetime,
) -> str:
""" """
Write out the raw positions (including x,y, server_ts, fetched_ts). Close the old bucket (if any), push it via rclone,
Geometry will be added later during daily compaction. then open a new ParquetWriter for the 10-min bucket of `server_ts`.
""" """
global _positions_writer, _positions_bucket, _positions_path
subdir = server_ts.strftime("positions/%Y/%m/%d/%H") # floor to ten-minute
bucket = server_ts.replace(
minute=(server_ts.minute // 10) * 10, second=0, microsecond=0
)
if bucket != _positions_bucket:
# -- close previous shard
if _positions_writer is not None:
_positions_writer.close()
# -- open new writer
subdir = bucket.strftime("positions/%Y/%m/%d/%H")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir) local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
_ensure_dir(local_dir) _ensure_dir(local_dir)
filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" fn = f"{bucket.strftime('%M%S')}_{HOST_ID}.parquet"
local_path = os.path.join(local_dir, filename) _positions_path = os.path.join(local_dir, fn)
pq.write_table(table, local_path, compression="snappy")
return local_path _positions_writer = pq.ParquetWriter(
_positions_path,
schema=POSITIONS_SCHEMA,
compression="none", # or "none" if you prefer to compress only at nightly
use_dictionary=False,
write_statistics=False,
)
_positions_bucket = bucket
def write_positions_rb(positions: list[dict], server_ts, fetched_ts) -> None:
"""
Append the incoming `positions` (list of dicts) into the
current 10-minute Parquet shard via a RecordBatchBuilder.
"""
global _positions_writer
with _positions_lock:
_rollover_writer(server_ts)
# 1) prepare one builder per column in the same order as SCHEMA
builders = [
pa.Int64Builder(), # v
pa.Int64Builder(), # c
pa.Float64Builder(), # x
pa.Float64Builder(), # y
pa.StringBuilder(), # l
pa.StringBuilder(), # t
pa.StringBuilder(), # s
pa.StringBuilder(), # d
pa.Int64Builder(), # e
pa.TimestampBuilder("ns", tz="Europe/Warsaw"), # server_ts
pa.TimestampBuilder("ns", tz="UTC"), # fetched_ts
]
# 2) append into each builder
for p in positions:
builders[0].append(p["v"])
builders[1].append(p["c"])
builders[2].append(p["x"])
builders[3].append(p["y"])
builders[4].append(p["l"])
builders[5].append(p["t"])
builders[6].append(p["s"])
builders[7].append(p["d"])
builders[8].append(p["e"])
builders[9].append(server_ts)
builders[10].append(fetched_ts)
# 3) finish → arrays, make a RecordBatch, write it
arrays = [b.finish() for b in builders]
batch = pa.RecordBatch.from_arrays(arrays, schema=POSITIONS_SCHEMA)
_positions_writer.write_batch(batch)
def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: