Skip to content

Commit

Permalink
Merge pull request #145 from wri/gtc-2580a
Browse files Browse the repository at this point in the history
GTC-2580 Create COG assets of integrated alerts in datapump
  • Loading branch information
danscales authored Jun 18, 2024
2 parents 7113df2 + 0a5de1c commit 5d97e84
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 45 deletions.
4 changes: 4 additions & 0 deletions src/datapump/clients/data_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ def delete_version(self, dataset: str, version: str) -> None:
uri = f"{GLOBALS.data_api_uri}/dataset/{dataset}/{version}"
self._send_request(ValidMethods.delete, uri)

def get_asset_creation_options(self, asset_id: str) -> Dict[str, Any]:
uri = f"{GLOBALS.data_api_uri}/asset/{asset_id}/creation_options"
return self._send_request(ValidMethods.get, uri)["data"]

def update_version_metadata(
self, dataset: str, version: str, metadata: Dict[str, Any]
):
Expand Down
7 changes: 7 additions & 0 deletions src/datapump/commands/version_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ class RasterVersionUpdateParameters(StrictBaseModel):
class RasterVersionUpdateCommand(StrictBaseModel):
command: str
parameters: RasterVersionUpdateParameters


class CogAssetParameters(StrictBaseModel):
implementation: str
source_pixel_meaning: str
blocksize: int
resampling: str = "mode"
54 changes: 54 additions & 0 deletions src/datapump/jobs/version_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datapump.commands.version_update import (
RasterTileCacheParameters,
RasterTileSetParameters,
CogAssetParameters,
)
from datapump.util.models import ContentDateRange

Expand All @@ -19,6 +20,7 @@ class RasterVersionUpdateJobStep(str, Enum):
creating_tile_set = "creating_tile_set"
creating_tile_cache = "creating_tile_cache"
creating_aux_assets = "creating_aux_assets"
creating_cog_assets = "creating_cog_assets"
mark_latest = "mark_latest"


Expand All @@ -29,6 +31,7 @@ class RasterVersionUpdateJob(Job):
tile_set_parameters: RasterTileSetParameters
tile_cache_parameters: Optional[RasterTileCacheParameters] = None
aux_tile_set_parameters: List[RasterTileSetParameters] = []
cog_asset_parameters: List[CogAssetParameters] = []
timeout_sec = 24 * 60 * 60

def next_step(self):
Expand Down Expand Up @@ -84,6 +87,20 @@ def next_step(self):
self.status = JobStatus.failed

elif self.step == RasterVersionUpdateJobStep.creating_aux_assets:
status = self._check_aux_assets_status()
if status == JobStatus.complete:
if self.cog_asset_parameters:
self.step = RasterVersionUpdateJobStep.creating_cog_assets
for cog_asset_param in self.cog_asset_parameters:
if self._create_cog_asset(cog_asset_param) == "":
self.status = JobStatus.failed
break
else:
self.status = JobStatus.complete
elif status == JobStatus.failed:
self.status = JobStatus.failed

elif self.step == RasterVersionUpdateJobStep.creating_cog_assets:
status = self._check_aux_assets_status()
if status == JobStatus.complete:
self.status = JobStatus.complete
Expand Down Expand Up @@ -173,6 +190,43 @@ def _create_aux_tile_set(self, tile_set_parameters: RasterTileSetParameters) ->

return data["asset_id"]

def _create_cog_asset(self, cog_asset_parameters: CogAssetParameters) -> str:
"""
Create cog asset and return asset ID, empty string if an error
"""
client = DataApiClient()

co = cog_asset_parameters

assets = client.get_assets(self.dataset, self.version)
asset_id = ""
for asset in assets:
if asset["asset_type"] == "Raster tile set":
creation_options = client.get_asset_creation_options(asset['asset_id'])
if creation_options["pixel_meaning"] == co.source_pixel_meaning:
if asset_id != "":
self.errors.append(f"Multiple assets with pixel meaning '{co.source_pixel_meaning}'")
return ""
asset_id = asset["asset_id"]

if asset_id == "":
self.errors.append(f"Could not find asset with pixel meaning '{co.source_pixel_meaning}'")
return ""

payload = {
"asset_type": "COG",
"creation_options": {
"implementation": co.implementation,
"source_asset_id": asset_id,
"resampling": co.resampling,
"block_size": co.blocksize
},
}

data = client.create_aux_asset(self.dataset, self.version, payload)

return data["asset_id"]

def _check_tile_set_status(self) -> JobStatus:
client = DataApiClient()

