diff --git a/src/hipscat/catalog/association_catalog/association_catalog.py b/src/hipscat/catalog/association_catalog/association_catalog.py index b74fc135..012c8f4f 100644 --- a/src/hipscat/catalog/association_catalog/association_catalog.py +++ b/src/hipscat/catalog/association_catalog/association_catalog.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Tuple, Union import pandas as pd +import pyarrow as pa from mocpy import MOC from typing_extensions import TypeAlias @@ -35,11 +36,14 @@ def __init__( join_pixels: JoinPixelInputTypes, catalog_path=None, moc: MOC | None = None, + schema: pa.Schema | None = None, storage_options: Union[Dict[Any, Any], None] = None, ) -> None: if not catalog_info.catalog_type == CatalogType.ASSOCIATION: raise ValueError("Catalog info `catalog_type` must be 'association'") - super().__init__(catalog_info, pixels, catalog_path, moc=moc, storage_options=storage_options) + super().__init__( + catalog_info, pixels, catalog_path, moc=moc, schema=schema, storage_options=storage_options + ) self.join_info = self._get_partition_join_info_from_pixels(join_pixels) def get_join_pixels(self) -> pd.DataFrame: diff --git a/src/hipscat/catalog/catalog.py b/src/hipscat/catalog/catalog.py index b911184b..ccb8a929 100644 --- a/src/hipscat/catalog/catalog.py +++ b/src/hipscat/catalog/catalog.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Tuple, Union import numpy as np +import pyarrow as pa from mocpy import MOC from typing_extensions import TypeAlias @@ -46,6 +47,7 @@ def __init__( pixels: PixelInputTypes, catalog_path: str = None, moc: MOC | None = None, + schema: pa.Schema | None = None, storage_options: Union[Dict[Any, Any], None] = None, ) -> None: """Initializes a Catalog @@ -56,8 +58,9 @@ def __init__( list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object catalog_path: If the catalog is stored on disk, specify the location of the catalog Does not load the catalog from this path, only store as metadata - storage_options: dictionary that contains abstract filesystem credentials moc (mocpy.MOC): MOC object representing the coverage of the catalog + schema (pa.Schema): The pyarrow schema for the catalog + storage_options: dictionary that contains abstract filesystem credentials """ if catalog_info.catalog_type not in self.HIPS_CATALOG_TYPES: raise ValueError( @@ -65,7 +68,12 @@ def __init__( f"{', '.join([t.value for t in self.HIPS_CATALOG_TYPES])}" ) super().__init__( - catalog_info, pixels, catalog_path=catalog_path, moc=moc, storage_options=storage_options + catalog_info, + pixels, + catalog_path=catalog_path, + moc=moc, + schema=schema, + storage_options=storage_options, ) def filter_by_cone(self, ra: float, dec: float, radius_arcsec: float) -> Catalog: diff --git a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py index 572da159..419dd32f 100644 --- a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py +++ b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py @@ -1,10 +1,12 @@ from __future__ import annotations import dataclasses +import warnings from typing import Any, Dict, List, Tuple, Union import numpy as np import pandas as pd +import pyarrow as pa from mocpy import MOC from typing_extensions import Self, TypeAlias @@ -12,6 +14,7 @@ from hipscat.catalog.dataset import BaseCatalogInfo, Dataset from hipscat.catalog.partition_info import PartitionInfo from hipscat.io import FilePointer, file_io, paths +from hipscat.io.file_io import read_parquet_metadata from hipscat.pixel_math import HealpixPixel from hipscat.pixel_tree import PixelAlignment, PixelAlignmentType from hipscat.pixel_tree.moc_filter import filter_by_moc @@ -39,6 +42,7 @@ def __init__( pixels: PixelInputTypes, catalog_path: str = None, moc: MOC | None = None, + schema: pa.Schema | None = None, storage_options: Union[Dict[Any, Any], None] = None, ) -> None: """Initializes a Catalog @@ -49,13 +53,15 @@ def __init__( list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object catalog_path: If the catalog is stored on disk, specify the location of the catalog Does not load the catalog from this path, only store as metadata - storage_options: dictionary that contains abstract filesystem credentials moc (mocpy.MOC): MOC object representing the coverage of the catalog + schema (pa.Schema): The pyarrow schema for the catalog + storage_options: dictionary that contains abstract filesystem credentials """ super().__init__(catalog_info, catalog_path=catalog_path, storage_options=storage_options) self.partition_info = self._get_partition_info_from_pixels(pixels) self.pixel_tree = self._get_pixel_tree_from_pixels(pixels) self.moc = moc + self.schema = schema def get_healpix_pixels(self) -> List[HealpixPixel]: """Get healpix pixel objects for all pixels contained in the catalog. @@ -101,6 +107,7 @@ def _read_kwargs( ) -> dict: kwargs = super()._read_kwargs(catalog_base_dir, storage_options=storage_options) kwargs["moc"] = cls._read_moc_from_point_map(catalog_base_dir, storage_options) + kwargs["schema"] = cls._read_schema_from_metadata(catalog_base_dir, storage_options) return kwargs @classmethod @@ -118,10 +125,30 @@ def _read_moc_from_point_map( orders = np.full(ipix.shape, order) return MOC.from_healpix_cells(ipix, orders, order) + @classmethod + def _read_schema_from_metadata( + cls, catalog_base_dir: FilePointer, storage_options: dict | None = None + ) -> pa.Schema | None: + """Reads the schema information stored in the _common_metadata or _metadata files.""" + common_metadata_file = paths.get_common_metadata_pointer(catalog_base_dir) + common_metadata_exists = file_io.does_file_or_directory_exist( + common_metadata_file, storage_options=storage_options + ) + metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) + metadata_exists = file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options) + if not (common_metadata_exists or metadata_exists): + warnings.warn( + "_common_metadata or _metadata files not found for this catalog." + "The arrow schema will not be set." + ) + return None + schema_file = common_metadata_file if common_metadata_exists else metadata_file + metadata = read_parquet_metadata(schema_file, storage_options=storage_options) + return metadata.schema.to_arrow_schema() + @classmethod def _check_files_exist(cls, catalog_base_dir: FilePointer, storage_options: dict = None): super()._check_files_exist(catalog_base_dir, storage_options=storage_options) - partition_info_file = paths.get_partition_info_pointer(catalog_base_dir) metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) if not ( @@ -170,7 +197,7 @@ def filter_by_moc(self, moc: MOC) -> Self: filtered_tree = filter_by_moc(self.pixel_tree, moc) filtered_moc = self.moc.intersection(moc) if self.moc is not None else None filtered_catalog_info = dataclasses.replace(self.catalog_info, total_rows=None) - return self.__class__(filtered_catalog_info, filtered_tree, moc=filtered_moc) + return self.__class__(filtered_catalog_info, filtered_tree, moc=filtered_moc, schema=self.schema) def align( self, other_cat: Self, alignment_type: PixelAlignmentType = PixelAlignmentType.INNER diff --git a/src/hipscat/catalog/margin_cache/margin_catalog.py b/src/hipscat/catalog/margin_cache/margin_catalog.py index 53bd174a..6a882d6f 100644 --- a/src/hipscat/catalog/margin_cache/margin_catalog.py +++ b/src/hipscat/catalog/margin_cache/margin_catalog.py @@ -1,5 +1,6 @@ from __future__ import annotations +import pyarrow as pa from mocpy import MOC from typing_extensions import Self, TypeAlias @@ -30,6 +31,7 @@ def __init__( pixels: PixelInputTypes, catalog_path: str = None, moc: MOC | None = None, + schema: pa.Schema | None = None, storage_options: dict | None = None, ) -> None: """Initializes a Margin Catalog @@ -40,13 +42,19 @@ def __init__( list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object catalog_path: If the catalog is stored on disk, specify the location of the catalog Does not load the catalog from this path, only store as metadata - storage_options: dictionary that contains abstract filesystem credentials moc (mocpy.MOC): MOC object representing the coverage of the catalog + schema (pa.Schema): The pyarrow schema for the catalog + storage_options: dictionary that contains abstract filesystem credentials """ if catalog_info.catalog_type != CatalogType.MARGIN: raise ValueError(f"Catalog info `catalog_type` must equal {CatalogType.MARGIN}") super().__init__( - catalog_info, pixels, catalog_path=catalog_path, moc=moc, storage_options=storage_options + catalog_info, + pixels, + catalog_path=catalog_path, + moc=moc, + schema=schema, + storage_options=storage_options, ) def filter_by_moc(self, moc: MOC) -> Self: diff --git a/tests/conftest.py b/tests/conftest.py index ce08dc6d..b33b8714 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ from typing import List import pandas as pd +import pyarrow as pa import pytest from hipscat.catalog.association_catalog.association_catalog_info import AssociationCatalogInfo @@ -171,6 +172,76 @@ def index_catalog_info_with_extra() -> dict: } +@pytest.fixture +def small_sky_schema() -> pa.Schema: + return pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("ra", pa.float64()), + pa.field("dec", pa.float64()), + pa.field("ra_error", pa.int64()), + pa.field("dec_error", pa.int64()), + pa.field("Norder", pa.uint8()), + pa.field("Dir", pa.uint64()), + pa.field("Npix", pa.uint64()), + pa.field("_hipscat_index", pa.uint64()), + ] + ) + + +@pytest.fixture +def small_sky_source_schema() -> pa.Schema: + return pa.schema( + [ + pa.field("source_id", pa.int64()), + pa.field("source_ra", pa.float64()), + pa.field("source_dec", pa.float64()), + pa.field("mjd", pa.float64()), + pa.field("mag", pa.float64()), + pa.field("band", pa.string()), + pa.field("object_id", pa.int64()), + pa.field("object_ra", pa.float64()), + pa.field("object_dec", pa.float64()), + pa.field("Norder", pa.uint8()), + pa.field("Dir", pa.uint64()), + pa.field("Npix", pa.uint64()), + pa.field("_hipscat_index", pa.uint64()), + ] + ) + + +@pytest.fixture +def association_catalog_schema() -> pa.Schema: + return pa.schema( + [ + pa.field("Norder", pa.int64()), + pa.field("Npix", pa.int64()), + pa.field("join_Norder", pa.int64()), + pa.field("join_Npix", pa.int64()), + ] + ) + + +@pytest.fixture +def margin_catalog_schema() -> pa.Schema: + return pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("ra", pa.float64()), + pa.field("dec", pa.float64()), + pa.field("ra_error", pa.int64()), + pa.field("dec_error", pa.int64()), + pa.field("Norder", pa.uint8()), + pa.field("Dir", pa.uint64()), + pa.field("Npix", pa.uint64()), + pa.field("_hipscat_index", pa.uint64()), + pa.field("margin_Norder", pa.uint8()), + pa.field("margin_Dir", pa.uint64()), + pa.field("margin_Npix", pa.uint64()), + ] + ) + + @pytest.fixture def dataset_path(test_data_dir) -> str: return test_data_dir / "info_only" / "dataset" diff --git a/tests/hipscat/catalog/association_catalog/test_association_catalog.py b/tests/hipscat/catalog/association_catalog/test_association_catalog.py index 79fdf2a0..52e6bc80 100644 --- a/tests/hipscat/catalog/association_catalog/test_association_catalog.py +++ b/tests/hipscat/catalog/association_catalog/test_association_catalog.py @@ -2,6 +2,7 @@ import os import pandas as pd +import pyarrow as pa import pytest from hipscat.catalog import CatalogType @@ -49,7 +50,9 @@ def test_different_join_pixels_type(association_catalog_info, association_catalo pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels) -def test_read_from_file(association_catalog_path, association_catalog_join_pixels): +def test_read_from_file( + association_catalog_path, association_catalog_join_pixels, association_catalog_schema +): catalog = read_from_hipscat(association_catalog_path) assert isinstance(catalog, AssociationCatalog) @@ -66,6 +69,9 @@ def test_read_from_file(association_catalog_path, association_catalog_join_pixel assert info.join_catalog == "small_sky_order1" assert info.join_column == "id" + assert isinstance(catalog.schema, pa.Schema) + assert catalog.schema.equals(association_catalog_schema) + def test_empty_directory(tmp_path, association_catalog_info_data, association_catalog_join_pixels): """Test loading empty or incomplete data""" @@ -121,5 +127,6 @@ def test_csv_round_trip(tmp_path, association_catalog_info_data, association_cat part_info = PartitionJoinInfo(association_catalog_join_pixels) part_info.write_to_csv(catalog_path=catalog_path) - catalog = read_from_hipscat(catalog_path) + with pytest.warns(UserWarning, match="_common_metadata or _metadata files not found"): + catalog = read_from_hipscat(catalog_path) pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels) diff --git a/tests/hipscat/catalog/margin_cache/test_margin_catalog.py b/tests/hipscat/catalog/margin_cache/test_margin_catalog.py index 30f80272..aa260b30 100644 --- a/tests/hipscat/catalog/margin_cache/test_margin_catalog.py +++ b/tests/hipscat/catalog/margin_cache/test_margin_catalog.py @@ -1,6 +1,7 @@ import json import os +import pyarrow as pa import pytest from hipscat.catalog import CatalogType, MarginCatalog, PartitionInfo @@ -32,7 +33,7 @@ def test_wrong_catalog_info_type(catalog_info, margin_catalog_pixels): MarginCatalog(catalog_info, margin_catalog_pixels) -def test_read_from_file(margin_catalog_path, margin_catalog_pixels): +def test_read_from_file(margin_catalog_path, margin_catalog_pixels, margin_catalog_schema): catalog = read_from_hipscat(margin_catalog_path) assert isinstance(catalog, MarginCatalog) @@ -50,6 +51,9 @@ def test_read_from_file(margin_catalog_path, margin_catalog_pixels): assert info.primary_catalog == "small_sky_order1" assert info.margin_threshold == 7200 + assert isinstance(catalog.schema, pa.Schema) + assert catalog.schema.equals(margin_catalog_schema) + # pylint: disable=duplicate-code def test_empty_directory(tmp_path, margin_cache_catalog_info_data, margin_catalog_pixels): diff --git a/tests/hipscat/catalog/test_catalog.py b/tests/hipscat/catalog/test_catalog.py index 9c4e07dd..91778d0f 100644 --- a/tests/hipscat/catalog/test_catalog.py +++ b/tests/hipscat/catalog/test_catalog.py @@ -4,6 +4,7 @@ import astropy.units as u import numpy as np +import pyarrow as pa import pytest from mocpy import MOC @@ -79,7 +80,7 @@ def test_get_pixels_list(catalog_info, catalog_pixels): assert pixels == catalog_pixels -def test_load_catalog_small_sky(small_sky_dir): +def test_load_catalog_small_sky(small_sky_dir, small_sky_schema): """Instantiate a catalog with 1 pixel""" cat = read_from_hipscat(small_sky_dir) @@ -87,6 +88,9 @@ def test_load_catalog_small_sky(small_sky_dir): assert cat.catalog_name == "small_sky" assert len(cat.get_healpix_pixels()) == 1 + assert isinstance(cat.schema, pa.Schema) + assert cat.schema.equals(small_sky_schema) + def test_load_catalog_small_sky_order1(small_sky_order1_dir): """Instantiate a catalog with 4 pixels""" @@ -109,7 +113,7 @@ def test_load_catalog_small_sky_order1_moc(small_sky_order1_dir): assert np.all(cat.moc.flatten() == np.where(counts_skymap > 0)) -def test_load_catalog_small_sky_source(small_sky_source_dir): +def test_load_catalog_small_sky_source(small_sky_source_dir, small_sky_source_schema): """Instantiate a source catalog with 14 pixels""" cat = read_from_hipscat(small_sky_source_dir) @@ -117,6 +121,9 @@ def test_load_catalog_small_sky_source(small_sky_source_dir): assert cat.catalog_name == "small_sky_source" assert len(cat.get_healpix_pixels()) == 14 + assert isinstance(cat.schema, pa.Schema) + assert cat.schema.equals(small_sky_source_schema) + def test_max_coverage_order(small_sky_order1_catalog): assert small_sky_order1_catalog.get_max_coverage_order() >= small_sky_order1_catalog.moc.max_order diff --git a/tests/hipscat/io/conftest.py b/tests/hipscat/io/conftest.py index ef53ecad..f0fd09b1 100644 --- a/tests/hipscat/io/conftest.py +++ b/tests/hipscat/io/conftest.py @@ -3,7 +3,6 @@ import re import numpy.testing as npt -import pyarrow as pa import pytest from hipscat.io import file_io @@ -48,23 +47,6 @@ def assert_text_file_matches(expected_lines, file_name, storage_options: dict = return assert_text_file_matches -@pytest.fixture -def basic_catalog_parquet_metadata(): - return pa.schema( - [ - pa.field("id", pa.int64()), - pa.field("ra", pa.float64()), - pa.field("dec", pa.float64()), - pa.field("ra_error", pa.int64()), - pa.field("dec_error", pa.int64()), - pa.field("Norder", pa.uint8()), - pa.field("Dir", pa.uint64()), - pa.field("Npix", pa.uint64()), - pa.field("_hipscat_index", pa.uint64()), - ] - ) - - @pytest.fixture def check_parquet_schema(): def check_parquet_schema(file_name, expected_schema, expected_num_row_groups=1): diff --git a/tests/hipscat/io/test_parquet_metadata.py b/tests/hipscat/io/test_parquet_metadata.py index 355b1ee0..97115b95 100644 --- a/tests/hipscat/io/test_parquet_metadata.py +++ b/tests/hipscat/io/test_parquet_metadata.py @@ -16,9 +16,7 @@ from hipscat.pixel_math.healpix_pixel import HealpixPixel -def test_write_parquet_metadata( - tmp_path, small_sky_dir, basic_catalog_parquet_metadata, check_parquet_schema -): +def test_write_parquet_metadata(tmp_path, small_sky_dir, small_sky_schema, check_parquet_schema): """Copy existing catalog and create new metadata files for it""" catalog_base_dir = tmp_path / "catalog" shutil.copytree( @@ -27,27 +25,27 @@ def test_write_parquet_metadata( ) total_rows = write_parquet_metadata(catalog_base_dir) assert total_rows == 131 - check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata) + check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) ## _common_metadata has 0 row groups check_parquet_schema( catalog_base_dir / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) ## Re-write - should still have the same properties. total_rows = write_parquet_metadata(catalog_base_dir) assert total_rows == 131 - check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata) + check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) ## _common_metadata has 0 row groups check_parquet_schema( catalog_base_dir / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) def test_write_parquet_metadata_order1( - tmp_path, small_sky_order1_dir, basic_catalog_parquet_metadata, check_parquet_schema + tmp_path, small_sky_order1_dir, small_sky_schema, check_parquet_schema ): """Copy existing catalog and create new metadata files for it, using a catalog with multiple files.""" @@ -62,19 +60,19 @@ def test_write_parquet_metadata_order1( ## 4 row groups for 4 partitioned parquet files check_parquet_schema( temp_path / "_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 4, ) ## _common_metadata has 0 row groups check_parquet_schema( temp_path / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) def test_write_parquet_metadata_sorted( - tmp_path, small_sky_order1_dir, basic_catalog_parquet_metadata, check_parquet_schema + tmp_path, small_sky_order1_dir, small_sky_schema, check_parquet_schema ): """Copy existing catalog and create new metadata files for it, using a catalog with multiple files.""" @@ -89,13 +87,13 @@ def test_write_parquet_metadata_sorted( ## 4 row groups for 4 partitioned parquet files check_parquet_schema( temp_path / "_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 4, ) ## _common_metadata has 0 row groups check_parquet_schema( temp_path / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) diff --git a/tests/hipscat/io/test_write_metadata.py b/tests/hipscat/io/test_write_metadata.py index cfe80411..8df9bc96 100644 --- a/tests/hipscat/io/test_write_metadata.py +++ b/tests/hipscat/io/test_write_metadata.py @@ -178,9 +178,7 @@ def test_write_partition_info_float(assert_text_file_matches, tmp_path): assert_text_file_matches(expected_lines, metadata_filename) -def test_write_parquet_metadata( - tmp_path, small_sky_dir, basic_catalog_parquet_metadata, check_parquet_schema -): +def test_write_parquet_metadata(tmp_path, small_sky_dir, small_sky_schema, check_parquet_schema): """Copy existing catalog and create new metadata files for it""" catalog_base_dir = tmp_path / "catalog" shutil.copytree( @@ -188,20 +186,20 @@ def test_write_parquet_metadata( catalog_base_dir, ) io.write_parquet_metadata(catalog_base_dir) - check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata) + check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) ## _common_metadata has 0 row groups check_parquet_schema( catalog_base_dir / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) ## Re-write - should still have the same properties. io.write_parquet_metadata(catalog_base_dir) - check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata) + check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) ## _common_metadata has 0 row groups check_parquet_schema( catalog_base_dir / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, )