Skip to content

Commit

Permalink
feat: adding possibility to bulk insert in JobLoggingDB
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Nov 1, 2024
1 parent d75e45a commit aee7bb4
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
19 changes: 17 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def addLoggingRecord(
_date = TimeUtilities.fromString(date)
elif isinstance(date, datetime.datetime):
_date = date
elif isinstance(date, list):
_date = date
else:
self.log.error("Incorrect date for the logging record")
_date = datetime.datetime.utcnow()
Expand All @@ -78,10 +80,23 @@ def addLoggingRecord(

cmd = (
"INSERT INTO LoggingInfo (JobId, Status, MinorStatus, ApplicationStatus, "
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES (%d,'%s','%s','%s','%s',%f,'%s')"
% (int(jobID), status, minorStatus, applicationStatus[:255], str(_date), epoc, source[:32])
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES "
)

# if JobID is a list, make a bulk insert
if isinstance(jobID, list):
for i in enumerate(jobID):
cmd = cmd + "(%d,'%s','%s','%s','%s',%f,'%s')," % (
int(jobID[i]),
status[i],
minorStatus[i],
applicationStatus[:255],
str(_date[i]),
epoc,
source[:32],
)
cmd = cmd[:-1]

return self._update(cmd)

#############################################################################
Expand Down
23 changes: 19 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def export_submitJob(self, jobDesc):
jobDescList = [jobDesc]

jobIDList = []
statusList = []
minorStatusList = []
timeStampList = []

if parametricJob:
initialStatus = JobStatus.SUBMITTING
Expand All @@ -201,11 +204,23 @@ def export_submitJob(self, jobDesc):
jobID = result["JobID"]
self.log.info(f'Job added to the JobDB", "{jobID} for {self.ownerDN}/{self.ownerGroup}')

self.jobLoggingDB.addLoggingRecord(
jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager"
)

jobIDList.append(jobID)
statusList.append(result["Status"])
minorStatusList.append(result["MinorStatus"])
timeStampList.append(result["TimeStamp"])

# insert in logging DB

# if jobIDs are all different, I can do a bulk insert
if len(set(jobIDList)) == len(jobIDList):
result = self.jobLoggingDB.insertLoggingRecord(
jobIDList, statusList, minorStatusList, timeStampList, source="JobManager"
)
else:
for i in enumerate(jobIDList):
result = self.jobLoggingDB.insertLoggingRecord(
jobIDList[i], statusList[i], minorStatusList[i], timeStampList[i], source="JobManager"
)

# Set persistency flag
retVal = gProxyManager.getUserPersistence(self.ownerDN, self.ownerGroup)
Expand Down
13 changes: 13 additions & 0 deletions tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,17 @@ def test_JobStatus(jobLoggingDB: JobLoggingDB):
result = jobLoggingDB.getWMSTimeStamps(1)
assert result["OK"] is True, result["Message"]

result = jobLoggingDB.addLoggingRecord([2, 3, 4, 5], status="testing", minorStatus="No date 2", source="Unittest")
assert result["OK"] is False

now = datetime.datetime.utcnow()
result = jobLoggingDB.addLoggingRecord(
[2, 3, 4, 5],
status=["testing", "testing", "testing", "testing"],
minorStatus=["mn", "mn", "mn", "mn"],
date=[now, now, now, now],
source="Unittest",
)
assert result["OK"] is False

jobLoggingDB.deleteJob(1)

0 comments on commit aee7bb4

Please sign in to comment.