Skip to content

Commit

Permalink
Merge pull request #67 from saagie/aws-batch
Browse files Browse the repository at this point in the history
AWS Batch
  • Loading branch information
youenchene authored Apr 16, 2020
2 parents 05054f7 + c68a3ba commit af15b04
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 3 deletions.
58 changes: 58 additions & 0 deletions technologies/job/aws-batch/job/context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
id: job
label: Job
description: ""
recommended: true
trustLevel: experimental
endpoint:
features:
- type: TEXT
name: aws_access_key_id
label: Access Key ID
required: true
- type: PASSWORD
name: aws_secret_access_key
label: Secret Access Key
required: true
- type: TEXT
name: region
label: Region
helper: "AWS region. Example: us-east-1"
required: true
job:
features:
- type: ENDPOINT
name: endpoint
label: Endpoint
required: true
- type: SELECT
name: jobDefinition
label: Job Definition
required: true
options:
script: ./jobForm.js
function: getJobs
dependsOn:
- endpoint
- type: SELECT
name: jobQueue
label: Job Queue
required: true
options:
script: ./jobForm.js
function: getJobQueues
dependsOn:
- endpoint
instance:
actions:
onStart:
script: ./instanceActions.js
function: start
onStop:
script: ./instanceActions.js
function: stop
getStatus:
script: ./instanceActions.js
function: getStatus
getLogs:
script: ./instanceActions.js
function: getLogs
125 changes: 125 additions & 0 deletions technologies/job/aws-batch/job/instanceActions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
const AWS = require('aws-sdk');
const { Response, JobStatus, Log, Stream } = require('@saagie/sdk');

