Compare commits
No commits in common. "20238e02700ac5912c4032386148c54d77f59456" and "3469511bd8af3a13fdc7c8a30745894f026bc743" have entirely different histories.
20238e0270
...
3469511bd8
2 changed files with 34 additions and 65 deletions
|
|
@ -32,13 +32,12 @@ from .storage import (
|
||||||
push_to_nas,
|
push_to_nas,
|
||||||
write_course_geometry,
|
write_course_geometry,
|
||||||
write_course_posts,
|
write_course_posts,
|
||||||
write_positions_batched,
|
write_positions,
|
||||||
write_posts_lines,
|
write_posts_lines,
|
||||||
write_stops,
|
write_stops,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger("ptscrapper.scheduler")
|
logger = logging.getLogger("ptscrapper.scheduler")
|
||||||
_client: httpx.AsyncClient = None
|
|
||||||
|
|
||||||
# Path to our local SQLite state database
|
# Path to our local SQLite state database
|
||||||
STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db")
|
STATE_DB = os.path.join(LOCAL_DATA_DIR, "state.db")
|
||||||
|
|
@ -112,8 +111,9 @@ def chunked(iterable, size):
|
||||||
async def positions_job():
|
async def positions_job():
|
||||||
fetch_ts = datetime.now(timezone.utc) # when *we* fetched
|
fetch_ts = datetime.now(timezone.utc) # when *we* fetched
|
||||||
|
|
||||||
|
client = get_async_client()
|
||||||
try:
|
try:
|
||||||
data = await fetch_positions(_client)
|
data = await fetch_positions(client)
|
||||||
# 2) API‐reported timestamp
|
# 2) API‐reported timestamp
|
||||||
server_ts_str, *positions = data
|
server_ts_str, *positions = data
|
||||||
# server_ts is local to Warsaw
|
# server_ts is local to Warsaw
|
||||||
|
|
@ -134,11 +134,13 @@ async def positions_job():
|
||||||
table = pa.Table.from_pylist(enriched)
|
table = pa.Table.from_pylist(enriched)
|
||||||
|
|
||||||
# 4) pass both timestamps through
|
# 4) pass both timestamps through
|
||||||
write_positions_batched(table, server_ts)
|
write_positions(table, server_ts)
|
||||||
logger.info("wrote positions shard for %s", server_ts)
|
logger.info("wrote positions shard for %s", server_ts)
|
||||||
except Exception:
|
except Exception:
|
||||||
# record error details
|
# record error details
|
||||||
logger.exception("job=positions_job")
|
logger.exception("job=positions_job")
|
||||||
|
finally:
|
||||||
|
await client.aclose()
|
||||||
|
|
||||||
# schedule course fetches without awaiting
|
# schedule course fetches without awaiting
|
||||||
for cid in {p["c"] for p in positions}:
|
for cid in {p["c"] for p in positions}:
|
||||||
|
|
@ -147,10 +149,14 @@ async def positions_job():
|
||||||
await course_queue.put((cid, server_ts, 0))
|
await course_queue.put((cid, server_ts, 0))
|
||||||
|
|
||||||
|
|
||||||
async def fetch_and_store_course(course_id: int, ts: datetime):
|
async def fetch_and_store_course(course_id: int, ts: datetime, client=None):
|
||||||
"""Fetch one course’s details and store them."""
|
"""Fetch one course’s details and store them."""
|
||||||
|
own_client = False
|
||||||
|
if client is None:
|
||||||
|
client = get_async_client()
|
||||||
|
own_client = True
|
||||||
try:
|
try:
|
||||||
recs = await fetch_course_posts(_client, [course_id])
|
recs = await fetch_course_posts(client, [course_id])
|
||||||
# now do your I/O; catch+log only errors here
|
# now do your I/O; catch+log only errors here
|
||||||
for rec in recs:
|
for rec in recs:
|
||||||
rows = []
|
rows = []
|
||||||
|
|
@ -180,13 +186,17 @@ async def fetch_and_store_course(course_id: int, ts: datetime):
|
||||||
if isinstance(e, ConnectTimeout):
|
if isinstance(e, ConnectTimeout):
|
||||||
raise
|
raise
|
||||||
logger.exception("job=course_posts_job")
|
logger.exception("job=course_posts_job")
|
||||||
|
finally:
|
||||||
|
if own_client:
|
||||||
|
await client.aclose()
|
||||||
|
|
||||||
|
|
||||||
async def daily_snapshot_job():
|
async def daily_snapshot_job():
|
||||||
ts = datetime.now(timezone.utc)
|
ts = datetime.now(timezone.utc)
|
||||||
|
client = get_async_client()
|
||||||
try:
|
try:
|
||||||
# stops
|
# stops
|
||||||
posts = await fetch_posts(_client)
|
posts = await fetch_posts(client)
|
||||||
rows = []
|
rows = []
|
||||||
for grp in posts:
|
for grp in posts:
|
||||||
# grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] }
|
# grp: { "n": name, "s": group_id, "t": group_type, "p": [ {s,x,y,t}, … ] }
|
||||||
|
|
@ -208,7 +218,7 @@ async def daily_snapshot_job():
|
||||||
push_to_nas(fp)
|
push_to_nas(fp)
|
||||||
|
|
||||||
# lines per stop
|
# lines per stop
|
||||||
posts_lines = await fetch_posts_lines(_client)
|
posts_lines = await fetch_posts_lines(client)
|
||||||
table2 = pa.Table.from_pylist(posts_lines)
|
table2 = pa.Table.from_pylist(posts_lines)
|
||||||
fp2 = write_posts_lines(table2, ts)
|
fp2 = write_posts_lines(table2, ts)
|
||||||
push_to_nas(fp2)
|
push_to_nas(fp2)
|
||||||
|
|
@ -216,6 +226,8 @@ async def daily_snapshot_job():
|
||||||
logger.info("wrote daily snapshot for %s", ts)
|
logger.info("wrote daily snapshot for %s", ts)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("job=daily_snapshot_job")
|
logger.exception("job=daily_snapshot_job")
|
||||||
|
finally:
|
||||||
|
await client.aclose()
|
||||||
|
|
||||||
|
|
||||||
async def daily_compact_and_push():
|
async def daily_compact_and_push():
|
||||||
|
|
@ -305,9 +317,6 @@ async def start_scheduler():
|
||||||
"""Initialize state DB, schedule all jobs, and start the loop."""
|
"""Initialize state DB, schedule all jobs, and start the loop."""
|
||||||
init_db()
|
init_db()
|
||||||
|
|
||||||
global _client
|
|
||||||
_client = get_async_client()
|
|
||||||
|
|
||||||
# spin up N workers before scheduling jobs
|
# spin up N workers before scheduling jobs
|
||||||
for i in range(COURSE_WORKER_COUNT):
|
for i in range(COURSE_WORKER_COUNT):
|
||||||
asyncio.create_task(course_worker(f"worker-{i+1}"))
|
asyncio.create_task(course_worker(f"worker-{i+1}"))
|
||||||
|
|
@ -331,8 +340,4 @@ async def start_scheduler():
|
||||||
)
|
)
|
||||||
|
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
await asyncio.Event().wait()
|
||||||
try:
|
|
||||||
await asyncio.Event().wait()
|
|
||||||
finally:
|
|
||||||
await _client.aclose()
|
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
import glob
|
import glob
|
||||||
import os
|
import os
|
||||||
import threading
|
|
||||||
from datetime import date, datetime
|
from datetime import date, datetime
|
||||||
|
|
||||||
import geoarrow.pyarrow as ga
|
import geoarrow.pyarrow as ga
|
||||||
|
|
@ -19,64 +18,29 @@ from .config import ( # base path on the remote, e.g. "pt-scraper-data"
|
||||||
RCLONE_REMOTE_PATH,
|
RCLONE_REMOTE_PATH,
|
||||||
)
|
)
|
||||||
|
|
||||||
# module‐level state for our rolling 10-min writer
|
|
||||||
_positions_writer = None # pq.ParquetWriter
|
|
||||||
_positions_batch = None # datetime for current batch start
|
|
||||||
_positions_path = None # where we’re writing right now
|
|
||||||
_positions_lock = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def _ensure_dir(path: str):
|
def _ensure_dir(path: str):
|
||||||
"""Make sure the directory exists."""
|
"""Make sure the directory exists."""
|
||||||
os.makedirs(path, exist_ok=True)
|
os.makedirs(path, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
def write_positions_batched(table: pa.Table, server_ts: datetime) -> None:
|
def write_positions(
|
||||||
|
table: pa.Table,
|
||||||
|
server_ts: datetime,
|
||||||
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Append `table` into a rolling Parquet file, one file per 10-minute window.
|
Write out the raw positions (including x,y, server_ts, fetched_ts).
|
||||||
When the window rolls over, we close+push the old file and start a new one.
|
Geometry will be added later during daily compaction.
|
||||||
"""
|
"""
|
||||||
global _positions_writer, _positions_batch, _positions_path
|
|
||||||
|
|
||||||
# compute the ten-minute bucket start (floor to minute//10 * 10)
|
subdir = server_ts.strftime("positions/%Y/%m/%d/%H")
|
||||||
batch_start = server_ts.replace(
|
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
||||||
minute=(server_ts.minute // 10) * 10, second=0, microsecond=0
|
_ensure_dir(local_dir)
|
||||||
)
|
|
||||||
|
|
||||||
with _positions_lock:
|
filename = f"{server_ts.strftime('%M%S')}_{HOST_ID}.parquet"
|
||||||
# if we’ve moved into a new bucket, close & push the old
|
local_path = os.path.join(local_dir, filename)
|
||||||
if _positions_batch is None or batch_start != _positions_batch:
|
pq.write_table(table, local_path, compression="snappy")
|
||||||
if _positions_writer is not None:
|
return local_path
|
||||||
_positions_writer.close()
|
|
||||||
# push the completed file
|
|
||||||
rel = os.path.relpath(_positions_path, LOCAL_DATA_DIR).replace(
|
|
||||||
os.sep, "/"
|
|
||||||
)
|
|
||||||
rclone.copy(
|
|
||||||
_positions_path,
|
|
||||||
f"{RCLONE_REMOTE}:{RCLONE_REMOTE_PATH}/{rel}",
|
|
||||||
ignore_existing=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# start a new writer for the new bucket
|
|
||||||
subdir = batch_start.strftime("positions/%Y/%m/%d/%H")
|
|
||||||
local_dir = os.path.join(LOCAL_DATA_DIR, subdir)
|
|
||||||
_ensure_dir(local_dir)
|
|
||||||
fn = f"{batch_start.strftime('%M%S')}_{HOST_ID}.parquet"
|
|
||||||
_positions_path = os.path.join(local_dir, fn)
|
|
||||||
_positions_writer = pq.ParquetWriter(
|
|
||||||
_positions_path,
|
|
||||||
table.schema,
|
|
||||||
compression="snappy",
|
|
||||||
use_dictionary=False,
|
|
||||||
write_statistics=False,
|
|
||||||
)
|
|
||||||
_positions_batch = batch_start
|
|
||||||
|
|
||||||
# write this 10-s table into the current bucket’s file
|
|
||||||
_positions_writer.write_table(table)
|
|
||||||
|
|
||||||
# no return path: we push only on rollover
|
|
||||||
|
|
||||||
|
|
||||||
def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str:
|
def write_course_posts(table: pa.Table, course_id: int, ts: datetime) -> str:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue