diff --git a/README.md b/README.md index fb07a16..0d7a22a 100644 --- a/README.md +++ b/README.md @@ -41,4 +41,39 @@ This will attempt to ingest new data for each listed sync type, and will update "sync_version": "Version to use to indicate which sync this is from. If empty, will by default use vYYYYMMDD based on the current date." } } -``` \ No newline at end of file +``` + +### Architecture + +We use AWS Step Functions and AWS Lambdas to orchestrate the pipeline. We pull fire alerts data from NASA FIRMS, deforestation data from Google Cloud Storage (GCS), and user area data from the ResourceWatch Areas API. + +```mermaid +flowchart LR +%% Nodes + A("NASA FIRMS") + B("GCS") + C("Area API") + D("Dispatcher") + E("Executor") + F("Postprocessor") + G("Sync Config (DynamoDB)") + +%%Flowchart + A -- Download last 7 days Shapefile for VIIRS/MODIS --> D + B -- Get links to latest alerts, verify number of tiles (GLAD-L, GLAD-S2, RADD) --> D + C -- Query API for pending areas --> D + D -- Create jobs based on command, run in parallel --> E + E -- Run job steps --> E + E -- Return job results --> F + F -- Update synced areas with status saved --> C + F -- Save syncing config to DynamoDB --> G + +``` + +There are three lambdas: + +**Dispatcher**: This lambda is the entrypoint and takes in the command. It creates the jobs to run based on the commands. For `sync` jobs, it will reach out to the different data providers (NASA, GCS, RW) and sync any required data to run the jobs. + +**Executor**: This lambda will execute each job. There are different classes in the `datapump` module that define how to reach each job. This is generally a multiple step process, like uploading data to the Data API, running EMR jobs or creating tile caches. + +**Postprocessor**: Once the jobs are complete, this will do any post-job work depending on the type of job and its parameters. If an `analysis` job was to set sync, it will add an entry to the DynamoDB table with the info neccessary to sync new data every night. This is typically the dataset and version the analysis is run on, the version of the dataset to sync to, any additional data like the geotrellis jar used, and a flag on whether to keep syncing. The primary key is a hash of these fields. For syncing user areas, the postprocessor will also update the RW Areas API with the status "saved" for any area that was processed. This lets GFW know results are ready for the area. diff --git a/src/datapump/jobs/geotrellis.py b/src/datapump/jobs/geotrellis.py index e33844d..6b9a9d6 100644 --- a/src/datapump/jobs/geotrellis.py +++ b/src/datapump/jobs/geotrellis.py @@ -23,6 +23,8 @@ Partition, Partitions, ) +from botocore.exceptions import ClientError +import time WORKER_INSTANCE_TYPES = ["r5.2xlarge", "r4.2xlarge"] # "r6g.2xlarge" MASTER_INSTANCE_TYPE = "r5.2xlarge" @@ -145,9 +147,22 @@ def cancel_analysis(self): client.terminate_job_flows(JobFlowIds=[self.emr_job_id]) def check_analysis(self) -> JobStatus: - cluster_description = get_emr_client().describe_cluster( - ClusterId=self.emr_job_id - ) + num_retries = 3 + for i in range(num_retries): + try: + cluster_description = get_emr_client().describe_cluster( + ClusterId=self.emr_job_id + ) + break + except ClientError as e: + # Retry up to 3 times if we get a throttling exception + if i + 1 < num_retries and e.response['Error']['Code'] == 'ThrottlingException': + print("Throttling exception occurred. Retrying in 30 seconds...") + time.sleep(30) + continue + else: + raise + status = cluster_description["Cluster"]["Status"] LOGGER.info( diff --git a/src/datapump/sync/sync.py b/src/datapump/sync/sync.py index f2e87e2..389330a 100644 --- a/src/datapump/sync/sync.py +++ b/src/datapump/sync/sync.py @@ -224,7 +224,11 @@ def build_jobs(self, config: DatapumpConfig) -> List[Job]: band_count=1, union_bands=True, compute_stats=False, - timeout_sec=21600, + # This timeout is about 5-6 hours for the date_conf and intensity + # raster jobs (run in series), and then another 6-7 hours for the + # default and intensity COG jobs (run in parallel). The + # generation of the default COG takes the longest. + timeout_sec=13 * 3600, ), tile_cache_parameters=RasterTileCacheParameters( max_zoom=14, @@ -242,7 +246,7 @@ def build_jobs(self, config: DatapumpConfig) -> List[Job]: data_type="uint8", calc="(A > 0) * 255", grid="10/100000", - no_data=0, + no_data=None, ) ] job.cog_asset_parameters = [