From 36d884244b56e73921a731faaf3a2e3f63612fe4 Mon Sep 17 00:00:00 2001 From: cjxkb210 Date: Fri, 7 Aug 2020 14:38:41 +0100 Subject: [PATCH 1/3] process s3 event messages from SQS Added ability to consume s3 event messages from an SQS queue. This facilitates the option of having multiple consumers of s3event notifications for the same prefix. You could for example subscribe an SNS topic to the s3event for a given prefix and then have multiple sqs queues subscribed to that topic. This expands usage options for multiple varying consumers of the same s3 prefix event. --- index.js | 196 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 115 insertions(+), 81 deletions(-) diff --git a/index.js b/index.js index 30e7528..a22a2cc 100644 --- a/index.js +++ b/index.js @@ -480,7 +480,7 @@ function handler(event, context) { /* * process what happened if the iterative request to * write to the open pending batch timed out - * + * * TODO Can we force a rotation of the current batch * at this point? */ @@ -1490,102 +1490,136 @@ function handler(event, context) { }); } - /* end of runtime functions */ - - try { - logger.debug(JSON.stringify(event)); + exports.processS3EventRecord = function (event) { + logger.info("processS3EventRecord " + JSON.stringify(event)); - if (!event.Records) { - // filter out unsupported events - logger.error("Event type unsupported by Lambda Redshift Loader"); - logger.info(JSON.stringify(event)); - context.done(null, null); + if (event.Records.length > 1) { + context.done(error, "Unable to process multi-record events"); } else { - if (event.Records.length > 1) { - context.done(error, "Unable to process multi-record events"); + var r = event.Records[0]; + + // ensure that we can process this event based on a variety + // of criteria + var noProcessReason; + if (r.eventSource !== "aws:s3") { + noProcessReason = "Invalid Event Source " + r.eventSource; + } + if (!(r.eventName === "ObjectCreated:Copy" || r.eventName === "ObjectCreated:Put" || r.eventName === 'ObjectCreated:CompleteMultipartUpload')) { + noProcessReason = "Invalid Event Name " + r.eventName; + } + if (r.s3.s3SchemaVersion !== "1.0") { + noProcessReason = "Unknown S3 Schema Version " + r.s3.s3SchemaVersion; + } + + if (noProcessReason) { + logger.error(noProcessReason); + context.done(error, noProcessReason); } else { - var r = event.Records[0]; + // extract the s3 details from the event + var inputInfo = { + bucket: undefined, + key: undefined, + prefix: undefined, + inputFilename: undefined + }; - // ensure that we can process this event based on a variety - // of criteria - var noProcessReason; - if (r.eventSource !== "aws:s3") { - noProcessReason = "Invalid Event Source " + r.eventSource; - } - if (!(r.eventName === "ObjectCreated:Copy" || r.eventName === "ObjectCreated:Put" || r.eventName === 'ObjectCreated:CompleteMultipartUpload')) { - noProcessReason = "Invalid Event Name " + r.eventName; - } - if (r.s3.s3SchemaVersion !== "1.0") { - noProcessReason = "Unknown S3 Schema Version " + r.s3.s3SchemaVersion; - } + inputInfo.bucket = r.s3.bucket.name; + inputInfo.key = decodeURIComponent(r.s3.object.key); - if (noProcessReason) { - logger.error(noProcessReason); - context.done(error, noProcessReason); - } else { - // extract the s3 details from the event - var inputInfo = { - bucket: undefined, - key: undefined, - prefix: undefined, - inputFilename: undefined - }; + // remove the bucket name from the key, if we have + // received it - this happens on object copy + inputInfo.key = inputInfo.key.replace(inputInfo.bucket + "/", ""); - inputInfo.bucket = r.s3.bucket.name; - inputInfo.key = decodeURIComponent(r.s3.object.key); + var keyComponents = inputInfo.key.split('/'); + inputInfo.inputFilename = keyComponents[keyComponents.length - 1]; - // remove the bucket name from the key, if we have - // received it - this happens on object copy - inputInfo.key = inputInfo.key.replace(inputInfo.bucket + "/", ""); + // remove the filename from the prefix value + var searchKey = inputInfo.key.replace(inputInfo.inputFilename, '').replace(/\/$/, ''); - var keyComponents = inputInfo.key.split('/'); - inputInfo.inputFilename = keyComponents[keyComponents.length - 1]; + // transform hive style dynamic prefixes into static + // match prefixes and set the prefix in inputInfo + inputInfo.prefix = inputInfo.bucket + '/' + searchKey.transformHiveStylePrefix(); - // remove the filename from the prefix value - var searchKey = inputInfo.key.replace(inputInfo.inputFilename, '').replace(/\/$/, ''); + // add the object size to inputInfo + inputInfo.size = r.s3.object.size; - // transform hive style dynamic prefixes into static - // match prefixes and set the prefix in inputInfo - inputInfo.prefix = inputInfo.bucket + '/' + searchKey.transformHiveStylePrefix(); + resolveConfig(inputInfo.prefix, function (err, configData) { + /* + * we did get a configuration found by the resolveConfig + * method + */ + if (err) { + logger.error(JSON.stringify(err)); + context.done(err, JSON.stringify(err)); + } else { + // update the inputInfo prefix to match the + // resolved + // config entry + inputInfo.prefix = configData.Item.s3Prefix.S; - // add the object size to inputInfo - inputInfo.size = r.s3.object.size; + logger.debug(JSON.stringify(inputInfo)); - resolveConfig(inputInfo.prefix, function (err, configData) { - /* - * we did get a configuration found by the resolveConfig - * method - */ - if (err) { - logger.error(JSON.stringify(err)); - context.done(err, JSON.stringify(err)); - } else { - // update the inputInfo prefix to match the - // resolved - // config entry - inputInfo.prefix = configData.Item.s3Prefix.S; + // call the foundConfig method with the data + // item + foundConfig(inputInfo, null, configData); + } + }, function (err) { + // finish with no exception - where this file sits + // in the S3 structure is not configured for redshift + // loads, or there was an access issue that prevented us + // querying DDB + logger.error("No Configuration Found for " + inputInfo.prefix); + if (err) { + logger.error(err); + } - logger.debug(JSON.stringify(inputInfo)); + context.done(err, JSON.stringify(err)); + }); + } + } + } - // call the foundConfig method with the data - // item - foundConfig(inputInfo, null, configData); - } - }, function (err) { - // finish with no exception - where this file sits - // in the S3 structure is not configured for redshift - // loads, or there was an access issue that prevented us - // querying DDB - logger.error("No Configuration Found for " + inputInfo.prefix); - if (err) { - logger.error(err); - } + /* end of runtime functions */ - context.done(err, JSON.stringify(err)); - }); + try { + logger.debug(JSON.stringify(event)); + + + if(!event.Records || event.Records.length == 0) { + // filter out unsupported events + logger.error("Event type unsupported by Lambda Redshift Loader"); + logger.info(JSON.stringify(event)); + context.done(null, null); + } + + //obtain the first record in order to establish the eventSource + var record = event.Records[0]; + + var noProcessReason; + + if (record.eventSource == "aws:s3") { //process the s3 event + logger.info("Processing message from S3 event source"); + + exports.processS3EventRecord(event); + } else if(record.eventSource == "aws:sqs") { //process the sqs message + logger.info("Processing " + event.Records.length + " message(s) from SQS event source."); + + //process the s3 event contained in the body of each sqs message. + event.Records.forEach(function(record) { + if(!record.body) { + noProcessReason = "Unable to process message body for event received via sqs event source, body was not present."; + logger.error(noProcessReason); + context.done(error, noProcessReason); } - } + var messageBody = JSON.parse(record.body); + + exports.processS3EventRecord(messageBody); + }); + } else { + noProcessReason = "Invalid Event Source " + record.eventSource; + logger.error(noProcessReason); + context.done(error, noProcessReason); } } catch (e) { logger.error("Unhandled Exception"); @@ -1595,4 +1629,4 @@ function handler(event, context) { } } -exports.handler = handler; \ No newline at end of file +exports.handler = handler; From 18f3c5e56f275eee0af0b88a0b667d8d4428cf95 Mon Sep 17 00:00:00 2001 From: cjxkb210 Date: Fri, 11 Dec 2020 16:53:24 +0000 Subject: [PATCH 2/3] consume bad messages [REQ-2126] - consume the message for bad events, log and move on. --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index a22a2cc..59fe69c 100644 --- a/index.js +++ b/index.js @@ -1589,7 +1589,7 @@ function handler(event, context) { // filter out unsupported events logger.error("Event type unsupported by Lambda Redshift Loader"); logger.info(JSON.stringify(event)); - context.done(null, null); + return; } //obtain the first record in order to establish the eventSource From d0bd1afa2022321b72d9bc4e9ce27526c22dd496 Mon Sep 17 00:00:00 2001 From: cjxkb210 Date: Tue, 12 Jan 2021 11:30:03 +0000 Subject: [PATCH 3/3] Update index.js [REQ-2126] safely consume and ignore aws test events which despite them being test in nature do not honour raw message delivery settings. --- index.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 59fe69c..2062b30 100644 --- a/index.js +++ b/index.js @@ -1493,7 +1493,9 @@ function handler(event, context) { exports.processS3EventRecord = function (event) { logger.info("processS3EventRecord " + JSON.stringify(event)); - if (event.Records.length > 1) { + if (!event.Records) { + logger.error("The provided s3 event was not wellformed or was generated as a test event, this will be ignored."); + } else if (event.Records.length > 1) { context.done(error, "Unable to process multi-record events"); } else { var r = event.Records[0];