Skip to content

Commit 58fc655

Browse files
committed
Reverted enqueue_job; added job.insert() where needed
1 parent fb96ce4 commit 58fc655

File tree

5 files changed

+96
-89
lines changed

5 files changed

+96
-89
lines changed

api/dao/containerstorage.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ def create_job_and_analysis(self, cont_name, cid, analysis, job, origin, uid):
372372
try:
373373

374374
job = Queue.enqueue_job(job, origin, perm_check_uid=uid)
375+
job.insert()
375376
except Exception as e:
376377
# NOTE #775 remove unusable analysis - until jobs have a 'hold' state
377378
self.delete_el(analysis['_id'])

api/jobs/batch.py

Lines changed: 87 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -126,108 +126,121 @@ def update(batch_id, payload):
126126
if result.modified_count != 1:
127127
raise Exception('Batch job not updated')
128128

129-
def run(batch_job):
129+
def run_preconstructed_jobs(origin, preconstructed_jobs):
130130
"""
131-
Creates jobs from proposed inputs, returns jobs enqueued.
131+
Enqueues jobs and returns list of created jobs
132132
"""
133+
jobs = []
133134

134-
proposal = batch_job.get('proposal')
135-
if not proposal:
136-
raise APIStorageException('The batch job is not formatted correctly.')
137-
preconstructed_jobs = proposal.get('preconstructed_jobs')
138-
139-
# If Running a batch from already-constructed jobs
140-
if preconstructed_jobs:
141-
origin = batch_job.get('origin')
142-
jobs = []
143-
job_ids = []
135+
for preconstructed_job in preconstructed_jobs:
136+
job = Queue.enqueue_job(preconstructed_job, origin)
137+
job.insert()
138+
jobs.append(job)
144139

145-
for preconstructed_job in preconstructed_jobs:
146-
job = Queue.enqueue_job(preconstructed_job, origin)
147-
job_id = job.id_
148-
jobs.append(job)
149-
job_ids.append(job_id)
140+
return jobs
150141

151-
# Otherwise create jobs from the containers and gear id provided in the proposal
152-
else:
153-
proposed_inputs = proposal.get('inputs', [])
154-
proposed_destinations = proposal.get('destinations', [])
142+
def run_container_jobs(batch_job, proposal):
143+
# Create jobs from the containers and gear id provided in the proposal
144+
proposed_inputs = proposal.get('inputs', [])
145+
proposed_destinations = proposal.get('destinations', [])
146+
147+
gear_id = batch_job['gear_id']
148+
gear = gears.get_gear(gear_id)
149+
gear_name = gear['gear']['name']
150+
151+
config_ = batch_job.get('config')
152+
origin = batch_job.get('origin')
153+
tags = proposal.get('tags', [])
154+
tags.append('batch')
155+
156+
if gear.get('category') == 'analysis':
157+
analysis_base = proposal.get('analysis', {})
158+
if not analysis_base.get('label'):
159+
time_now = datetime.datetime.utcnow()
160+
analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)}
161+
an_storage = AnalysisStorage()
162+
acq_storage = AcquisitionStorage()
163+
164+
jobs = []
165+
job_ids = []
166+
167+
job_defaults = {
168+
'config': config_,
169+
'gear_id': gear_id,
170+
'tags': tags,
171+
'batch': str(batch_job.get('_id')),
172+
'inputs': {}
173+
}
155174

156-
gear_id = batch_job['gear_id']
157-
gear = gears.get_gear(gear_id)
158-
gear_name = gear['gear']['name']
175+
for inputs in proposed_inputs:
159176

160-
config_ = batch_job.get('config')
161-
origin = batch_job.get('origin')
162-
tags = proposal.get('tags', [])
163-
tags.append('batch')
177+
job_map = copy.deepcopy(job_defaults)
178+
job_map['inputs'] = inputs
164179

165180
if gear.get('category') == 'analysis':
166-
analysis_base = proposal.get('analysis', {})
167-
if not analysis_base.get('label'):
168-
time_now = datetime.datetime.utcnow()
169-
analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)}
170-
an_storage = AnalysisStorage()
171-
acq_storage = AcquisitionStorage()
172181

173-
jobs = []
174-
job_ids = []
182+
analysis = copy.deepcopy(analysis_base)
175183

176-
job_defaults = {
177-
'config': config_,
178-
'gear_id': gear_id,
179-
'tags': tags,
180-
'batch': str(batch_job.get('_id')),
181-
'inputs': {}
182-
}
184+
# Create analysis
185+
acquisition_id = inputs.values()[0].get('id')
186+
session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session')
187+
result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None)
188+
job = result.get('job')
189+
job_id = result.get('job_id')
183190

184-
for inputs in proposed_inputs:
185-
186-
job_map = copy.deepcopy(job_defaults)
187-
job_map['inputs'] = inputs
191+
else:
188192

189-
if gear.get('category') == 'analysis':
193+
job = Queue.enqueue_job(job_map, origin)
194+
job.insert()
195+
job_id = job.id_
190196