/**
* Logic to start the external job instance.
* @param {Object} params
* @param {Object} params.job - Contains job data including featuresValues.
* @param {Object} params.instance - Contains instance data.
*/
exports.start = async ({ job, instance }) => {
try {
console.log('START INSTANCE:', instance);
AWS.config.update({credentials: { accessKeyId : job.featuresValues.endpoint.aws_access_key_id, secretAccessKey: job.featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: job.featuresValues.endpoint.region});

const batch = new AWS.Batch({apiVersion: '2016-08-10'});

const data = await batch.submitJob({
jobDefinition : job.featuresValues.jobDefinition.id ,
jobName : `${job.featuresValues.jobDefinition.label}-${instance.id}`,
jobQueue : job.featuresValues.jobQueue.id
}).promise();

return Response.success({ customId: data.jobId });
} catch (error) {
console.log(error);
return Response.error('Fail to start job', { error });
}
};

/**
* Logic to stop the external job instance.
* @param {Object} params
* @param {Object} params.job - Contains job data including featuresValues.
* @param {Object} params.instance - Contains instance data including the payload returned in the start function.
*/
exports.stop = async ({ job, instance }) => {
try {
console.log('STOP INSTANCE:', instance);
AWS.config.update({credentials: { accessKeyId : job.featuresValues.endpoint.aws_access_key_id, secretAccessKey: job.featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: job.featuresValues.endpoint.region});

const batch = new AWS.Batch({apiVersion: '2016-08-10'});

const data = await batch.terminateJob({
jobId : instance.payload.customId,
reason: "Terminating job from Saagie."
}).promise();

return Response.success();
} catch (error) {
console.log(error);
return Response.error('Fail to stop job', { error });
}
};

/**
* Logic to retrieve the external job instance status.
* @param {Object} params
* @param {Object} params.job - Contains job data including featuresValues.
* @param {Object} params.instance - Contains instance data including the payload returned in the start function.
*/
exports.getStatus = async ({ job, instance }) => {
try {
console.log('GET STATUS INSTANCE:', instance);
AWS.config.update({credentials: { accessKeyId : job.featuresValues.endpoint.aws_access_key_id, secretAccessKey: job.featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: job.featuresValues.endpoint.region});

const batch = new AWS.Batch({apiVersion: '2016-08-10'});

const data = await batch.describeJobs({jobs: [ instance.payload.customId ]}).promise();

const JOB_STATES = {
SUBMITTED: JobStatus.REQUESTED,
PENDING: JobStatus.QUEUED,
RUNNABLE: JobStatus.QUEUED,
STARTING: JobStatus.RUNNING,
RUNNING: JobStatus.RUNNING,
SUCCEEDED: JobStatus.SUCCEEDED,
FAILED: JobStatus.FAILED,
};
return Response.success(JOB_STATES[data.jobs[0].status]);
} catch (error) {
console.log(error);
return Response.error(`Failed to get status for job instance ${instance.customId}`, { error });
}
};

/**
* Logic to retrieve the external job instance logs.
* @param {Object} params
* @param {Object} params.job - Contains job data including featuresValues.
* @param {Object} params.instance - Contains instance data including the payload returned in the start function.
*/
exports.getLogs = async ({ job, instance }) => {
try {
console.log('GET LOG INSTANCE:', instance);
AWS.config.update({credentials: { accessKeyId : job.featuresValues.endpoint.aws_access_key_id, secretAccessKey: job.featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: job.featuresValues.endpoint.region});

// Get logstreamName
const batch = new AWS.Batch({apiVersion: '2016-08-10'});

const data = await batch.describeJobs({jobs: [ instance.payload.customId ]}).promise();

if (!data || !data.jobs || !data.jobs.length || !data.jobs[0].attempts || !data.jobs[0].attempts.length || !data.jobs[0].attempts[data.jobs[0].attempts.length-1].container || !data.jobs[0].attempts[data.jobs[0].attempts.length-1].container.logStreamName) {
return Response.empty('No logs availables');
}

const logStreamName=data.jobs[0].attempts[data.jobs[0].attempts.length-1].container.logStreamName;

// Gather logs
const cwl = new AWS.CloudWatchLogs({apiVersion: '2014-03-28'});
const params = {
logGroupName: '/aws/batch/job',
logStreamName: logStreamName
};
const logs = await cwl.getLogEvents(params).promise();

return Response.success(logs.events.map((item) => Log(item.message, Stream.STDOUT, new Date(item.timestamp*1000).toISOString())));
} catch (error) {
console.log(error);
return Response.error(`Failed to get log for job ${instance.payload.glueJobId}`, { error });
}
};
57 changes: 57 additions & 0 deletions technologies/job/aws-batch/job/jobForm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const AWS = require('aws-sdk');
const { Response } = require('@saagie/sdk');

/**
* Example of function to retrieve select options from an external endpoint.
* @param {Object} entity - Contains entity data including featuresValues.
* @param {Object} entity.featuresValues - Contains all the values from the entity features declared in the context.yaml
*/
exports.getJobs = async ({ featuresValues }) => {
try {
AWS.config.update({credentials: { accessKeyId : featuresValues.endpoint.aws_access_key_id, secretAccessKey: featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: featuresValues.endpoint.region});

var batch = new AWS.Batch({apiVersion: '2016-08-10'});

const data = await batch.describeJobDefinitions({ status: "ACTIVE" }).promise();

if (!data || !data.jobDefinitions || !data.jobDefinitions.length) {
return Response.empty('No job definitions availables');
}

return Response.success(
data.jobDefinitions.map(({ jobDefinitionArn, jobDefinitionName }) => ({
id: jobDefinitionArn,
label: jobDefinitionName,
})),
);
} catch (error) {
console.log(error);
return Response.error("Can't retrieve job definitions", { error });
}
};

exports.getJobQueues = async ({ featuresValues }) => {
try {
AWS.config.update({credentials: { accessKeyId : featuresValues.endpoint.aws_access_key_id, secretAccessKey: featuresValues.endpoint.aws_secret_access_key}});
AWS.config.update({region: featuresValues.endpoint.region});

var batch = new AWS.Batch({apiVersion: '2016-08-10'});

const data = await batch.describeJobQueues().promise();

if (!data || !data.jobQueues || !data.jobQueues.length) {
return Response.empty('No job definitions availables');
}

return Response.success(
data.jobQueues.map(({ jobQueueName, jobQueueArn }) => ({
id: jobQueueArn,
label: jobQueueName,
})),
);
} catch (error) {
console.log(error);
return Response.error("Can't retrieve job definitions", { error });
}
};
67 changes: 67 additions & 0 deletions technologies/job/aws-batch/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
version: v1
id: aws-batch
label: aws-batch
description: "Fully managed batch processing at any scale"
available: true
type: JOB
icon: job

contexts:
- id: job
label: Job
description: ""
recommended: true
trustLevel: experimental
endpoint:
features:
- type: TEXT
name: aws_access_key_id
label: Access Key ID
required: true
- type: PASSWORD
name: aws_secret_access_key
label: Secret Access Key
required: true
- type: TEXT
name: region
label: Region
helper: "AWS region. Example: us-east-1"
required: true
job:
features:
- type: ENDPOINT
name: endpoint
label: Endpoint
required: true
- type: SELECT
name: jobDefinition
label: Job Definition
required: true
options:
script: ./jobForm.js
function: getJobs
dependsOn:
- endpoint
- type: SELECT
name: jobQueue
label: Job Queue
required: true
options:
script: ./jobForm.js
function: getJobQueues
dependsOn:
- endpoint
instance:
actions:
onStart:
script: ./instanceActions.js
function: start
onStop:
script: ./instanceActions.js
function: stop
getStatus:
script: ./instanceActions.js
function: getStatus
getLogs:
script: ./instanceActions.js
function: getLogs
14 changes: 14 additions & 0 deletions technologies/job/aws-batch/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "aws-batch",
"version": "0.1.0",
"private": true,
"scripts": {
"start": "saagie-sdk start",
"build": "saagie-sdk build",
"new:context": "saagie-sdk init"
},
"dependencies": {
"aws-sdk": "2.646.0",
"@saagie/sdk": "0.7.0"
}
}
7 changes: 7 additions & 0 deletions technologies/job/aws-batch/technology.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: v1
id: aws-batch
label: aws-batch
description: "Fully managed batch processing at any scale"
available: true
type: JOB
icon: job
2 changes: 1 addition & 1 deletion technologies/job/aws-lambda/functions/instanceActions.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,4 @@ exports.getLogs = async ({ job, instance }) => {
console.log(error);
return Response.error(`Failed to get log for job ${job.featuresValues.functions.id}`, { error });
}
};
};
4 changes: 2 additions & 2 deletions version.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version.buildmeta=
version.buildmeta=aws-batch
version.major=1
version.minor=21
version.patch=0
version.prerelease=
version.semver=1.21.0
version.semver=1.21.0+aws-batch

0 comments on commit af15b04

Please sign in to comment.