From 579c44d4b2a6aa864bd5c4598ed62e1c1d1c6ded Mon Sep 17 00:00:00 2001 From: Frederico Liporace Date: Wed, 21 Feb 2024 18:58:29 -0300 Subject: [PATCH] Migration to CDK2 (#100) --- README.md | 6 ++-- cdk.json | 5 +++- setup.py | 23 +------------- stack/app.py | 84 +++++++++++++++++++++++++--------------------------- 4 files changed, 49 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index a05ed13..34f483a 100644 --- a/README.md +++ b/README.md @@ -34,15 +34,15 @@ Some lambdas require extra pip packages to be installed in the lambda directory ## CDK bootstrap -Deployment uses AWS CDK. +Deployment uses AWS CDK2. Requirements: * node: Use [nvm](https://heynode.com/tutorial/install-nodejs-locally-nvm/) to make sure a supported node is being used, tested with 18.0.0 * AWS credentials configured -To install and check AWS CDK: +To install and check AWS CDK (tested with CDK 2.129.0): ```bash -$ npm install -g aws-cdk@1.204.0 +$ npm install -g aws-cdk $ cdk --version $ cdk bootstrap # Deploys the CDK toolkit stack into an AWS environment diff --git a/cdk.json b/cdk.json index 68fd5c3..b31901b 100644 --- a/cdk.json +++ b/cdk.json @@ -1,3 +1,6 @@ { - "app": "(export PYTHONPATH=. && python stack/app.py)" + "app": "(export PYTHONPATH=. && python stack/app.py)", + "context":{ + "@aws-cdk/customresources:installLatestAwsSdkDefault":false + } } diff --git a/setup.py b/setup.py index 047ac57..72abbb7 100644 --- a/setup.py +++ b/setup.py @@ -34,28 +34,7 @@ # Used in process_new_scene_queue lambda. "utm", ], - "deploy": [ - "pydantic[dotenv]", - "aws-cdk.core", - "aws-cdk.aws-sqs", - "aws-cdk.aws-sns", - "aws-cdk.aws-sns-subscriptions", - "aws-cdk.aws-cloudwatch", - "aws-cdk.aws-cloudwatch-actions", - "aws-cdk.aws-lambda", - "aws-cdk.aws-s3", - "aws-cdk.aws-s3-deployment", - "aws-cdk.aws-s3-assets", - "aws-cdk.aws-iam", - "aws-cdk.aws-dynamodb", - "aws-cdk.aws-lambda-event-sources", - "aws-cdk.aws-events", - "aws-cdk.aws-events-targets", - "aws-cdk.aws-apigateway", - "aws-cdk.aws-elasticsearch", - "aws-cdk.aws-opensearchservice", - "aws-cdk.aws-synthetics", - ], + "deploy": ["pydantic[dotenv]", "aws-cdk-lib>=2.0.0", "constructs>=10.0.0",], } ENTRY_POINTS = """ diff --git a/stack/app.py b/stack/app.py index 591c301..21be234 100644 --- a/stack/app.py +++ b/stack/app.py @@ -5,7 +5,7 @@ ) from typing import Any, Dict, List -# from aws_cdk import aws_s3_notifications as s3n +from aws_cdk import App, CfnOutput, Duration, Fn, RemovalPolicy, Stack, Tags from aws_cdk import aws_apigateway as apigateway from aws_cdk import aws_cloudwatch as cloudwatch from aws_cdk import aws_cloudwatch_actions as cw_actions @@ -21,9 +21,9 @@ from aws_cdk import aws_sns_subscriptions as sns_subscriptions from aws_cdk import aws_sqs as sqs from aws_cdk import aws_synthetics as synthetics -from aws_cdk import core from aws_cdk.aws_cloudwatch import ComparisonOperator from aws_cdk.aws_lambda_event_sources import SqsEventSource +from constructs import Construct from cbers2stac.layers.common.dbtable import DBTable from cbers2stac.local.create_static_catalog_structure import ( @@ -34,7 +34,7 @@ settings = StackSettings() -class CBERS2STACStack(core.Stack): +class CBERS2STACStack(Stack): """CBERS2STACStack""" lambdas_env_: Dict[str, str] = {} @@ -115,7 +115,7 @@ def create_all_queues(self) -> None: # subscribe to CBERS 4/4A quicklook notification topics self.create_queue( id="process_new_scenes_queue_dlq", - retention_period=core.Duration.seconds(1209600), + retention_period=Duration.seconds(1209600), ) process_new_scenes_queue_alarm = cloudwatch.Alarm( self, @@ -132,8 +132,8 @@ def create_all_queues(self) -> None: ) self.create_queue( id="new_scenes_queue", - visibility_timeout=core.Duration.seconds(385), - retention_period=core.Duration.seconds(1209600), + visibility_timeout=Duration.seconds(385), + retention_period=Duration.seconds(1209600), dead_letter_queue=sqs.DeadLetterQueue( max_receive_count=1, queue=self.queues_["process_new_scenes_queue_dlq"] ), @@ -180,8 +180,8 @@ def create_all_queues(self) -> None: self.create_queue( id="catalog_prefix_update_queue", - visibility_timeout=core.Duration.seconds(60), - retention_period=core.Duration.seconds(1209600), + visibility_timeout=Duration.seconds(60), + retention_period=Duration.seconds(1209600), dead_letter_queue=sqs.DeadLetterQueue( max_receive_count=3, queue=self.queues_["dead_letter_queue"] ), @@ -190,7 +190,7 @@ def create_all_queues(self) -> None: # Reconcile queue for INPE's XML metadata self.create_queue( id="consume_reconcile_queue_dlq", - retention_period=core.Duration.seconds(1209600), + retention_period=Duration.seconds(1209600), ) consume_reconcile_queue_alarm = cloudwatch.Alarm( self, @@ -207,8 +207,8 @@ def create_all_queues(self) -> None: ) self.create_queue( id="reconcile_queue", - visibility_timeout=core.Duration.seconds(1000), - retention_period=core.Duration.seconds(1209600), + visibility_timeout=Duration.seconds(1000), + retention_period=Duration.seconds(1209600), dead_letter_queue=sqs.DeadLetterQueue( max_receive_count=3, queue=self.queues_["consume_reconcile_queue_dlq"] ), @@ -217,7 +217,7 @@ def create_all_queues(self) -> None: # Reconcile queue for STAC items self.create_queue( id="consume_stac_reconcile_queue_dlq", - retention_period=core.Duration.seconds(1209600), + retention_period=Duration.seconds(1209600), ) consume_stac_reconcile_queue_alarm = cloudwatch.Alarm( self, @@ -234,8 +234,8 @@ def create_all_queues(self) -> None: ) self.create_queue( id="stac_reconcile_queue", - visibility_timeout=core.Duration.seconds(1000), - retention_period=core.Duration.seconds(1209600), + visibility_timeout=Duration.seconds(1000), + retention_period=Duration.seconds(1209600), dead_letter_queue=sqs.DeadLetterQueue( max_receive_count=3, queue=self.queues_["consume_stac_reconcile_queue_dlq"], @@ -246,8 +246,8 @@ def create_all_queues(self) -> None: # topic with new stac items self.create_queue( id="insert_into_elasticsearch_queue", - visibility_timeout=core.Duration.seconds(180), - retention_period=core.Duration.seconds(1209600), + visibility_timeout=Duration.seconds(180), + retention_period=Duration.seconds(1209600), dead_letter_queue=sqs.DeadLetterQueue( max_receive_count=3, queue=self.queues_["dead_letter_queue"] ), @@ -272,8 +272,8 @@ def create_all_queues(self) -> None: # This queue subscribe only to new item topics self.create_queue( id="backup_insert_into_elasticsearch_queue", - visibility_timeout=core.Duration.seconds(180), - retention_period=core.Duration.days(settings.backup_queue_retention_days), + visibility_timeout=Duration.seconds(180), + retention_period=Duration.days(settings.backup_queue_retention_days), dead_letter_queue=sqs.DeadLetterQueue( max_receive_count=3, queue=self.queues_["dead_letter_queue"] ), @@ -287,7 +287,7 @@ def create_all_queues(self) -> None: # Queue for corrupted XML entries, see #89 self.create_queue( - id="corrupted_xml_queue", retention_period=core.Duration.days(14), + id="corrupted_xml_queue", retention_period=Duration.days(14), ) def create_all_topics(self) -> None: @@ -303,7 +303,7 @@ def create_all_topics(self) -> None: ) # Public STAC item topic for new STAC items self.topics_["stac_item_topic"] = sns.Topic(self, "stac_item_topic") - core.CfnOutput( + CfnOutput( self, "stac_item_topic_output", value=self.topics_["stac_item_topic"].topic_arn, @@ -391,7 +391,7 @@ def create_all_lambdas(self) -> None: "MESSAGE_BATCH_SIZE": "1", }, }, - timeout=core.Duration.seconds(55), + timeout=Duration.seconds(55), dead_letter_queue=self.queues_["process_new_scenes_queue_dlq"], layers=[self.layers_["common_layer"]], description="Process new scenes from quicklook queue", @@ -415,7 +415,7 @@ def create_all_lambdas(self) -> None: ].queue_url }, }, - timeout=core.Duration.seconds(900), + timeout=Duration.seconds(900), dead_letter_queue=self.queues_["dead_letter_queue"], layers=[self.layers_["common_layer"]], description="Generate levels into output table from input table", @@ -429,7 +429,7 @@ def create_all_lambdas(self) -> None: handler="code.trigger_handler", runtime=aws_lambda.Runtime.PYTHON_3_7, environment={**self.lambdas_env_,}, - timeout=core.Duration.seconds(55), + timeout=Duration.seconds(55), dead_letter_queue=self.queues_["dead_letter_queue"], layers=[self.layers_["common_layer"]], description="Update catalog from prefix", @@ -451,7 +451,7 @@ def create_all_lambdas(self) -> None: **self.lambdas_env_, **{"RECONCILE_QUEUE": self.queues_["reconcile_queue"].queue_url}, }, - timeout=core.Duration.seconds(300), + timeout=Duration.seconds(300), dead_letter_queue=self.queues_["dead_letter_queue"], layers=[self.layers_["common_layer"]], description="Populates reconcile queue with S3 keys from a common prefix", @@ -468,7 +468,7 @@ def create_all_lambdas(self) -> None: **self.lambdas_env_, **{"NEW_SCENES_QUEUE": self.queues_["new_scenes_queue"].queue_url}, }, - timeout=core.Duration.seconds(900), + timeout=Duration.seconds(900), dead_letter_queue=self.queues_["consume_reconcile_queue_dlq"], layers=[self.layers_["common_layer"]], description="Consume dirs from reconcile queue, populating " @@ -513,7 +513,7 @@ def create_all_lambdas(self) -> None: runtime=aws_lambda.Runtime.PYTHON_3_7, environment={**self.lambdas_env_,}, layers=[self.layers_["common_layer"]], - timeout=core.Duration.seconds(30), + timeout=Duration.seconds(30), dead_letter_queue=self.queues_["dead_letter_queue"], description="Create Elasticsearch stac index", ) @@ -530,7 +530,7 @@ def create_all_lambdas(self) -> None: **{"ES_STRIPPED": "YES", "BULK_CALLS": "1", "BULK_SIZE": "10"}, }, layers=[self.layers_["common_layer"]], - timeout=core.Duration.seconds(30), + timeout=Duration.seconds(30), dead_letter_queue=self.queues_["dead_letter_queue"], # Concurrent executions tuned to work with t2.small.elasticsearch reserved_concurrent_executions=5, @@ -551,7 +551,7 @@ def create_all_lambdas(self) -> None: runtime=aws_lambda.Runtime.PYTHON_3_7, environment=self.lambdas_env_, layers=[self.layers_["common_layer"]], - timeout=core.Duration.seconds(900), + timeout=Duration.seconds(900), description="Reindex STAC items from a prefix", ) # Batch size changed from 5 to 2 to reduce the lambda work and increase @@ -568,7 +568,7 @@ def create_all_lambdas(self) -> None: handler="code.populate_stac_reconcile_queue_handler", runtime=aws_lambda.Runtime.PYTHON_3_7, environment={**self.lambdas_env_,}, - timeout=core.Duration.seconds(300), + timeout=Duration.seconds(300), dead_letter_queue=self.queues_["dead_letter_queue"], layers=[self.layers_["common_layer"]], description="Populates reconcile queue with STAC items from a common prefix", @@ -591,7 +591,7 @@ def create_api_lambdas(self) -> None: runtime=aws_lambda.Runtime.PYTHON_3_7, environment={**self.lambdas_env_,}, layers=[self.layers_["common_layer"]], - timeout=core.Duration.seconds(30), + timeout=Duration.seconds(30), dead_letter_queue=self.queues_["api_dead_letter_queue"], description="Implement / endpoint (landing page)", ) @@ -604,7 +604,7 @@ def create_api_lambdas(self) -> None: runtime=aws_lambda.Runtime.PYTHON_3_7, environment={**self.lambdas_env_,}, layers=[self.layers_["common_layer"]], - timeout=core.Duration.seconds(55), + timeout=Duration.seconds(55), dead_letter_queue=self.queues_["api_dead_letter_queue"], description="Implement /search endpoint", ) @@ -623,9 +623,7 @@ def create_api_gateway(self) -> None: "openapi_asset", path="cbers2stac/openapi/core-item-search-query-integrated.yaml", ) - data = core.Fn.transform( - "AWS::Include", {"Location": openapi_asset.s3_object_url} - ) + data = Fn.transform("AWS::Include", {"Location": openapi_asset.s3_object_url}) definition = apigateway.AssetApiDefinition.from_inline(data) apigw = apigateway.SpecRestApi( self, @@ -640,12 +638,12 @@ def create_api_gateway(self) -> None: self, "canary_artifacts", auto_delete_objects=True, - removal_policy=core.RemovalPolicy.DESTROY, + removal_policy=RemovalPolicy.DESTROY, ) canary = synthetics.Canary( self, "SearchEndpointCanary", - schedule=synthetics.Schedule.rate(core.Duration.hours(1)), + schedule=synthetics.Schedule.rate(Duration.hours(1)), runtime=synthetics.Runtime.SYNTHETICS_PYTHON_SELENIUM_1_0, test=synthetics.Test.custom( code=synthetics.Code.from_asset("cbers2stac/canary", exclude=["*~"]), @@ -659,7 +657,7 @@ def create_api_gateway(self) -> None: canary_alarm = cloudwatch.Alarm( self, "CanaryAlarm", - metric=canary.metric_failed(period=core.Duration.hours(1)), + metric=canary.metric_failed(period=Duration.hours(1)), evaluation_periods=1, threshold=0, comparison_operator=ComparisonOperator.GREATER_THAN_THRESHOLD, @@ -697,7 +695,7 @@ def create_es_domain(self) -> None: # No need to specify resource, the domain is implicit ) ], - removal_policy=core.RemovalPolicy.DESTROY, + removal_policy=RemovalPolicy.DESTROY, enable_version_upgrade=False, ) @@ -709,7 +707,7 @@ def create_es_domain(self) -> None: def __init__( self, - scope: core.Construct, + scope: Construct, stack_id: str, description: str, env: Dict[str, str], @@ -773,7 +771,7 @@ def __init__( block_public_acls=False, restrict_public_buckets=False, ), - removal_policy=core.RemovalPolicy.DESTROY, + removal_policy=RemovalPolicy.DESTROY, ) self.lambdas_env_.update({"STAC_BUCKET": stac_working_bucket.bucket_name}) self.lambdas_perms_.append( @@ -821,7 +819,7 @@ def __init__( name=DBTable.pk_attr_name_, type=dynamodb.AttributeType.STRING ), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, - removal_policy=core.RemovalPolicy.DESTROY, + removal_policy=RemovalPolicy.DESTROY, ) self.lambdas_env_.update( {"CATALOG_UPDATE_TABLE": catalog_update_table.table_name} @@ -899,7 +897,7 @@ def __init__( self.create_es_domain() -app = core.App() +app = App() # Tag infrastructure for key, value in { @@ -907,7 +905,7 @@ def __init__( "cost_center": settings.cost_center, }.items(): if value: - core.Tags.of(app).add(key, value) + Tags.of(app).add(key, value) stackname = f"{settings.name}-{settings.stage}" CBERS2STACStack(