Skip to content

Commit

Permalink
Migration to CDK2 (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
fredliporace committed Feb 21, 2024
1 parent d1c2112 commit 579c44d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 69 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ Some lambdas require extra pip packages to be installed in the lambda directory

## CDK bootstrap

Deployment uses AWS CDK.
Deployment uses AWS CDK2.

Requirements:
* node: Use [nvm](https://heynode.com/tutorial/install-nodejs-locally-nvm/) to make sure a supported node is being used, tested with 18.0.0
* AWS credentials configured

To install and check AWS CDK:
To install and check AWS CDK (tested with CDK 2.129.0):
```bash
$ npm install -g aws-cdk@1.204.0
$ npm install -g aws-cdk
$ cdk --version

$ cdk bootstrap # Deploys the CDK toolkit stack into an AWS environment
Expand Down
5 changes: 4 additions & 1 deletion cdk.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"app": "(export PYTHONPATH=. && python stack/app.py)"
"app": "(export PYTHONPATH=. && python stack/app.py)",
"context":{
"@aws-cdk/customresources:installLatestAwsSdkDefault":false
}
}
23 changes: 1 addition & 22 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,7 @@
# Used in process_new_scene_queue lambda.
"utm",
],
"deploy": [
"pydantic[dotenv]",
"aws-cdk.core",
"aws-cdk.aws-sqs",
"aws-cdk.aws-sns",
"aws-cdk.aws-sns-subscriptions",
"aws-cdk.aws-cloudwatch",
"aws-cdk.aws-cloudwatch-actions",
"aws-cdk.aws-lambda",
"aws-cdk.aws-s3",
"aws-cdk.aws-s3-deployment",
"aws-cdk.aws-s3-assets",
"aws-cdk.aws-iam",
"aws-cdk.aws-dynamodb",
"aws-cdk.aws-lambda-event-sources",
"aws-cdk.aws-events",
"aws-cdk.aws-events-targets",
"aws-cdk.aws-apigateway",
"aws-cdk.aws-elasticsearch",
"aws-cdk.aws-opensearchservice",
"aws-cdk.aws-synthetics",
],
"deploy": ["pydantic[dotenv]", "aws-cdk-lib>=2.0.0", "constructs>=10.0.0",],
}

ENTRY_POINTS = """
Expand Down
84 changes: 41 additions & 43 deletions stack/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
)
from typing import Any, Dict, List

# from aws_cdk import aws_s3_notifications as s3n
from aws_cdk import App, CfnOutput, Duration, Fn, RemovalPolicy, Stack, Tags
from aws_cdk import aws_apigateway as apigateway
from aws_cdk import aws_cloudwatch as cloudwatch
from aws_cdk import aws_cloudwatch_actions as cw_actions
Expand All @@ -21,9 +21,9 @@
from aws_cdk import aws_sns_subscriptions as sns_subscriptions
from aws_cdk import aws_sqs as sqs
from aws_cdk import aws_synthetics as synthetics
from aws_cdk import core
from aws_cdk.aws_cloudwatch import ComparisonOperator
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from constructs import Construct

from cbers2stac.layers.common.dbtable import DBTable
from cbers2stac.local.create_static_catalog_structure import (
Expand All @@ -34,7 +34,7 @@
settings = StackSettings()


class CBERS2STACStack(core.Stack):
class CBERS2STACStack(Stack):
"""CBERS2STACStack"""

lambdas_env_: Dict[str, str] = {}
Expand Down Expand Up @@ -115,7 +115,7 @@ def create_all_queues(self) -> None:
# subscribe to CBERS 4/4A quicklook notification topics
self.create_queue(
id="process_new_scenes_queue_dlq",
retention_period=core.Duration.seconds(1209600),
retention_period=Duration.seconds(1209600),
)
process_new_scenes_queue_alarm = cloudwatch.Alarm(
self,
Expand All @@ -132,8 +132,8 @@ def create_all_queues(self) -> None:
)
self.create_queue(
id="new_scenes_queue",
visibility_timeout=core.Duration.seconds(385),
retention_period=core.Duration.seconds(1209600),
visibility_timeout=Duration.seconds(385),
retention_period=Duration.seconds(1209600),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=1, queue=self.queues_["process_new_scenes_queue_dlq"]
),
Expand Down Expand Up @@ -180,8 +180,8 @@ def create_all_queues(self) -> None:

self.create_queue(
id="catalog_prefix_update_queue",
visibility_timeout=core.Duration.seconds(60),
retention_period=core.Duration.seconds(1209600),
visibility_timeout=Duration.seconds(60),
retention_period=Duration.seconds(1209600),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3, queue=self.queues_["dead_letter_queue"]
),
Expand All @@ -190,7 +190,7 @@ def create_all_queues(self) -> None:
# Reconcile queue for INPE's XML metadata
self.create_queue(
id="consume_reconcile_queue_dlq",
retention_period=core.Duration.seconds(1209600),
retention_period=Duration.seconds(1209600),
)
consume_reconcile_queue_alarm = cloudwatch.Alarm(
self,
Expand All @@ -207,8 +207,8 @@ def create_all_queues(self) -> None:
)
self.create_queue(
id="reconcile_queue",
visibility_timeout=core.Duration.seconds(1000),
retention_period=core.Duration.seconds(1209600),
visibility_timeout=Duration.seconds(1000),
retention_period=Duration.seconds(1209600),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3, queue=self.queues_["consume_reconcile_queue_dlq"]
),
Expand All @@ -217,7 +217,7 @@ def create_all_queues(self) -> None:
# Reconcile queue for STAC items
self.create_queue(
id="consume_stac_reconcile_queue_dlq",
retention_period=core.Duration.seconds(1209600),
retention_period=Duration.seconds(1209600),
)
consume_stac_reconcile_queue_alarm = cloudwatch.Alarm(
self,
Expand All @@ -234,8 +234,8 @@ def create_all_queues(self) -> None:
)
self.create_queue(
id="stac_reconcile_queue",
visibility_timeout=core.Duration.seconds(1000),
retention_period=core.Duration.seconds(1209600),
visibility_timeout=Duration.seconds(1000),
retention_period=Duration.seconds(1209600),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3,
queue=self.queues_["consume_stac_reconcile_queue_dlq"],
Expand All @@ -246,8 +246,8 @@ def create_all_queues(self) -> None:
# topic with new stac items
self.create_queue(
id="insert_into_elasticsearch_queue",
visibility_timeout=core.Duration.seconds(180),
retention_period=core.Duration.seconds(1209600),
visibility_timeout=Duration.seconds(180),
retention_period=Duration.seconds(1209600),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3, queue=self.queues_["dead_letter_queue"]
),
Expand All @@ -272,8 +272,8 @@ def create_all_queues(self) -> None:
# This queue subscribe only to new item topics
self.create_queue(
id="backup_insert_into_elasticsearch_queue",
visibility_timeout=core.Duration.seconds(180),
retention_period=core.Duration.days(settings.backup_queue_retention_days),
visibility_timeout=Duration.seconds(180),
retention_period=Duration.days(settings.backup_queue_retention_days),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3, queue=self.queues_["dead_letter_queue"]
),
Expand All @@ -287,7 +287,7 @@ def create_all_queues(self) -> None:

# Queue for corrupted XML entries, see #89
self.create_queue(
id="corrupted_xml_queue", retention_period=core.Duration.days(14),
id="corrupted_xml_queue", retention_period=Duration.days(14),
)

def create_all_topics(self) -> None:
Expand All @@ -303,7 +303,7 @@ def create_all_topics(self) -> None:
)
# Public STAC item topic for new STAC items
self.topics_["stac_item_topic"] = sns.Topic(self, "stac_item_topic")
core.CfnOutput(
CfnOutput(
self,
"stac_item_topic_output",
value=self.topics_["stac_item_topic"].topic_arn,
Expand Down Expand Up @@ -391,7 +391,7 @@ def create_all_lambdas(self) -> None:
"MESSAGE_BATCH_SIZE": "1",
},
},
timeout=core.Duration.seconds(55),
timeout=Duration.seconds(55),
dead_letter_queue=self.queues_["process_new_scenes_queue_dlq"],
layers=[self.layers_["common_layer"]],
description="Process new scenes from quicklook queue",
Expand All @@ -415,7 +415,7 @@ def create_all_lambdas(self) -> None:
].queue_url
},
},
timeout=core.Duration.seconds(900),
timeout=Duration.seconds(900),
dead_letter_queue=self.queues_["dead_letter_queue"],
layers=[self.layers_["common_layer"]],
description="Generate levels into output table from input table",
Expand All @@ -429,7 +429,7 @@ def create_all_lambdas(self) -> None:
handler="code.trigger_handler",
runtime=aws_lambda.Runtime.PYTHON_3_7,
environment={**self.lambdas_env_,},
timeout=core.Duration.seconds(55),
timeout=Duration.seconds(55),
dead_letter_queue=self.queues_["dead_letter_queue"],
layers=[self.layers_["common_layer"]],
description="Update catalog from prefix",
Expand All @@ -451,7 +451,7 @@ def create_all_lambdas(self) -> None:
**self.lambdas_env_,
**{"RECONCILE_QUEUE": self.queues_["reconcile_queue"].queue_url},
},
timeout=core.Duration.seconds(300),
timeout=Duration.seconds(300),
dead_letter_queue=self.queues_["dead_letter_queue"],
layers=[self.layers_["common_layer"]],
description="Populates reconcile queue with S3 keys from a common prefix",
Expand All @@ -468,7 +468,7 @@ def create_all_lambdas(self) -> None:
**self.lambdas_env_,
**{"NEW_SCENES_QUEUE": self.queues_["new_scenes_queue"].queue_url},
},
timeout=core.Duration.seconds(900),
timeout=Duration.seconds(900),
dead_letter_queue=self.queues_["consume_reconcile_queue_dlq"],
layers=[self.layers_["common_layer"]],
description="Consume dirs from reconcile queue, populating "
Expand Down Expand Up @@ -513,7 +513,7 @@ def create_all_lambdas(self) -> None:
runtime=aws_lambda.Runtime.PYTHON_3_7,
environment={**self.lambdas_env_,},
layers=[self.layers_["common_layer"]],
timeout=core.Duration.seconds(30),
timeout=Duration.seconds(30),
dead_letter_queue=self.queues_["dead_letter_queue"],
description="Create Elasticsearch stac index",
)
Expand All @@ -530,7 +530,7 @@ def create_all_lambdas(self) -> None:
**{"ES_STRIPPED": "YES", "BULK_CALLS": "1", "BULK_SIZE": "10"},
},
layers=[self.layers_["common_layer"]],
timeout=core.Duration.seconds(30),
timeout=Duration.seconds(30),
dead_letter_queue=self.queues_["dead_letter_queue"],
# Concurrent executions tuned to work with t2.small.elasticsearch
reserved_concurrent_executions=5,
Expand All @@ -551,7 +551,7 @@ def create_all_lambdas(self) -> None:
runtime=aws_lambda.Runtime.PYTHON_3_7,
environment=self.lambdas_env_,
layers=[self.layers_["common_layer"]],
timeout=core.Duration.seconds(900),
timeout=Duration.seconds(900),
description="Reindex STAC items from a prefix",
)
# Batch size changed from 5 to 2 to reduce the lambda work and increase
Expand All @@ -568,7 +568,7 @@ def create_all_lambdas(self) -> None:
handler="code.populate_stac_reconcile_queue_handler",
runtime=aws_lambda.Runtime.PYTHON_3_7,
environment={**self.lambdas_env_,},
timeout=core.Duration.seconds(300),
timeout=Duration.seconds(300),
dead_letter_queue=self.queues_["dead_letter_queue"],
layers=[self.layers_["common_layer"]],
description="Populates reconcile queue with STAC items from a common prefix",
Expand All @@ -591,7 +591,7 @@ def create_api_lambdas(self) -> None:
runtime=aws_lambda.Runtime.PYTHON_3_7,
environment={**self.lambdas_env_,},
layers=[self.layers_["common_layer"]],
timeout=core.Duration.seconds(30),
timeout=Duration.seconds(30),
dead_letter_queue=self.queues_["api_dead_letter_queue"],
description="Implement / endpoint (landing page)",
)
Expand All @@ -604,7 +604,7 @@ def create_api_lambdas(self) -> None:
runtime=aws_lambda.Runtime.PYTHON_3_7,
environment={**self.lambdas_env_,},
layers=[self.layers_["common_layer"]],
timeout=core.Duration.seconds(55),
timeout=Duration.seconds(55),
dead_letter_queue=self.queues_["api_dead_letter_queue"],
description="Implement /search endpoint",
)
Expand All @@ -623,9 +623,7 @@ def create_api_gateway(self) -> None:
"openapi_asset",
path="cbers2stac/openapi/core-item-search-query-integrated.yaml",
)
data = core.Fn.transform(
"AWS::Include", {"Location": openapi_asset.s3_object_url}
)
data = Fn.transform("AWS::Include", {"Location": openapi_asset.s3_object_url})
definition = apigateway.AssetApiDefinition.from_inline(data)
apigw = apigateway.SpecRestApi(
self,
Expand All @@ -640,12 +638,12 @@ def create_api_gateway(self) -> None:
self,
"canary_artifacts",
auto_delete_objects=True,
removal_policy=core.RemovalPolicy.DESTROY,
removal_policy=RemovalPolicy.DESTROY,
)
canary = synthetics.Canary(
self,
"SearchEndpointCanary",
schedule=synthetics.Schedule.rate(core.Duration.hours(1)),
schedule=synthetics.Schedule.rate(Duration.hours(1)),
runtime=synthetics.Runtime.SYNTHETICS_PYTHON_SELENIUM_1_0,
test=synthetics.Test.custom(
code=synthetics.Code.from_asset("cbers2stac/canary", exclude=["*~"]),
Expand All @@ -659,7 +657,7 @@ def create_api_gateway(self) -> None:
canary_alarm = cloudwatch.Alarm(
self,
"CanaryAlarm",
metric=canary.metric_failed(period=core.Duration.hours(1)),
metric=canary.metric_failed(period=Duration.hours(1)),
evaluation_periods=1,
threshold=0,
comparison_operator=ComparisonOperator.GREATER_THAN_THRESHOLD,
Expand Down Expand Up @@ -697,7 +695,7 @@ def create_es_domain(self) -> None:
# No need to specify resource, the domain is implicit
)
],
removal_policy=core.RemovalPolicy.DESTROY,
removal_policy=RemovalPolicy.DESTROY,
enable_version_upgrade=False,
)

Expand All @@ -709,7 +707,7 @@ def create_es_domain(self) -> None:

def __init__(
self,
scope: core.Construct,
scope: Construct,
stack_id: str,
description: str,
env: Dict[str, str],
Expand Down Expand Up @@ -773,7 +771,7 @@ def __init__(
block_public_acls=False,
restrict_public_buckets=False,
),
removal_policy=core.RemovalPolicy.DESTROY,
removal_policy=RemovalPolicy.DESTROY,
)
self.lambdas_env_.update({"STAC_BUCKET": stac_working_bucket.bucket_name})
self.lambdas_perms_.append(
Expand Down Expand Up @@ -821,7 +819,7 @@ def __init__(
name=DBTable.pk_attr_name_, type=dynamodb.AttributeType.STRING
),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
removal_policy=core.RemovalPolicy.DESTROY,
removal_policy=RemovalPolicy.DESTROY,
)
self.lambdas_env_.update(
{"CATALOG_UPDATE_TABLE": catalog_update_table.table_name}
Expand Down Expand Up @@ -899,15 +897,15 @@ def __init__(
self.create_es_domain()


app = core.App()
app = App()

# Tag infrastructure
for key, value in {
"project": f"{settings.name}-{settings.stage}",
"cost_center": settings.cost_center,
}.items():
if value:
core.Tags.of(app).add(key, value)
Tags.of(app).add(key, value)

stackname = f"{settings.name}-{settings.stage}"
CBERS2STACStack(
Expand Down

0 comments on commit 579c44d

Please sign in to comment.