From ce95efae577c1d37638d285546f56e8956862131 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Mon, 4 Nov 2024 11:19:17 -0800 Subject: [PATCH 01/14] GTC-2977 Read latest date --- src/datapump/commands/sync.py | 1 + src/datapump/sync/sync.py | 62 +++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/src/datapump/commands/sync.py b/src/datapump/commands/sync.py index 888a0da6..1d313ff5 100644 --- a/src/datapump/commands/sync.py +++ b/src/datapump/commands/sync.py @@ -14,6 +14,7 @@ class SyncType(str, Enum): wur_radd_alerts = "wur_radd_alerts" umd_glad_landsat_alerts = "umd_glad_landsat_alerts" umd_glad_sentinel2_alerts = "umd_glad_sentinel2_alerts" + umd_land_disturbance_alerts = "umd_land_disturbance_alerts" @staticmethod def get_sync_types(dataset: str, analysis: Analysis): diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index eebd89c8..71f397df 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -723,6 +723,67 @@ def get_latest_release(self) -> Tuple[str, List[str]]: return latest_release, source_uri +class DISTAlertsSync(DeforestationAlertsSync): + """ + Defines jobs to create new DIST alerts assets once a new release is available. + """ + + dataset_name = "umd_land_distburbance_alerts" + source_bucket = "earthenginepartners-hansen" + source_prefix = "DIST-ALERT" + input_calc = """ + np.where((A>=30) & (A<255) & (B>0) & (C>=2) & (C<255), + np.where(C<4, 20000 + B, 30000 + B), + 0 + ) + """ + number_of_tiles = 60 + grid = "10/40000" + data_type = "int16" + no_data = -1 + max_zoom = 12 + + def get_latest_release(self) -> Tuple[str, List[str]]: + """ + Get the version of the latest *complete* release in GCS + """ + + # Raw tiles are just updated in-place + source_uri = [ + f"gs://{self.source_bucket}/{self.source_prefix}/VEG-ANOM-MAX", + f"gs://{self.source_bucket}/{self.source_prefix}/VEG-DIST-DATE", + f"gs://{self.source_bucket}/{self.source_prefix}/VEG-DIST-COUNT", + ] + + # This file is updated once tiles are updated + upload_date_text = get_gs_file_as_text( + self.source_bucket, f"{self.source_prefix}/uploadDate.txt" + ) + + # Example string: "Updated Fri Apr 15 14:27:01 2022 UTC" + upload_date = upload_date_text[12:-5] + LOGGER.info(f"Last DIST-Alert upload date: {upload_date}") + latest_release = datetime.strptime(upload_date, "%b %d %H:%M:%S %Y").strftime( + "v%Y%m%d" + ) + + return latest_release, source_uri + + def get_raster_job( + self, version: str, source_uris: List[str] + ) -> RasterVersionUpdateJob: + raster_job = super().get_raster_job(version, source_uris) + + raster_job.tile_set_parameters.grid = self.grid + raster_job.tile_set_parameters.data_type = self.data_type + raster_job.tile_set_parameters.no_data = self.no_data + raster_job.tile_set_parameters.unify_projection = True # Contingent on GTC-3029 + raster_job.content_date_range = ContentDateRange( + start_date="2020-12-31", end_date=str(self.get_today()) + ) + + return raster_job + class RWAreasSync(Sync): def __init__(self, sync_version: str): self.sync_version = sync_version @@ -765,6 +826,7 @@ class Syncer: SyncType.wur_radd_alerts: RADDAlertsSync, SyncType.umd_glad_landsat_alerts: GLADLAlertsSync, SyncType.umd_glad_sentinel2_alerts: GLADS2AlertsSync, + SyncType.umd_land_distburbance_alerts: DISTAlertsSync, } def __init__(self, sync_types: List[SyncType], sync_version: str = None): From 72a1c39f6b68ad83fe3b65f8f94ff0035bccef70 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Mon, 11 Nov 2024 11:04:12 -0800 Subject: [PATCH 02/14] GTC-2977 Add aux tile set jobs --- src/datapump/sync/sync.py | 110 ++++++++++++++++++++++++++++---------- 1 file changed, 82 insertions(+), 28 deletions(-) diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index 71f397df..897289ca 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -32,6 +32,14 @@ def __init__(self, sync_version: str): def build_jobs(self, config: DatapumpConfig) -> List[Job]: ... + @staticmethod + def get_latest_api_version(dataset_name: str) -> str: + """ + Get the version of the latest release in the Data API + """ + client = DataApiClient() + return client.get_latest_version(dataset_name) + class FireAlertsSync(Sync): def __init__(self, sync_version: str): @@ -428,14 +436,6 @@ def parse_version_as_dt(version: str) -> datetime: ) return datetime(year, month, day) - @staticmethod - def get_latest_api_version(dataset_name: str) -> str: - """ - Get the version of the latest release in the Data API - """ - client = DataApiClient() - return client.get_latest_version(dataset_name) - @staticmethod def get_today(): return date.today() @@ -723,12 +723,12 @@ def get_latest_release(self) -> Tuple[str, List[str]]: return latest_release, source_uri -class DISTAlertsSync(DeforestationAlertsSync): +class DISTAlertsSync(Sync): """ Defines jobs to create new DIST alerts assets once a new release is available. """ - dataset_name = "umd_land_distburbance_alerts" + dataset = "umd_dist_alerts" source_bucket = "earthenginepartners-hansen" source_prefix = "DIST-ALERT" input_calc = """ @@ -737,11 +737,9 @@ class DISTAlertsSync(DeforestationAlertsSync): 0 ) """ - number_of_tiles = 60 - grid = "10/40000" - data_type = "int16" - no_data = -1 - max_zoom = 12 + + def __init__(self, sync_version: str): + super().__init__(sync_version) def get_latest_release(self) -> Tuple[str, List[str]]: """ @@ -749,7 +747,7 @@ def get_latest_release(self) -> Tuple[str, List[str]]: """ # Raw tiles are just updated in-place - source_uri = [ + source_uris = [ f"gs://{self.source_bucket}/{self.source_prefix}/VEG-ANOM-MAX", f"gs://{self.source_bucket}/{self.source_prefix}/VEG-DIST-DATE", f"gs://{self.source_bucket}/{self.source_prefix}/VEG-DIST-COUNT", @@ -767,22 +765,78 @@ def get_latest_release(self) -> Tuple[str, List[str]]: "v%Y%m%d" ) - return latest_release, source_uri + return latest_release, source_uris - def get_raster_job( - self, version: str, source_uris: List[str] - ) -> RasterVersionUpdateJob: - raster_job = super().get_raster_job(version, source_uris) + def build_jobs (self, config: DatapumpConfig) -> List[Job]: + latest_api_version = self.get_latest_api_version(self.dataset_name) + latest_release, source_uris = self.get_latest_release() + + jobs: List[Job] = [] - raster_job.tile_set_parameters.grid = self.grid - raster_job.tile_set_parameters.data_type = self.data_type - raster_job.tile_set_parameters.no_data = self.no_data - raster_job.tile_set_parameters.unify_projection = True # Contingent on GTC-3029 - raster_job.content_date_range = ContentDateRange( - start_date="2020-12-31", end_date=str(self.get_today()) + job = RasterVersionUpdateJob( + id=str(uuid1()), + status=JobStatus.starting, + dataset=self.dataset, + version=latest_release, + tile_set_parameters=RasterTileSetParameters( + source_uri=source_uris, + calc=self.input_calc, + grid="10/40000", + data_type="int16", + no_data=-1, + pixel_meaning="currentweek", + band_count=1, + compute_stats=False, + union_bands=True, + unify_projection=True + ), + tile_cache_parameters=RasterTileCacheParameters( + max_zoom=12, + resampling="med", + symbology={"type": "date_conf_intensity"}, # what is the correct symbology? + ), + content_date_range=ContentDateRange( + start_date="2020-12-31", end_date=str(date.today()) + ) ) + job.aux_tile_set_parameters = [ + # Aggregation TODO: Might need to adjust calc for date + RasterTileSetParameters( + source_uri=f"s3://gfw-data-lake/{self.dataset_name}/{latest_api_version}/raster/epsg-4326/10/40000/currentweek/geotiff", + pixel_meaning="date_conf" - return raster_job + ), + # Intensity tile set + RasterTileSetParameters( + source_uri=None, + pixel_meaning="intensity", + data_type="uint8", + calc="(A > 0) * 55", + grid="10/40000", + no_data=None, + ) + ] + job.cog_asset_parameters = [ + # Created from the "date_conf" asset + CogAssetParameters( + source_pixel_meaning="date_conf", + resampling="mode", + implementation="default", + blocksize=1024, + export_to_gee=False, # are we exporting to GEE? + ), + # Created from the "intensity" asset + CogAssetParameters( + source_pixel_meaning="intensity", + resampling="bilinear", + implementation="intensity", + blocksize=1024, + ) + ] + + jobs.append(job) + + return jobs class RWAreasSync(Sync): def __init__(self, sync_version: str): From 31fe41ec92b6af1ca06f269cc16b01fd03059d48 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Wed, 13 Nov 2024 09:04:24 -0800 Subject: [PATCH 03/14] GTC-2977 Add job step for aggregated tile set --- src/datapump/commands/sync.py | 2 +- src/datapump/commands/version_update.py | 4 ++ src/datapump/jobs/version_update.py | 52 ++++++++++++++++++++++++- src/datapump/sync/sync.py | 33 ++++++++-------- 4 files changed, 72 insertions(+), 19 deletions(-) diff --git a/src/datapump/commands/sync.py b/src/datapump/commands/sync.py index 1d313ff5..845457c4 100644 --- a/src/datapump/commands/sync.py +++ b/src/datapump/commands/sync.py @@ -14,7 +14,7 @@ class SyncType(str, Enum): wur_radd_alerts = "wur_radd_alerts" umd_glad_landsat_alerts = "umd_glad_landsat_alerts" umd_glad_sentinel2_alerts = "umd_glad_sentinel2_alerts" - umd_land_disturbance_alerts = "umd_land_disturbance_alerts" + umd_dist_alerts = "umd_dist_alerts" @staticmethod def get_sync_types(dataset: str, analysis: Analysis): diff --git a/src/datapump/commands/version_update.py b/src/datapump/commands/version_update.py index fa3f8db7..b215e70e 100644 --- a/src/datapump/commands/version_update.py +++ b/src/datapump/commands/version_update.py @@ -53,3 +53,7 @@ class CogAssetParameters(StrictBaseModel): blocksize: int resampling: str = "mode" export_to_gee: bool = False + +class AuxTileSetParameters(RasterTileSetParameters): + source_uri: None = None + auxiliary_asset_pixel_meaning: str \ No newline at end of file diff --git a/src/datapump/jobs/version_update.py b/src/datapump/jobs/version_update.py index b89c68b9..e6e965bb 100644 --- a/src/datapump/jobs/version_update.py +++ b/src/datapump/jobs/version_update.py @@ -6,6 +6,7 @@ RasterTileCacheParameters, RasterTileSetParameters, CogAssetParameters, + AuxTileSetParameters ) from datapump.util.models import ContentDateRange from datapump.util.slack import slack_webhook @@ -20,6 +21,7 @@ class RasterVersionUpdateJobStep(str, Enum): starting = "starting" creating_tile_set = "creating_tile_set" creating_tile_cache = "creating_tile_cache" + creating_aggregated_tile_set = "creating_aggregated_tile_set" creating_aux_assets = "creating_aux_assets" creating_cog_assets = "creating_cog_assets" mark_latest = "mark_latest" @@ -31,7 +33,8 @@ class RasterVersionUpdateJob(Job): content_date_range: ContentDateRange tile_set_parameters: RasterTileSetParameters tile_cache_parameters: Optional[RasterTileCacheParameters] = None - aux_tile_set_parameters: List[RasterTileSetParameters] = [] + aggregated_tile_set_parameters: AuxTileSetParameters = None + aux_tile_set_parameters: List[AuxTileSetParameters] = [] cog_asset_parameters: List[CogAssetParameters] = [] timeout_sec = 24 * 60 * 60 @@ -61,6 +64,9 @@ def next_step(self): if self.tile_cache_parameters: self.step = RasterVersionUpdateJobStep.creating_tile_cache self._create_tile_cache() + elif self.aggregated_tile_set_parameters: + self.step = RasterVersionUpdateJobStep.creating_aggregated_tile_set + self._create_aggregated_tile_set() else: self.step = RasterVersionUpdateJobStep.mark_latest self._mark_latest() @@ -75,6 +81,14 @@ def next_step(self): elif status == JobStatus.failed: self.status = JobStatus.failed + elif self.step == RasterVersionUpdateJobStep.creating_aggregated_tile_set: + status = self._check_aux_assets_status() + if status == JobStatus.complete: + self.step = RasterVersionUpdateJobStep.mark_latest + self._mark_latest() + elif status == JobStatus.failed: + self.status = JobStatus.failed + elif self.step == RasterVersionUpdateJobStep.mark_latest: status = self._check_latest_status() if status == JobStatus.complete: @@ -161,7 +175,7 @@ def _create_tile_set(self): _ = client.create_version(self.dataset, self.version, payload) - def _create_aux_tile_set(self, tile_set_parameters: RasterTileSetParameters) -> str: + def _create_aux_tile_set(self, tile_set_parameters: AuxTileSetParameters) -> str: """ Create auxiliary tile set and return asset ID """ @@ -187,6 +201,16 @@ def _create_aux_tile_set(self, tile_set_parameters: RasterTileSetParameters) -> }, } + # Looks up asset ID of the raster tile set with auxiliary_asset_pixel_meaning + if co.auxiliary_asset_pixel_meaning: + latest_version = client.get_latest_version(self.dataset) + assets = client.get_assets(self.dataset, latest_version) + for asset in assets: + if asset["asset_type"] == "Raster tile set": + creation_options = client.get_asset_creation_options(asset['asset_id']) + if creation_options["pixel_meaning"] == co.auxiliary_assets_pixel_meaning: + payload["creation_options"]["auxiliary_assets"] = [asset["asset_id"]] + data = client.create_aux_asset(self.dataset, self.version, payload) return data["asset_id"] @@ -229,6 +253,30 @@ def _create_cog_asset(self, cog_asset_parameters: CogAssetParameters) -> str: data = client.create_aux_asset(self.dataset, self.version, payload) return data["asset_id"] + + def _create_aggregated_tile_set(self, aggregated_tile_set_parameters: AuxTileSetParameters): + """ + Create aggregated tile set + """ + client = DataApiClient() + + co = aggregated_tile_set_parameters + + assets = client.get_assets(self.dataset, self.version) + + payload = { + "asset_type": "Aggregated tile set", + "creation_options": { + "calc": co.calc, + "grid": co.grid, + "data_type": co.data_type, + "no_data": co.no_data, + "pixel_meaning": co.pixel_meaning, + "auxiliary_assets": co.auxiliary_assets + }, + } + + _ = client.create_aux_asset(self.dataset, self.version) def _check_tile_set_status(self) -> JobStatus: client = DataApiClient() diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index 897289ca..d45cbcb7 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -11,7 +11,7 @@ from ..clients.datapump_store import DatapumpConfig from ..commands.analysis import FIRES_ANALYSES, AnalysisInputTable from ..commands.sync import SyncType -from ..commands.version_update import RasterTileCacheParameters, RasterTileSetParameters, CogAssetParameters +from ..commands.version_update import RasterTileCacheParameters, RasterTileSetParameters, CogAssetParameters, AuxTileSetParameters from ..globals import GLOBALS, LOGGER from ..jobs.geotrellis import FireAlertsGeotrellisJob, GeotrellisJob, Job from ..jobs.jobs import JobStatus @@ -790,30 +790,31 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: union_bands=True, unify_projection=True ), - tile_cache_parameters=RasterTileCacheParameters( - max_zoom=12, - resampling="med", - symbology={"type": "date_conf_intensity"}, # what is the correct symbology? - ), content_date_range=ContentDateRange( start_date="2020-12-31", end_date=str(date.today()) ) ) - job.aux_tile_set_parameters = [ - # Aggregation TODO: Might need to adjust calc for date - RasterTileSetParameters( - source_uri=f"s3://gfw-data-lake/{self.dataset_name}/{latest_api_version}/raster/epsg-4326/10/40000/currentweek/geotiff", - pixel_meaning="date_conf" - + job.aggregated_tile_set_parameters = [ + # Aggregated tile set + AuxTileSetParameters( + pixel_meaning="default", + grid="10/40000", + data_type="int16", + no_data=-1, + calc="np.where(A > 0, A, B)", + auxiliary_asset_pixel_meaning = "default" ), + ] + job.aux_tile_set_parameters = [ # Intensity tile set RasterTileSetParameters( source_uri=None, pixel_meaning="intensity", data_type="uint8", - calc="(A > 0) * 55", + calc="(B > 0) * 55", grid="10/40000", no_data=None, + auxiliary_assets_pixel_meaning = "default" ) ] job.cog_asset_parameters = [ @@ -823,14 +824,14 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: resampling="mode", implementation="default", blocksize=1024, - export_to_gee=False, # are we exporting to GEE? + export_to_gee=False ), # Created from the "intensity" asset CogAssetParameters( source_pixel_meaning="intensity", resampling="bilinear", implementation="intensity", - blocksize=1024, + blocksize=1024 ) ] @@ -880,7 +881,7 @@ class Syncer: SyncType.wur_radd_alerts: RADDAlertsSync, SyncType.umd_glad_landsat_alerts: GLADLAlertsSync, SyncType.umd_glad_sentinel2_alerts: GLADS2AlertsSync, - SyncType.umd_land_distburbance_alerts: DISTAlertsSync, + SyncType.umd_dist_alerts: DISTAlertsSync, } def __init__(self, sync_types: List[SyncType], sync_version: str = None): From 6f39459fb175291641dc6e15db817df749091075 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Wed, 13 Nov 2024 11:31:01 -0800 Subject: [PATCH 04/14] GTC-2977 Add comments and lookup for aux assets --- src/datapump/jobs/version_update.py | 37 +++++------------------------ src/datapump/sync/sync.py | 19 +++++++++------ 2 files changed, 18 insertions(+), 38 deletions(-) diff --git a/src/datapump/jobs/version_update.py b/src/datapump/jobs/version_update.py index e6e965bb..2a5a0985 100644 --- a/src/datapump/jobs/version_update.py +++ b/src/datapump/jobs/version_update.py @@ -21,11 +21,10 @@ class RasterVersionUpdateJobStep(str, Enum): starting = "starting" creating_tile_set = "creating_tile_set" creating_tile_cache = "creating_tile_cache" - creating_aggregated_tile_set = "creating_aggregated_tile_set" + creating_aggregated_tile_set = "creating_aggregated_tile_set" # DIST-Alerts aggregation must run before mark_latest + mark_latest = "mark_latest" creating_aux_assets = "creating_aux_assets" creating_cog_assets = "creating_cog_assets" - mark_latest = "mark_latest" - class RasterVersionUpdateJob(Job): dataset: str @@ -33,7 +32,7 @@ class RasterVersionUpdateJob(Job): content_date_range: ContentDateRange tile_set_parameters: RasterTileSetParameters tile_cache_parameters: Optional[RasterTileCacheParameters] = None - aggregated_tile_set_parameters: AuxTileSetParameters = None + aggregated_tile_set_parameters: Optional[AuxTileSetParameters] = None aux_tile_set_parameters: List[AuxTileSetParameters] = [] cog_asset_parameters: List[CogAssetParameters] = [] timeout_sec = 24 * 60 * 60 @@ -66,7 +65,7 @@ def next_step(self): self._create_tile_cache() elif self.aggregated_tile_set_parameters: self.step = RasterVersionUpdateJobStep.creating_aggregated_tile_set - self._create_aggregated_tile_set() + self._create_aux_tile_set(self.aggregated_tile_set_parameters) else: self.step = RasterVersionUpdateJobStep.mark_latest self._mark_latest() @@ -201,14 +200,14 @@ def _create_aux_tile_set(self, tile_set_parameters: AuxTileSetParameters) -> str }, } - # Looks up asset ID of the raster tile set with auxiliary_asset_pixel_meaning + # Looks up asset ID of the latest version raster tile set with auxiliary_asset_pixel_meaning if co.auxiliary_asset_pixel_meaning: latest_version = client.get_latest_version(self.dataset) assets = client.get_assets(self.dataset, latest_version) for asset in assets: if asset["asset_type"] == "Raster tile set": creation_options = client.get_asset_creation_options(asset['asset_id']) - if creation_options["pixel_meaning"] == co.auxiliary_assets_pixel_meaning: + if creation_options["pixel_meaning"] == co.auxiliary_asset_pixel_meaning: payload["creation_options"]["auxiliary_assets"] = [asset["asset_id"]] data = client.create_aux_asset(self.dataset, self.version, payload) @@ -253,30 +252,6 @@ def _create_cog_asset(self, cog_asset_parameters: CogAssetParameters) -> str: data = client.create_aux_asset(self.dataset, self.version, payload) return data["asset_id"] - - def _create_aggregated_tile_set(self, aggregated_tile_set_parameters: AuxTileSetParameters): - """ - Create aggregated tile set - """ - client = DataApiClient() - - co = aggregated_tile_set_parameters - - assets = client.get_assets(self.dataset, self.version) - - payload = { - "asset_type": "Aggregated tile set", - "creation_options": { - "calc": co.calc, - "grid": co.grid, - "data_type": co.data_type, - "no_data": co.no_data, - "pixel_meaning": co.pixel_meaning, - "auxiliary_assets": co.auxiliary_assets - }, - } - - _ = client.create_aux_asset(self.dataset, self.version) def _check_tile_set_status(self) -> JobStatus: client = DataApiClient() diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index d45cbcb7..818caef5 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -728,13 +728,13 @@ class DISTAlertsSync(Sync): Defines jobs to create new DIST alerts assets once a new release is available. """ - dataset = "umd_dist_alerts" + dataset_name = "umd_dist_alerts" source_bucket = "earthenginepartners-hansen" source_prefix = "DIST-ALERT" input_calc = """ np.where((A>=30) & (A<255) & (B>0) & (C>=2) & (C<255), np.where(C<4, 20000 + B, 30000 + B), - 0 + -1 ) """ @@ -743,7 +743,7 @@ def __init__(self, sync_version: str): def get_latest_release(self) -> Tuple[str, List[str]]: """ - Get the version of the latest *complete* release in GCS + Get the version of the latest release in GCS """ # Raw tiles are just updated in-place @@ -771,9 +771,14 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: latest_api_version = self.get_latest_api_version(self.dataset_name) latest_release, source_uris = self.get_latest_release() + # If the latest API version matches latest release from UMD, no need to update + if latest_api_version == latest_release: + return [] + jobs: List[Job] = [] job = RasterVersionUpdateJob( + # Current week alerts tile set id=str(uuid1()), status=JobStatus.starting, dataset=self.dataset, @@ -795,7 +800,7 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: ) ) job.aggregated_tile_set_parameters = [ - # Aggregated tile set + # Aggregated tile set (to include all alerts) AuxTileSetParameters( pixel_meaning="default", grid="10/40000", @@ -807,18 +812,18 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: ] job.aux_tile_set_parameters = [ # Intensity tile set - RasterTileSetParameters( + AuxTileSetParameters( source_uri=None, pixel_meaning="intensity", data_type="uint8", calc="(B > 0) * 55", grid="10/40000", no_data=None, - auxiliary_assets_pixel_meaning = "default" + auxiliary_asset_pixel_meaning = "default" ) ] job.cog_asset_parameters = [ - # Created from the "date_conf" asset + # Created from the "default" asset CogAssetParameters( source_pixel_meaning="date_conf", resampling="mode", From 1ad132d6f687f0dd9b538b91bf0e03316c8bebc4 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Thu, 14 Nov 2024 10:52:02 -0800 Subject: [PATCH 05/14] GTC-2977 Correct types --- src/datapump/commands/version_update.py | 1 + src/datapump/jobs/version_update.py | 2 ++ src/datapump/sync/sync.py | 26 ++++++++++++------------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/datapump/commands/version_update.py b/src/datapump/commands/version_update.py index b215e70e..727dc83e 100644 --- a/src/datapump/commands/version_update.py +++ b/src/datapump/commands/version_update.py @@ -26,6 +26,7 @@ class RasterTileSetParameters(StrictBaseModel): timeout_sec: int = 7200 num_processes: Optional[int] = None resampling: str = "nearest" + unify_projection: bool = False class RasterTileCacheParameters(StrictBaseModel): diff --git a/src/datapump/jobs/version_update.py b/src/datapump/jobs/version_update.py index 2a5a0985..6503b60e 100644 --- a/src/datapump/jobs/version_update.py +++ b/src/datapump/jobs/version_update.py @@ -161,6 +161,7 @@ def _create_tile_set(self): "compute_histogram": co.compute_histogram, "timeout_sec": co.timeout_sec, "resampling": co.resampling, + "unify_projection": co.unify_projection }, "metadata": { "last_update": self.content_date_range.end_date, @@ -197,6 +198,7 @@ def _create_aux_tile_set(self, tile_set_parameters: AuxTileSetParameters) -> str "timeout_sec": co.timeout_sec, "num_processes": co.num_processes, "resampling": co.resampling, + "unify_projection": co.unify_projection }, } diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index 818caef5..72b181e2 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -248,7 +248,7 @@ def build_jobs(self, config: DatapumpConfig) -> List[Job]: ), ) job.aux_tile_set_parameters = [ - RasterTileSetParameters( + AuxTileSetParameters( source_uri=None, pixel_meaning="intensity", data_type="uint8", @@ -565,7 +565,7 @@ def get_raster_job( ) -> RasterVersionUpdateJob: raster_job = super().get_raster_job(version, source_uris) raster_job.aux_tile_set_parameters = [ - RasterTileSetParameters( + AuxTileSetParameters( grid="10/100000", data_type="uint16", pixel_meaning="date_conf", @@ -728,7 +728,7 @@ class DISTAlertsSync(Sync): Defines jobs to create new DIST alerts assets once a new release is available. """ - dataset_name = "umd_dist_alerts" + dataset_name = "umd__glad_dist_alerts" source_bucket = "earthenginepartners-hansen" source_prefix = "DIST-ALERT" input_calc = """ @@ -799,17 +799,15 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: start_date="2020-12-31", end_date=str(date.today()) ) ) - job.aggregated_tile_set_parameters = [ + job.aggregated_tile_set_parameters = AuxTileSetParameters( # Aggregated tile set (to include all alerts) - AuxTileSetParameters( - pixel_meaning="default", - grid="10/40000", - data_type="int16", - no_data=-1, - calc="np.where(A > 0, A, B)", - auxiliary_asset_pixel_meaning = "default" - ), - ] + pixel_meaning="default", + grid="10/40000", + data_type="int16", + no_data=-1, + calc="np.where(A > 0, A, B)", + auxiliary_asset_pixel_meaning = "default" + ) job.aux_tile_set_parameters = [ # Intensity tile set AuxTileSetParameters( @@ -886,7 +884,7 @@ class Syncer: SyncType.wur_radd_alerts: RADDAlertsSync, SyncType.umd_glad_landsat_alerts: GLADLAlertsSync, SyncType.umd_glad_sentinel2_alerts: GLADS2AlertsSync, - SyncType.umd_dist_alerts: DISTAlertsSync, + SyncType.umd_glad_dist_alerts: DISTAlertsSync, } def __init__(self, sync_types: List[SyncType], sync_version: str = None): From c7a864f8a4e9c4d9a7cb1d5171a0c77f85e967f4 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Fri, 15 Nov 2024 09:32:34 -0800 Subject: [PATCH 06/14] Further corrections --- src/datapump/commands/sync.py | 2 +- src/datapump/commands/version_update.py | 2 +- src/datapump/sync/sync.py | 12 +++++------- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/datapump/commands/sync.py b/src/datapump/commands/sync.py index 845457c4..0c3e0dae 100644 --- a/src/datapump/commands/sync.py +++ b/src/datapump/commands/sync.py @@ -14,7 +14,7 @@ class SyncType(str, Enum): wur_radd_alerts = "wur_radd_alerts" umd_glad_landsat_alerts = "umd_glad_landsat_alerts" umd_glad_sentinel2_alerts = "umd_glad_sentinel2_alerts" - umd_dist_alerts = "umd_dist_alerts" + umd_glad_dist_alerts = "umd_glad_dist_alerts" @staticmethod def get_sync_types(dataset: str, analysis: Analysis): diff --git a/src/datapump/commands/version_update.py b/src/datapump/commands/version_update.py index 727dc83e..dd31e9b1 100644 --- a/src/datapump/commands/version_update.py +++ b/src/datapump/commands/version_update.py @@ -57,4 +57,4 @@ class CogAssetParameters(StrictBaseModel): class AuxTileSetParameters(RasterTileSetParameters): source_uri: None = None - auxiliary_asset_pixel_meaning: str \ No newline at end of file + auxiliary_asset_pixel_meaning: Optional[str] \ No newline at end of file diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index 72b181e2..c940d93e 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -739,7 +739,7 @@ class DISTAlertsSync(Sync): """ def __init__(self, sync_version: str): - super().__init__(sync_version) + self.sync_version = sync_version def get_latest_release(self) -> Tuple[str, List[str]]: """ @@ -758,12 +758,10 @@ def get_latest_release(self) -> Tuple[str, List[str]]: self.source_bucket, f"{self.source_prefix}/uploadDate.txt" ) - # Example string: "Updated Fri Apr 15 14:27:01 2022 UTC" - upload_date = upload_date_text[12:-5] + # Example string: "Updated Sat Nov 9 13:43:05 2024-11-09 UTC" + upload_date = upload_date_text[-14:-4] LOGGER.info(f"Last DIST-Alert upload date: {upload_date}") - latest_release = datetime.strptime(upload_date, "%b %d %H:%M:%S %Y").strftime( - "v%Y%m%d" - ) + latest_release = f"v{upload_date.replace("-", "")}" return latest_release, source_uris @@ -823,7 +821,7 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: job.cog_asset_parameters = [ # Created from the "default" asset CogAssetParameters( - source_pixel_meaning="date_conf", + source_pixel_meaning="default", resampling="mode", implementation="default", blocksize=1024, From 823458fb7f3c08eb23dfc5991719bcc7b3e4e1d7 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Mon, 18 Nov 2024 10:50:06 -0800 Subject: [PATCH 07/14] GTC-2977 Fix parsing of upload date --- src/Dockerfile | 2 +- src/datapump/sync/sync.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Dockerfile b/src/Dockerfile index f31239cb..2851546e 100644 --- a/src/Dockerfile +++ b/src/Dockerfile @@ -15,7 +15,7 @@ RUN pip install . -t python # to change the hash of the file and get TF to realize it needs to be # redeployed. Ticket for a better solution: # https://gfw.atlassian.net/browse/GTC-1250 -# change 10 +# change 11 RUN yum install -y zip geos-devel diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index c940d93e..377ebbb6 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -759,9 +759,9 @@ def get_latest_release(self) -> Tuple[str, List[str]]: ) # Example string: "Updated Sat Nov 9 13:43:05 2024-11-09 UTC" - upload_date = upload_date_text[-14:-4] + upload_date = upload_date_text[-15:-5] LOGGER.info(f"Last DIST-Alert upload date: {upload_date}") - latest_release = f"v{upload_date.replace("-", "")}" + latest_release = f"v{upload_date.replace('-', '')}" return latest_release, source_uris From c821062857fac9663e7b433a2a7c692971ccb9d7 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Mon, 18 Nov 2024 13:30:14 -0800 Subject: [PATCH 08/14] Fix sync type name --- src/datapump/sync/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index 377ebbb6..8ddd9572 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -728,7 +728,7 @@ class DISTAlertsSync(Sync): Defines jobs to create new DIST alerts assets once a new release is available. """ - dataset_name = "umd__glad_dist_alerts" + dataset_name = "umd_glad_dist_alerts" source_bucket = "earthenginepartners-hansen" source_prefix = "DIST-ALERT" input_calc = """ From 33bb6c45b30638d7698df28454dd93298ed87192 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Mon, 18 Nov 2024 13:51:22 -0800 Subject: [PATCH 09/14] Increcement TF --- src/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Dockerfile b/src/Dockerfile index 2851546e..33b065b7 100644 --- a/src/Dockerfile +++ b/src/Dockerfile @@ -15,7 +15,7 @@ RUN pip install . -t python # to change the hash of the file and get TF to realize it needs to be # redeployed. Ticket for a better solution: # https://gfw.atlassian.net/browse/GTC-1250 -# change 11 +# change 12 RUN yum install -y zip geos-devel From 6e7c1def22988e5b098cfa0f79cc7029be297342 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Mon, 18 Nov 2024 14:48:47 -0800 Subject: [PATCH 10/14] Change dataset to dataset_name --- src/datapump/sync/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index 8ddd9572..374f2728 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -779,7 +779,7 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: # Current week alerts tile set id=str(uuid1()), status=JobStatus.starting, - dataset=self.dataset, + dataset=self.dataset_name, version=latest_release, tile_set_parameters=RasterTileSetParameters( source_uri=source_uris, From 8fd677f1de026b823c9bb3f6b2d139618cfbbc35 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Mon, 18 Nov 2024 15:05:59 -0800 Subject: [PATCH 11/14] Increment TF --- src/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Dockerfile b/src/Dockerfile index 33b065b7..be0c6e1b 100644 --- a/src/Dockerfile +++ b/src/Dockerfile @@ -15,7 +15,7 @@ RUN pip install . -t python # to change the hash of the file and get TF to realize it needs to be # redeployed. Ticket for a better solution: # https://gfw.atlassian.net/browse/GTC-1250 -# change 12 +# change 13 RUN yum install -y zip geos-devel From b55f67a8535ced6af84aa2a5f1b4140e6aa2a099 Mon Sep 17 00:00:00 2001 From: manukala6 Date: Tue, 19 Nov 2024 11:01:12 -0800 Subject: [PATCH 12/14] GTC-2977 Make input_calc one line --- src/Dockerfile | 2 +- src/datapump/sync/sync.py | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/Dockerfile b/src/Dockerfile index be0c6e1b..a07a9599 100644 --- a/src/Dockerfile +++ b/src/Dockerfile @@ -15,7 +15,7 @@ RUN pip install . -t python # to change the hash of the file and get TF to realize it needs to be # redeployed. Ticket for a better solution: # https://gfw.atlassian.net/browse/GTC-1250 -# change 13 +# change 14 RUN yum install -y zip geos-devel diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index 374f2728..ed01214d 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -731,12 +731,7 @@ class DISTAlertsSync(Sync): dataset_name = "umd_glad_dist_alerts" source_bucket = "earthenginepartners-hansen" source_prefix = "DIST-ALERT" - input_calc = """ - np.where((A>=30) & (A<255) & (B>0) & (C>=2) & (C<255), - np.where(C<4, 20000 + B, 30000 + B), - -1 - ) - """ + input_calc = "np.where((A>=30) & (A<255) & (B>0) & (C>=2) & (C<255), np.where(C<4, 20000 + B, 30000 + B), -1)" def __init__(self, sync_version: str): self.sync_version = sync_version From 83d33e5ede3e3275b000d9e73f6c6e472fc89a3f Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Thu, 21 Nov 2024 09:16:34 -0800 Subject: [PATCH 13/14] Schedule dist-alerts sync for 1pm PST every day New data from UMD should only be available on Saturday morning, so should only do a full run generating a new version once a week on Saturday/Sunday --- terraform/modules/datapump/cloudwatch.tf | 25 +++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/terraform/modules/datapump/cloudwatch.tf b/terraform/modules/datapump/cloudwatch.tf index 46b69e86..070a9444 100644 --- a/terraform/modules/datapump/cloudwatch.tf +++ b/terraform/modules/datapump/cloudwatch.tf @@ -1,3 +1,6 @@ +# Note: when it says est/EST here, it really means PST (Pacific Standard time) +# The hours in the cron() expression are in UTC. + resource "aws_cloudwatch_event_rule" "everyday-11-pm-est" { name = substr("everyday-11-pm-est${local.name_suffix}", 0, 64) description = "Run everyday at 11 pm EST" @@ -26,6 +29,15 @@ resource "aws_cloudwatch_event_rule" "everyday-3-am-est" { tags = local.tags } +resource "aws_cloudwatch_event_rule" "everyday-1-pm-est" { + name = substr("everyday-1-pm-est${local.name_suffix}", 0, 64) + description = "Run everyday at 1 pm EST" + schedule_expression = "cron(0 21 ? * * *)" + tags = local.tags +} + +# The count condition in each of the resources below ensures that the CloudWatch +# events only happen in production. resource "aws_cloudwatch_event_target" "sync-areas" { rule = aws_cloudwatch_event_rule.everyday-7-pm-est.name target_id = substr("${local.project}-sync-areas${local.name_suffix}", 0, 64) @@ -78,4 +90,15 @@ resource "aws_cloudwatch_event_target" "sync-integrated-alerts" { input = "{\"command\": \"sync\", \"parameters\": {\"types\": [\"integrated_alerts\"]}}" role_arn = aws_iam_role.datapump_states.arn count = var.environment == "production" ? 1 : 0 -} \ No newline at end of file +} + +# Run every day at 1pm PST, but new data from UMD should only be available on Saturday morning, +# so should only do a full run generating a new version once a week on Saturday/Sunday. +resource "aws_cloudwatch_event_target" "sync-dist-alerts" { + rule = aws_cloudwatch_event_rule.everyday-1-pm-est.name + target_id = substr("${local.project}-sync-umd-glad-dist-alerts${local.name_suffix}", 0, 64) + arn = aws_sfn_state_machine.datapump.id + input = "{\"command\": \"sync\", \"parameters\": {\"types\": [\"umd_glad_dist_alerts\"]}}" + role_arn = aws_iam_role.datapump_states.arn + count = var.environment == "production" ? 1 : 0 +} From a67a18e32e34c8b4c5468090b4bb3bf52e43720c Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Mon, 25 Nov 2024 13:43:46 -0800 Subject: [PATCH 14/14] Added slack msg for the start of the DIST alerts jobs. --- src/datapump/sync/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index ed01214d..643df5a4 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -21,6 +21,7 @@ from ..util.gcs import get_gs_file_as_text, get_gs_files, get_gs_subfolders from ..util.models import ContentDateRange from ..util.util import log_and_notify_error +from ..util.slack import slack_webhook class Sync(ABC): @@ -770,6 +771,8 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: jobs: List[Job] = [] + slack_webhook("INFO", f"Starting dist-alerts jobs for {self.dataset_name}/{latest_release}") + job = RasterVersionUpdateJob( # Current week alerts tile set id=str(uuid1()),