forked from aiven/aiven-go-client
-
Notifications
You must be signed in to change notification settings - Fork 1
/
flink_job.go
130 lines (113 loc) · 4.09 KB
/
flink_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package aiven
type (
// FlinkJobHandler aiven go-client handler for Flink Jobs
FlinkJobHandler struct {
client *Client
}
// CreateFlinkJobRequest Aiven API request
// POST https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job
CreateFlinkJobRequest struct {
JobName string `json:"job_name,omitempty"`
Statement string `json:"statement"`
TablesIds []string `json:"table_ids"`
}
// CreateFlinkJobResponse Aiven API response
// POST https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job
CreateFlinkJobResponse struct {
APIResponse
JobName string `json:"job_name"`
JobId string `json:"job_id"`
}
// PatchFlinkJobRequest Aiven API request
// PATCH https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/proxy/v1/jobs/<job_id>
PatchFlinkJobRequest struct {
JobId string `json:"job_id"`
}
// GetFlinkJobRequest Aiven API request
// GET https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job/<job_id>
GetFlinkJobRequest struct {
JobId string `json:"job_id"`
}
// GetFlinkJobResponse Aiven API response
// GET https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/proxy/v1/jobs/<job_id>
GetFlinkJobResponse struct {
APIResponse
Name string `json:"name"`
JID string `json:"jid"`
IsStoppable bool `json:"isStoppable"`
Duration int `json:"duration"`
Now int `json:"now"`
EndTime int `json:"end-time"`
StartTime int `json:"start-time"`
MaxParallelism int `json:"maxParallelism"`
State string `json:"state"`
Plan struct {
JID string `json:"jid"`
Name string `json:"name"`
Nodes []struct {
Description string `json:"description"`
Id string `json:"id"`
Operator string `json:"operator"`
OperatorStrategy string `json:"operator_strategy"`
OptimizerProperties interface{} `json:"optimizer_properties"`
Parallelism int `json:"parallelism"`
} `json:"nodes"`
} `json:"plan"`
StatusCounts struct {
Canceled int `json:"CANCELED"`
Canceling int `json:"CANCELING"`
Created int `json:"CREATED"`
Deploying int `json:"DEPLOYING"`
Failed int `json:"FAILED"`
Finished int `json:"FINISHED"`
Initializing int `json:"INITIALIZING"`
Reconciling int `json:"RECONCILING"`
Running int `json:"RUNNING"`
Scheduled int `json:"SCHEDULED"`
} `json:"status-counts"`
Timestamps struct {
Canceled int `json:"CANCELED"`
Canceling int `json:"CANCELING"`
Created int `json:"CREATED"`
Deploying int `json:"DEPLOYING"`
Failed int `json:"FAILED"`
Finished int `json:"FINISHED"`
Initializing int `json:"INITIALIZING"`
Reconciling int `json:"RECONCILING"`
Running int `json:"RUNNING"`
Scheduled int `json:"SCHEDULED"`
} `json:"timestamps"`
Vertices []interface{} `json:"vertices"`
}
)
// Create creates a flink job
func (h *FlinkJobHandler) Create(project, service string, req CreateFlinkJobRequest) (*CreateFlinkJobResponse, error) {
path := buildPath("project", project, "service", service, "flink", "job")
bts, err := h.client.doPostRequest(path, req)
if err != nil {
return nil, err
}
var r CreateFlinkJobResponse
errR := checkAPIResponse(bts, &r)
return &r, errR
}
// Get gets a flink job
func (h *FlinkJobHandler) Get(project, service string, req GetFlinkJobRequest) (*GetFlinkJobResponse, error) {
path := buildPath("project", project, "service", service, "flink", "proxy", "v1", "jobs", req.JobId)
bts, err := h.client.doGetRequest(path, nil)
if err != nil {
return nil, err
}
var r GetFlinkJobResponse
errR := checkAPIResponse(bts, &r)
return &r, errR
}
// Patch patches a flink job
func (h *FlinkJobHandler) Patch(project, service string, req PatchFlinkJobRequest) error {
path := buildPath("project", project, "service", service, "flink", "proxy", "v1", "jobs", req.JobId)
bts, err := h.client.doPatchRequest(path, nil)
if err != nil {
return err
}
return checkAPIResponse(bts, nil)
}