-
Notifications
You must be signed in to change notification settings - Fork 31
/
jobmodule.py
executable file
·317 lines (271 loc) · 13.3 KB
/
jobmodule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# jobmodule.py
# August 2021
# March 2023 - Issue #137
#
# This module has the following functions in the folder.
# submit_job_definition is used by submit_jobdef.py to submit a job based on the job definition id. Depending if a
# corresponding job request was found for the job definition it would either call execute_job or would go to
# submit_job_request to create a new job request based on a default template in variable jobReq
#
# submit_job_request is used by submit_jobreq.py to submit a job based on job request id. If the call is coming from
# submit_job_definition then it will create a job request after which it will call execute_job.
#
# execute_job is called by both submit_job_definition and submit_job_request and is responsible for submitting the job.
#
# check_context verifies if the context provided by the user is the correct context, if it's not the program will error out
#
#
#
# NOTE: Above functions don't use callrestapi from the shared module, instead it makes requests calls.
# getauthtoken
# getbaseurl
# file_accessible
#
# There is another method called cancel_job, which allows users to manually cancel the job by pressing Ctrl + C on the
# keyboard.
#
#
# Copyright © 2018, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing permissions and limitations under the License.
#
import requests, sys, os, time, json
from sharedfunctions import callrestapi
class jobmodule:
def __init__(self):
self.head = {"Content-type": "application/json", "Accept": "application/json", "Authorization": jobmodule.getauthtoken(jobmodule.getbaseurl())}
self.verbose = None
self.sasjob_status = None
self.sasjob_status_details = None
self.saslog_location = None
self.sasout_location = None
self.sasres_location = None
self.job_definition_id = None
self.job_requests_id = None
self.job_execution_id = None
self.sasjob_error_details = None
self.cancel_job_uri = None
self.cancel_job_method = None
def submit_job_definition(self, contextName="SAS Job Execution compute context", id=None, verbose=False):
if verbose:
self.verbose = True
# jobID = "dbbc02b9-e191-4a0b-b549-388df207c933"
# contextName = "SAS Job Execution compute context"
self.job_definition_id = id
jobID = id
contextName = contextName
jobDefinitionUri = "/jobDefinitions/definitions/" + jobID
url = self.getbaseurl() + jobDefinitionUri
result = requests.get(url=url, headers=self.head)
if result.status_code == 404:
print("ERROR! Job Definition ID is invalid. No Job Definition was found with id: {}".format(id))
return
print ("Job Definition id: {}".format(id))
if self.verbose:
print ("Checking Response IDs associated with the job definition id.")
name = result.json()['name']
desc = result.json()['name'].strip() + " created by: " + os.getlogin() + " using pyviyatools"
url = self.getbaseurl() + "/jobExecution/jobRequests?filter=in('jobDefinitionUri','{}')&sortBy=modifiedTimeStamp:descending".format(jobDefinitionUri)
result = requests.get(url=url,headers=self.head)
count = result.json()['count']
if count == 0:
if self.verbose:
print ("No Job Responses were found associated to the job request. Creating new job request")
jobReq = {
"name": name,
"description": desc,
"jobDefinitionUri": jobDefinitionUri,
"arguments": {
"_contextName": contextName,
"_omitJsonLog": "true"
}
}
# March 2023 - Issue #137 Commented is not needed and the call is to be made for submit_job_request with json
# url = self.getbaseurl() + "jobExecution/jobRequests"
self.submit_job_request(job_req_json=jobReq)
elif count >= 1:
if self.verbose:
print ("Job Request Found using job request id {}".format(result.json()['items'][0]['id']))
self.job_requests_id = result.json()['items'][0]['id']
for links in result.json()['items'][0]['links']:
if links['rel'] == 'submitJob':
jobSubmitURI = links['uri']
url = self.getbaseurl() + jobSubmitURI
self.execute_job(url=url)
def submit_job_request(self, id=None, job_req_json=None, verbose=False):
if verbose:
self.verbose = verbose
if id is not None:
self.job_requests_id = id
if self.verbose:
print ("Checking if the job request id {} is valid".format(id))
url = self.getbaseurl() + "/jobExecution/jobRequests/{}".format(id)
result = requests.get(url=url,headers=self.head)
if result.status_code == 404:
print("ERROR! Job Request ID is invalid. No Job Request was found with id: {}".format(kwargs['jobID']))
sys.exit(1)
print ("Job Request id {} is found.")
for result in result.json()['links']:
if result['rel'] == 'submitJob':
submit_job_uri = self.getbaseurl() + result['uri']
self.execute_job(url=submit_job_uri)
elif job_req_json is not None:
if self.verbose:
print ("Submitting a new job request.")
url = self.getbaseurl() + "/jobExecution/jobRequests"
# March 2023 - Issue #137 Changed data to json.
result = requests.post(url=url,json=job_req_json,headers=self.head)
print ("New Job Request has been created. {}".format(result.json()['id']))
self.job_requests_id = result.json()['id']
for links in result.json()['links']:
# March 2023 - Issue #137 It was not going to the proper link
if result['rel'] == 'submitJob':
submit_job_url = self.getbaseurl() + links['uri'].strip()
self.execute_job(url=submit_job_url)
def execute_job(self, url):
sasout_loc = None
saslog_loc = None
sasresinfo = None
job_error_details = None
job_status_details = None
jobStatusURI = None
result = requests.post(url=url, headers=self.head)
print ("Job Submitted.")
print ("Job id: {} \nState: {}".format(result.json()['id'],result.json()['state']))
for links in result.json()['links']:
if links['rel'] == 'self':
jobStatusURI = links['uri']
url = self.getbaseurl() + jobStatusURI
if links['rel'] == 'updateState':
self.cancel_job_uri = links['uri'] + "?value=canceled"
self.cancel_job_method = links['method']
print ("Get Job Results > {}".format(url))
result = requests.get(url=url, headers=self.head)
while result.json()['state'] == 'running':
time.sleep(0.01)
url = self.getbaseurl() + jobStatusURI
result = requests.get(url=url,headers=self.head)
url = self.getbaseurl() + jobStatusURI
result = requests.get(url=url,headers=self.head)
job_status = result.json()['state']
if 'stateDetails' in result.json():
print ("Job {} with {}".format(job_status,job_status_details))
job_status_details = result.json()['stateDetails']
else:
print ("Job {}".format(job_status))
if 'error' in result.json():
job_error_details = json.dumps(result.json()['error'])
if 'results' in result.json():
computeJob = result.json()['results']['COMPUTE_JOB']
if computeJob + ".list" in result.json()['results']:
sasout_loc = result.json()['results'][computeJob + ".list.txt"]
saslog_loc = result.json()['logLocation']
sasresinfo = json.dumps(result.json()['results'])
self.sasjob_status = job_status
self.sasjob_status_details = job_status_details
self.saslog_location = saslog_loc
self.sasout_location = sasout_loc
self.sasres_location = sasresinfo
self.sasjob_error_details = job_error_details
def cancel_job(self):
if self.sasjob_status == "running":
result = callrestapi(self.cancel_job_uri, self.cancel_job_method, acceptType="text/plain", contentType="text/plain")
return result.text
def check_context(self,contextName):
context = ["SAS Job Execution compute context", "SAS Studio compute context"]
if contextName not in context:
print("Context provided, {} ,is not the default context".format(contextName))
check_context_uri = self.getbaseurl() + "/compute/contexts?filter=eq('name','{}')".format(contextName)
check_context_resp = requests.get(check_context_uri,headers=self.head)
count = check_context_resp.json()['count']
if count == 0:
session_context_nf_uri = self.getbaseurl() + "/compute/contexts"
session_context_nf_result = requests.get(session_context_nf_uri,headers=self.head)
listOfNames = []
for names in session_context_nf_result.json()['items']:
listOfNames.append(names['name'])
print(
"Invalid Context Name. {} is not available in SAS Viya. Here is the list of valid context names {}".format(
contextName, listOfNames))
sys.exit(1)
return True
@staticmethod
def getauthtoken(baseurl):
#get authentication information for the header
credential_file=os.path.join(os.path.expanduser('~'),'.sas','credentials.json')
# check that credential file is available and can be read
access_file=jobmodule.file_accessible(credential_file, 'r')
if access_file==False:
oaval=None
print("ERROR: Cannot read authentication credentials at: ", credential_file)
print("ERROR: Try refreshing your token with sas-admin auth login")
sys.exit()
with open(credential_file) as json_file:
data = json.load(json_file)
type(data)
# the sas-admin profile init creates an empty credential file
# check that credential is in file, if it is add it to the header, if not exit
# get the profile environment variable to use it
# if it is not set default to the default profile
cur_profile=os.environ.get("SAS_CLI_PROFILE","Default")
#print("LOGON: ", cur_profile )
if cur_profile in data:
oauthToken=data[cur_profile]['access-token']
oauthTokenType="bearer"
oaval=oauthTokenType + ' ' + oauthToken
if oauthToken == "":
oaval = None
print("ERROR: access token not in file: ", credential_file)
print("ERROR: Try refreshing your token with sas-admin auth login")
sys.exit()
else:
oaval=None
print("ERROR: access token not in file: ", credential_file)
print("ERROR: Try refreshing your token with sas-admin auth login")
sys.exit()
return oaval
@staticmethod
def getbaseurl():
# check that profile file is available and can be read
# note the path to the profile is hard-coded right now
endpointfile=os.path.join(os.path.expanduser('~'),'.sas','config.json')
access_file= jobmodule.file_accessible(endpointfile, 'r')
#profile does not exist
if access_file==False:
print("ERROR: Cannot read CLI profile at:",endpointfile,". Recreate profile with sas-admin profile init.")
sys.exit()
#profile is empty file
if os.stat(endpointfile).st_size==0:
print("ERROR: Cannot read CLI profile empty file at:",endpointfile,". Recreate profile with sas-admin profile init.")
sys.exit()
# get json from profile
with open(endpointfile) as json_file:
data = json.load(json_file)
# get the profile environment variable to use it
# if it is not set default to the default profile
cur_profile=os.environ.get("SAS_CLI_PROFILE","Default")
#print("URL: ",cur_profile )
# check that information is in profile
if cur_profile in data:
baseurl=data[cur_profile]['sas-endpoint']
else:
baseurl=None
print("ERROR: profile "+cur_profile+" does not exist. Recreate profile with sas-admin profile init.")
sys.exit()
return baseurl
@staticmethod
def file_accessible(filepath, mode):
try:
f = open(filepath, mode)
f.close()
except IOError as e:
return False
return True