write positions in 10-minute batches

This commit is contained in:
Piotr Oleszczyk 2025-05-12 17:51:48 +02:00
parent 4fca094f89
commit a65049fd62
3 changed files with 105 additions and 30 deletions

View file

@ -3,6 +3,7 @@
import asyncio import asyncio
import logging import logging
import pyarrow as pa
import uvloop import uvloop
from ptscrapper.scheduler import start_scheduler from ptscrapper.scheduler import start_scheduler
@ -14,6 +15,8 @@ try:
except ImportError: except ImportError:
pass pass
pa.set_cpu_count(1)
def main(): def main():
# 1) Configure the root logger # 1) Configure the root logger

View file

@ -32,7 +32,7 @@ from .storage import (
push_to_nas, push_to_nas,
write_course_geometry, write_course_geometry,
write_course_posts, write_course_posts,
write_positions, write_positions_rb,
write_posts_lines, write_posts_lines,
write_stops, write_stops,
) )
@ -120,22 +120,9 @@ async def positions_job():
server_ts = datetime.fromisoformat(server_ts_str).replace( server_ts = datetime.fromisoformat(server_ts_str).replace(
tzinfo=ZoneInfo("Europe/Warsaw") tzinfo=ZoneInfo("Europe/Warsaw")
) )
# 3) hand off raw list to our super-light RecordBatchBuilder
# 3) build Arrow table enrich each dict with both timestamps write_positions_rb(positions, server_ts, fetch_ts)
enriched = [] logger.info("appended %d rows into 10 min shard %s", len(positions), server_ts)
for p in positions:
enriched.append(
{
**p,
"server_ts": server_ts,
"fetched_ts": fetch_ts,
}
)
table = pa.Table.from_pylist(enriched)
# 4) pass both timestamps through
write_positions(table, server_ts)
logger.info("wrote positions shard for %s", server_ts)
except Exception: except Exception:
# record error details # record error details
logger.exception("job=positions_job") logger.exception("job=positions_job")

View file

@ -2,6 +2,7 @@
import glob import glob
import os import os
import threading
from datetime import date, datetime from datetime import date, datetime
import geoarrow.pyarrow as ga import geoarrow.pyarrow as ga
@ -9,6 +10,7 @@ import polyline
import pyarrow as pa import pyarrow as pa
import pyarrow.dataset as ds import pyarrow.dataset as ds
import pyarrow.parquet as pq import pyarrow.parquet as pq
from pyarrow import lib as _lib
from rclone_python import rclone from rclone_python import rclone
from .config import HOST_ID # unique identifier for this host/container from .config import HOST_ID # unique identifier for this host/container
@ -18,29 +20,112 @@ from .config import ( # base path on the remote, e.g. "pt-scraper-data"
RCLONE_REMOTE_PATH, RCLONE_REMOTE_PATH,
) )
# a fixed schema for every positions batch
POSITIONS_SCHEMA = pa.schema(
[
("v", pa.int64()),
("c", pa.int64()),
("x", pa.float64()),
("y", pa.float64()),
("l", pa.string()),
("t", pa.string()),
("s", pa.string()),
("d", pa.string()),
("e", pa.int64()),
("server_ts", pa.timestamp("ns", tz="Europe/Warsaw")),
("fetched_ts", pa.timestamp("ns", tz="UTC")),
]
)
# module-level rolling writer state
_positions_lock = threading.Lock()
_positions_writer = None # pq.ParquetWriter
_positions_bucket = None # datetime start of current 10-min
_positions_path = None # path to current .parquet file
def _ensure_dir(path: str): def _ensure_dir(path: str):
"""Make sure the directory exists.""" """Make sure the directory exists."""
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
def write_positions( def _rollover_writer(server_ts):
table: pa.Table,
server_ts: datetime,
) -> str:
""" """
Write out the raw positions (including x,y, server_ts, fetched_ts). Close the old bucket (if any), push it via rclone,
Geometry will be added later during daily compaction. then open a new ParquetWriter for the 10-min bucket of `server_ts`.
""" """
global _positions_writer, _positions_bucket, _positions_path
subdir = server_ts.strftime("positions/%Y/%m/%d/%H") # floor to ten-minute
local_dir = os.path.join(LOCAL_DATA_DIR, subdir) bucket = server_ts.replace(
_ensure_dir(local_dir) minute=(server_ts.minute // 10) * 10, second=0, microsecond=0
)
filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet" if bucket != _positions_bucket:
local_path = os.path.join(local_dir, filename) # -- close previous shard
pq.write_table(table, local_path, compression="snappy") if _positions_writer is not None:
return local_path _positions_writer.close()
# -- open new writer
subdir = bucket.strftime("positions/%Y/%m/%d/%H")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
_ensure_dir(local_dir)
fn = f"{bucket.strftime('%M%S')}_{HOST_ID}.parquet"
_positions_path = os.path.join(local_dir, fn)
_positions_writer = pq.ParquetWriter(
_positions_path,
schema=POSITIONS_SCHEMA,
compression="none", # or "none" if you prefer to compress only at nightly
use_dictionary=False,
write_statistics=False,
)
_positions_bucket = bucket
def write_positions_rb(positions: list[dict], server_ts, fetched_ts) -> None:
"""
Append the incoming `positions` (list of dicts) into the
current 10-minute Parquet shard via a RecordBatchBuilder.
"""
global _positions_writer
with _positions_lock:
_rollover_writer(server_ts)
# 1) prepare one builder per column in the same order as POSITIONS_SCHEMA
builders = [
_lib.Int64Builder(), # v
_lib.Int64Builder(), # c
_lib.Float64Builder(), # x
_lib.Float64Builder(), # y
_lib.StringBuilder(), # l
_lib.StringBuilder(), # t
_lib.StringBuilder(), # s
_lib.StringBuilder(), # d
_lib.Int64Builder(), # e
_lib.TimestampBuilder("ns", tz="Europe/Warsaw"), # server_ts
_lib.TimestampBuilder("ns", tz="UTC"), # fetched_ts
]
# 2) append into each builder
for p in positions:
builders[0].append(p["v"])
builders[1].append(p["c"])
builders[2].append(p["x"])
builders[3].append(p["y"])
builders[4].append(p["l"])
builders[5].append(p["t"])
builders[6].append(p["s"])
builders[7].append(p["d"])
builders[8].append(p["e"])
builders[9].append(server_ts)
builders[10].append(fetched_ts)
# 3) finish → arrays, make a RecordBatch, write it
arrays = [b.finish() for b in builders]
batch = pa.RecordBatch.from_arrays(arrays, schema=POSITIONS_SCHEMA)
_positions_writer.write_batch(batch)
def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: