Skip to content

CDK Lab to schedule serverless tasks with EventBridge, Lambda and SQS

Notifications You must be signed in to change notification settings

Vivicorp-AWS/cdk-lab-serverless-task-scheduler

Repository files navigation

CDK Lab - Serverless Task Scheduler

I used to schedule some cron tasks and send the results to IM application, such as Slack, Discord ...etc.

What I did before was implementing a gigantic, stable, famous workflow platform like Apache Airflow and Prefect. They are still good choices, but it's time for me to adopt a lighter, event-driven architecture.

When talk about AWS, we can use Amazon EventBridge, AWS Lambda, Amazon SQS to build the same architecture to fulfill the same job:

Hope this will cost less money and effort. Let's get hands dirty 🛠️ and have the party started! 🎉

Components

All we need are:

  • EventBridge Scheduler (Event Scheduler): Scheduler to invoke a Lambda function with a Role attached
    • Service Role: Has permission to invoke a Lambda function
  • Lambda function (Send Tasks): Send messages to queue with a Role attached
    • Service Role: Has permission to send messages to SQS, and manage CloudWatch logs
  • Queue: Store the tasks
  • Lambda function (Run Task): Receive messages from queue with a Role attached
    • Service Role: Has permission to receive message from queue, and manage CloudWatch logs

File Sturcture

.
├── README.md
├── app.py                  # Entrypoint
├── cdk.context.json        # Set this stack's prefix
├── cdk.json
├── diagram-detailed.jpg
├── diagram-services.jpg
├── diagram-story.jpg
├── lambda
│   ├── run_task
│   │   ├── __init__.py
│   │   └── index.py        # Task processing logic
│   └── send_task
│       ├── __init__.py
│       └── index.py        # Task sending logic
├── layer
│   └── python              # Lambda's Python dependencies
├── poetry.lock
├── pyproject.toml
├── requirements-layer.txt  # Lambda's Python dependency list
├── requirements.txt
├── stacks
│   ├── __init__.py
│   ├── __pycache__
│   ├── lambda_stack.py     # Lambdas' configurations
│   ├── scheduler_stack.py  # EventBridge Scheduler's configurations
│   └── sqs_stack.py        # SQS Queue's configurations
└── tests                   # Test (Incomplete)
    ├── __init__.py
    └── unit

Deployment

Step 1: Configure AWS Credentials

Install AWS CLi first, then create the config file by executing:

aws configure

Step 2: Set the stacks' prefix

The deployed stack has a prefix, the default value is cdklab. Edit the value in runtime context file (cdk.context.json):

{
  "prefix": "cdklab"
}

Step 3: Reset the scheduler

The default schedule is set at 23:59:59 on 2037-12-31, Refer to stacks.scheduler_stack.scheduler_lambda.

Choose to set it as an rate-based, cron-based or one-time job (Reference), and replace the value string of the schedule_expression parameter that follows the syntax below:

And set the timezone by replacing the value of the schedule_expression_timezone parameter with the TZ identifier like Asia/Taipei.

Step 4: Create Task Definitions

By default, the tasks are stored in the tasks List object in lambda/send_task/index.py. Such as:

tasks = [
    "Hello from Lambda #1!",
    "Hello from Lambda #2!",
]

Simply modify the content and each String object will be the task that the next (run-task) Lambda function should execute with.

If the task definitions is much larger than this example, try store into another text file then read it in:

# e.g. The tasks are now in tasks.txt, each line represents a unique task definition
with open("tasks.txt", "r") as file:
    tasks = file.open().splitlines()

(Optional) Step 5: Install Python Lambda Layer (Python dependencies)

All Python dependencies must stored under layer/python/ as Lambda Layer then pack and send to AWS Lambda.

First add the dependencies to requirements-layer.txt, then install the dependencies with the command below:

mkdir -p ./layer/python/
pip install --target ./layer/python -r requirements-layer.txt

[NOTE] Remember to manually add dependencies to requirements-layer.txt.

Step 6: Add Business Logic Code

Insert the code into lambda/run_task/index.py.

Step 7: Configure timeout duration of Lambda functions

Find lambda_stack.py and change the timeout duration. These are configured by default:

  • For send-task Lambda function: 30 seconds
  • For run-task Lambda function: 1 minute

Feel free to change the duration from 1 second to 15 minutes. Evaluate the duration by planning a dry run of your business logics.

(Optional) Step 8: Change time related parameters of SQS Queue

There are 3 parameters:

  • visibility_timeout: How long does a Lambda function process the task?
    • Default value: 1 minute
  • retention_period: How long does the task remain in the queue?
    • Default value: 15 minutes
  • receive_message_wait_time: How long does the "ReceiveMessage" (Polling) action takes?
    • Default value: 0 minute (Immediate)

Estimate the settings by the program you goona run, end override them by editing the stack/sqs_stack.py file.

Step 9: Deploy with CDK toolkit (cdk command)

Install the CDK toolkit then deploy by executing:

cdk deploy --all --require-approval=never

(Optional) Step 10: Clean all resources

Not going to use these anymore? Remove them with:

cdk destroy --all

Some Development Tips & Explainations

Role creation

CDK will try to create roles and add necessary permission policies by evaluate the resources that are going to be created. So, if we create a Lambda function, and set a SQS Queue to trigger the function, CDK will automatically create the policies below:

  • CloudWatch log related policies
  • Read SQS Queue message related policies

But there are exceptions in this project:

  1. If the action is written in Lambda function, CDK won't understand what permission does the function need (Refer to stacks.lambda_stack.lambda_sendtask)
  2. If we use L1 construct to create the resources, everything in the CloudFormation template should be created by ourselves (Refer to stacks.scheduler_stack.scheduler_lambda)

If these situations exists, try to create the Roles and Policies manually first.

JSON serialization in Lambda function

We can use the standard JSON library and call json.dumps() function to acomplish the serialization process. But try to do with the code below:

import json

json.dumps(json)  # TypeError: Object of type module is not JSON serializable

Error occurs: module object can not be JSON serialized. Sometimes it's annoying because we can't realize if the object is serializable or not at a glance.

JSONPickle is an amazing project for making JSON serialization. The reason to use JSONPickle better than calling json.dumps is based on the project itself's description:

"It can take almost any Python object and turn the object into JSON"

We added into the Python Lambda Layer by default and already implemented into the Lambda function. Try and enjoy it! 🍻

Example References