diff --git a/ndscheduler/static/index.html b/ndscheduler/static/index.html
index bc4bd2e..ee6de7b 100644
--- a/ndscheduler/static/index.html
+++ b/ndscheduler/static/index.html
@@ -83,6 +83,7 @@
+
@@ -111,6 +112,7 @@
+
diff --git a/simple_scheduler/jobs/ecs_job.py b/simple_scheduler/jobs/ecs_job.py
new file mode 100644
index 0000000..db3d17b
--- /dev/null
+++ b/simple_scheduler/jobs/ecs_job.py
@@ -0,0 +1,142 @@
+"""A sample job that prints string."""
+
+import time
+import logging
+import boto3
+from ndscheduler import job
+
+logger = logging.getLogger(__name__)
+
+client = boto3.client('ecs')
+
+POLL_TIME = 2
+
+
+class ECSFailureException(BaseException):
+ pass
+
+
+class ECSResponseException(BaseException):
+ pass
+
+
+class ECSJob(job.JobBase):
+ retry_count = 3
+
+ @classmethod
+ def meta_info(cls):
+ return {
+ 'job_class_string': '%s.%s' % (cls.__module__, cls.__name__),
+ 'notes': 'This will execute a AWS ECS RunTask!',
+ 'arguments': [
+ {'type': 'string', 'description': 'ECS Cluster to run on'},
+ {'type': 'string', 'description': 'task_def_arn'},
+ {'type': 'array[dict]', 'description': 'task_def'},
+ {'type': 'string', 'description': 'Directly corresponds to the '
+ '`overrides` parameter of runTask API'}
+ ],
+ 'example_arguments': '["ClusterName", None, "arn:aws:ecs:'
+ '::task-definition/:", None]'
+ }
+
+ def _get_task_statuses(self, task_ids):
+ """
+ Retrieve task statuses from ECS API
+
+ Returns list of {RUNNING|PENDING|STOPPED} for each id in task_ids
+ """
+ logger.debug('Get status of task_ids: {}'.format(task_ids))
+ response = client.describe_tasks(tasks=task_ids, cluster=self.cluster)
+
+ # Error checking
+ if response['failures']:
+ raise ECSFailureException('There were some failures:\n{0}'.format(
+ response['failures']))
+ status_code = response['ResponseMetadata']['HTTPStatusCode']
+ if status_code != 200:
+ msg = 'Task status request received status code {0}:\n{1}'
+ raise ECSResponseException(msg.format(status_code, response))
+
+ return [t['lastStatus'] for t in response['tasks']]
+
+ def get_task_statuses(self, task_ids):
+ retries = 0
+ while True:
+ try:
+ return self._get_task_statuses(task_ids)
+ except ECSResponseException as e:
+ if retries <= self.retry_count:
+ msg = 'Response failed retry attempt {}/{}'.format(retries, self.retry_count)
+ logger.warning(msg)
+ time.sleep(POLL_TIME)
+ else:
+ raise
+
+ def _track_tasks(self, task_ids):
+ """Poll task status until STOPPED"""
+ while True:
+ statuses = self.get_task_statuses(task_ids)
+
+ if all([status == 'STOPPED' for status in statuses]):
+ logger.info('ECS tasks {0} STOPPED'.format(','.join(task_ids)))
+ break
+ time.sleep(POLL_TIME)
+ logger.debug('ECS task status for tasks {0}: {1}'.format(
+ ','.join(task_ids), statuses))
+
+ @property
+ def cluster(self):
+ if not hasattr(self, '_cluster'):
+ logger.warning('Cluster not set!')
+ return None
+ return self._cluster
+
+ @cluster.setter
+ def cluster(self, cluster):
+ self._cluster = cluster
+ logger.debug('Set Cluster: {}'.format(cluster))
+
+ def run(self, cluster, task_def_arn=None, task_def=None, command=None, *args, **kwargs):
+ self.cluster = cluster
+ if (not task_def and not task_def_arn) or \
+ (task_def and task_def_arn):
+ raise ValueError(('Either (but not both) a task_def (dict) or'
+ 'task_def_arn (string) must be assigned'))
+ if not task_def_arn:
+ # Register the task and get assigned taskDefinition ID (arn)
+ response = client.register_task_definition(**task_def)
+ task_def_arn = response['taskDefinition']['taskDefinitionArn']
+ logger.debug('Task Definition ARN: {}'.format(task_def_arn))
+
+ # Submit the task to AWS ECS and get assigned task ID
+ # (list containing 1 string)
+ if command:
+ overrides = {'containerOverrides': command}
+ else:
+ overrides = {}
+ response = client.run_task(taskDefinition=task_def_arn,
+ overrides=overrides, cluster=self.cluster)
+ _task_ids = [task['taskArn'] for task in response['tasks']]
+
+ # Wait on task completion
+ self._track_tasks(_task_ids)
+
+
+if __name__ == "__main__":
+ # You can easily test this job here
+ job = ECSJob.create_test_instance()
+ job.run('ClusterName', "arn:aws:ecs:::task-"
+ "definition/:")
+ job.run('DataETLCluster', None, {
+ 'family': 'hello-world',
+ 'volumes': [],
+ 'containerDefinitions': [
+ {
+ 'memory': 1,
+ 'essential': True,
+ 'name': 'hello-world',
+ 'image': 'ubuntu',
+ 'command': ['/bin/echo', 'hello world']
+ }
+ ]
+ })
diff --git a/simple_scheduler/requirements.txt b/simple_scheduler/requirements.txt
index e4b1e17..89e4c40 100644
--- a/simple_scheduler/requirements.txt
+++ b/simple_scheduler/requirements.txt
@@ -10,3 +10,7 @@ requests == 2.9.1
# Uncomment this if you want to use MySQL as datastore
#
# pymysql == 0.6.7
+
+# Uncomment this if you want to use ECS Job
+#
+boto3