Expand Down
117 changes: 73 additions & 44 deletions src/datapump/sync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ..clients.datapump_store import DatapumpConfig
from ..commands.analysis import FIRES_ANALYSES, AnalysisInputTable
from ..commands.sync import SyncType
from ..commands.version_update import RasterTileCacheParameters, RasterTileSetParameters
from ..commands.version_update import RasterTileCacheParameters, RasterTileSetParameters, CogAssetParameters
from ..globals import GLOBALS, LOGGER
from ..jobs.geotrellis import FireAlertsGeotrellisJob, GeotrellisJob, Job
from ..jobs.jobs import JobStatus
Expand Down Expand Up @@ -203,57 +203,86 @@ def build_jobs(self, config: DatapumpConfig) -> List[Job]:
for dataset, version in latest_versions.items()
]

if not self._should_update(latest_versions):
return []
# For testing, always force an integrated_alerts update
# if not self._should_update(latest_versions):
# return []

jobs = []

if config.dataset == "gadm":
jobs.append(
RasterVersionUpdateJob(
id=str(uuid1()),
status=JobStatus.starting,
dataset=self.DATASET_NAME,
version=self.sync_version,
tile_set_parameters=RasterTileSetParameters(
source_uri=source_uris,
calc=self.INPUT_CALC,
grid="10/100000",
data_type="uint16",
no_data=0,
pixel_meaning="date_conf",
band_count=1,
union_bands=True,
compute_stats=False,
timeout_sec=21600,
),
tile_cache_parameters=RasterTileCacheParameters(
max_zoom=14,
resampling="med",
symbology={"type": "date_conf_intensity_multi_8"},
),
content_date_range=ContentDateRange(
start_date="2014-12-31", end_date=str(date.today())
),
)
)
jobs.append(
GeotrellisJob(
job = RasterVersionUpdateJob(
id=str(uuid1()),
status=JobStatus.starting,
analysis_version=config.analysis_version,
sync_version=self.sync_version,
sync_type=config.sync_type,
table=AnalysisInputTable(
dataset=config.dataset,
version=config.dataset_version,
analysis=config.analysis,
dataset=self.DATASET_NAME,
version=self.sync_version,
tile_set_parameters=RasterTileSetParameters(
source_uri=source_uris,
calc=self.INPUT_CALC,
grid="10/100000",
data_type="uint16",
no_data=0,
pixel_meaning="date_conf",
band_count=1,
union_bands=True,
compute_stats=False,
timeout_sec=21600,
),
tile_cache_parameters=RasterTileCacheParameters(
max_zoom=14,
resampling="med",
symbology={"type": "date_conf_intensity_multi_8"},
),
content_date_range=ContentDateRange(
start_date="2014-12-31", end_date=str(date.today())
),
features_1x1=config.metadata["features_1x1"],
geotrellis_version=config.metadata["geotrellis_version"],
timeout_sec=6 * 3600,
)
)
job.aux_tile_set_parameters = [
RasterTileSetParameters(
source_uri=None,
pixel_meaning="intensity",
data_type="uint8",
calc="(A > 0) * 255",
grid="10/100000",
no_data=0,
)
]
job.cog_asset_parameters = [
# Created from the "date_conf" asset
CogAssetParameters(
source_pixel_meaning="date_conf",
resampling="mode",
implementation="default",
blocksize=1024,
),
# Created from the "intensity" asset
CogAssetParameters(
source_pixel_meaning="intensity",
resampling="bilinear",
implementation="intensity",
blocksize=1024,
),
]

jobs.append(job)

# Disable for testing
# jobs.append(
# GeotrellisJob(
# id=str(uuid1()),
# status=JobStatus.starting,
# analysis_version=config.analysis_version,
# sync_version=self.sync_version,
# sync_type=config.sync_type,
# table=AnalysisInputTable(
# dataset=config.dataset,
# version=config.dataset_version,
# analysis=config.analysis,
# ),
# features_1x1=config.metadata["features_1x1"],
# geotrellis_version=config.metadata["geotrellis_version"],
# timeout_sec=6 * 3600,
# )
# )

return jobs

Expand Down
4 changes: 3 additions & 1 deletion src/lambdas/dispatcher/src/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,12 @@ def _sync(command: SyncCommand):
if not sync_config:
slack_webhook(
"WARNING",
f"No DyanamoDB rows found for sync type {sync_type}!"
f"No DynamoDB rows found for sync type {sync_type}!"
)
LOGGER.warning(f"No DynamoDB rows found for sync type {sync_type}!")
for row in sync_config:
syncer_jobs = syncer.build_jobs(row)
LOGGER.info(f"Processing row {row}, got jobs {syncer_jobs}!")
if syncer_jobs:
jobs += [job.dict() for job in syncer_jobs]

Expand Down

0 comments on commit 5d97e84

Please sign in to comment.