-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathclient.go
166 lines (139 loc) · 4.12 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package connect
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
)
const (
// StatusUnprocessableEntity is the status code returned when sending a
// request with invalid fields.
StatusUnprocessableEntity = 422
)
const (
// DefaultHostURL is the default HTTP host used for connecting to a Kafka
// Connect REST API.
DefaultHostURL = "http://localhost:8083/"
userAgent = "go-kafka/0.9 connect/" + Version
)
// A Client manages communication with the Kafka Connect REST API.
type Client struct {
host *url.URL // Base host URL for API requests.
// HTTP client used to communicate with the API. By default
// http.DefaultClient will be used.
HTTPClient *http.Client
// User agent used when communicating with the Kafka Connect API.
UserAgent string
}
// NewClient returns a new Kafka Connect API client that communicates with the
// optional host. If no host is given, DefaultHostURL (localhost) is used.
func NewClient(host ...string) *Client {
var hostURL *url.URL
var err error
switch len(host) {
case 0:
hostURL, _ = url.Parse(DefaultHostURL)
case 1:
hostURL, err = url.Parse(host[0])
if err != nil {
panic(err.Error())
}
default:
panic("only one host URL can be given")
}
return &Client{host: hostURL, UserAgent: userAgent}
}
func (c *Client) httpClient() *http.Client {
if c.HTTPClient == nil {
return http.DefaultClient
}
return c.HTTPClient
}
// Host returns the API root URL the Client is configured to talk to.
func (c *Client) Host() string {
return c.host.String()
}
// NewRequest creates an API request. A relative URL can be provided in path,
// in which case it is resolved relative to the BaseURL of the Client.
// Relative URLs should always be specified without a preceding slash. If
// specified, the value pointed to by body is JSON-encoded and included as the
// request body.
func (c *Client) NewRequest(method, path string, body interface{}) (*http.Request, error) {
rel, err := url.Parse(path)
if err != nil {
return nil, err
}
url := c.host.ResolveReference(rel)
var contentType string
var buf io.ReadWriter
if body != nil {
buf = new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(body)
if err != nil {
return nil, err
}
contentType = "application/json"
}
request, err := http.NewRequest(method, url.String(), buf)
if err != nil {
return nil, err
}
request.Header.Set("Accept", "application/json")
if contentType != "" {
request.Header.Set("Content-Type", contentType)
}
if c.UserAgent != "" {
request.Header.Set("User-Agent", c.UserAgent)
}
return request, nil
}
// Do sends an API request and returns the API response. The API response is
// JSON-decoded and stored in the value pointed to by v, or returned as an
// error if an API or HTTP error has occurred.
func (c *Client) Do(req *http.Request, v interface{}) (*http.Response, error) {
response, err := c.httpClient().Do(req)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode >= 400 {
return response, buildError(req, response)
}
if v != nil {
err = json.NewDecoder(response.Body).Decode(v)
if err == io.EOF {
err = nil // ignore EOF, empty response body
}
}
return response, err
}
// Simple GET helper with no request body.
func (c *Client) get(path string, v interface{}) (*http.Response, error) {
return c.doRequest("GET", path, nil, v)
}
func (c *Client) delete(path string) (*http.Response, error) {
return c.doRequest("DELETE", path, nil, nil)
}
func (c *Client) doRequest(method, path string, body, v interface{}) (*http.Response, error) {
request, err := c.NewRequest(method, path, body)
if err != nil {
return nil, err
}
return c.Do(request, v)
}
func buildError(req *http.Request, resp *http.Response) error {
apiError := APIError{Response: resp}
data, err := ioutil.ReadAll(resp.Body)
if err == nil && data != nil {
_ = json.Unmarshal(data, &apiError) // Fall back on general error below
}
// Possibly a general HTTP error, e.g. we're not even talking to a valid
// Kafka Connect API host
if apiError.Code == 0 {
return fmt.Errorf("HTTP %v on %v %v", resp.Status, req.Method, req.URL)
}
return apiError
}