Skip to content

Commit 456fa3e

Browse files
committed
Create batch with premade jobs
1 parent d850b0f commit 456fa3e

File tree

4 files changed

+137
-64
lines changed

4 files changed

+137
-64
lines changed

api/api.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,13 @@ def prefix(path, routes):
157157

158158
# Batch jobs
159159

160-
route('/batch', BatchHandler, h='get_all', m=['GET']),
161-
route('/batch', BatchHandler, m=['POST']),
160+
route('/batch', BatchHandler, h='get_all', m=['GET']),
161+
route('/batch', BatchHandler, m=['POST']),
162162
prefix('/batch', [
163-
route('/<:[^/]+>', BatchHandler, h='get', m=['GET']),
164-
route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']),
165-
route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']),
163+
route('/jobs', BatchHandler, h='post_with_jobs', m=['POST']),
164+
route('/<:[^/]+>', BatchHandler, h='get', m=['GET']),
165+
route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']),
166+
route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']),
166167
]),
167168

168169

api/jobs/batch.py

Lines changed: 75 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -134,84 +134,100 @@ def run(batch_job):
134134
proposal = batch_job.get('proposal')
135135
if not proposal:
136136
raise APIStorageException('The batch job is not formatted correctly.')
137-
proposed_inputs = proposal.get('inputs', [])
138-
proposed_destinations = proposal.get('destinations', [])
139-
140-
gear_id = batch_job['gear_id']
141-
gear = gears.get_gear(gear_id)
142-
gear_name = gear['gear']['name']
143-
144-
config_ = batch_job.get('config')
145-
origin = batch_job.get('origin')
146-
tags = proposal.get('tags', [])
147-
tags.append('batch')
148-
149-
if gear.get('category') == 'analysis':
150-
analysis_base = proposal.get('analysis', {})
151-
if not analysis_base.get('label'):
152-
time_now = datetime.datetime.utcnow()
153-
analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)}
154-
an_storage = AnalysisStorage()
155-
acq_storage = AcquisitionStorage()
156-
157-
jobs = []
158-
job_ids = []
159-
160-
job_defaults = {
161-
'config': config_,
162-
'gear_id': gear_id,
163-
'tags': tags,
164-
'batch': str(batch_job.get('_id')),
165-
'inputs': {}
166-
}
137+
preconstructed_jobs = proposal.get('preconstructed_jobs')
138+
139+
# If Running a batch from already-constructed jobs
140+
if preconstructed_jobs:
141+
origin = batch_job.get('origin')
142+
jobs = []
143+
job_ids = []
144+
145+
for preconstructed_job in preconstructed_jobs:
146+
job = Queue.enqueue_job(preconstructed_job, origin)
147+
job_id = job.id_
148+
jobs.append(job)
149+
job_ids.append(job_id)
167150

168-
for inputs in proposed_inputs:
151+
# Otherwise create jobs from the containers and gear id provided in the proposal
152+
else:
153+
proposed_inputs = proposal.get('inputs', [])
154+
proposed_destinations = proposal.get('destinations', [])
155+
156+
gear_id = batch_job['gear_id']
157+
gear = gears.get_gear(gear_id)
158+
gear_name = gear['gear']['name']
169159

170-
job_map = copy.deepcopy(job_defaults)
171-
job_map['inputs'] = inputs
160+
config_ = batch_job.get('config')
161+
origin = batch_job.get('origin')
162+
tags = proposal.get('tags', [])
163+
tags.append('batch')
172164

173165
if gear.get('category') == 'analysis':
166+
analysis_base = proposal.get('analysis', {})
167+
if not analysis_base.get('label'):
168+
time_now = datetime.datetime.utcnow()
169+
analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)}
170+
an_storage = AnalysisStorage()
171+
acq_storage = AcquisitionStorage()
174172

175-
analysis = copy.deepcopy(analysis_base)
173+
jobs = []
174+
job_ids = []
176175

177-
# Create analysis
178-
acquisition_id = inputs.values()[0].get('id')
179-
session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session')
180-
result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None)
181-
job = result.get('job')
182-
job_id = result.get('job_id')
176+
job_defaults = {
177+
'config': config_,
178+
'gear_id': gear_id,
179+
'tags': tags,
180+
'batch': str(batch_job.get('_id')),
181+
'inputs': {}
182+
}
183183

184-
else:
184+
for inputs in proposed_inputs:
185185

186-
job = Queue.enqueue_job(job_map, origin)
187-
job_id = job.id_
186+
job_map = copy.deepcopy(job_defaults)
187+
job_map['inputs'] = inputs
188188

189+
if gear.get('category') == 'analysis':
189190

190-
jobs.append(job)
191-
job_ids.append(job_id)
191+
analysis = copy.deepcopy(analysis_base)
192192

