Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more APScheduler job-types #59

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
12 changes: 6 additions & 6 deletions ndscheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
"""

import importlib
import logging
import os
import sys
import logging

from ndscheduler import default_settings

# import sys
logger = logging.getLogger()
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
# ch = logging.StreamHandler(sys.stdout)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# ch.setFormatter(formatter)
# logger.addHandler(ch)


ENVIRONMENT_VARIABLE = 'NDSCHEDULER_SETTINGS_MODULE'
Expand Down
59 changes: 56 additions & 3 deletions ndscheduler/corescheduler/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json

from ndscheduler import settings
from apscheduler.schedulers import tornado as apscheduler_tornado

from ndscheduler.corescheduler import constants
Expand Down Expand Up @@ -144,9 +145,61 @@ def add_scheduler_job(self, job_class_string, name, pub_args=None,
datastore.db_config, datastore.table_names]
arguments.extend(pub_args)

self.add_job(self.run_job,
'cron', month=month, day=day, day_of_week=day_of_week, hour=hour,
minute=minute, args=arguments, kwargs=kwargs, name=name, id=job_id)
self.add_job(
func = self.run_job, # noqa
trigger = 'cron', # noqa
month = month, # noqa
day = day, # noqa
day_of_week = day_of_week, # noqa
hour = hour, # noqa
minute = minute, # noqa
args = arguments, # noqa
kwargs = kwargs, # noqa
name = name, # noqa
id = job_id # noqa
)
return job_id

def add_trigger_scheduler_job(self, job_class_string, name, pub_args, trigger,
**kwargs):
"""Add a job. Job infomation will be persistent in postgres.

This is a NON-BLOCKING operation, as internally, apscheduler calls wakeup()
that is async.

:param str job_class_string: String for job class, e.g., myscheduler.jobs.a_job.NiceJob
:param str name: String for job name, e.g., Check Melissa job.
:param str pub_args: List for arguments passed to publish method of a task.
:param str month: String for month cron string, e.g., */10
:param str day_of_week: String for day of week cron string, e.g., 1-6
:param str day: String for day cron string, e.g., */1
:param str hour: String for hour cron string, e.g., */2
:param str minute: String for minute cron string, e.g., */3
:param dict kwargs: Other keyword arguments passed to run_job function.
:return: String of job id, e.g., 6bca19736d374ef2b3df23eb278b512e
:rtype: str

Returns:
String of job id, e.g., 6bca19736d374ef2b3df23eb278b512e
"""
if not pub_args:
pub_args = []

job_id = utils.generate_uuid()

datastore = self._lookup_jobstore('default')
arguments = [job_class_string, job_id, self.datastore_class_path,
datastore.db_config, datastore.table_names]
arguments.extend(pub_args)

self.add_job(
func = self.run_job, # noqa
trigger = trigger, # noqa
args = arguments, # noqa
kwargs = kwargs, # noqa
name = name, # noqa
id = job_id # noqa
)
return job_id

def modify_scheduler_job(self, job_id, **kwargs):
Expand Down
14 changes: 13 additions & 1 deletion ndscheduler/corescheduler/datastore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import dateutil.tz
import dateutil.parser
import apscheduler.triggers.cron
import apscheduler.triggers.interval

from apscheduler.jobstores import sqlalchemy as sched_sqlalchemy
from sqlalchemy import desc, select, MetaData

Expand Down Expand Up @@ -125,7 +128,16 @@ def _build_execution(self, row):
'name': job.name,
'task_name': utils.get_job_name(job),
'pub_args': utils.get_job_args(job)}
return_json['job'].update(utils.get_cron_strings(job))

if isinstance(job.trigger, apscheduler.triggers.cron.CronTrigger):
return_json.update(utils.get_cron_strings(job))
return_json["trigger_type"] = "cron"
elif isinstance(job.trigger, apscheduler.triggers.interval.IntervalTrigger):
return_json["interval"] = job.trigger.interval.total_seconds()
return_json["trigger_type"] = "interval"
else:
return_json["trigger_type"] = "unknown"

return return_json

def get_time_isoformat_from_db(self, time_object):
Expand Down
12 changes: 10 additions & 2 deletions ndscheduler/corescheduler/datastore/providers/postgres.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Represents Postgres datastore."""

