Skip to content

Commit 76d29bd

Browse files
committed
[IMP] queue_job: handle same uuid in duplicated db
1 parent 6792545 commit 76d29bd

File tree

1 file changed

+79
-14
lines changed

1 file changed

+79
-14
lines changed

queue_job/jobrunner/channels.py

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,62 @@ class ChannelManager:
799799
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done')
800800
>>> pp(list(cm.get_jobs_to_run(now=105)))
801801
[]
802+
803+
Test handling of duplicate job UUIDs across different databases.
804+
This can happen when a database is duplicated/cloned and both databases
805+
notify the same job runner with the same job UUID.
806+
807+
>>> cm = ChannelManager()
808+
>>> cm.simple_configure('root:4,D:2')
809+
810+
Create a job in db1
811+
812+
>>> cm.notify('db1', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending')
813+
>>> job = cm._jobs_by_uuid.get('dup-uuid-1')
814+
>>> job.db_name
815+
'db1'
816+
>>> len(cm._jobs_by_uuid)
817+
1
818+
819+
Same UUID notified from a different database (db2) - simulates cloned DB.
820+
The job should be recreated for the new database.
821+
822+
>>> cm.notify('db2', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending')
823+
>>> job = cm._jobs_by_uuid.get('dup-uuid-1')
824+
>>> job.db_name
825+
'db2'
826+
>>> len(cm._jobs_by_uuid)
827+
1
828+
829+
Verify job can be run from the new database
830+
831+
>>> pp(list(cm.get_jobs_to_run(now=100)))
832+
[<ChannelJob dup-uuid-1>]
833+
834+
Test multiple database switches for the same UUID
835+
836+
>>> cm.notify('db3', 'D', 'dup-uuid-1', 1, 0, 10, None, 'pending')
837+
>>> job = cm._jobs_by_uuid.get('dup-uuid-1')
838+
>>> job.db_name
839+
'db3'
840+
841+
Create another job with different UUID in db1 and verify it coexists
842+
843+
>>> cm.notify('db1', 'D', 'other-uuid', 2, 0, 10, None, 'pending')
844+
>>> len(cm._jobs_by_uuid)
845+
2
846+
>>> cm._jobs_by_uuid.get('dup-uuid-1').db_name
847+
'db3'
848+
>>> cm._jobs_by_uuid.get('other-uuid').db_name
849+
'db1'
850+
851+
Both jobs should be available to run
852+
853+
>>> jobs = list(cm.get_jobs_to_run(now=100))
854+
>>> len(jobs)
855+
2
856+
>>> sorted([j.uuid for j in jobs])
857+
['dup-uuid-1', 'other-uuid']
802858
"""
803859

804860
def __init__(self):
@@ -1029,22 +1085,31 @@ def notify(
10291085
channel = self.get_channel_by_name(channel_name, parent_fallback=True)
10301086
job = self._jobs_by_uuid.get(uuid)
10311087
if job:
1032-
# db_name is invariant
1033-
assert job.db_name == db_name
1034-
# date_created is invariant
1035-
assert job.date_created == date_created
1036-
# if one of the job properties that influence
1037-
# scheduling order has changed, we remove the job
1038-
# from the queues and create a new job object
1039-
if (
1040-
seq != job.seq
1041-
or priority != job.priority
1042-
or eta != job.eta
1043-
or channel != job.channel
1044-
):
1045-
_logger.debug("job %s properties changed, rescheduling it", uuid)
1088+
# if db_name differs, this is likely a cloned database
1089+
if job.db_name != db_name:
1090+
_logger.warning(
1091+
"job %s exists in multiple databases (%s and %s). recreating it",
1092+
uuid,
1093+
job.db_name,
1094+
db_name,
1095+
)
10461096
self.remove_job(uuid)
10471097
job = None
1098+
else:
1099+
# date_created is invariant
1100+
assert job.date_created == date_created
1101+
# if one of the job properties that influence
1102+
# scheduling order has changed, we remove the job
1103+
# from the queues and create a new job object
1104+
if (
1105+
seq != job.seq
1106+
or priority != job.priority
1107+
or eta != job.eta
1108+
or channel != job.channel
1109+
):
1110+
_logger.debug("job %s properties changed, rescheduling it", uuid)
1111+
self.remove_job(uuid)
1112+
job = None
10481113
if not job:
10491114
job = ChannelJob(db_name, channel, uuid, seq, date_created, priority, eta)
10501115
self._jobs_by_uuid[uuid] = job

0 commit comments

Comments
 (0)