|
1 | 1 | package cmq_go
|
2 | 2 |
|
3 | 3 | import (
|
4 |
| - "net" |
5 | 4 | "time"
|
6 | 5 | "net/http"
|
7 | 6 | "fmt"
|
8 | 7 | "io/ioutil"
|
9 | 8 | "net/url"
|
10 | 9 | "bytes"
|
| 10 | + "errors" |
| 11 | + "net" |
| 12 | +) |
| 13 | + |
| 14 | +const ( |
| 15 | + DEFAULT_HTTP_TIMEOUT = 3000 //ms |
11 | 16 | )
|
12 | 17 |
|
13 | 18 | type CMQHttp struct {
|
14 |
| - timeout int |
15 | 19 | isKeepAlive bool
|
16 | 20 | conn *http.Client
|
17 | 21 | }
|
18 | 22 |
|
| 23 | +var DefaultTransport = &http.Transport{ |
| 24 | + Proxy: http.ProxyFromEnvironment, |
| 25 | + DialContext: (&net.Dialer{ |
| 26 | + Timeout: 30 * time.Second, |
| 27 | + KeepAlive: 30 * time.Second, |
| 28 | + DualStack: true, |
| 29 | + }).DialContext, |
| 30 | + MaxIdleConns: 500, |
| 31 | + MaxIdleConnsPerHost: 100, |
| 32 | + IdleConnTimeout: 90 * time.Second, |
| 33 | + TLSHandshakeTimeout: 10 * time.Second, |
| 34 | + ExpectContinueTimeout: 1 * time.Second, |
| 35 | +} |
| 36 | + |
19 | 37 | func NewCMQHttp() *CMQHttp {
|
20 | 38 | return &CMQHttp{
|
21 |
| - timeout: 10000, |
22 | 39 | isKeepAlive: true,
|
23 |
| - conn: nil, |
| 40 | + conn: &http.Client{ |
| 41 | + Timeout: DEFAULT_HTTP_TIMEOUT * time.Millisecond, |
| 42 | + Transport: DefaultTransport, |
| 43 | + }, |
24 | 44 | }
|
| 45 | + |
25 | 46 | }
|
26 | 47 |
|
27 |
| -func (this *CMQHttp) request(method, urlStr, reqStr, proxyUrlStr string, userTimeout int) (result string, err error){ |
28 |
| - var client *http.Client |
29 |
| - timeout := 0 |
30 |
| - if userTimeout >= 0 { |
31 |
| - timeout = userTimeout |
| 48 | +func (this *CMQHttp) setProxy(proxyUrlStr string) (err error) { |
| 49 | + if proxyUrlStr == "" { |
| 50 | + return |
32 | 51 | }
|
33 |
| - keepalive := 0 |
34 |
| - if this.isKeepAlive { |
35 |
| - keepalive = 30 |
| 52 | + proxyUrl, err := url.Parse(proxyUrlStr) |
| 53 | + if err != nil { |
| 54 | + return |
36 | 55 | }
|
| 56 | + transport, err := this.getTransport() |
| 57 | + if err != nil { |
| 58 | + return |
| 59 | + } |
| 60 | + transport.Proxy = http.ProxyURL(proxyUrl) |
| 61 | + return |
| 62 | +} |
37 | 63 |
|
38 |
| - if proxyUrlStr == "" { |
39 |
| - unproxyTransport := &http.Transport{ |
40 |
| - Proxy: http.ProxyFromEnvironment, |
41 |
| - DialContext: (&net.Dialer{ |
42 |
| - Timeout: 30 * time.Second, |
43 |
| - KeepAlive: time.Duration(keepalive) * time.Second, |
44 |
| - DualStack: true, |
45 |
| - }).DialContext, |
46 |
| - MaxIdleConns: 100, |
47 |
| - IdleConnTimeout: 90 * time.Second, |
48 |
| - TLSHandshakeTimeout: 10 * time.Second, |
49 |
| - ExpectContinueTimeout: 1 * time.Second, |
50 |
| - } |
| 64 | +func (this *CMQHttp) unsetProxy() (err error) { |
| 65 | + transport, err := this.getTransport() |
| 66 | + if err != nil { |
| 67 | + return |
| 68 | + } |
| 69 | + transport.Proxy = nil |
| 70 | + return |
| 71 | +} |
| 72 | + |
| 73 | +func (this *CMQHttp) getTransport() (*http.Transport, error) { |
| 74 | + if this.conn.Transport == nil { |
| 75 | + this.SetTransport(DefaultTransport) |
| 76 | + } |
51 | 77 |
|
52 |
| - client = &http.Client{ |
53 |
| - Timeout: time.Duration(timeout) * time.Second, |
54 |
| - Transport: unproxyTransport, |
55 |
| - } |
56 |
| - } else { |
57 |
| - proxyUrl, err := url.Parse(proxyUrlStr) |
58 |
| - if err != nil { |
59 |
| - panic(err) |
60 |
| - } |
61 |
| - proxyTransport := &http.Transport{ |
62 |
| - Proxy: http.ProxyURL(proxyUrl), |
63 |
| - DialContext: (&net.Dialer{ |
64 |
| - Timeout: 30 * time.Second, |
65 |
| - KeepAlive: time.Duration(keepalive) * time.Second, |
66 |
| - DualStack: true, |
67 |
| - }).DialContext, |
68 |
| - MaxIdleConns: 100, |
69 |
| - IdleConnTimeout: 90 * time.Second, |
70 |
| - TLSHandshakeTimeout: 10 * time.Second, |
71 |
| - ExpectContinueTimeout: 1 * time.Second, |
72 |
| - } |
73 |
| - client = &http.Client{ |
74 |
| - Timeout: time.Duration(timeout) * time.Second, |
75 |
| - Transport: proxyTransport, |
76 |
| - } |
| 78 | + if transport, ok := this.conn.Transport.(*http.Transport); ok { |
| 79 | + return transport, nil |
| 80 | + } |
| 81 | + return nil, errors.New("transport is not an *http.Transport instance") |
| 82 | +} |
| 83 | + |
| 84 | +func (this *CMQHttp) SetTransport(transport http.RoundTripper) { |
| 85 | + this.conn.Transport = transport |
| 86 | +} |
| 87 | + |
| 88 | +func (this *CMQHttp) request(method, urlStr, reqStr string, userTimeout int) (result string, err error) { |
| 89 | + timeout := DEFAULT_HTTP_TIMEOUT |
| 90 | + if userTimeout >= 0 { |
| 91 | + timeout += userTimeout |
77 | 92 | }
|
| 93 | + this.conn.Timeout = time.Duration(timeout) * time.Millisecond |
78 | 94 |
|
79 | 95 | req, err := http.NewRequest(method, urlStr, bytes.NewReader([]byte(reqStr)))
|
80 | 96 | if err != nil {
|
81 | 97 | return "", fmt.Errorf("make http req error %v", err)
|
82 | 98 | }
|
83 |
| - resp, err := client.Do(req) |
| 99 | + resp, err := this.conn.Do(req) |
84 | 100 | if err != nil {
|
85 | 101 | return "", fmt.Errorf("http error %v", err)
|
86 | 102 | }
|
87 | 103 | defer resp.Body.Close()
|
88 | 104 | if resp.StatusCode != http.StatusOK {
|
89 |
| - return "",fmt.Errorf("http error code %d", resp.StatusCode) |
| 105 | + return "", fmt.Errorf("http error code %d", resp.StatusCode) |
90 | 106 | }
|
91 | 107 | body, err := ioutil.ReadAll(resp.Body)
|
92 | 108 | if err != nil {
|
93 |
| - return "",fmt.Errorf("read http resp body error %v", err) |
| 109 | + return "", fmt.Errorf("read http resp body error %v", err) |
94 | 110 | }
|
95 | 111 | result = string(body)
|
96 | 112 | return
|
|
0 commit comments