Compare commits

..

No commits in common. "20238e02700ac5912c4032386148c54d77f59456" and "3469511bd8af3a13fdc7c8a30745894f026bc743" have entirely different histories.

2 changed files with 34 additions and 65 deletions

View file

@ -32,13 +32,12 @@ from .storage import (
push_to_nas, push_to_nas,
write_course_geometry, write_course_geometry,
write_course_posts, write_course_posts,
write_positions_batched, write_positions,
write_posts_lines, write_posts_lines,
write_stops, write_stops,
) )
logger = logging.getLogger("ptscrapper.scheduler") logger = logging.getLogger("ptscrapper.scheduler")
_client: httpx.AsyncClient = None
# Path to our local SQLite state database # Path to our local SQLite state database
STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db") STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db")
@ -112,8 +111,9 @@ def chunked(iterable, size):
async def positions_job(): async def positions_job():
fetch_ts = datetime.now(timezone.utc) # when *we* fetched fetch_ts = datetime.now(timezone.utc) # when *we* fetched
client = get_async_client()
try: try:
data = await fetch_positions(_client) data = await fetch_positions(client)
# 2) APIreported timestamp # 2) APIreported timestamp
server_ts_str, *positions = data server_ts_str, *positions = data
# server_ts is local to Warsaw # server_ts is local to Warsaw
@ -134,11 +134,13 @@ async def positions_job():
table = pa.Table.from_pylist(enriched) table = pa.Table.from_pylist(enriched)
# 4) pass both timestamps through # 4) pass both timestamps through
write_positions_batched(table, server_ts) write_positions(table, server_ts)
logger.info("wrote positions shard for %s", server_ts) logger.info("wrote positions shard for %s", server_ts)
except Exception: except Exception:
# record error details # record error details
logger.exception("job=positions_job") logger.exception("job=positions_job")
finally:
await client.aclose()
# schedule course fetches without awaiting # schedule course fetches without awaiting
for cid in {p["c"] for p in positions}: for cid in {p["c"] for p in positions}:
@ -147,10 +149,14 @@ async def positions_job():
await course_queue.put((cid, server_ts, 0)) await course_queue.put((cid, server_ts, 0))
async def fetch_and_store_course(course_id: int, ts: datetime): async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
"""Fetch one courses details and store them.""" """Fetch one courses details and store them."""
own_client = False
if client is None:
client = get_async_client()
own_client = True
try: try:
recs = await fetch_course_posts(_client, [course_id]) recs = await fetch_course_posts(client, [course_id])
# now do your I/O; catch+log only errors here # now do your I/O; catch+log only errors here
for rec in recs: for rec in recs:
rows = [] rows = []
@ -180,13 +186,17 @@ async def fetch_and_store_course(course_id: int, ts: datetime):
if isinstance(e, ConnectTimeout): if isinstance(e, ConnectTimeout):
raise raise
logger.exception("job=course_posts_job") logger.exception("job=course_posts_job")
finally:
if own_client:
await client.aclose()
async def daily_snapshot_job(): async def daily_snapshot_job():
ts = datetime.now(timezone.utc) ts = datetime.now(timezone.utc)
client = get_async_client()
try: try:
# stops # stops
posts = await fetch_posts(_client) posts = await fetch_posts(client)
rows = [] rows = []
for grp in posts: for grp in posts:
# grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] } # grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] }
@ -208,7 +218,7 @@ async def daily_snapshot_job():
push_to_nas(fp) push_to_nas(fp)
# lines per stop # lines per stop
posts_lines = await fetch_posts_lines(_client) posts_lines = await fetch_posts_lines(client)
table2 = pa.Table.from_pylist(posts_lines) table2 = pa.Table.from_pylist(posts_lines)
fp2 = write_posts_lines(table2, ts) fp2 = write_posts_lines(table2, ts)
push_to_nas(fp2) push_to_nas(fp2)
@ -216,6 +226,8 @@ async def daily_snapshot_job():
logger.info("wrote daily snapshot for %s", ts) logger.info("wrote daily snapshot for %s", ts)
except Exception: except Exception:
logger.exception("job=daily_snapshot_job") logger.exception("job=daily_snapshot_job")
finally:
await client.aclose()
async def daily_compact_and_push(): async def daily_compact_and_push():
@ -305,9 +317,6 @@ async def start_scheduler():
"""Initialize state DB, schedule all jobs, and start the loop.""" """Initialize state DB, schedule all jobs, and start the loop."""
init_db() init_db()
global _client
_client = get_async_client()
# spin up N workers before scheduling jobs # spin up N workers before scheduling jobs
for i in range(COURSE_WORKER_COUNT): for i in range(COURSE_WORKER_COUNT):
asyncio.create_task(course_worker(f"worker-{i+1}")) asyncio.create_task(course_worker(f"worker-{i+1}"))
@ -331,8 +340,4 @@ async def start_scheduler():
) )
scheduler.start() scheduler.start()
await asyncio.Event().wait()
try:
await asyncio.Event().wait()
finally:
await _client.aclose()

