-
Notifications
You must be signed in to change notification settings - Fork 0
/
python_client.py
217 lines (184 loc) · 7.75 KB
/
python_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#### #!/usr/bin/python3
import threading
import sys
import requests
import time
import os
class GpcJobEngineClient:
"""GPC Job EngineClient"""
def __init__(self, url, userWorker = None, noJobsSleep = 5, noJobsSignal = None, workerId = '', terminateOnNoJobs = True, DEBUG = False, family='', **kwargs):
self.url = url
self.processing = False
self.workerThread = None
self.noJobsSleep = noJobsSleep
self.noJobsSignal = noJobsSignal
self.userWorker = userWorker
self.DEBUG = DEBUG
self.terminateOnNoJobs = terminateOnNoJobs
self.workerId = workerId
self.family = family
def log(self, s):
"""
Log string
:param s: string to log
"""
if self.DEBUG == True:
sys.stdout.write("%s\n" % s)
def addJob(self, payload, friendlyName = "", jobFamily = "", priority=1):
"""
Add a job to queue
:param payload: dict of job settings
:param priority=1: int optional priority for job
"""
return self.post("jobs", { 'payload': payload, "friendlyName" : friendlyName, "jobFamily" : jobFamily})
def addJobsFromFile(self, filepath):
if os.path.isfile(filepath):
with open(filepath, "r") as f:
return self.postFile("jobs", f)
else:
raise Exception(f"{filepath} is not a file according to os.path.isfile")
def get(self, m, params = {}):
if 'family' not in params:
params['family'] = self.family
self.log("Request GET " + self.url + m + " with parameters " + str(params))
r = requests.get(url = self.url + m, params = params)
if r.status_code == 200:
data = r.json()
self.log(data)
return data
elif r.status_code == 204:
return False
else:
raise Exception("Not sure how to handle response code {}".format(r.status_code))
def postFile(self, m, file):
self.log("Request POST file " + self.url + m)
r = requests.post(url = self.url + m, files={'file' : file})
if r.status_code == 201:
data = r.json()
self.log(data)
return data
else:
raise Exception(f"Not sure how to handle response code {r.status_code} message from api: {r.text}")
def post(self, m, params = {}):
if 'family' not in params:
params['family'] = self.family
self.log("Request POST " + self.url + m)
r = requests.post(url = self.url + m, data = params)
if r.status_code == 201:
data = r.json()
self.log(data)
return data
else:
raise Exception(f"Not sure how to handle response code {r.status_code} message from api: {r.text}")
def put(self, m, params = {}):
self.log("Request PUT " + self.url + m)
r = requests.put(url = self.url + m, data = params)
if r.status_code == 201:
return True
else:
raise Exception(f"Not sure how to handle response code {r.status_code} message from api: {r.text}")
def delete(self, m, params = {}):
if 'family' not in params:
params['family'] = self.family
self.log("Request DELETE " + self.url + m)
r = requests.delete(url = self.url + m, data = params)
if r.status_code == 204:
return True
else:
raise Exception(f"Not sure how to handle response code {r.status_code} message from api: {r.text}")
def getJobById(self, _id):
return self.get(f"jobs/{_id}")
def checkoutJob(self):
return self.get("jobs/checkout_one", {"workerid" : self.workerId})
def getJobs(self, limit=1000):
return self.get("jobs", {"limit" : limit})
def getPendingJobs(self, limit=1000):
return self.get("jobs", {"limit" : limit, "status" : "pending"})
def getRunningJobs(self, limit=1000):
return self.get("jobs", {"limit" : limit, "status" : "running"})
def getCompletedJobs(self, limit=1000):
return self.get("jobs", {"limit" : limit, "status" : "completed"})
def completeJob(self, _id, result):
return self.put(f"jobs/{_id}", {"result": result })
def progressJob(self, _id, progress):
return self.put(f"jobs/{_id}", {"progress" : progress})
def failJob(self, _id, error):
return self.put(f"jobs/{_id}", {"error": error })
def deleteAllJobs(self):
return self.delete("jobs")
def deleteJob(self, _id):
return self.delete(f"jobs/{_id}")
def getFamilies(self):
return self.get(f"jobs/families")
def getCount(self, family):
self.log(f"getcount(family={family})")
return self.get(f"jobs/count", { "family" : f"{family}" })
def getCounts(self):
counts = {}
families = self.getFamilies()
for item in families:
family = item["_id"]
if family == "":
continue
c = self.getCount(family)
self.log(f"c={c}, family={family}")
counts[family] = c
return counts
### refactor, belongs in client side
def worker(self):
"""
Internal worker thread entry point
"""
self.log("Worker thread active")
while True: # keep looping until signaled to stop
if self.processing == False:
break
job = None
try:
job = self.checkoutJob()
except Exception as e:
self.log(f"Exception when trying to checkout a job. Performing a sleep.\n Exception detail {e}")
time.sleep(self.noJobsSleep)
continue
if job == False:
#no job
if self.terminateOnNoJobs:
# self terminate worker thread since no jobs pending
print("Terminating because terminateOnNoJobs is true and there are no jobs pending.")
self.stopProcessing()
else:
#long running poll mode
time.sleep(self.noJobsSleep)
else:
#got a job
if job == "PAUSED":
if self.terminateOnNoJobs:
print("Terminating because terminateNoJobs is true and the family queue is paused")
self.stopProcessing()
else:
print("Job family queue is paused")
time.sleep(self.noJobsSleep)
else:
self.userWorker(job,self)
def start(self,poolid=None):
""" starts the worker thread"""
if self.userWorker == None:
raise Exception("userWorker not defined")
else:
if poolid != None:
self.log(f"worker pool id is {poolid}")
self.processing = True
self.workerThread = threading.Thread(target=self.worker)
self.log("Starting job engine")
self.workerThread.start()
def stopProcessing(self):
""" signals that processing should be stopped """
self.processing = False
def end(self):
""" stops processing, joins worker thread and returns"""
self.stopProcessing()
self.workerThread.join()
self.log("ending job engine")
def getFamily(self, family):
self.log(f"in getFamilyConfig family={family}")
return self.get(f"jobs/families/{family}")