Skip to content

Commit

Permalink
DAS-1699 rechunk the zarr store (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
flamingbear committed Feb 15, 2023
1 parent 7cd1232 commit e230636
Show file tree
Hide file tree
Showing 12 changed files with 347 additions and 46 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/publish_docker_image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ on:
push:
branches: [ main ]
paths: version.txt
workflow_dispatch:


env:
IMAGE_NAME: ${{ github.repository }}
Expand Down Expand Up @@ -83,7 +85,7 @@ jobs:
tags: |
type=semver,pattern={{version}},value=${{ env.semantic_version }}
- name: Push Docker image
- name: Build and Push Docker image
uses: docker/build-push-action@v3
with:
context: .
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
name: A reusable workflow to build and run the unit test suite

on:
workflow_call
workflow_call:
workflow_dispatch:

jobs:
build_and_test:
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v1.1.0
### 2023-02-08

* DAS-1699 - Add secondary rechunking step to service. Concatentated requests
to the service will now be rechunked before returned to the caller.

## v1.0.3
### 2022-12-13

Expand Down
4 changes: 0 additions & 4 deletions harmony_netcdf_to_zarr/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ def main(argv, **kwargs):
-------
None
"""
# TODO - Update this when working HARMONY-639
# DO NOT REMOVE THE FOLLOWING LINE - NEEDED AS WORKAROUND TO ARGO CHAINING ISSUE
print('MAIN STARTED')

config = None
# Optional: harmony.util.Config is injectable for tests
if 'config' in kwargs:
Expand Down
20 changes: 14 additions & 6 deletions harmony_netcdf_to_zarr/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
from os.path import join as path_join
from shutil import rmtree
from tempfile import mkdtemp
from uuid import uuid4

from harmony import BaseHarmonyAdapter
from harmony.util import generate_output_filename, HarmonyException

from .convert import make_localstack_s3fs, make_s3fs, mosaic_to_zarr
from .download_utilities import download_granules
from .stac_utilities import get_netcdf_urls, get_output_catalog
from harmony_netcdf_to_zarr.convert import make_localstack_s3fs, make_s3fs, mosaic_to_zarr
from harmony_netcdf_to_zarr.rechunk import rechunk_zarr
from harmony_netcdf_to_zarr.download_utilities import download_granules
from harmony_netcdf_to_zarr.stac_utilities import get_netcdf_urls, get_output_catalog


ZARR_MEDIA_TYPES = ['application/zarr', 'application/x-zarr']
Expand Down Expand Up @@ -101,16 +103,22 @@ def process_items_many_to_one(self):
collection = self._get_item_source(items[0]).collection
output_name = f'{collection}_merged.zarr'

pre_rechunk_root = path_join(self.message.stagingLocation, f'{uuid4()}.zarr')
zarr_root = path_join(self.message.stagingLocation, output_name)
zarr_store = self.s3.get_mapper(root=zarr_root, check=False,

zarr_store = self.s3.get_mapper(root=pre_rechunk_root,
check=False,
create=True)

mosaic_to_zarr(local_file_paths, zarr_store, logger=self.logger)

rechunk_zarr(pre_rechunk_root, zarr_root, self)

return get_output_catalog(self.catalog, zarr_root)
except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
raise ZarrException('Could not create Zarr output: '
f'{str(service_exception)}') from service_exception
raise ZarrException(
'Could not create Zarr output: '
f'{str(service_exception)}') from service_exception
finally:
rmtree(workdir)
18 changes: 11 additions & 7 deletions harmony_netcdf_to_zarr/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@

# Some global variables that may be shared by different methods
region = environ.get('AWS_DEFAULT_REGION') or 'us-west-2'
# This dictionary converts from a string representation of units, such as
# kibibytes, mebibytes or gibibytes, to a raw number of bytes. This is used
# when a compressed chunk size is expressed as a string. See the NIST standard
# for binary prefix: https://physics.nist.gov/cuu/Units/binary.html.
binary_prefix_conversion_map = {'Ki': 1024, 'Mi': 1048576, 'Gi': 1073741824}


def make_localstack_s3fs() -> S3FileSystem:
Expand Down Expand Up @@ -96,6 +91,7 @@ def mosaic_to_zarr(input_granules: List[str], zarr_store: Union[FSMap, str],
with Manager() as manager:
output_queue = manager.Queue(len(input_granules))
shared_namespace = manager.Namespace()
shared_namespace.granules_processed = 0

if isinstance(zarr_store, DirectoryStore):
shared_namespace.store_type = 'DirectoryStore'
Expand All @@ -110,7 +106,7 @@ def mosaic_to_zarr(input_granules: List[str], zarr_store: Union[FSMap, str],
processes = [Process(target=_output_worker,
args=(output_queue, shared_namespace,
aggregated_dimensions, dim_mapping,
variable_chunk_metadata))
variable_chunk_metadata, logger))
for _ in range(process_count)]

monitor_processes(processes, shared_namespace,
Expand Down Expand Up @@ -143,7 +139,7 @@ def _finalize_metadata(store: MutableMapping) -> None:

def _output_worker(output_queue: Queue, shared_namespace: Namespace,
aggregated_dimensions: Set[str], dim_mapping: DimensionsMapping,
variable_chunk_metadata: Dict = {}) -> None:
variable_chunk_metadata: Dict = {}, logger: Logger = None) -> None:
""" This worker function is executed in a spawned process. It checks for
items in the main queue, which correspond to local file paths for input
NetCDF-4 files. If there is at least one URL left for writing, then the
Expand Down Expand Up @@ -176,6 +172,8 @@ def _output_worker(output_queue: Queue, shared_namespace: Namespace,
break

try:
shared_namespace.granules_processed += 1
logger.info(f'processing granule {shared_namespace.granules_processed}')
with Dataset(input_granule, 'r') as input_dataset:
input_dataset.set_auto_maskandscale(False)
__copy_group(input_dataset,
Expand Down Expand Up @@ -578,6 +576,12 @@ def compute_chunksize(shape: Union[tuple, list],
'(https://physics.nist.gov/cuu/Units/binary.html)'
' except that only Ki, Mi, and Gi are allowed.')

# This dictionary converts from a string representation of units, such as
# kibibytes, mebibytes or gibibytes, to a raw number of bytes. This is used
# when a compressed chunk size is expressed as a string. See the NIST standard
# for binary prefix: https://physics.nist.gov/cuu/Units/binary.html.
binary_prefix_conversion_map = {'Ki': 1024, 'Mi': 1048576, 'Gi': 1073741824}

compressed_chunksize_byte = int(float(value)) * int(binary_prefix_conversion_map[unit])

# get product of chunksize along different dimensions before compression
Expand Down
125 changes: 125 additions & 0 deletions harmony_netcdf_to_zarr/rechunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Code that will rechunk an existing zarr store."""
from __future__ import annotations

from harmony_netcdf_to_zarr.convert import compute_chunksize

from fsspec.mapping import FSMap
from time import time
from rechunker import rechunk
from typing import List, Dict, TYPE_CHECKING
if TYPE_CHECKING:
from harmony_netcdf_to_zarr.adapter import NetCDFToZarrAdapter
from zarr import open_consolidated, consolidate_metadata, Group as zarrGroup
import xarray as xr


def rechunk_zarr(zarr_root: str, chunked_root: str, adapter: NetCDFToZarrAdapter) -> str:
"""Rechunks the zarr store found at zarr_root location.
Rechunks the store found at zarr_root, outputing a new rechunked store into
chunked root. Finally deleting the input zarr_root store.
"""
temp_root = zarr_root.replace('.zarr', '_tmp.zarr')

zarr_store = adapter.s3.get_mapper(root=zarr_root,
check=False,
create=False)

zarr_temp = adapter.s3.get_mapper(root=temp_root, check=False, create=True)
zarr_target = adapter.s3.get_mapper(root=chunked_root,
check=False,
create=True)

try:
adapter.s3.rm(temp_root, recursive=True)
except FileNotFoundError:
adapter.logger.info(f'Nothing to clean in {temp_root}')

try:
adapter.s3.rm(chunked_root, recursive=True)
except FileNotFoundError:
adapter.logger.info(f'Nothing to clean in {chunked_root}')

t1 = time()
rechunk_zarr_store(zarr_store, zarr_target, zarr_temp)
t2 = time()
adapter.logger.info(f'Function rechunk_zarr_store executed in {(t2-t1):.4f}s')

adapter.s3.rm(zarr_root, recursive=True)
adapter.s3.rm(temp_root, recursive=True)


def rechunk_zarr_store(zarr_store: FSMap, zarr_target: FSMap,
zarr_temp: FSMap) -> str:
"""Rechunks a zarr store that was created by the mosaic_to_zarr processes.
This is specific to tuning output zarr store variables to the chunksizes
given by compute_chunksize.
"""
target_chunks = get_target_chunks(zarr_store)
opened_zarr_store = open_consolidated(zarr_store, mode='r')
# This is a best guess on trial and error with an 8Gi Memory container
max_memory = '1GB'
array_plan = rechunk(opened_zarr_store,
target_chunks,
max_memory,
zarr_target,
temp_store=zarr_temp)
array_plan.execute()
consolidate_metadata(zarr_target)


def get_target_chunks(zarr_store: FSMap) -> Dict:
"""Determine the chuncking strategy for the input zarr store's variables.
Iterate through the zarr store, computing new chunksizes for all variables
that are not coordinates or coordinate bounds. Return a dictionary of the
variable and new chunksizes to be used in the rechunker.
"""
zarr_groups = _groups_from_zarr(zarr_store)

target_chunks = {}
# open with xr for each group?
for group in zarr_groups:
group_dataset = xr.open_dataset(zarr_store,
group=group,
mode='r',
engine='zarr')
for variable, varinfo in group_dataset.data_vars.items():
if not _bounds(variable):
target_chunks[f'{group}/{variable}'] = compute_chunksize(
varinfo.shape, varinfo.dtype)
else:
target_chunks[f'{group}/{variable}'] = None

for variable in group_dataset.coords.keys():
target_chunks[f'{group}/{variable}'] = None

return target_chunks


def _bounds(variable: str) -> bool:
"""Is a variable a bounds type variable.
Coordinates and coordinate bounds are not chunked in a zarr store, this is
just a convenience function to determine the coordinate bounds variables by
name.
"""
return variable.endswith(('_bnds', '_bounds'))


def _groups_from_zarr(zarr_root: str) -> List[str]:
"""Get the name of all groups in the zarr_store."""
original_zarr = open_consolidated(zarr_root, mode='r')
groups = ['']

def is_group(name: str) -> None:
"""Create function to test if the item is a group or not."""
if isinstance(original_zarr.get(name), zarrGroup):
groups.append(name)

original_zarr.visit(is_group)

return groups
7 changes: 5 additions & 2 deletions requirements/core.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
boto3 ~= 1.24
dask == 2023.1.1
harmony-service-lib ~= 1.0.22
netCDF4 ~= 1.6.1
numpy ~= 1.23.3
numpy ~= 1.24
python-dateutil ~= 2.8.2
rechunker == 0.5.0
s3fs ~= 0.4.0
zarr ~= 2.13.3
xarray == 2022.9.0
zarr == 2.13.3
14 changes: 7 additions & 7 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
autopep8 ~= 1.5
coverage ~= 6.3.1
flake8 ~= 3.8
ipython ~= 7.17
autopep8 ~= 2.0.1
coverage ~= 7.1.0
flake8 ~= 6.0
ipython ~= 7.32
isort ~= 5.5
moto ~= 1.3
pytest ~= 5.4
python-dotenv ~=0.12
safety ~= 1.8
pytest ~= 7.2.1
python-dotenv ~=0.21
safety ~= 1.10
Loading

0 comments on commit e230636

Please sign in to comment.