fix daily compaction

This commit is contained in:
Piotr Oleszczyk 2025-05-11 13:32:32 +02:00
parent 823ca4c259
commit a5349123ce

View file

@ -115,12 +115,11 @@ def compact_positions(date_to_compact: date) -> str:
merge into one table, write a single daily file. merge into one table, write a single daily file.
Returns the path to that daily Parquet. Returns the path to that daily Parquet.
""" """
# build glob pattern for that day day_dir = os.path.join(LOCAL_DATA_DIR, f"positions/{date_to_compact:%Y/%m/%d}")
day_glob = os.path.join( if not os.path.isdir(day_dir):
LOCAL_DATA_DIR, f"positions/{date_to_compact:%Y/%m/%d}/**/*.parquet" # nothing to compact
) return ""
# load dataset dataset = ds.dataset(day_dir, format="parquet", partitioning="hive")
dataset = ds.dataset(day_glob, format="parquet", partitioning="hive")
table = dataset.to_table() # merges all batches table = dataset.to_table() # merges all batches
# write one daily file # write one daily file
@ -148,10 +147,10 @@ def compact_course_posts(date_to_compact: date) -> str:
Read all shards under courses/YYYY-MM-DD/*.parquet, Read all shards under courses/YYYY-MM-DD/*.parquet,
merge into one table, write a single daily file. merge into one table, write a single daily file.
""" """
pattern = os.path.join( day_dir = os.path.join(LOCAL_DATA_DIR, f"courses/{date_to_compact:%Y-%m-%d}")
LOCAL_DATA_DIR, f"courses/{date_to_compact:%Y-%m-%d}/*.parquet" if not os.path.isdir(day_dir):
) return ""
dataset = ds.dataset(pattern, format="parquet") dataset = ds.dataset(day_dir, format="parquet")
table = dataset.to_table() table = dataset.to_table()
out_dir = os.path.join(LOCAL_DATA_DIR, "courses_compacted") out_dir = os.path.join(LOCAL_DATA_DIR, "courses_compacted")
@ -215,10 +214,12 @@ def compact_course_geometry(date_to_compact: date) -> str:
merge into one table, write a single daily file. merge into one table, write a single daily file.
Returns the path to that daily Parquet. Returns the path to that daily Parquet.
""" """
pattern = os.path.join( day_dir = os.path.join(
LOCAL_DATA_DIR, f"courses_geometry/{date_to_compact:%Y-%m-%d}/*.parquet" LOCAL_DATA_DIR, f"courses_geometry/{date_to_compact:%Y-%m-%d}"
) )
dataset = ds.dataset(pattern, format="parquet") if not os.path.isdir(day_dir):
return ""
dataset = ds.dataset(day_dir, format="parquet")
table = dataset.to_table() table = dataset.to_table()
out_dir = os.path.join(LOCAL_DATA_DIR, "courses_geometry_compacted") out_dir = os.path.join(LOCAL_DATA_DIR, "courses_geometry_compacted")