-
Notifications
You must be signed in to change notification settings - Fork 4
/
test_run_job.py
84 lines (74 loc) · 3.42 KB
/
test_run_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import logging
import logging.handlers
import httpx
from .crud import (
get_scheduler_job,
)
dir_path = os.path.dirname(os.path.realpath(__file__))
logname = os.path.join(dir_path, 'test_run_job.log')
logger = logging.getLogger('scheduler testlog')
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(filename=logname, encoding='utf-8', mode='a')
dt_fmt = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter('[{asctime}] [{levelname}] {name}: {message}', dt_fmt, style='{')
handler.setFormatter(formatter)
logger.addHandler(handler)
async def test_job(job_id: str, adminkey: str) -> str:
'''
A clone of what is actually run when run_cron_job.py is executed
This is used to execute the API call and log the result.
'''
try:
# print(f'[test_run_job]: jobid: {job_id} adminkey: {adminkey}')
jobinfo = await get_scheduler_job(job_id)
# print(f'[test_run_job]: get scheduler job created: {jobinfo}')
body_json = {}
json_headers = {}
method_name = jobinfo.selectedverb
url = jobinfo.url
body = jobinfo.body
headers = jobinfo.headers
if body is None:
body_json = {}
elif len(body) > 0:
body_json = body
# await process_json_body(body)
# print(f' Length of body data {len(body)}')
for h in headers:
key = h.key
value = h.value
json_headers.update({key: value})
# print(f'key: {key} value: {value}')
# print(f'body_json: {body_json}')
# print(f'headers_json: {json_headers}')
logger.info('[test_run_job]: url: %s headers: %s body: %s', url, json_headers, body_json)
# GET response
if method_name == 'GET':
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=json_headers, params=body_json)
logger.info('[test_run_job]: response status from api call: %s', response.status_code)
logger.info('[test_run_job]: response text from api call: %s', response.text)
# POST response
elif method_name == 'POST':
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=json_headers, data=body_json)
logger.info(f'[test_run_job]: response status from api call: {response.status_code}')
logger.info(f'[test_run_job]: response text from api call: {response.text}')
# PUT response
elif method_name == 'PUT':
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=json_headers, data=body_json)
logger.info(f'[test_run_job]: response status from api call: {response.status_code}')
logger.info(f'[test_run_job]: response text from api call: {response.text}')
# DELETE response
elif method_name == 'DELETE':
async with httpx.AsyncClient() as client:
response = await client.delete(url, headers=json_headers)
logger.info(f'[test_run_job]: response status from api call: {response.status_code}')
logger.info(f'[test_run_job]: response text from api call: {response.text}')
# return "testjob 1234"
return response.text
except Exception as e:
logger.error('[test_job]:Exception thrown in [test_job]: %s', e)
return str(e)