import sys
from ndscheduler.corescheduler.datastore import base


Expand All @@ -18,7 +18,15 @@ def get_db_url(self):
}
:return: string db url
"""
return 'postgresql://%s:%s@%s:%d/%s?sslmode=%s' % (

# Work under Pypy, which doesn't have the default psycopg2
if '__pypy__' in sys.builtin_module_names:
db_wrapper = 'postgresql+psycopg2cffi'
else:
db_wrapper = 'postgresql'

return '%s://%s:%s@%s:%d/%s?sslmode=%s' % (
db_wrapper,
self.db_config['user'],
self.db_config['password'],
self.db_config['hostname'],
Expand Down
26 changes: 26 additions & 0 deletions ndscheduler/corescheduler/scheduler_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,32 @@ def add_job(self, job_class_string, name, pub_args=None, month=None,
return self.sched.add_scheduler_job(job_class_string, name, pub_args, month, day_of_week,
day, hour, minute, **kwargs)

def add_trigger_job(self, job_class_string, name, pub_args=None,
trigger=None,
**kwargs):
"""Add a job. Job infomation will be persistent in postgres.

This is a NON-BLOCKING operation, as internally, apscheduler calls wakeup()
that is async.

:param str job_class_string: String for job class, e.g., myscheduler.jobs.a_job.NiceJob
:param str name: String for job name, e.g., Check Melissa job.
:param str pub_args: List for arguments passed to publish method of a task.
:param str month: String for month cron string, e.g., */10
:param str day_of_week: String for day of week cron string, e.g., 1-6
:param str day: String for day cron string, e.g., */1
:param str hour: String for hour cron string, e.g., */2
:param str minute: String for minute cron string, e.g., */3
:param dict kwargs: Other keyword arguments passed to run_job function.
:return: String of job id, e.g., 6bca19736d374ef2b3df23eb278b512e
:rtype: str
"""
return self.sched.add_trigger_scheduler_job(job_class_string,
name,
pub_args,
trigger,
**kwargs)

def pause_job(self, job_id):
"""Pauses the schedule of a job.
This is a NON-BLOCKING operation, as internally, apscheduler calls wakeup()
Expand Down
8 changes: 0 additions & 8 deletions ndscheduler/default_settings.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""Default settings."""

import logging
import os


#
# Development mode or production mode
# If DEBUG is True, then auto-reload is enabled, i.e., when code is modified, server will be
Expand Down Expand Up @@ -92,11 +90,5 @@
# Please see ndscheduler/core/scheduler/base.py
SCHEDULER_CLASS = 'ndscheduler.corescheduler.core.base.BaseScheduler'

#
# Set logging level
#
logging.getLogger().setLevel(logging.INFO)


# Packages that contains job classes, e.g., simple_scheduler.jobs
JOB_CLASS_PACKAGES = []
17 changes: 14 additions & 3 deletions ndscheduler/server/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import tornado.gen
import tornado.web

from ndscheduler.corescheduler import constants
from ndscheduler.corescheduler import utils
import apscheduler.triggers.cron
import apscheduler.triggers.interval

from ndscheduler.server.handlers import base
from ndscheduler.corescheduler import utils
from ndscheduler.corescheduler import constants


class Handler(base.BaseHandler):
Expand Down Expand Up @@ -42,7 +45,15 @@ def _build_job_dict(self, job):
'job_class_string': utils.get_job_name(job),
'pub_args': utils.get_job_args(job)}

return_dict.update(utils.get_cron_strings(job))
if isinstance(job.trigger, apscheduler.triggers.cron.CronTrigger):
return_dict.update(utils.get_cron_strings(job))
return_dict["trigger_type"] = "cron"
elif isinstance(job.trigger, apscheduler.triggers.interval.IntervalTrigger):
return_dict["interval"] = job.trigger.interval.total_seconds()
return_dict["trigger_type"] = "interval"
else:
return_dict["trigger_type"] = "unknown"

return return_dict

@tornado.concurrent.run_on_executor
Expand Down
35 changes: 32 additions & 3 deletions ndscheduler/static/js/models/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@ require.config({
}
});

function secondsToStr( seconds_in ) {
let temp = seconds_in;
const years = Math.floor( temp / 31536000 ),
days = Math.floor( ( temp %= 31536000 ) / 86400 ),
hours = Math.floor( ( temp %= 86400 ) / 3600 ),
minutes = Math.floor( ( temp %= 3600 ) / 60 ),
seconds = temp % 60;

if ( days || hours || seconds || minutes ) {
return ( years ? years + "y " : "" ) +
( days ? days + "d " : "" ) +
( hours ? hours + "h " : "" ) +
( minutes ? minutes + "m " : "" ) +
Number.parseFloat( seconds ).toFixed( 2 ) + "s";
}

return "< 1s";
}


define(['backbone', 'vendor/moment-timezone-with-data'], function(backbone, moment) {
'use strict';

Expand All @@ -31,9 +51,18 @@ define(['backbone', 'vendor/moment-timezone-with-data'], function(backbone, mome
* @return {string} schedule string for this job.
*/
getScheduleString: function() {
return 'minute: ' + this.get('minute') + ', hour: ' + this.get('hour') +
', day: ' + this.get('day') + ', month: ' + this.get('month') +
', day of week: ' + this.get('day_of_week');
var trig = this.get('trigger_type');

if (trig == 'cron')
return 'Cron: minute: ' + this.get('minute') + ', hour: ' + this.get('hour') +
', day: ' + this.get('day') + ', month: ' + this.get('month') +
', day of week: ' + this.get('day_of_week');
else if (trig == 'interval')
return 'Interval: ' + secondsToStr(this.get('interval'));
else
return 'Unknown trigger type!';


},

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
data-target="#edit-job-modal"
data-id="<%= job_id %>"
data-job-name="<%= job_name %>"
data-job-trigtype="cron"
data-job-month="<%= job_month %>"
data-job-day-of-week="<%= job_day_of_week %>"
data-job-day="<%= job_day %>"
Expand Down
14 changes: 14 additions & 0 deletions ndscheduler/static/js/templates/job-row-name-interval.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<a href="/#jobs/<%= job_id %>"><i class="fa fa-link fa-lg"></i></a>

<a href="#" title="<%= job_class %>(<%= job_pubargs %>)"
data-toggle="modal"
data-target="#edit-job-modal"
data-id="<%= job_id %>"
data-job-name="<%= job_name %>"
data-job-trigtype="interval"
data-job-interval="<%= job_interval %>"
data-job-task="<%= job_class %>"
data-job-active="<%= job_active %>"
data-job-pubargs='<%= job_pubargs %>'>
<%= job_name %>
</a>
13 changes: 13 additions & 0 deletions ndscheduler/static/js/templates/job-row-name-unknown.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<a href="/#jobs/<%= job_id %>"><i class="fa fa-link fa-lg"></i></a>

<a href="#" title="<%= job_class %>(<%= job_pubargs %>)"
data-toggle="modal"
data-target="#edit-job-modal"
data-id="<%= job_id %>"
data-job-name="<%= job_name %>"
data-job-trigtype="unknown"
data-job-task="<%= job_class %>"
data-job-active="<%= job_active %>"
data-job-pubargs='<%= job_pubargs %>'>
<%= job_name %>
</a>
1 change: 1 addition & 0 deletions ndscheduler/static/js/views/executions/table-view.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ define(['utils',
this.table = $('#executions-table').dataTable({
// Sorted by last updated time
'order': [[3, 'desc']],
"iDisplayLength": 50,
// Disable sorting on result column
"columnDefs": [
{ "orderable": false, "className": "table-result-column", "targets": 5 }
Expand Down
Loading