Skip to content

Commit

Permalink
[aws-lambda] Merge pull request #65 from saagie/aws-lambda
Browse files Browse the repository at this point in the history
AWS Lambda
  • Loading branch information
youenchene authored Apr 16, 2020
2 parents 796a12c + 405239d commit 088135a
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 54 deletions.
36 changes: 18 additions & 18 deletions technologies/job/aws-glue/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ type: JOB
icon: job

contexts:
- id: workflow
label: workflow
description: "A workflow is an orchestration used to visualize and manage the relationship and execution of multiple triggers, jobs and crawlers."
- id: crawler
label: crawler
description: "A crawler connects to a data store, progresses through a prioritized list of classifiers to determine the schema for your data, and then creates metadata tables in your data catalog."
recommended: false
trustLevel: experimental
endpoint:
Expand All @@ -34,25 +34,31 @@ contexts:
label: Endpoint
required: true
- type: SELECT
name: workflow
label: Workflow
name: crawler
label: Crawler
required: true
options:
script: ./jobForm.js
function: getWorkflows
function: getCrawlers
dependsOn:
- endpoint
instance:
actions:
onStart:
script: ./instanceActions.js
function: start
onStop:
script: ./instanceActions.js
function: stop
getStatus:
script: ./instanceActions.js
function: getStatus
- id: crawler
label: crawler
description: "A crawler connects to a data store, progresses through a prioritized list of classifiers to determine the schema for your data, and then creates metadata tables in your data catalog."
getLogs:
script: ./instanceActions.js
function: getLogs
- id: workflow
label: workflow
description: "A workflow is an orchestration used to visualize and manage the relationship and execution of multiple triggers, jobs and crawlers."
recommended: false
trustLevel: experimental
endpoint:
Expand All @@ -77,28 +83,22 @@ contexts:
label: Endpoint
required: true
- type: SELECT
name: crawler
label: Crawler
name: workflow
label: Workflow
required: true
options:
script: ./jobForm.js
function: getCrawlers
function: getWorkflows
dependsOn:
- endpoint
instance:
actions:
onStart:
script: ./instanceActions.js
function: start
onStop:
script: ./instanceActions.js
function: stop
getStatus:
script: ./instanceActions.js
function: getStatus
getLogs:
script: ./instanceActions.js
function: getLogs
- id: job
label: job
description: "A job is your business logic required to perform extract, transform and load (ETL) work. Job runs are initiated by triggers which can be scheduled or driven by events."
Expand Down
50 changes: 50 additions & 0 deletions technologies/job/aws-lambda/functions/context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
id: functions
label: functions
description: "Only lambda with Kinesis, SQS and DynamoDB Stream."
recommended: true
trustLevel: experimental
endpoint:
features:
- type: TEXT
name: aws_access_key_id
label: Access Key ID
required: true
- type: PASSWORD
name: aws_secret_access_key
label: Secret Access Key
required: true
- type: TEXT
name: region
label: Region
helper: "AWS region. Example: us-east-1"
required: true
job:
features:
- type: ENDPOINT
name: endpoint
label: Endpoint
required: true
- type: SELECT
name: functions
label: Functions
helper: Only lambda with Kinesis, SQS and DynamoDB Stream.
required: true
options:
script: ./jobForm.js
function: getFunctions
dependsOn:
- endpoint
instance:
actions:
onStart:
script: ./instanceActions.js
function: start
onStop:
script: ./instanceActions.js
function: stop
getStatus:
script: ./instanceActions.js
function: getStatus
getLogs:
script: ./instanceActions.js
function: getLogs
176 changes: 176 additions & 0 deletions technologies/job/aws-lambda/functions/instanceActions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
const { Response, JobStatus, Log, Stream } = require('@saagie/sdk');
const AWS = require('aws-sdk');


const AWS_LAMBDA_OPTIONS = { apiVersion: '2015-03-31' };

