-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path调用华为codearts流水线脚本
96 lines (89 loc) · 4.13 KB
/
调用华为codearts流水线脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import json
import requests
from apig_sdk import signer
from datetime import datetime
import time
def sign_request(http_method, url, headers=None, body=None):
sig = signer.Signer()
sig.Key = "xxxxQ"
sig.Secret = "xxxxx"
req = signer.HttpRequest(http_method, url)
req.headers = headers or {}
req.body = json.dumps(body) if body else ""
req.url = url
sig.Sign(req)
return req
def run_pipeline(pipeline_id, project_id, choose_jobs, choose_stages):
url = f"https://cloudpipeline-ext.cn-east-3.myhuaweicloud.com/v5/{project_id}/api/pipelines/{pipeline_id}/run"
headers = {"content-type": "application/json"}
body = {
"sources": [],
"description": "",
"variables": [],
"choose_jobs": choose_jobs,
"choose_stages": choose_stages
}
req = sign_request("POST", url, headers=headers, body=body)
resp = requests.post(req.url, headers=req.headers, data=req.body)
data = resp.json()
return data.get('pipeline_run_id')
def get_pipeline_status(project_id, pipeline_id, pipeline_run_id):
url = f"https://cloudpipeline-ext.cn-east-3.myhuaweicloud.com/v5/{project_id}/api/pipelines/{pipeline_id}/pipeline-runs/detail?pipeline_run_id={pipeline_run_id}"
while True:
req = sign_request("GET", url, headers={"content-type": "application/json"})
response = requests.get(req.url, headers=req.headers)
data = response.json()
pipeline_run_status = data.get('status')
if pipeline_run_status != "RUNNING":
pipeline_name = data.get('name')
start_time = datetime.fromtimestamp(data.get('start_time') / 1000)
end_time = datetime.fromtimestamp(data.get('end_time') / 1000)
return {
'pipeline_name': pipeline_name,
'pipeline_run_status': pipeline_run_status,
'start_time': start_time,
'end_time': end_time
}
time.sleep(10)
def create_dingtalk_message(pipeline_name, start_time, end_time, pipeline_run_status, pipeline_url):
status_color = {
"COMPLETED": "#32CD32",
"FAILED": "#FF0000",
"OTHER": "#FFA500"
}
color = status_color.get(pipeline_run_status, "#FFA500")
text = f"### {pipeline_name} 测试用例完成\n\n" \
f"> **流水线名称:** {pipeline_name}\n\n" \
f"> **开始时间:** {start_time}\n\n" \
f"> **结束时间:** {end_time}\n\n" \
f"> **运行状态:** <font color={color}>{pipeline_run_status}</font>\n\n" \
f"> [**流水线地址**]({pipeline_url})\n\n" \
f"> **自动化测试** "
return {
"msgtype": "markdown",
"markdown": {
"title": "Spark Monitor",
"text": text
}
}
def send_dingtalk_message(webhook_url, message):
headers = {'Content-Type': 'application/json'}
response = requests.post(webhook_url, data=json.dumps(message), headers=headers)
return response.text
if __name__ == '__main__':
pipeline_id = "d7580ec1f3ab4f7bae2028c8bd5c66cd"
project_id = "8672d4f0470f4eaf8bd75e2589934d21"
choose_jobs = ["168864794392314618d37-850e-46c8-b911-d2d644b19430"]
choose_stages = ["0"]
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=425ff9a2f17b8143c46a951ad3eba20ff69e9b615fcb7d4b8b5bdf383dd56568'
pipeline_run_id = run_pipeline(pipeline_id, project_id, choose_jobs, choose_stages)
if pipeline_run_id:
pipeline_status = get_pipeline_status(project_id, pipeline_id, pipeline_run_id)
if pipeline_status:
message = create_dingtalk_message(pipeline_status['pipeline_name'],
pipeline_status['start_time'],
pipeline_status['end_time'],
pipeline_status['pipeline_run_status'],
pipeline_url=f"https://devcloud.cn-east-3.huaweicloud.com/cicd/project/{project_id}/pipeline/detail/{pipeline_id}/{pipeline_run_id}?v=1")
response_text = send_dingtalk_message(webhook_url, message)
print(response_text)