diff --git a/src/ptscrapper/storage.py b/src/ptscrapper/storage.py index bfce5d7..879a24e 100644 --- a/src/ptscrapper/storage.py +++ b/src/ptscrapper/storage.py @@ -115,12 +115,11 @@ def compact_positions(date_to_compact: date) -> str: merge into one table, write a single daily file. Returns the path to that daily Parquet. """ - # build glob pattern for that day - day_glob = os.path.join( - LOCAL_DATA_DIR, f"positions/{date_to_compact:%Y/%m/%d}/**/*.parquet" - ) - # load dataset - dataset = ds.dataset(day_glob, format="parquet", partitioning="hive") + day_dir = os.path.join(LOCAL_DATA_DIR, f"positions/{date_to_compact:%Y/%m/%d}") + if not os.path.isdir(day_dir): + # nothing to compact + return "" + dataset = ds.dataset(day_dir, format="parquet", partitioning="hive") table = dataset.to_table() # merges all batches # write one daily file @@ -148,10 +147,10 @@ def compact_course_posts(date_to_compact: date) -> str: Read all shards under courses/YYYY-MM-DD/*.parquet, merge into one table, write a single daily file. """ - pattern = os.path.join( - LOCAL_DATA_DIR, f"courses/{date_to_compact:%Y-%m-%d}/*.parquet" - ) - dataset = ds.dataset(pattern, format="parquet") + day_dir = os.path.join(LOCAL_DATA_DIR, f"courses/{date_to_compact:%Y-%m-%d}") + if not os.path.isdir(day_dir): + return "" + dataset = ds.dataset(day_dir, format="parquet") table = dataset.to_table() out_dir = os.path.join(LOCAL_DATA_DIR, "courses_compacted") @@ -215,10 +214,12 @@ def compact_course_geometry(date_to_compact: date) -> str: merge into one table, write a single daily file. Returns the path to that daily Parquet. """ - pattern = os.path.join( - LOCAL_DATA_DIR, f"courses_geometry/{date_to_compact:%Y-%m-%d}/*.parquet" + day_dir = os.path.join( + LOCAL_DATA_DIR, f"courses_geometry/{date_to_compact:%Y-%m-%d}" ) - dataset = ds.dataset(pattern, format="parquet") + if not os.path.isdir(day_dir): + return "" + dataset = ds.dataset(day_dir, format="parquet") table = dataset.to_table() out_dir = os.path.join(LOCAL_DATA_DIR, "courses_geometry_compacted")