/**
* Logic to start the external job instance.
* @param {Object} params
* @param {Object} params.job - Contains job data including featuresValues.
* @param {Object} params.instance - Contains instance data.
*/
exports.start = async ({ job, instance }) => {
try {
console.log('START INSTANCE:', instance);
AWS.config.update({credentials: { accessKeyId : job.featuresValues.endpoint.aws_access_key_id, secretAccessKey: job.featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: job.featuresValues.endpoint.region});

const lambda = new AWS.Lambda(AWS_LAMBDA_OPTIONS);

// Start all trigger/eventsource
const dataList = job.featuresValues.functions.sourceId.map(
(value) =>
lambda.updateEventSourceMapping({
Enabled: true,
FunctionName: job.featuresValues.functions.id,
UUID: value
}).promise()
);
await Promise.all(dataList);

// You can return any payload you want to get in the stop and getStatus functions.
return Response.success({});
} catch (error) {
console.log(error);
return Response.error('Fail to start job', { error });
}
};

/**
* Logic to stop the external job instance.
* @param {Object} params
* @param {Object} params.job - Contains job data including featuresValues.
* @param {Object} params.instance - Contains instance data including the payload returned in the start function.
*/
exports.stop = async ({ job, instance }) => {
try {
console.log('STOP INSTANCE:', instance);
AWS.config.update({credentials: { accessKeyId : job.featuresValues.endpoint.aws_access_key_id, secretAccessKey: job.featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: job.featuresValues.endpoint.region});

const lambda = new AWS.Lambda(AWS_LAMBDA_OPTIONS);

// Stop all trigger/eventsource
const dataList = job.featuresValues.functions.sourceId.map(
(value) =>
lambda.updateEventSourceMapping({
Enabled: false,
FunctionName: job.featuresValues.functions.id,
UUID: value
}).promise()
);
await Promise.all(dataList);

return Response.success();
} catch (error) {
console.log(error);
return Response.error('Fail to stop job', { error });
}
};

/**
* Logic to retrieve the external job instance status.
* @param {Object} params
* @param {Object} params.job - Contains job data including featuresValues.
* @param {Object} params.instance - Contains instance data including the payload returned in the start function.
*/
exports.getStatus = async ({ job, instance }) => {
try {
console.log('GET STATUS INSTANCE:', instance);

AWS.config.update({credentials: { accessKeyId : job.featuresValues.endpoint.aws_access_key_id, secretAccessKey: job.featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: job.featuresValues.endpoint.region});

const lambda = new AWS.Lambda(AWS_LAMBDA_OPTIONS);

let statusList = job.featuresValues.functions.sourceId.map(
(value) =>
lambda.getEventSourceMapping({UUID: value }).promise().then((data) => {
return ({
uuid: value,
status: data.State
})
}
));
statusList=await Promise.all(statusList);

// Computing worst case status if a lambda got different triggers/eventsource
const xrefStatus = {
"Creating": 30,
"Enabling": 10,
"Enabled": 0,
"Disabling": 40,
"Disabled": 50,
"Updating": 20,
"Deleting": 60
}

const status = statusList.reduce((consolidated, item) => {
if (xrefStatus[item.status] > xrefStatus[consolidated])
return item.status;
else
return consolidated;
}, "Enabled");

// Existing status : Creating, Enabling, Enabled, Disabling, Disabled, Updating, or Deleting
const JOB_STATES = {
Creating: JobStatus.REQUESTED,
Enabling: JobStatus.QUEUED,
Enabled: JobStatus.RUNNING,
Disabling: JobStatus.KILLING,
Disabled: JobStatus.KILLED,
Updating: JobStatus.AWAITING,
Deleting: JobStatus.AWAITING,
};

return Response.success(JOB_STATES[status] || JobStatus.AWAITING);
} catch (error) {
console.log(error);
return Response.error(`Failed to get status for functions ${job.featuresValues.functions.id}`, { error });
}
};

/**
* Logic to retrieve the external job instance logs.
* @param {Object} params
* @param {Object} params.job - Contains job data including featuresValues.
* @param {Object} params.instance - Contains instance data including the payload returned in the start function.
*/
exports.getLogs = async ({ job, instance }) => {
try {
console.log('GET LOG INSTANCE:', instance);
AWS.config.update({credentials: { accessKeyId : job.featuresValues.endpoint.aws_access_key_id, secretAccessKey: job.featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: job.featuresValues.endpoint.region});

const cwl = new AWS.CloudWatchLogs({apiVersion: '2014-03-28'});

const paramslogstreams = {
logGroupName: `/aws/lambda/${job.featuresValues.functions.label}`,
};

const logstreams = await cwl.describeLogStreams(paramslogstreams).promise();

var logs=logstreams.logStreams.map(
(ls) =>
cwl.getLogEvents(
{
logGroupName: `/aws/lambda/${job.featuresValues.functions.label}`,
logStreamName: ls.logStreamName
}
).promise().then((data) => {
return (
data.events
)
}
));

logs=(await Promise.all(logs)).flat();

return Response.success(logs.map((item) => Log(item.message, Stream.STDOUT, new Date(item.timestamp*1000).toISOString())));

} catch (error) {
console.log(error);
return Response.error(`Failed to get log for job ${job.featuresValues.functions.id}`, { error });
}
};
53 changes: 53 additions & 0 deletions technologies/job/aws-lambda/functions/jobForm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const { Response } = require('@saagie/sdk');
const AWS = require('aws-sdk');

/**
* Example of function to retrieve select options from an external endpoint.
* @param {Object} entity - Contains entity data including featuresValues.
* @param {Object} entity.featuresValues - Contains all the values from the entity features declared in the context.yaml
*/
exports.getFunctions = async ({ featuresValues }) => {
try {
AWS.config.update({credentials: { accessKeyId : featuresValues.endpoint.aws_access_key_id, secretAccessKey: featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: featuresValues.endpoint.region});

const lambda = new AWS.Lambda({apiVersion: '2015-03-31'});

// Get all lmabda functions
const data = await lambda.listFunctions().promise();

if (!data.Functions || !data.Functions.length) {
return Response.empty('No functions availables');
}

const functionsList = data.Functions.map(({ FunctionName, FunctionArn }) => ({
id: FunctionArn,
label: FunctionName,
}));

// And filtered out functions without an event source - Only lambda with Kinesis, SQS and DynamoDB Stream can be enable/disable.
// https://stackoverflow.com/questions/46199256/disable-and-enable-aws-lambda-trigger-programmatically

const filteredFunctions = functionsList.map(
({id, label}) =>
lambda.listEventSourceMappings({FunctionName : id }).promise().then((data) => {
if (data.EventSourceMappings && data.EventSourceMappings.length>0) {
return ({
id: id,
label: label,
sourceId: data.EventSourceMappings.map((item) => item.UUID),
})
} else {
return ({
id: id,
label: label
})
}
}));

const functions=(await Promise.all(filteredFunctions)).filter(item => (item.sourceId && item.sourceId.length>0));
return Response.success(functions);
} catch (error) {
return Response.error("Can't retrieve functions", { error });
}
};
Loading

0 comments on commit 088135a

Please sign in to comment.