View file

@ -2,7 +2,6 @@
import glob import glob
import os import os
import threading
from datetime import date, datetime from datetime import date, datetime
import geoarrow.pyarrow as ga import geoarrow.pyarrow as ga
@ -19,64 +18,29 @@ from .config import ( # base path on the remote, e.g. "pt-scraper-data"
RCLONE_REMOTE_PATH, RCLONE_REMOTE_PATH,
) )
# modulelevel state for our rolling 10-min writer
_positions_writer = None # pq.ParquetWriter
_positions_batch = None # datetime for current batch start
_positions_path = None # where were writing right now
_positions_lock = threading.Lock()
def _ensure_dir(path: str): def _ensure_dir(path: str):
"""Make sure the directory exists.""" """Make sure the directory exists."""
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
def write_positions_batched(table: pa.Table, server_ts: datetime) -> None: def write_positions(
table: pa.Table,
server_ts: datetime,
) -> str:
""" """
Append `table` into a rolling Parquet file, one file per 10-minute window. Write out the raw positions (including x,y, server_ts, fetched_ts).
When the window rolls over, we close+push the old file and start a new one. Geometry will be added later during daily compaction.
""" """
global _positions_writer, _positions_batch, _positions_path
# compute the ten-minute bucket start (floor to minute//10 * 10) subdir = server_ts.strftime("positions/%Y/%m/%d/%H")
batch_start = server_ts.replace( local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
minute=(server_ts.minute // 10) * 10, second=0, microsecond=0 _ensure_dir(local_dir)
)
with _positions_lock: filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet"
# if weve moved into a new bucket, close & push the old local_path = os.path.join(local_dir, filename)
if _positions_batch is None or batch_start != _positions_batch: pq.write_table(table, local_path, compression="snappy")
if _positions_writer is not None: return local_path
_positions_writer.close()
# push the completed file
rel = os.path.relpath(_positions_path, LOCAL_DATA_DIR).replace(
os.sep, "/"
)
rclone.copy(
_positions_path,
f"{RCLONE_REMOTE}:{RCLONE_REMOTE_PATH}/{rel}",
ignore_existing=True,
)
# start a new writer for the new bucket
subdir = batch_start.strftime("positions/%Y/%m/%d/%H")
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
_ensure_dir(local_dir)
fn = f"{batch_start.strftime('%M%S')}_{HOST_ID}.parquet"
_positions_path = os.path.join(local_dir, fn)
_positions_writer = pq.ParquetWriter(
_positions_path,
table.schema,
compression="snappy",
use_dictionary=False,
write_statistics=False,
)
_positions_batch = batch_start
# write this 10-s table into the current buckets file
_positions_writer.write_table(table)
# no return path: we push only on rollover
def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str: