Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep directory and use to infer write location #210

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 60 additions & 10 deletions src/hipscat/catalog/association_catalog/partition_join_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class PartitionJoinInfo:
JOIN_PIXEL_COLUMN_NAME,
]

def __init__(self, join_info_df: pd.DataFrame) -> None:
def __init__(self, join_info_df: pd.DataFrame, catalog_base_dir: str = None) -> None:
self.data_frame = join_info_df
self.catalog_base_dir = catalog_base_dir
self._check_column_names()

def _check_column_names(self):
Expand Down Expand Up @@ -69,13 +70,21 @@ def primary_to_join_map(self) -> Dict[HealpixPixel, List[HealpixPixel]]:
primary_to_join = dict(primary_to_join)
return primary_to_join

def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_options: dict = None):
"""Generate parquet metadata, using the known partitions.

Args:
catalog_path (FilePointer): base path for the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials

Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if catalog_path is None:
if self.catalog_base_dir is None:
raise ValueError("catalog_path is required if info was not loaded from a directory")
catalog_path = self.catalog_base_dir

batches = [
[
pa.RecordBatch.from_arrays(
Expand All @@ -94,7 +103,7 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
def write_to_csv(self, catalog_path: FilePointer = None, storage_options: dict = None):
"""Write all partition data to CSV files.

Two files will be written::
Expand All @@ -106,7 +115,15 @@ def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
catalog_path: FilePointer to the directory where the
`partition_join_info.csv` file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials

Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if catalog_path is None:
if self.catalog_base_dir is None:
raise ValueError("catalog_path is required if info was not loaded from a directory")
catalog_path = self.catalog_base_dir

partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path)
file_io.write_dataframe_to_csv(
self.data_frame, partition_join_info_file, index=False, storage_options=storage_options
Expand Down Expand Up @@ -141,29 +158,48 @@ def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = No
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_csv(
pixel_frame = PartitionJoinInfo._read_from_csv(
partition_join_info_file, storage_options=storage_options
)
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
partition_join_info = PartitionJoinInfo.read_from_file(
pixel_frame = PartitionJoinInfo._read_from_metadata_file(
metadata_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}"
)
return partition_join_info
return cls(pixel_frame, catalog_base_dir)

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `_metadata` file to create an object

Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.

Returns:
A `PartitionJoinInfo` object with the data from the file
"""
return cls(cls._read_from_metadata_file(metadata_file, strict, storage_options))

@classmethod
def _read_from_metadata_file(
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> pd.DataFrame:
"""Read partition join info from a `_metadata` file to create an object

Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.

Returns:
A `PartitionJoinInfo` object with the data from the file
Expand Down Expand Up @@ -229,14 +265,29 @@ def read_from_file(
columns=cls.COLUMN_NAMES,
)

return cls(pixel_frame)
return pixel_frame

@classmethod
def read_from_csv(
cls, partition_join_info_file: FilePointer, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `partition_join_info.csv` file to create an object

Args:
partition_join_info_file (FilePointer): FilePointer to the `partition_join_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials

Returns:
A `PartitionJoinInfo` object with the data from the file
"""
return cls(cls._read_from_csv(partition_join_info_file, storage_options))

@classmethod
def _read_from_csv(
cls, partition_join_info_file: FilePointer, storage_options: dict = None
) -> pd.DataFrame:
"""Read partition join info from a `partition_join_info.csv` file to create an object

Args:
partition_join_info_file (FilePointer): FilePointer to the `partition_join_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Expand All @@ -251,5 +302,4 @@ def read_from_csv(
f"No partition join info found where expected: {str(partition_join_info_file)}"
)

data_frame = file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)
return cls(data_frame)
return file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)
89 changes: 75 additions & 14 deletions src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ class PartitionInfo:
METADATA_DIR_COLUMN_NAME = "Dir"
METADATA_PIXEL_COLUMN_NAME = "Npix"

def __init__(self, pixel_list: List[HealpixPixel]) -> None:
def __init__(self, pixel_list: List[HealpixPixel], catalog_base_dir: str = None) -> None:
self.pixel_list = pixel_list
self.catalog_base_dir = catalog_base_dir

def get_healpix_pixels(self) -> List[HealpixPixel]:
"""Get healpix pixel objects for all pixels represented as partitions.
Expand All @@ -45,25 +46,55 @@ def get_highest_order(self) -> int:
max_pixel = np.max(self.pixel_list)
return max_pixel.order

def write_to_file(self, partition_info_file: FilePointer, storage_options: dict = None):
def write_to_file(
self,
partition_info_file: FilePointer = None,
catalog_path: FilePointer = None,
storage_options: dict = None,
):
"""Write all partition data to CSV file.

If no paths are provided, the catalog base directory from the `read_from_dir` call is used.

Args:
partition_info_file: FilePointer to where the `partition_info.csv`
file will be written
file will be written.
catalog_path: base directory for a catalog where the `partition_info.csv`
file will be written.
storage_options (dict): dictionary that contains abstract filesystem credentials

Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if partition_info_file is None:
if catalog_path is not None:
partition_info_file = paths.get_partition_info_pointer(catalog_path)
elif self.catalog_base_dir is not None:
partition_info_file = paths.get_partition_info_pointer(self.catalog_base_dir)
else:
raise ValueError("partition_info_file is required if info was not loaded from a directory")

file_io.write_dataframe_to_csv(
self.as_dataframe(), partition_info_file, index=False, storage_options=storage_options
)

def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_options: dict = None):
"""Generate parquet metadata, using the known partitions.

If no catalog_path is provided, the catalog base directory from the `read_from_dir` call is used.

Args:
catalog_path (FilePointer): base path for the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials

Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if catalog_path is None:
if self.catalog_base_dir is None:
raise ValueError("catalog_path is required if info was not loaded from a directory")
catalog_path = self.catalog_base_dir

batches = [
[
pa.RecordBatch.from_arrays(
Expand Down Expand Up @@ -102,25 +133,46 @@ def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = No
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
pixel_list = PartitionInfo._read_from_csv(partition_info_file, storage_options=storage_options)
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
pixel_list = PartitionInfo._read_from_metadata_file(
metadata_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
)
return partition_info
return cls(pixel_list, catalog_base_dir)

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> PartitionInfo:
"""Read partition info from a `_metadata` file to create an object

Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.

Returns:
A `PartitionInfo` object with the data from the file
"""
return cls(cls._read_from_metadata_file(metadata_file, strict, storage_options))

@classmethod
def _read_from_metadata_file(
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> List[HealpixPixel]:
"""Read partition info list from a `_metadata` file.

Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.

Returns:
A `PartitionInfo` object with the data from the file
Expand Down Expand Up @@ -163,14 +215,25 @@ def read_from_file(
## Remove duplicates, preserving order.
## In the case of association partition join info, we may have multiple entries
## for the primary order/pixels.
pixel_list = list(dict.fromkeys(pixel_list))

return cls(pixel_list)
return list(dict.fromkeys(pixel_list))

@classmethod
def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a `partition_info.csv` file to create an object

Args:
partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials

Returns:
A `PartitionInfo` object with the data from the file
"""
return cls(cls._read_from_csv(partition_info_file, storage_options))

@classmethod
def _read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a `partition_info.csv` file to create an object

Args:
partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Expand All @@ -183,16 +246,14 @@ def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict =

data_frame = file_io.load_csv_to_pandas(partition_info_file, storage_options=storage_options)

pixel_list = [
return [
HealpixPixel(order, pixel)
for order, pixel in zip(
data_frame[cls.METADATA_ORDER_COLUMN_NAME],
data_frame[cls.METADATA_PIXEL_COLUMN_NAME],
)
]

return cls(pixel_list)

def as_dataframe(self):
"""Construct a pandas dataframe for the partition info pixels.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,28 @@ def test_read_from_missing_file(tmp_path):
file_pointer = file_io.get_file_pointer_from_path(wrong_path)
with pytest.raises(FileNotFoundError):
PartitionJoinInfo.read_from_csv(file_pointer)


def test_load_partition_info_from_dir_and_write(tmp_path, association_catalog_join_pixels):
info = PartitionJoinInfo(association_catalog_join_pixels)

## Path arguments are required if the info was not created from a `read_from_dir` call
with pytest.raises(ValueError):
info.write_to_csv()
with pytest.raises(ValueError):
info.write_to_metadata_files()

info.write_to_csv(catalog_path=tmp_path)
info = PartitionJoinInfo.read_from_dir(tmp_path)

## Can write out the partition info CSV by providing:
## - no arguments
## - new catalog directory
info.write_to_csv()
info.write_to_csv(catalog_path=tmp_path)

## Can write out the _metadata file by providing:
## - no arguments
## - new catalog directory
info.write_to_metadata_files()
info.write_to_metadata_files(catalog_path=tmp_path)
27 changes: 27 additions & 0 deletions tests/hipscat/catalog/test_partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,30 @@ def test_write_to_file_sorted(tmp_path, pixel_list_depth_first, pixel_list_bread
new_partition_info = PartitionInfo.read_from_file(partition_info_pointer)

npt.assert_array_equal(pixel_list_breadth_first, new_partition_info.get_healpix_pixels())


def test_load_partition_info_from_dir_and_write(tmp_path, pixel_list_depth_first):
partition_info = PartitionInfo.from_healpix(pixel_list_depth_first)

## Path arguments are required if the info was not created from a `read_from_dir` call
with pytest.raises(ValueError):
partition_info.write_to_file()
with pytest.raises(ValueError):
partition_info.write_to_metadata_files()

partition_info.write_to_file(catalog_path=tmp_path)
info = PartitionInfo.read_from_dir(tmp_path)

## Can write out the partition info CSV by providing:
## - no arguments
## - new catalog directory
## - full path to the csv file
info.write_to_file()
info.write_to_file(catalog_path=tmp_path)
info.write_to_file(partition_info_file=os.path.join(tmp_path, "new_csv.csv"))

## Can write out the _metadata file by providing:
## - no arguments
## - new catalog directory
info.write_to_metadata_files()
info.write_to_metadata_files(catalog_path=tmp_path)
Loading