Skip to content

Latest commit

 

History

History
160 lines (88 loc) · 8.36 KB

File metadata and controls

160 lines (88 loc) · 8.36 KB

Module 4: Stream Processing

In this module you'll use AWS Lambda to process data from the wildrydes-aggregated Amazon Kinesis Stream created in the last module. The Lambda function will read from the stream and write records to the UnicornSensorData Amazon DynamoDB table created in the first module.

Architecture Overview

Architecture

Our producer is a sensor attached to a unicorn - Shadowfax - currently taking a passenger on a Wild Ryde. This sensor emits data every second including the unicorn's current location, distance traveled in the previous second, and magic points and hit points so that our operations team can monitor the health of our unicorns from Wild Rydes headquarters.

The Amazon Kinesis Analytics application aggregates the per-second data and emits a single record each minute with a sum of distance traveled and minimum and maximum magic and health points for each unicorn. These aggregated messages are sent to another Amazon Kinesis Stream.

In this module you'll create an AWS Lambda function to process the aggregated stream and write those records to an Amazon DynamoDB table.

Before beginning the module ensure that you have the Kinesis command-line clients downloaded by following the installation instructions.

Implementation Instructions

1. Create an IAM role for your Lambda function

Use the IAM console to create a new role. Give it a name like WildRydesStreamProcessorRole and select AWS Lambda for the role type. Attach the managed policy called AWSLambdaKinesisExecutionRole to this role in order to grant permissions for your function to read from Amazon Kinesis streams and to log to Amazon CloudWatch Logs.

You'll need to grant this role permissions to access the Amazon DynamoDB table create in the previous sections. Create an inline policy allowing the role access to the dynamodb:BatchWriteItem action for the Amazon DynamoDB table you created in the File Processing module.

Step-by-step instructions (expand for details)

  1. From the AWS Console, click on Services and then select IAM in the Security, Identity & Compliance section.

  2. Select Roles from the left navigation and then click Create new role.

  3. Select Lambda for the role type from AWS Service Role.

    Note: Selecting a role type automatically creates a trust policy for your role that allows AWS services to assume this role on your behalf. If you were creating this role using the CLI, AWS CloudFormation or another mechanism, you would specify a trust policy directly.

  4. Click Next: Permissions.

  5. Begin typing AWSLambdaKinesisExecutionRole in the Filter text box and check the box next to that role.

  6. Click Next: Review.

  7. Enter WildRydesStreamProcessorRole for the Role Name.

  8. Click Create role.

  9. Type WildRydesStreamProcessorRole into the filter box on the Roles page and click the role you just created.

  10. On the Permissions tab, click Add inline policy link to create a new inline policy. Inline policies screenshot

  11. Ensure Policy Generator is selected and click Select.

  12. Select Amazon DynamoDB from the AWS Service dropdown.

  13. Select BatchWriteItem from the Actions list.

  14. Type the ARN of the DynamoDB table you created in the previous section in the Amazon Resource Name (ARN) field. The ARN is in the format of:

    arn:aws:dynamodb:REGION:ACCOUNT_ID:table/UnicornSensorData
    

    For example, if you've deployed to US East (N. Virginia) and your account ID is 123456789012, your table ARN would be:

    arn:aws:dynamodb:us-east-1:123456789012:table/UnicornSensorData
    

    To find your AWS account ID number in the AWS Management Console, click on Support in the navigation bar in the upper-right, and then click Support Center. Your currently signed in account ID appears in the upper-right corner below the Support menu.

    Policy generator screenshot

  15. Click Add Statement.

    Policy screenshot

  16. Click Next Step then Apply Policy.

2. Create a Lambda function for processing

Use the console to create a new Lambda function called WildRydesStreamProcessor that will be triggered whenever a new record is available in the wildrydes-aggregated stream created in the Streaming Aggregation module.

Use the provided index.js example implementation for your function code by copying and pasting the contents of that file into the Lambda console's editor. Ensure you create an environment variable with the key TABLE_NAME and the value UnicornSensorData.

Make sure you configure your function to use the WildRydesStreamProcessorRole IAM role you created in the previous section.

Step-by-step instructions (expand for details)

  1. Click on Services then select Lambda in the Compute section.

  2. Click Create function.

  3. Click on Author from scratch.

  4. Enter WildRydesStreamProcessor in the Name field.

  5. Select WildRydesStreamProcessorRole from the Existing Role dropdown.

    Create Lambda function screenshot

  6. Click on Create function.

  7. Click on Triggers then click + Add trigger

  8. Click on the dotted outline and select Kinesis. Select wildrydes-aggregated from Kinesis stream, select Trim horizon from Starting position, and tick the Enable trigger checkbox.

    Create Lambda trigger screenshot

    Starting position refers to the position in the stream where AWS Lambda should start reading and trim horizon configures this to the oldest data record in the shard. See ShardIteratorType in the Amazon Kinesis API Reference for more details.

  9. Click Submit.

  10. Click Configuration.

  11. Select Node.js 6.10 for the Runtime.

  12. Leave the default of index.handler for the Handler field.

  13. Copy and paste the code from index.js into the code entry area. Create Lambda function screenshot

  14. Extend Environment variables under the entry area

  15. In Environment variables, enter an environment variable with key TABLE_NAME and value UnicornSensorData. Lambda environment variable screenshot

  16. Scroll to top and click "Save" (Not "Save and test" since we haven't configured any test event)

Implementation Validation

  1. Run the producer to start emiting sensor data to the stream with a unique unicorn name. Replace YOUR_REGION_HERE with your Region. For example, if you've created the stream in US West (Oregon), you'd replace the placeholder with us-west-2.

    ./producer -region YOUR_REGION_HERE -name Rocinante
  2. Click on Services then select DynamoDB in the Database section.

  3. Click on UnicornSensorData.

  4. Click on the Items tab, click Add Filter, enter Name in Enter attribute, and enter Rocinante in Enter value. Verify that the table is being populated with the data from the aggregated stream.

    DynamoDB items screenshot

When you see items from the stream in the table, you can move onto the next module: Data Archiving.

Extra Credit

  • Create a new Lambda function to read from the stream and send proactive alerts to operations personnel if a unicorn's magic points vital sign falls below 50 points.