From affa535571251f7c338123f20bdeb52478ae0eb3 Mon Sep 17 00:00:00 2001 From: Justin Terry Date: Tue, 24 Sep 2024 10:44:34 -0700 Subject: [PATCH 1/5] Intensity raster should set no data to null --- src/datapump/sync/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index f2e87e2..b58107f 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -242,7 +242,7 @@ def build_jobs(self, config: DatapumpConfig) -> List[Job]: data_type="uint8", calc="(A > 0) * 255", grid="10/100000", - no_data=0, + no_data=None, ) ] job.cog_asset_parameters = [ From 06848e039b22379fc9101ccee61e29157fd00bb5 Mon Sep 17 00:00:00 2001 From: jterry64 Date: Thu, 26 Sep 2024 17:27:44 -0700 Subject: [PATCH 2/5] Update README.md --- README.md | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fb07a16..6548137 100644 --- a/README.md +++ b/README.md @@ -41,4 +41,38 @@ This will attempt to ingest new data for each listed sync type, and will update "sync_version": "Version to use to indicate which sync this is from. If empty, will by default use vYYYYMMDD based on the current date." } } -``` \ No newline at end of file +``` + +### Architecture + +We use AWS Step Functions and AWS Lambdas to orchestrate the pipeline. We pull fire alerts data from NASA FIRMS, deforestation data from Google Cloud Storage (GCS), and user area data from the ResourceWatch Areas API. + +```mermaid +flowchart LR +%% Nodes + A("NASA FIRMS") + B("GCS") + C("Area API") + D("Dispatcher") + E("Executor") + F("Postprocessor") + G("Sync Config (DynamoDB)") + +%%Flowchart + A -- Download last 7 days Shapefile for VIIRS/MODIS --> D + B -- Get links to latest alerts, verify number of tiles (GLAD-L, GLAD-S2, RADD) --> D + C -- Query API for pending areas --> D + D -- Create jobs based on command, run in parallel --> E + E -- Run job steps --> E + E -- Return job results --> F + F -- Update synced areas with status saved --> C + F -- Save syncing config to DynamoDB --> G + +``` + +There are three lambdas: +**Dispatcher**: This lambda is the entrypoint and takes in the command. It creates the jobs to run based on the commands. For `sync` jobs, it will reach out to the different data providers (NASA, GCS, RW) and sync any required data to run the jobs. + +**Executor**: This lambda will execute each job. There are different classes in the `datapump` module that define how to reach each job. This is generally a multiple step process, like uploading data to the Data API, running EMR jobs or creating tile caches. + +**Postprocessor**: Once the jobs are complete, this will do any post-job work depending on the type of job and its parameters. If an `analysis` job was to set sync, it will add an entry to the DynamoDB table with the info neccessary to sync new data every night. This is typically the dataset and version the analysis is run on, the version of the dataset to sync to, any additional data like the geotrellis jar used, and a flag on whether to keep syncing. The primary key is a hash of these fields. For syncing user areas, the postprocessor will also update the RW Areas API with the status "saved" for any area that was processed. This lets GFW know results are ready for the area. From b1e7db13353b4dbfbc2465b542ed61d3192fcbb2 Mon Sep 17 00:00:00 2001 From: jterry64 Date: Thu, 26 Sep 2024 17:28:14 -0700 Subject: [PATCH 3/5] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 6548137..0d7a22a 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ flowchart LR ``` There are three lambdas: + **Dispatcher**: This lambda is the entrypoint and takes in the command. It creates the jobs to run based on the commands. For `sync` jobs, it will reach out to the different data providers (NASA, GCS, RW) and sync any required data to run the jobs. **Executor**: This lambda will execute each job. There are different classes in the `datapump` module that define how to reach each job. This is generally a multiple step process, like uploading data to the Data API, running EMR jobs or creating tile caches. From 5dd04862563f8bbc76874417d83573d3ffd751b3 Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Tue, 1 Oct 2024 14:58:52 -0700 Subject: [PATCH 4/5] Get rid of error message after COG jobs successfully complete. Even when the integrated alerts COG jobs successfully complete, there is a data-updates error message. It turns out this is because we didn't increase the integrated_alerts job timeout when we added the very long extra COG steps. So, I just added an extra 7 hours to the timeout for IntegratedAlertsSync job. --- src/Dockerfile | 2 +- src/datapump/sync/sync.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Dockerfile b/src/Dockerfile index c332b78..5ef2992 100644 --- a/src/Dockerfile +++ b/src/Dockerfile @@ -15,7 +15,7 @@ RUN pip install . -t python # to change the hash of the file and get TF to realize it needs to be # redeployed. Ticket for a better solution: # https://gfw.atlassian.net/browse/GTC-1250 -# change 8 +# change 9 RUN yum install -y zip geos-devel diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index b58107f..389330a 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -224,7 +224,11 @@ def build_jobs(self, config: DatapumpConfig) -> List[Job]: band_count=1, union_bands=True, compute_stats=False, - timeout_sec=21600, + # This timeout is about 5-6 hours for the date_conf and intensity + # raster jobs (run in series), and then another 6-7 hours for the + # default and intensity COG jobs (run in parallel). The + # generation of the default COG takes the longest. + timeout_sec=13 * 3600, ), tile_cache_parameters=RasterTileCacheParameters( max_zoom=14, From d9e2ab95f76ac823047abd8dde38b9e52561c0d0 Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Wed, 2 Oct 2024 15:14:24 -0700 Subject: [PATCH 5/5] Put in retries for ThrottlingException for describe_cluster() call We seem to seem to get a ThrottlingException on describe_cluster() fairly often (often happens several time every couple of nights). This causes the entire job process to fail (or at least not finish the post-processing). So, I'm adding a few retries if we get a ClientError which is a ThrottlingException on the describe_cluster() call in check_analysis(). --- src/datapump/jobs/geotrellis.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/datapump/jobs/geotrellis.py b/src/datapump/jobs/geotrellis.py index e33844d..6b9a9d6 100644 --- a/src/datapump/jobs/geotrellis.py +++ b/src/datapump/jobs/geotrellis.py @@ -23,6 +23,8 @@ Partition, Partitions, ) +from botocore.exceptions import ClientError +import time WORKER_INSTANCE_TYPES = ["r5.2xlarge", "r4.2xlarge"] # "r6g.2xlarge" MASTER_INSTANCE_TYPE = "r5.2xlarge" @@ -145,9 +147,22 @@ def cancel_analysis(self): client.terminate_job_flows(JobFlowIds=[self.emr_job_id]) def check_analysis(self) -> JobStatus: - cluster_description = get_emr_client().describe_cluster( - ClusterId=self.emr_job_id - ) + num_retries = 3 + for i in range(num_retries): + try: + cluster_description = get_emr_client().describe_cluster( + ClusterId=self.emr_job_id + ) + break + except ClientError as e: + # Retry up to 3 times if we get a throttling exception + if i + 1 < num_retries and e.response['Error']['Code'] == 'ThrottlingException': + print("Throttling exception occurred. Retrying in 30 seconds...") + time.sleep(30) + continue + else: + raise + status = cluster_description["Cluster"]["Status"] LOGGER.info(