193-
for dest in proposed_destinations:
193+
# Create analysis
194+
acquisition_id = inputs.values()[0].get('id')
195+
session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session')
196+
result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None)
197+
job = result.get('job')
198+
job_id = result.get('job_id')
194199

195-
job_map = copy.deepcopy(job_defaults)
196-
job_map['destination'] = dest
200+
else:
197201

198-
if gear.get('category') == 'analysis':
202+
job = Queue.enqueue_job(job_map, origin)
203+
job_id = job.id_
199204

200-
analysis = copy.deepcopy(analysis_base)
201205

202-
# Create analysis
203-
result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None)
204-
job = result.get('job')
205-
job_id = result.get('job_id')
206+
jobs.append(job)
207+
job_ids.append(job_id)
206208

207-
else:
209+
for dest in proposed_destinations:
208210

209-
job = Queue.enqueue_job(job_map, origin)
210-
job_id = job.id_
211+
job_map = copy.deepcopy(job_defaults)
212+
job_map['destination'] = dest
213+
214+
if gear.get('category') == 'analysis':
215+
216+
analysis = copy.deepcopy(analysis_base)
211217

218+
# Create analysis
219+
result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None)
220+
job = result.get('job')
221+
job_id = result.get('job_id')
212222

213-
jobs.append(job)
214-
job_ids.append(job_id)
223+
else:
224+
225+
job = Queue.enqueue_job(job_map, origin)
226+
job_id = job.id_
227+
228+
229+
jobs.append(job)
230+
job_ids.append(job_id)
215231

216232
update(batch_job['_id'], {'state': 'running', 'jobs': job_ids})
217233
return jobs

api/jobs/handlers.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,27 @@ def post(self):
699699

700700
return batch_proposal
701701

702+
@require_login
703+
def post_with_jobs(self):
704+
"""
705+
Creates a batch from preconstructed jobs
706+
"""
707+
payload = self.request.json
708+
jobs_ = payload.get('jobs', [])
709+
710+
batch_proposal = {
711+
'proposal': {
712+
'preconstructed_jobs': jobs_
713+
},
714+
'origin': self.origin,
715+
'state': 'pending',
716+
'_id': bson.ObjectId()
717+
}
718+
batch.insert(batch_proposal)
719+
batch_proposal['preconstructed_jobs'] = batch_proposal.pop('proposal')
720+
721+
return batch_proposal
722+
702723
@require_login
703724
def run(self, _id):
704725
"""

tests/integration_tests/python/test_batch.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,23 @@ def test_batch(data_builder, as_user, as_admin, as_root):
8484
assert r.ok
8585
analysis_batch_id = r.json()['_id']
8686

87+
# create a batch with preconstructed jobs
88+
r = as_admin.post('/batch/jobs', json={
89+
'jobs': [
90+
{
91+
'gear_id': gear,
92+
'config': { 'two-digit multiple of ten': 20 },
93+
'destination': {
94+
'type': 'acquisition',
95+
'id': acquisition
96+
},
97+
'tags': [ 'test-tag' ]
98+
}
99+
]
100+
})
101+
assert r.ok
102+
job_batch_id = r.json()['_id']
103+
87104
# try to get non-existent batch
88105
r = as_admin.get('/batch/000000000000000000000000')
89106
assert r.status_code == 404
@@ -97,11 +114,21 @@ def test_batch(data_builder, as_user, as_admin, as_root):
97114
assert r.ok
98115
assert r.json()['state'] == 'pending'
99116

117+
# get batch from jobs
118+
r = as_admin.get('/batch/' + job_batch_id)
119+
assert r.ok
120+
assert r.json()['state'] == 'pending'
121+
100122
# get batch w/ ?jobs=true
101123
r = as_admin.get('/batch/' + batch_id, params={'jobs': 'true'})
102124
assert r.ok
103125
assert 'jobs' in r.json()
104126

127+
# get job batch w/ ?jobs=true
128+
r = as_admin.get('/batch/' + job_batch_id, params={'jobs': 'true'})
129+
assert r.ok
130+
assert 'jobs' in r.json()
131+
105132
# try to cancel non-running batch
106133
r = as_admin.post('/batch/' + batch_id + '/cancel')
107134
assert r.status_code == 400
@@ -134,6 +161,14 @@ def test_batch(data_builder, as_user, as_admin, as_root):
134161
r = as_admin.get('/batch/' + analysis_batch_id)
135162
assert r.json()['state'] == 'running'
136163

164+
# run job batch
165+
r = as_admin.post('/batch/' + job_batch_id + '/run')
166+
print r.json()
167+
assert r.ok
168+
169+
# test batch.state after calling run
170+
r = as_admin.get('/batch/' + job_batch_id)
171+
assert r.json()['state'] == 'running'
137172

138173
# Test batch complete
139174
# create a batch w/ acquisition target and target_context

0 commit comments

Comments
 (0)