diff --git a/docker-compose-test.yml b/docker-compose-test.yml index d48b8aa9..12642730 100644 --- a/docker-compose-test.yml +++ b/docker-compose-test.yml @@ -29,13 +29,10 @@ services: localstack: container_name: localstack - image: localstack/localstack-pro:2.2.0 # required for Pro + image: localstack/localstack:2.2.0 ports: - "127.0.0.1:4566:4566" # LocalStack Gateway - "127.0.0.1:4510-4559:4510-4559" # external services port range - - "127.0.0.1:53:53" # DNS config (required for Pro) - - "127.0.0.1:53:53/udp" # DNS config (required for Pro) - - "127.0.0.1:443:443" # LocalStack HTTPS Gateway (required for Pro) environment: - SERVICES=cloudformation,cloudwatch,dynamodb,ec2,emr,events,iam,lambda,s3,secretsmanager,stepfunctions,sts - DEBUG=1 @@ -44,7 +41,6 @@ services: - HOSTNAME_EXTERNAL=localstack - LAMBDA_KEEPALIVE_MS=0 - PERSISTENCE=${PERSISTENCE-} - - LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY-} # required for Pro - DOCKER_HOST=unix:///var/run/docker.sock volumes: - "${TMPDIR:-./localstack}:/tmp/localstack" diff --git a/src/Dockerfile b/src/Dockerfile index e713cc80..12631f2e 100644 --- a/src/Dockerfile +++ b/src/Dockerfile @@ -15,7 +15,7 @@ RUN pip install . -t python # to change the hash of the file and get TF to realize it needs to be # redeployed. Ticket for a better solution: # https://gfw.atlassian.net/browse/GTC-1250 -# change 3 +# change 6 RUN yum install -y zip geos-devel diff --git a/src/datapump/clients/data_api.py b/src/datapump/clients/data_api.py index c52994a2..b91c7dcc 100644 --- a/src/datapump/clients/data_api.py +++ b/src/datapump/clients/data_api.py @@ -186,6 +186,10 @@ def delete_version(self, dataset: str, version: str) -> None: uri = f"{GLOBALS.data_api_uri}/dataset/{dataset}/{version}" self._send_request(ValidMethods.delete, uri) + def get_asset_creation_options(self, asset_id: str) -> Dict[str, Any]: + uri = f"{GLOBALS.data_api_uri}/asset/{asset_id}/creation_options" + return self._send_request(ValidMethods.get, uri)["data"] + def update_version_metadata( self, dataset: str, version: str, metadata: Dict[str, Any] ): diff --git a/src/datapump/commands/version_update.py b/src/datapump/commands/version_update.py index 2b1e00a6..fa3f8db7 100644 --- a/src/datapump/commands/version_update.py +++ b/src/datapump/commands/version_update.py @@ -45,3 +45,11 @@ class RasterVersionUpdateParameters(StrictBaseModel): class RasterVersionUpdateCommand(StrictBaseModel): command: str parameters: RasterVersionUpdateParameters + + +class CogAssetParameters(StrictBaseModel): + implementation: str + source_pixel_meaning: str + blocksize: int + resampling: str = "mode" + export_to_gee: bool = False diff --git a/src/datapump/jobs/version_update.py b/src/datapump/jobs/version_update.py index bc67dbf8..b89c68b9 100644 --- a/src/datapump/jobs/version_update.py +++ b/src/datapump/jobs/version_update.py @@ -5,8 +5,10 @@ from datapump.commands.version_update import ( RasterTileCacheParameters, RasterTileSetParameters, + CogAssetParameters, ) from datapump.util.models import ContentDateRange +from datapump.util.slack import slack_webhook from ..clients.data_api import DataApiClient from ..globals import LOGGER @@ -19,6 +21,7 @@ class RasterVersionUpdateJobStep(str, Enum): creating_tile_set = "creating_tile_set" creating_tile_cache = "creating_tile_cache" creating_aux_assets = "creating_aux_assets" + creating_cog_assets = "creating_cog_assets" mark_latest = "mark_latest" @@ -29,6 +32,7 @@ class RasterVersionUpdateJob(Job): tile_set_parameters: RasterTileSetParameters tile_cache_parameters: Optional[RasterTileCacheParameters] = None aux_tile_set_parameters: List[RasterTileSetParameters] = [] + cog_asset_parameters: List[CogAssetParameters] = [] timeout_sec = 24 * 60 * 60 def next_step(self): @@ -84,6 +88,20 @@ def next_step(self): self.status = JobStatus.failed elif self.step == RasterVersionUpdateJobStep.creating_aux_assets: + status = self._check_aux_assets_status() + if status == JobStatus.complete: + if self.cog_asset_parameters: + self.step = RasterVersionUpdateJobStep.creating_cog_assets + for cog_asset_param in self.cog_asset_parameters: + if self._create_cog_asset(cog_asset_param) == "": + self.status = JobStatus.failed + break + else: + self.status = JobStatus.complete + elif status == JobStatus.failed: + self.status = JobStatus.failed + + elif self.step == RasterVersionUpdateJobStep.creating_cog_assets: status = self._check_aux_assets_status() if status == JobStatus.complete: self.status = JobStatus.complete @@ -173,6 +191,45 @@ def _create_aux_tile_set(self, tile_set_parameters: RasterTileSetParameters) -> return data["asset_id"] + def _create_cog_asset(self, cog_asset_parameters: CogAssetParameters) -> str: + """ + Create cog asset and return asset ID, empty string if an error + """ + client = DataApiClient() + + co = cog_asset_parameters + + assets = client.get_assets(self.dataset, self.version) + asset_id = "" + for asset in assets: + if asset["asset_type"] == "Raster tile set": + creation_options = client.get_asset_creation_options(asset['asset_id']) + if creation_options["pixel_meaning"] == co.source_pixel_meaning: + if asset_id != "": + self.errors.append(f"Multiple assets with pixel meaning '{co.source_pixel_meaning}'") + return "" + asset_id = asset["asset_id"] + + if asset_id == "": + self.errors.append(f"Could not find asset with pixel meaning '{co.source_pixel_meaning}'") + return "" + + payload = { + "asset_type": "COG", + "creation_options": { + "implementation": co.implementation, + "source_asset_id": asset_id, + "resampling": co.resampling, + "block_size": co.blocksize, + "export_to_gee": co.export_to_gee, + }, + } + + slack_webhook("INFO", f"Starting COG asset job {self.dataset}/{self.version} {co.implementation}") + data = client.create_aux_asset(self.dataset, self.version, payload) + + return data["asset_id"] + def _check_tile_set_status(self) -> JobStatus: client = DataApiClient() diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index d7533a2a..99d4e12d 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -11,7 +11,7 @@ from ..clients.datapump_store import DatapumpConfig from ..commands.analysis import FIRES_ANALYSES, AnalysisInputTable from ..commands.sync import SyncType -from ..commands.version_update import RasterTileCacheParameters, RasterTileSetParameters +from ..commands.version_update import RasterTileCacheParameters, RasterTileSetParameters, CogAssetParameters from ..globals import GLOBALS, LOGGER from ..jobs.geotrellis import FireAlertsGeotrellisJob, GeotrellisJob, Job from ..jobs.jobs import JobStatus @@ -206,37 +206,65 @@ def build_jobs(self, config: DatapumpConfig) -> List[Job]: if not self._should_update(latest_versions): return [] - jobs = [] + jobs: List[Job] = [] if config.dataset == "gadm": - jobs.append( - RasterVersionUpdateJob( - id=str(uuid1()), - status=JobStatus.starting, - dataset=self.DATASET_NAME, - version=self.sync_version, - tile_set_parameters=RasterTileSetParameters( - source_uri=source_uris, - calc=self.INPUT_CALC, - grid="10/100000", - data_type="uint16", - no_data=0, - pixel_meaning="date_conf", - band_count=1, - union_bands=True, - compute_stats=False, - timeout_sec=21600, - ), - tile_cache_parameters=RasterTileCacheParameters( - max_zoom=14, - resampling="med", - symbology={"type": "date_conf_intensity_multi_8"}, - ), - content_date_range=ContentDateRange( - start_date="2014-12-31", end_date=str(date.today()) - ), - ) + job = RasterVersionUpdateJob( + id=str(uuid1()), + status=JobStatus.starting, + dataset=self.DATASET_NAME, + version=self.sync_version, + tile_set_parameters=RasterTileSetParameters( + source_uri=source_uris, + calc=self.INPUT_CALC, + grid="10/100000", + data_type="uint16", + no_data=0, + pixel_meaning="date_conf", + band_count=1, + union_bands=True, + compute_stats=False, + timeout_sec=21600, + ), + tile_cache_parameters=RasterTileCacheParameters( + max_zoom=14, + resampling="med", + symbology={"type": "date_conf_intensity_multi_8"}, + ), + content_date_range=ContentDateRange( + start_date="2014-12-31", end_date=str(date.today()) + ), ) + job.aux_tile_set_parameters = [ + RasterTileSetParameters( + source_uri=None, + pixel_meaning="intensity", + data_type="uint8", + calc="(A > 0) * 255", + grid="10/100000", + no_data=0, + ) + ] + job.cog_asset_parameters = [ + # Created from the "date_conf" asset + CogAssetParameters( + source_pixel_meaning="date_conf", + resampling="mode", + implementation="default", + blocksize=1024, + export_to_gee=True, + ), + # Created from the "intensity" asset + CogAssetParameters( + source_pixel_meaning="intensity", + resampling="bilinear", + implementation="intensity", + blocksize=1024, + ), + ] + + jobs.append(job) + jobs.append( GeotrellisJob( id=str(uuid1()), diff --git a/src/lambdas/dispatcher/src/lambda_function.py b/src/lambdas/dispatcher/src/lambda_function.py index cc776e44..bac88c29 100644 --- a/src/lambdas/dispatcher/src/lambda_function.py +++ b/src/lambdas/dispatcher/src/lambda_function.py @@ -128,10 +128,12 @@ def _sync(command: SyncCommand): if not sync_config: slack_webhook( "WARNING", - f"No DyanamoDB rows found for sync type {sync_type}!" + f"No DynamoDB rows found for sync type {sync_type}!" ) + LOGGER.warning(f"No DynamoDB rows found for sync type {sync_type}!") for row in sync_config: syncer_jobs = syncer.build_jobs(row) + LOGGER.info(f"Processing row {row}, got jobs {syncer_jobs}!") if syncer_jobs: jobs += [job.dict() for job in syncer_jobs] diff --git a/terraform/main.tf b/terraform/main.tf index a21150b7..5f24c3a8 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -46,7 +46,7 @@ module "datapump" { geotrellis_jar_path = var.geotrellis_jar_path pipelines_bucket = data.terraform_remote_state.core.outputs.pipelines_bucket tags = local.tags - sfn_wait_time = 30 + sfn_wait_time = 120 data_api_uri = var.data_api_uri data_lake_bucket = data.terraform_remote_state.core.outputs.data-lake_bucket rasterio_lambda_layer_arn = data.terraform_remote_state.lambda-layers.outputs.py310_rasterio_138_arn