Skip to content

Commit

Permalink
Merge pull request #146 from wri/develop
Browse files Browse the repository at this point in the history
Merge to master - GTC-2850 Create COG asset of integrated alerts in the data pump
  • Loading branch information
danscales authored Jul 2, 2024
2 parents c6b719a + 0d606de commit 9bef83f
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 37 deletions.
6 changes: 1 addition & 5 deletions docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,10 @@ services:

localstack:
container_name: localstack
image: localstack/localstack-pro:2.2.0 # required for Pro
image: localstack/localstack:2.2.0
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
- "127.0.0.1:53:53" # DNS config (required for Pro)
- "127.0.0.1:53:53/udp" # DNS config (required for Pro)
- "127.0.0.1:443:443" # LocalStack HTTPS Gateway (required for Pro)
environment:
- SERVICES=cloudformation,cloudwatch,dynamodb,ec2,emr,events,iam,lambda,s3,secretsmanager,stepfunctions,sts
- DEBUG=1
Expand All @@ -44,7 +41,6 @@ services:
- HOSTNAME_EXTERNAL=localstack
- LAMBDA_KEEPALIVE_MS=0
- PERSISTENCE=${PERSISTENCE-}
- LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY-} # required for Pro
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${TMPDIR:-./localstack}:/tmp/localstack"
Expand Down
2 changes: 1 addition & 1 deletion src/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ RUN pip install . -t python
# to change the hash of the file and get TF to realize it needs to be
# redeployed. Ticket for a better solution:
# https://gfw.atlassian.net/browse/GTC-1250
# change 3
# change 6

RUN yum install -y zip geos-devel

Expand Down
4 changes: 4 additions & 0 deletions src/datapump/clients/data_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ def delete_version(self, dataset: str, version: str) -> None:
uri = f"{GLOBALS.data_api_uri}/dataset/{dataset}/{version}"
self._send_request(ValidMethods.delete, uri)

def get_asset_creation_options(self, asset_id: str) -> Dict[str, Any]:
uri = f"{GLOBALS.data_api_uri}/asset/{asset_id}/creation_options"
return self._send_request(ValidMethods.get, uri)["data"]

def update_version_metadata(
self, dataset: str, version: str, metadata: Dict[str, Any]
):
Expand Down
8 changes: 8 additions & 0 deletions src/datapump/commands/version_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ class RasterVersionUpdateParameters(StrictBaseModel):
class RasterVersionUpdateCommand(StrictBaseModel):
command: str
parameters: RasterVersionUpdateParameters


class CogAssetParameters(StrictBaseModel):
implementation: str
source_pixel_meaning: str
blocksize: int
resampling: str = "mode"
export_to_gee: bool = False
57 changes: 57 additions & 0 deletions src/datapump/jobs/version_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from datapump.commands.version_update import (
RasterTileCacheParameters,
RasterTileSetParameters,
CogAssetParameters,
)
from datapump.util.models import ContentDateRange
from datapump.util.slack import slack_webhook

from ..clients.data_api import DataApiClient
from ..globals import LOGGER
Expand All @@ -19,6 +21,7 @@ class RasterVersionUpdateJobStep(str, Enum):
creating_tile_set = "creating_tile_set"
creating_tile_cache = "creating_tile_cache"
creating_aux_assets = "creating_aux_assets"
creating_cog_assets = "creating_cog_assets"
mark_latest = "mark_latest"


Expand All @@ -29,6 +32,7 @@ class RasterVersionUpdateJob(Job):
tile_set_parameters: RasterTileSetParameters
tile_cache_parameters: Optional[RasterTileCacheParameters] = None
aux_tile_set_parameters: List[RasterTileSetParameters] = []
cog_asset_parameters: List[CogAssetParameters] = []
timeout_sec = 24 * 60 * 60

def next_step(self):
Expand Down Expand Up @@ -84,6 +88,20 @@ def next_step(self):
self.status = JobStatus.failed

elif self.step == RasterVersionUpdateJobStep.creating_aux_assets:
status = self._check_aux_assets_status()
if status == JobStatus.complete:
if self.cog_asset_parameters:
self.step = RasterVersionUpdateJobStep.creating_cog_assets
for cog_asset_param in self.cog_asset_parameters:
if self._create_cog_asset(cog_asset_param) == "":
self.status = JobStatus.failed
break
else:
self.status = JobStatus.complete
elif status == JobStatus.failed:
self.status = JobStatus.failed

elif self.step == RasterVersionUpdateJobStep.creating_cog_assets:
status = self._check_aux_assets_status()
if status == JobStatus.complete:
self.status = JobStatus.complete
Expand Down Expand Up @@ -173,6 +191,45 @@ def _create_aux_tile_set(self, tile_set_parameters: RasterTileSetParameters) ->

return data["asset_id"]

def _create_cog_asset(self, cog_asset_parameters: CogAssetParameters) -> str:
"""
Create cog asset and return asset ID, empty string if an error
"""
client = DataApiClient()

co = cog_asset_parameters

assets = client.get_assets(self.dataset, self.version)
asset_id = ""
for asset in assets:
if asset["asset_type"] == "Raster tile set":
creation_options = client.get_asset_creation_options(asset['asset_id'])
if creation_options["pixel_meaning"] == co.source_pixel_meaning:
if asset_id != "":
self.errors.append(f"Multiple assets with pixel meaning '{co.source_pixel_meaning}'")
return ""
asset_id = asset["asset_id"]

