diff --git a/src/datapump/jobs/version_update.py b/src/datapump/jobs/version_update.py index e6e965b..2a5a098 100644 --- a/src/datapump/jobs/version_update.py +++ b/src/datapump/jobs/version_update.py @@ -21,11 +21,10 @@ class RasterVersionUpdateJobStep(str, Enum): starting = "starting" creating_tile_set = "creating_tile_set" creating_tile_cache = "creating_tile_cache" - creating_aggregated_tile_set = "creating_aggregated_tile_set" + creating_aggregated_tile_set = "creating_aggregated_tile_set" # DIST-Alerts aggregation must run before mark_latest + mark_latest = "mark_latest" creating_aux_assets = "creating_aux_assets" creating_cog_assets = "creating_cog_assets" - mark_latest = "mark_latest" - class RasterVersionUpdateJob(Job): dataset: str @@ -33,7 +32,7 @@ class RasterVersionUpdateJob(Job): content_date_range: ContentDateRange tile_set_parameters: RasterTileSetParameters tile_cache_parameters: Optional[RasterTileCacheParameters] = None - aggregated_tile_set_parameters: AuxTileSetParameters = None + aggregated_tile_set_parameters: Optional[AuxTileSetParameters] = None aux_tile_set_parameters: List[AuxTileSetParameters] = [] cog_asset_parameters: List[CogAssetParameters] = [] timeout_sec = 24 * 60 * 60 @@ -66,7 +65,7 @@ def next_step(self): self._create_tile_cache() elif self.aggregated_tile_set_parameters: self.step = RasterVersionUpdateJobStep.creating_aggregated_tile_set - self._create_aggregated_tile_set() + self._create_aux_tile_set(self.aggregated_tile_set_parameters) else: self.step = RasterVersionUpdateJobStep.mark_latest self._mark_latest() @@ -201,14 +200,14 @@ def _create_aux_tile_set(self, tile_set_parameters: AuxTileSetParameters) -> str }, } - # Looks up asset ID of the raster tile set with auxiliary_asset_pixel_meaning + # Looks up asset ID of the latest version raster tile set with auxiliary_asset_pixel_meaning if co.auxiliary_asset_pixel_meaning: latest_version = client.get_latest_version(self.dataset) assets = client.get_assets(self.dataset, latest_version) for asset in assets: if asset["asset_type"] == "Raster tile set": creation_options = client.get_asset_creation_options(asset['asset_id']) - if creation_options["pixel_meaning"] == co.auxiliary_assets_pixel_meaning: + if creation_options["pixel_meaning"] == co.auxiliary_asset_pixel_meaning: payload["creation_options"]["auxiliary_assets"] = [asset["asset_id"]] data = client.create_aux_asset(self.dataset, self.version, payload) @@ -253,30 +252,6 @@ def _create_cog_asset(self, cog_asset_parameters: CogAssetParameters) -> str: data = client.create_aux_asset(self.dataset, self.version, payload) return data["asset_id"] - - def _create_aggregated_tile_set(self, aggregated_tile_set_parameters: AuxTileSetParameters): - """ - Create aggregated tile set - """ - client = DataApiClient() - - co = aggregated_tile_set_parameters - - assets = client.get_assets(self.dataset, self.version) - - payload = { - "asset_type": "Aggregated tile set", - "creation_options": { - "calc": co.calc, - "grid": co.grid, - "data_type": co.data_type, - "no_data": co.no_data, - "pixel_meaning": co.pixel_meaning, - "auxiliary_assets": co.auxiliary_assets - }, - } - - _ = client.create_aux_asset(self.dataset, self.version) def _check_tile_set_status(self) -> JobStatus: client = DataApiClient() diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index d45cbcb..818caef 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -728,13 +728,13 @@ class DISTAlertsSync(Sync): Defines jobs to create new DIST alerts assets once a new release is available. """ - dataset = "umd_dist_alerts" + dataset_name = "umd_dist_alerts" source_bucket = "earthenginepartners-hansen" source_prefix = "DIST-ALERT" input_calc = """ np.where((A>=30) & (A<255) & (B>0) & (C>=2) & (C<255), np.where(C<4, 20000 + B, 30000 + B), - 0 + -1 ) """ @@ -743,7 +743,7 @@ def __init__(self, sync_version: str): def get_latest_release(self) -> Tuple[str, List[str]]: """ - Get the version of the latest *complete* release in GCS + Get the version of the latest release in GCS """ # Raw tiles are just updated in-place @@ -771,9 +771,14 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: latest_api_version = self.get_latest_api_version(self.dataset_name) latest_release, source_uris = self.get_latest_release() + # If the latest API version matches latest release from UMD, no need to update + if latest_api_version == latest_release: + return [] + jobs: List[Job] = [] job = RasterVersionUpdateJob( + # Current week alerts tile set id=str(uuid1()), status=JobStatus.starting, dataset=self.dataset, @@ -795,7 +800,7 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: ) ) job.aggregated_tile_set_parameters = [ - # Aggregated tile set + # Aggregated tile set (to include all alerts) AuxTileSetParameters( pixel_meaning="default", grid="10/40000", @@ -807,18 +812,18 @@ def build_jobs (self, config: DatapumpConfig) -> List[Job]: ] job.aux_tile_set_parameters = [ # Intensity tile set - RasterTileSetParameters( + AuxTileSetParameters( source_uri=None, pixel_meaning="intensity", data_type="uint8", calc="(B > 0) * 55", grid="10/40000", no_data=None, - auxiliary_assets_pixel_meaning = "default" + auxiliary_asset_pixel_meaning = "default" ) ] job.cog_asset_parameters = [ - # Created from the "date_conf" asset + # Created from the "default" asset CogAssetParameters( source_pixel_meaning="date_conf", resampling="mode",