From 72a1c39f6b68ad83fe3b65f8f94ff0035bccef70 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Mon, 11 Nov 2024 11:04:12 -0800 Subject: [PATCH] GTC-2977 Add aux tile set jobs --- src/datapump/sync/sync.py | 110 ++++++++++++++++++++++++++++---------- 1 file changed, 82 insertions(+), 28 deletions(-) diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index 71f397d..897289c 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -32,6 +32,14 @@ def __init__(self, sync_version: str): def build_jobs(self, config: DatapumpConfig) -> List[Job]: ... + @staticmethod + def get_latest_api_version(dataset_name: str) -> str: + """ + Get the version of the latest release in the Data API + """ + client = DataApiClient() + return client.get_latest_version(dataset_name) + class FireAlertsSync(Sync): def __init__(self, sync_version: str): @@ -428,14 +436,6 @@ def parse_version_as_dt(version: str) -> datetime: ) return datetime(year, month, day) - @staticmethod - def get_latest_api_version(dataset_name: str) -> str: - """ - Get the version of the latest release in the Data API - """ - client = DataApiClient() - return client.get_latest_version(dataset_name) - @staticmethod def get_today(): return date.today() @@ -723,12 +723,12 @@ def get_latest_release(self) -> Tuple[str, List[str]]: return latest_release, source_uri -class DISTAlertsSync(DeforestationAlertsSync): +class DISTAlertsSync(Sync): """ Defines jobs to create new DIST alerts assets once a new release is available. """ - dataset_name = "umd_land_distburbance_alerts" + dataset = "umd_dist_alerts" source_bucket = "earthenginepartners-hansen" source_prefix = "DIST-ALERT" input_calc = """ @@ -737,11 +737,9 @@ class DISTAlertsSync(DeforestationAlertsSync): 0 ) """ - number_of_tiles = 60 - grid = "10/40000" - data_type = "int16" - no_data = -1 - max_zoom = 12 + + def __init__(self, sync_version: str): + super().__init__(sync_version) def get_latest_release(self) -> Tuple[str, List[str]]: """ @@ -749,7 +747,7 @@ def get_latest_release(self) -> Tuple[str, List[str]]: """ # Raw tiles are just updated in-place - source_uri = [ + source_uris = [ f"gs://{self.source_bucket}/{self.source_prefix}/VEG-ANOM-MAX", f"gs://{self.source_bucket}/{self.source_prefix}/VEG-DIST-DATE", f"gs://{self.source_bucket}/{self.source_prefix}/VEG-DIST-COUNT", @@ -767,22 +765,78 @@ def get_latest_release(self) -> Tuple[str, List[str]]: "v%Y%m%d" ) - return latest_release, source_uri + return latest_release, source_uris - def get_raster_job( - self, version: str, source_uris: List[str] - ) -> RasterVersionUpdateJob: - raster_job = super().get_raster_job(version, source_uris) + def build_jobs (self, config: DatapumpConfig) -> List[Job]: + latest_api_version = self.get_latest_api_version(self.dataset_name) + latest_release, source_uris = self.get_latest_release() + + jobs: List[Job] = [] - raster_job.tile_set_parameters.grid = self.grid - raster_job.tile_set_parameters.data_type = self.data_type - raster_job.tile_set_parameters.no_data = self.no_data - raster_job.tile_set_parameters.unify_projection = True # Contingent on GTC-3029 - raster_job.content_date_range = ContentDateRange( - start_date="2020-12-31", end_date=str(self.get_today()) + job = RasterVersionUpdateJob( + id=str(uuid1()), + status=JobStatus.starting, + dataset=self.dataset, + version=latest_release, + tile_set_parameters=RasterTileSetParameters( + source_uri=source_uris, + calc=self.input_calc, + grid="10/40000", + data_type="int16", + no_data=-1, + pixel_meaning="currentweek", + band_count=1, + compute_stats=False, + union_bands=True, + unify_projection=True + ), + tile_cache_parameters=RasterTileCacheParameters( + max_zoom=12, + resampling="med", + symbology={"type": "date_conf_intensity"}, # what is the correct symbology? + ), + content_date_range=ContentDateRange( + start_date="2020-12-31", end_date=str(date.today()) + ) ) + job.aux_tile_set_parameters = [ + # Aggregation TODO: Might need to adjust calc for date + RasterTileSetParameters( + source_uri=f"s3://gfw-data-lake/{self.dataset_name}/{latest_api_version}/raster/epsg-4326/10/40000/currentweek/geotiff", + pixel_meaning="date_conf" - return raster_job + ), + # Intensity tile set + RasterTileSetParameters( + source_uri=None, + pixel_meaning="intensity", + data_type="uint8", + calc="(A > 0) * 55", + grid="10/40000", + no_data=None, + ) + ] + job.cog_asset_parameters = [ + # Created from the "date_conf" asset + CogAssetParameters( + source_pixel_meaning="date_conf", + resampling="mode", + implementation="default", + blocksize=1024, + export_to_gee=False, # are we exporting to GEE? + ), + # Created from the "intensity" asset + CogAssetParameters( + source_pixel_meaning="intensity", + resampling="bilinear", + implementation="intensity", + blocksize=1024, + ) + ] + + jobs.append(job) + + return jobs class RWAreasSync(Sync): def __init__(self, sync_version: str):