@@ -2,8 +2,15 @@ import { DynamoDBStreamHandler } from "aws-lambda";
22import { unmarshall } from "@aws-sdk/util-dynamodb" ;
33import { AttributeValue , DynamoDBClient } from "@aws-sdk/client-dynamodb" ;
44import { DynamoDBDocumentClient , UpdateCommand } from "@aws-sdk/lib-dynamodb" ;
5+ import {
6+ SchedulerClient ,
7+ CreateScheduleCommand ,
8+ } from "@aws-sdk/client-scheduler" ;
59
610const ENV_VARIABLE_REVANT_COST_TABLE_NAME = "REVANT_COST_TABLE_NAME" ;
11+ const ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX = "REVANT_COST_LIMIT" ;
12+ const ENV_VARIABLE_REVANT_SCHEDULE_ROLE_ARN = "REVANT_SCHEDULE_ROLE_ARN" ;
13+ const ENV_VARIABLE_REVANT_SCHEDULE_FUNCTION_ARN = "REVANT_SCHEDULE_FUNCTION_ARN" ;
714
815const DYNAMODB_ACCRUED_EXPENSES_ATTRIBUTE_NAME = "accruedExpenses" ;
916const DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME = "incurredExpensesRate" ;
@@ -23,6 +30,7 @@ type BudgetUpdateOperation = {
2330
2431const dynamoDBClient = new DynamoDBClient ( { } ) ;
2532const dynamoDBDocumentClient = DynamoDBDocumentClient . from ( dynamoDBClient ) ;
33+ const schedulerClient = new SchedulerClient ( { } ) ;
2634
2735const isBudgetUpdateOperation = ( {
2836 oldBudget,
@@ -41,6 +49,25 @@ const calculateNewAccruedExpenses = ({
4149 1000
4250 ) * oldBudget [ DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME ] ;
4351
52+ const calculateBudgetReachedEstimatedDate = ( {
53+ accruedExpenses,
54+ incurredExpensesRate,
55+ updatedAt,
56+ budget,
57+ } : {
58+ accruedExpenses : number ;
59+ incurredExpensesRate : number ;
60+ updatedAt : Date ;
61+ budget : number ;
62+ } ) : Date => {
63+ const budgetReachedDate = new Date ( updatedAt ) ;
64+ budgetReachedDate . setSeconds (
65+ budgetReachedDate . getSeconds ( ) +
66+ ( budget - accruedExpenses ) / incurredExpensesRate
67+ ) ;
68+ return budgetReachedDate ;
69+ } ;
70+
4471export const handler : DynamoDBStreamHandler = async ( { Records } ) => {
4572 console . log ( `${ Records . length } records received` ) ;
4673 const budgetUpdatesOperations = Records . map ( ( record ) => ( {
@@ -56,12 +83,21 @@ export const handler: DynamoDBStreamHandler = async ({ Records }) => {
5683 `${ budgetUpdatesOperations . length } budget update operations received`
5784 ) ;
5885
86+ const budgets = Object . fromEntries (
87+ Object . entries ( process . env )
88+ . filter ( ( [ key ] ) => key . startsWith ( ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX ) )
89+ . map ( ( [ key , value ] ) => [
90+ key . slice ( ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX . length + 1 ) ,
91+ Number ( value ) ,
92+ ] )
93+ ) ;
94+
5995 const failedUpdateIds : { itemIdentifier : string } [ ] = [ ] ;
6096 await Promise . all (
6197 budgetUpdatesOperations . map (
6298 async ( { itemIdentifier, oldBudget, newBudget } ) => {
6399 try {
64- await dynamoDBDocumentClient . send (
100+ const { Attributes } = await dynamoDBDocumentClient . send (
65101 new UpdateCommand ( {
66102 TableName : process . env [ ENV_VARIABLE_REVANT_COST_TABLE_NAME ] ,
67103 Key : { PK : oldBudget . PK } ,
@@ -75,8 +111,39 @@ export const handler: DynamoDBStreamHandler = async ({ Records }) => {
75111 newBudget,
76112 } ) ,
77113 } ,
114+ ReturnValues : "ALL_NEW" ,
78115 } )
79116 ) ;
117+ if ( Attributes === undefined ) {
118+ console . error ( "Did not get any updated budget from DynamDB" ) ;
119+ return ;
120+ }
121+
122+ const address = oldBudget . PK . split ( "#" ) [ 1 ] ;
123+ const budget = budgets [ address ] ;
124+ const budgetReachedDate = calculateBudgetReachedEstimatedDate ( {
125+ accruedExpenses : Attributes [
126+ DYNAMODB_ACCRUED_EXPENSES_ATTRIBUTE_NAME
127+ ] as number ,
128+ incurredExpensesRate : Attributes [
129+ DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME
130+ ] as number ,
131+ updatedAt : new Date (
132+ Attributes [ DYNAMODB_LAST_UPDATE_ATTRIBUTE_NAME ]
133+ ) ,
134+ budget,
135+ } ) ;
136+ await schedulerClient . send ( new CreateScheduleCommand ( {
137+ Name : address ,
138+ ScheduleExpression : `at(${ budgetReachedDate . toISOString ( ) . split ( '.' ) [ 0 ] } )` ,
139+ Target : {
140+ RoleArn : process . env [ ENV_VARIABLE_REVANT_SCHEDULE_ROLE_ARN ] ,
141+ Arn : process . env [ ENV_VARIABLE_REVANT_SCHEDULE_FUNCTION_ARN ] ,
142+ } ,
143+ FlexibleTimeWindow : {
144+ Mode : "OFF"
145+ }
146+ } ) ) ;
80147 } catch ( error ) {
81148 failedUpdateIds . push ( { itemIdentifier } ) ;
82149 }
0 commit comments