Skip to content

Commit

Permalink
Merge branch 'main' into issue/163/sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu authored Nov 20, 2023
2 parents b4ed99a + 72dd5ba commit 61c60a2
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 24 deletions.
20 changes: 11 additions & 9 deletions src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Container class to hold per-partition metadata"""
from __future__ import annotations

from typing import List

import numpy as np
Expand Down Expand Up @@ -54,8 +56,8 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di
"""Generate parquet metadata, using the known partitions.
Args:
catalog_path (str): base path for the catalog
storage_options: dictionary that contains abstract filesystem credentials
catalog_path (FilePointer): base path for the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
batches = [
[
Expand All @@ -74,12 +76,12 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di
write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

@classmethod
def read_from_file(cls, metadata_file: FilePointer, storage_options: dict = None):
def read_from_file(cls, metadata_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a `_metadata` file to create an object
Args:
metadata_file: FilePointer to the `_metadata` file
storage_options: dictionary that contains abstract filesystem credentials
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionInfo` object with the data from the file
Expand All @@ -99,12 +101,12 @@ def read_from_file(cls, metadata_file: FilePointer, storage_options: dict = None
return cls(pixel_list)

@classmethod
def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None):
def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a `partition_info.csv` file to create an object
Args:
partition_info_file: FilePointer to the `partition_info.csv` file
storage_options: dictionary that contains abstract filesystem credentials
partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionInfo` object with the data from the file
Expand Down Expand Up @@ -144,7 +146,7 @@ def as_dataframe(self):
return pd.DataFrame.from_dict(partition_info_dict)

@classmethod
def from_healpix(cls, healpix_pixels: List[HealpixPixel]):
def from_healpix(cls, healpix_pixels: List[HealpixPixel]) -> PartitionInfo:
"""Create a partition info object from a list of constituent healpix pixels.
Args:
Expand Down
25 changes: 18 additions & 7 deletions src/hipscat/io/parquet_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@
from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort


def row_group_stat_single_value(row_group, stat_key):
def row_group_stat_single_value(row_group, stat_key: str):
"""Convenience method to find the min and max inside a statistics dictionary,
and raise an error if they're unequal."""
and raise an error if they're unequal.
Args:
row_group: dataset fragment row group
stat_key (str): column name of interest.
Returns:
The value of the specified row group statistic
"""
if stat_key not in row_group.statistics:
raise ValueError(f"row group doesn't have expected key {stat_key}")
stat_dict = row_group.statistics[stat_key]
Expand Down Expand Up @@ -47,11 +54,12 @@ def get_healpix_pixel_from_metadata(metadata) -> HealpixPixel:
return HealpixPixel(order, pixel)


def write_parquet_metadata(
catalog_path, order_by_healpix=True, storage_options: dict = None, output_path: str = None
):
def write_parquet_metadata(catalog_path: str, order_by_healpix=True, storage_options: dict = None, output_path: str = None):
"""Generate parquet metadata, using the already-partitioned parquet files
for this catalog
for this catalog.
For more information on the general parquet metadata files, and why we write them, see
https://arrow.apache.org/docs/python/parquet.html#writing-metadata-and-common-metadata-files
Args:
catalog_path (str): base path for the catalog
Expand Down Expand Up @@ -80,6 +88,8 @@ def write_parquet_metadata(
for hips_file in dataset.files:
hips_file_pointer = file_io.get_file_pointer_from_path(hips_file, include_protocol=catalog_path)
single_metadata = file_io.read_parquet_metadata(hips_file_pointer, storage_options=storage_options)

# Users must set the file path of each chunk before combining the metadata.
relative_path = hips_file[len(catalog_path) :]
single_metadata.set_file_path(relative_path)

Expand Down Expand Up @@ -118,6 +128,7 @@ def write_parquet_metadata_for_batches(
):
"""Write parquet metadata files for some pyarrow table batches.
This writes the batches to a temporary parquet dataset using local storage, and
generates the metadata for the partitioned catalog parquet files.
Args:
batches (List[pa.RecordBatch]): create one batch per group of data (partition or row group)
Expand All @@ -133,7 +144,7 @@ def write_parquet_metadata_for_batches(
write_parquet_metadata(temp_pq_file, storage_options=storage_options, output_path=output_path)


def read_row_group_fragments(metadata_file, storage_options: dict = None):
def read_row_group_fragments(metadata_file: str, storage_options: dict = None):
"""Generator for metadata fragment row groups in a parquet metadata file.
Args:
Expand Down
14 changes: 7 additions & 7 deletions src/hipscat/io/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def is_valid_catalog(pointer: FilePointer) -> bool:
"""Checks if a catalog is valid for a given base catalog pointer
Args:
pointer: pointer to base catalog directory
pointer (FilePointer): pointer to base catalog directory
Returns:
True if both the catalog_info and partition_info files are
Expand All @@ -16,11 +16,11 @@ def is_valid_catalog(pointer: FilePointer) -> bool:
return is_catalog_info_valid(pointer) and (is_partition_info_valid(pointer) or is_metadata_valid(pointer))


def is_catalog_info_valid(pointer):
def is_catalog_info_valid(pointer: FilePointer) -> bool:
"""Checks if catalog_info is valid for a given base catalog pointer
Args:
pointer: pointer to base catalog directory
pointer (FilePointer): pointer to base catalog directory
Returns:
True if the catalog_info file exists, and it is correctly formatted,
Expand All @@ -34,11 +34,11 @@ def is_catalog_info_valid(pointer):
return is_valid


def is_partition_info_valid(pointer):
def is_partition_info_valid(pointer: FilePointer) -> bool:
"""Checks if partition_info is valid for a given base catalog pointer
Args:
pointer: pointer to base catalog directory
pointer (FilePointer): pointer to base catalog directory
Returns:
True if the partition_info file exists, False otherwise
Expand All @@ -48,11 +48,11 @@ def is_partition_info_valid(pointer):
return partition_info_exists


def is_metadata_valid(pointer):
def is_metadata_valid(pointer: FilePointer) -> bool:
"""Checks if _metadata is valid for a given base catalog pointer
Args:
pointer: pointer to base catalog directory
pointer (FilePointer): pointer to base catalog directory
Returns:
True if the _metadata file exists, False otherwise
Expand Down
2 changes: 1 addition & 1 deletion src/hipscat/pixel_math/healpix_pixel.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def convert_to_higher_order(self, delta_order: int) -> List[HealpixPixel]:
return pixels

@property
def dir(self):
def dir(self) -> int:
"""Directory number for the pixel.
This is necessary for file systems that limit to 10,000 subdirectories.
Expand Down

0 comments on commit 61c60a2

Please sign in to comment.