191-
analysis = copy.deepcopy(analysis_base)
192197

193-
# Create analysis
194-
acquisition_id = inputs.values()[0].get('id')
195-
session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session')
196-
result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None)
197-
job = result.get('job')
198-
job_id = result.get('job_id')
198+
jobs.append(job)
199+
job_ids.append(job_id)
199200

200-
else:
201+
for dest in proposed_destinations:
201202

202-
job = Queue.enqueue_job(job_map, origin)
203-
job_id = job.id_
203+
job_map = copy.deepcopy(job_defaults)
204+
job_map['destination'] = dest
204205

206+
if gear.get('category') == 'analysis':
205207

206-
jobs.append(job)
207-
job_ids.append(job_id)
208+
analysis = copy.deepcopy(analysis_base)
208209

209-
for dest in proposed_destinations:
210+
# Create analysis
211+
result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None)
212+
job = result.get('job')
213+
job_id = result.get('job_id')
210214

211-
job_map = copy.deepcopy(job_defaults)
212-
job_map['destination'] = dest
215+
else:
213216

214-
if gear.get('category') == 'analysis':
217+
job = Queue.enqueue_job(job_map, origin)
218+
job.insert()
219+
job_id = job.id_
215220

216-
analysis = copy.deepcopy(analysis_base)
217221

218-
# Create analysis
219-
result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None)
220-
job = result.get('job')
221-
job_id = result.get('job_id')
222+
jobs.append(job)
223+
job_ids.append(job_id)
222224

223-
else:
225+
return jobs, job_ids
224226

225-
job = Queue.enqueue_job(job_map, origin)
226-
job_id = job.id_
227+
def run(batch_job):
228+
"""
229+
Creates jobs from proposed inputs, returns jobs enqueued.
230+
"""
227231

232+
proposal = batch_job.get('proposal')
233+
if not proposal:
234+
raise APIStorageException('The batch job is not formatted correctly.')
235+
preconstructed_jobs = proposal.get('preconstructed_jobs')
228236

229-
jobs.append(job)
230-
job_ids.append(job_id)
237+
# If Running a batch from already-constructed jobs
238+
if preconstructed_jobs:
239+
origin = batch_job.get('origin')
240+
jobs = run_preconstructed_jobs(origin, preconstructed_jobs)
241+
job_ids = [job.id_ for job in jobs]
242+
else:
243+
jobs, job_ids = run_container_jobs(batch_job, proposal)
231244

232245
update(batch_job['_id'], {'state': 'running', 'jobs': job_ids})
233246
return jobs

api/jobs/handlers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ def add(self):
280280
uid = self.uid
281281

282282
job = Queue.enqueue_job(payload,self.origin, perm_check_uid=uid)
283+
job.insert()
283284

284285
return { '_id': job.id_ }
285286

@@ -713,7 +714,7 @@ def post_with_jobs(self):
713714

714715
for job_number, job_ in enumerate(jobs_):
715716
try:
716-
Queue.validate_job(job_, self.origin, create_job=False, perm_check_uid=uid)
717+
Queue.enqueue_job(job_, self.origin, perm_check_uid=uid)
717718
except InputValidationException as e:
718719
raise InputValidationException("Job {}: {}".format(job_number, str(e)))
719720

api/jobs/queue.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ def retry(job, force=False):
128128

129129
return new_id
130130

131+
131132
@staticmethod
132-
def validate_job(job_map, origin, create_job=False, perm_check_uid=None):
133+
def enqueue_job(job_map, origin, perm_check_uid=None):
133134
"""
134-
Using a payload for a proposed job, creates and returns(if create_job is True) (but does not insert)
135+
Using a payload for a proposed job, creates and returns (but does not insert)
135136
a Job object. This preperation includes:
136137
- confirms gear exists
137138
- validates config against gear manifest
@@ -250,18 +251,8 @@ def validate_job(job_map, origin, create_job=False, perm_check_uid=None):
250251

251252
if gear_name not in tags:
252253
tags.append(gear_name)
253-
if create_job:
254-
job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch)
255-
return job
256-
return True
257254

258-
@staticmethod
259-
def enqueue_job(job_map, origin, perm_check_uid=None):
260-
"""
261-
Validates, Creates, Inserts, and Returns job
262-
"""
263-
job = Queue.validate_job(job_map, origin, create_job=True, perm_check_uid=perm_check_uid)
264-
job.insert()
255+
job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch)
265256
return job
266257

267258
@staticmethod

api/jobs/rules.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ def create_jobs(db, container_before, container_after, container_type):
246246

247247
for pj in potential_jobs:
248248
job_map = pj['job'].map()
249-
Queue.enqueue_job(job_map, origin)
249+
job = Queue.enqueue_job(job_map, origin)
250+
job.insert()
250251

251252
spawned_jobs.append(pj['rule']['alg'])
252253

0 commit comments

Comments
 (0)