@@ -2,8 +2,14 @@ import { DynamoDBStreamHandler } from "aws-lambda";
22import { unmarshall } from "@aws-sdk/util-dynamodb" ;
33import { AttributeValue , DynamoDBClient } from "@aws-sdk/client-dynamodb" ;
44import { DynamoDBDocumentClient , UpdateCommand } from "@aws-sdk/lib-dynamodb" ;
5+ import {
6+ SchedulerClient ,
7+ CreateScheduleCommand ,
8+ } from "@aws-sdk/client-scheduler" ;
59
610const ENV_VARIABLE_REVANT_COST_TABLE_NAME = "REVANT_COST_TABLE_NAME" ;
11+ const ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX = "REVANT_COST_LIMIT" ;
12+ const ENV_VARIABLE_REVANT_SCHEDULE_ROLE_ARN = "REVANT_SCHEDULE_ROLE_ARN" ;
713
814const DYNAMODB_ACCRUED_EXPENSES_ATTRIBUTE_NAME = "accruedExpenses" ;
915const DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME = "incurredExpensesRate" ;
@@ -23,6 +29,7 @@ type BudgetUpdateOperation = {
2329
2430const dynamoDBClient = new DynamoDBClient ( { } ) ;
2531const dynamoDBDocumentClient = DynamoDBDocumentClient . from ( dynamoDBClient ) ;
32+ const schedulerClient = new SchedulerClient ( { } ) ;
2633
2734const isBudgetUpdateOperation = ( {
2835 oldBudget,
@@ -41,6 +48,25 @@ const calculateNewAccruedExpenses = ({
4148 1000
4249 ) * oldBudget [ DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME ] ;
4350
51+ const calculateBudgetReachedEstimatedDate = ( {
52+ accruedExpenses,
53+ incurredExpensesRate,
54+ updatedAt,
55+ budget,
56+ } : {
57+ accruedExpenses : number ;
58+ incurredExpensesRate : number ;
59+ updatedAt : Date ;
60+ budget : number ;
61+ } ) : Date => {
62+ const budgetReachedDate = new Date ( updatedAt ) ;
63+ budgetReachedDate . setSeconds (
64+ budgetReachedDate . getSeconds ( ) +
65+ ( budget - accruedExpenses ) / incurredExpensesRate
66+ ) ;
67+ return budgetReachedDate ;
68+ } ;
69+
4470export const handler : DynamoDBStreamHandler = async ( { Records } ) => {
4571 console . log ( `${ Records . length } records received` ) ;
4672 const budgetUpdatesOperations = Records . map ( ( record ) => ( {
@@ -56,12 +82,21 @@ export const handler: DynamoDBStreamHandler = async ({ Records }) => {
5682 `${ budgetUpdatesOperations . length } budget update operations received`
5783 ) ;
5884
85+ const budgets = Object . fromEntries (
86+ Object . entries ( process . env )
87+ . filter ( ( [ key ] ) => key . startsWith ( ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX ) )
88+ . map ( ( [ key , value ] ) => [
89+ key . slice ( ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX . length + 1 ) ,
90+ Number ( value ) ,
91+ ] )
92+ ) ;
93+
5994 const failedUpdateIds : { itemIdentifier : string } [ ] = [ ] ;
6095 await Promise . all (
6196 budgetUpdatesOperations . map (
6297 async ( { itemIdentifier, oldBudget, newBudget } ) => {
6398 try {
64- await dynamoDBDocumentClient . send (
99+ const { Attributes } = await dynamoDBDocumentClient . send (
65100 new UpdateCommand ( {
66101 TableName : process . env [ ENV_VARIABLE_REVANT_COST_TABLE_NAME ] ,
67102 Key : { PK : oldBudget . PK } ,
@@ -75,8 +110,38 @@ export const handler: DynamoDBStreamHandler = async ({ Records }) => {
75110 newBudget,
76111 } ) ,
77112 } ,
113+ ReturnValues : "ALL_NEW" ,
78114 } )
79115 ) ;
116+ if ( Attributes === undefined ) {
117+ console . error ( "Did not get any updated budget from DynamDB" ) ;
118+ return ;
119+ }
120+
121+ const address = oldBudget . PK . split ( "#" ) [ 1 ] ;
122+ const budget = Number ( budgets [ address ) ;
123+ const budgetReachedDate = calculateBudgetReachedEstimatedDate ( {
124+ accruedExpenses : Attributes [
125+ DYNAMODB_ACCRUED_EXPENSES_ATTRIBUTE_NAME
126+ ] as number ,
127+ incurredExpensesRate : Attributes [
128+ DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME
129+ ] as number ,
130+ updatedAt : new Date (
131+ Attributes [ DYNAMODB_LAST_UPDATE_ATTRIBUTE_NAME ]
132+ ) ,
133+ budget,
134+ } ) ;
135+ await schedulerClient . send ( new CreateScheduleCommand ( {
136+ Name : address ,
137+ ScheduleExpression : `at(${ budgetReachedDate . toISOString ( ) . split ( '.' ) [ 0 ] } )` ,
138+ Target : {
139+ RoleArn : process . env [ ENV_VARIABLE_REVANT_SCHEDULE_ROLE_ARN ]
140+ } ,
141+ FlexibleTimeWindow : {
142+ Mode : "OFF"
143+ }
144+ } ) ) ;
80145 } catch ( error ) {
81146 failedUpdateIds . push ( { itemIdentifier } ) ;
82147 }
0 commit comments