if asset_id == "":
self.errors.append(f"Could not find asset with pixel meaning '{co.source_pixel_meaning}'")
return ""

payload = {
"asset_type": "COG",
"creation_options": {
"implementation": co.implementation,
"source_asset_id": asset_id,
"resampling": co.resampling,
"block_size": co.blocksize,
"export_to_gee": co.export_to_gee,
},
}

slack_webhook("INFO", f"Starting COG asset job {self.dataset}/{self.version} {co.implementation}")
data = client.create_aux_asset(self.dataset, self.version, payload)

return data["asset_id"]

def _check_tile_set_status(self) -> JobStatus:
client = DataApiClient()

Expand Down
86 changes: 57 additions & 29 deletions src/datapump/sync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ..clients.datapump_store import DatapumpConfig
from ..commands.analysis import FIRES_ANALYSES, AnalysisInputTable
from ..commands.sync import SyncType
from ..commands.version_update import RasterTileCacheParameters, RasterTileSetParameters
from ..commands.version_update import RasterTileCacheParameters, RasterTileSetParameters, CogAssetParameters
from ..globals import GLOBALS, LOGGER
from ..jobs.geotrellis import FireAlertsGeotrellisJob, GeotrellisJob, Job
from ..jobs.jobs import JobStatus
Expand Down Expand Up @@ -206,37 +206,65 @@ def build_jobs(self, config: DatapumpConfig) -> List[Job]:
if not self._should_update(latest_versions):
return []

jobs = []
jobs: List[Job] = []

if config.dataset == "gadm":
jobs.append(
RasterVersionUpdateJob(
id=str(uuid1()),
status=JobStatus.starting,
dataset=self.DATASET_NAME,
version=self.sync_version,
tile_set_parameters=RasterTileSetParameters(
source_uri=source_uris,
calc=self.INPUT_CALC,
grid="10/100000",
data_type="uint16",
no_data=0,
pixel_meaning="date_conf",
band_count=1,
union_bands=True,
compute_stats=False,
timeout_sec=21600,
),
tile_cache_parameters=RasterTileCacheParameters(
max_zoom=14,
resampling="med",
symbology={"type": "date_conf_intensity_multi_8"},
),
content_date_range=ContentDateRange(
start_date="2014-12-31", end_date=str(date.today())
),
)
job = RasterVersionUpdateJob(
id=str(uuid1()),
status=JobStatus.starting,
dataset=self.DATASET_NAME,
version=self.sync_version,
tile_set_parameters=RasterTileSetParameters(
source_uri=source_uris,
calc=self.INPUT_CALC,
grid="10/100000",
data_type="uint16",
no_data=0,
pixel_meaning="date_conf",
band_count=1,
union_bands=True,
compute_stats=False,
timeout_sec=21600,
),
tile_cache_parameters=RasterTileCacheParameters(
max_zoom=14,
resampling="med",
symbology={"type": "date_conf_intensity_multi_8"},
),
content_date_range=ContentDateRange(
start_date="2014-12-31", end_date=str(date.today())
),
)
job.aux_tile_set_parameters = [
RasterTileSetParameters(
source_uri=None,
pixel_meaning="intensity",
data_type="uint8",
calc="(A > 0) * 255",
grid="10/100000",
no_data=0,
)
]
job.cog_asset_parameters = [
# Created from the "date_conf" asset
CogAssetParameters(
source_pixel_meaning="date_conf",
resampling="mode",
implementation="default",
blocksize=1024,
export_to_gee=True,
),
# Created from the "intensity" asset
CogAssetParameters(
source_pixel_meaning="intensity",
resampling="bilinear",
implementation="intensity",
blocksize=1024,
),
]

jobs.append(job)

jobs.append(
GeotrellisJob(
id=str(uuid1()),
Expand Down
4 changes: 3 additions & 1 deletion src/lambdas/dispatcher/src/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,12 @@ def _sync(command: SyncCommand):
if not sync_config:
slack_webhook(
"WARNING",
f"No DyanamoDB rows found for sync type {sync_type}!"
f"No DynamoDB rows found for sync type {sync_type}!"
)
LOGGER.warning(f"No DynamoDB rows found for sync type {sync_type}!")
for row in sync_config:
syncer_jobs = syncer.build_jobs(row)
LOGGER.info(f"Processing row {row}, got jobs {syncer_jobs}!")
if syncer_jobs:
jobs += [job.dict() for job in syncer_jobs]

Expand Down
2 changes: 1 addition & 1 deletion terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ module "datapump" {
geotrellis_jar_path = var.geotrellis_jar_path
pipelines_bucket = data.terraform_remote_state.core.outputs.pipelines_bucket
tags = local.tags
sfn_wait_time = 30
sfn_wait_time = 120
data_api_uri = var.data_api_uri
data_lake_bucket = data.terraform_remote_state.core.outputs.data-lake_bucket
rasterio_lambda_layer_arn = data.terraform_remote_state.lambda-layers.outputs.py310_rasterio_138_arn
Expand Down

0 comments on commit 9bef83f

Please sign in to comment.