|
| 1 | +'use strict'; |
| 2 | + |
1 | 3 | /**
|
2 | 4 | * A connection handler for Amazon ES.
|
3 | 5 | *
|
|
9 | 11 | * @class HttpConnector
|
10 | 12 | */
|
11 | 13 |
|
12 |
| -import AWS from 'aws-sdk'; |
13 |
| -import HttpConnector from 'elasticsearch/src/lib/connectors/http' |
14 |
| -import _ from 'elasticsearch/src/lib/utils'; |
15 |
| -import zlib from 'zlib'; |
| 14 | +const AWS = require('aws-sdk'); |
| 15 | +const HttpConnector = require('elasticsearch/src/lib/connectors/http'); |
| 16 | +const zlib = require('zlib'); |
16 | 17 |
|
17 | 18 | class HttpAmazonESConnector extends HttpConnector {
|
18 | 19 | constructor(host, config) {
|
19 | 20 | super(host, config);
|
20 |
| - const { protocol, port } = host; |
| 21 | + |
| 22 | + const protocol = host.protocol; |
| 23 | + const port = host.port; |
21 | 24 | const endpoint = new AWS.Endpoint(host.host);
|
22 | 25 |
|
23 | 26 | if (protocol) endpoint.protocol = protocol.replace(/:?$/, ":");
|
24 | 27 | if (port) endpoint.port = port;
|
25 | 28 |
|
26 |
| - this.AWS = AWS; |
27 | 29 | this.awsConfig = config.awsConfig || AWS.config;
|
28 | 30 | this.endpoint = endpoint;
|
29 | 31 | this.httpOptions = config.httpOptions || this.awsConfig.httpOptions;
|
| 32 | + this.httpClient = new AWS.NodeHttpClient(); |
30 | 33 | }
|
31 | 34 |
|
32 |
| - async request(params, cb) { |
33 |
| - let incoming; |
34 |
| - let timeoutId; |
35 |
| - let request; |
| 35 | + request(params, cb) { |
| 36 | + const reqParams = this.makeReqParams(params); |
| 37 | + const request = this.createRequest(params, reqParams); |
| 38 | + const signer = new AWS.Signers.V4(request, 'es'); |
| 39 | + |
36 | 40 | let req;
|
37 | 41 | let status = 0;
|
38 | 42 | let headers = {};
|
39 |
| - let log = this.log; |
40 | 43 | let response;
|
41 |
| - const AWS = this.AWS; |
| 44 | + let incoming; |
42 | 45 |
|
43 |
| - let reqParams = this.makeReqParams(params); |
44 | 46 | // general clean-up procedure to run after the request
|
45 | 47 | // completes, has an error, or is aborted.
|
46 |
| - let cleanUp = _.bind(function (err) { |
47 |
| - clearTimeout(timeoutId); |
48 |
| - |
| 48 | + const cleanUp = (err) => { |
49 | 49 | req && req.removeAllListeners();
|
50 | 50 | incoming && incoming.removeAllListeners();
|
51 | 51 |
|
52 |
| - if ((err instanceof Error) === false) { |
53 |
| - err = void 0; |
54 |
| - } |
55 |
| - |
56 |
| - log.trace(params.method, reqParams, params.body, response, status); |
57 |
| - if (err) { |
| 52 | + this.log.trace(params.method, reqParams, params.body, response, status); |
| 53 | + if (err instanceof Error) { |
58 | 54 | cb(err);
|
59 | 55 | } else {
|
60 |
| - cb(err, response, status, headers); |
| 56 | + cb(null, response, status, headers); |
61 | 57 | }
|
62 |
| - }, this); |
63 |
| - |
64 |
| - request = new AWS.HttpRequest(this.endpoint); |
65 |
| - |
66 |
| - // copy across params |
67 |
| - for (let p in reqParams) { |
68 |
| - request[p] = reqParams[p]; |
69 |
| - } |
70 |
| - request.region = this.awsConfig.region; |
71 |
| - if (params.body) request.body = params.body; |
72 |
| - if (!request.headers) request.headers = {}; |
73 |
| - request.headers['presigned-expires'] = false; |
74 |
| - request.headers['Host'] = this.endpoint.host; |
| 58 | + }; |
75 | 59 |
|
76 | 60 | // load creds
|
77 |
| - let CREDS; |
78 |
| - try { |
79 |
| - CREDS = await this.getAWSCredentials(); |
80 |
| - |
81 |
| - // Sign the request (Sigv4) |
82 |
| - let signer = new AWS.Signers.V4(request, 'es'); |
83 |
| - signer.addAuthorization(CREDS, new Date()); |
84 |
| - } catch (e) { |
85 |
| - if (e && e.message) e.message = `AWS Credentials error: ${e.message}`; |
86 |
| - cleanUp(e); |
87 |
| - return () => {}; |
88 |
| - } |
89 |
| - |
90 |
| - let send = new AWS.NodeHttpClient(); |
91 |
| - req = send.handleRequest(request, this.httpOptions, function (_incoming) { |
92 |
| - incoming = _incoming; |
93 |
| - status = incoming.statusCode; |
94 |
| - headers = incoming.headers; |
95 |
| - response = ''; |
96 |
| - |
97 |
| - let encoding = (headers['content-encoding'] || '').toLowerCase(); |
98 |
| - if (encoding === 'gzip' || encoding === 'deflate') { |
99 |
| - incoming = incoming.pipe(zlib.createUnzip()); |
100 |
| - } |
101 |
| - |
102 |
| - incoming.setEncoding('utf8'); |
103 |
| - incoming.on('data', function (d) { |
104 |
| - response += d; |
| 61 | + return this.getAWSCredentials() |
| 62 | + .then(creds => { |
| 63 | + // Sign the request (Sigv4) |
| 64 | + signer.addAuthorization(creds, new Date()); |
| 65 | + |
| 66 | + req = this.httpClient.handleRequest(request, this.httpOptions, function (_incoming) { |
| 67 | + incoming = _incoming; |
| 68 | + status = incoming.statusCode; |
| 69 | + headers = incoming.headers; |
| 70 | + response = ''; |
| 71 | + |
| 72 | + let encoding = (headers['content-encoding'] || '').toLowerCase(); |
| 73 | + if (encoding === 'gzip' || encoding === 'deflate') { |
| 74 | + incoming = incoming.pipe(zlib.createUnzip()); |
| 75 | + } |
| 76 | + |
| 77 | + incoming.setEncoding('utf8'); |
| 78 | + incoming.on('data', function (d) { |
| 79 | + response += d; |
| 80 | + }); |
| 81 | + |
| 82 | + incoming.on('error', cleanUp); |
| 83 | + incoming.on('end', cleanUp); |
| 84 | + }, cleanUp); |
| 85 | + |
| 86 | + req.on('error', cleanUp); |
| 87 | + |
| 88 | + req.setNoDelay(true); |
| 89 | + req.setSocketKeepAlive(true); |
| 90 | + }) |
| 91 | + .then(() => { |
| 92 | + return () => req.abort(); |
| 93 | + }) |
| 94 | + .catch(e => { |
| 95 | + if (e && e.message) e.message = `AWS Credentials error: ${e.message}`; |
| 96 | + cleanUp(e); |
| 97 | + return () => {}; |
105 | 98 | });
|
106 |
| - |
107 |
| - incoming.on('error', cleanUp); |
108 |
| - incoming.on('end', cleanUp); |
109 |
| - }, cleanUp); |
110 |
| - |
111 |
| - req.on('error', cleanUp); |
112 |
| - |
113 |
| - req.setNoDelay(true); |
114 |
| - req.setSocketKeepAlive(true); |
115 |
| - |
116 |
| - return function () { |
117 |
| - req.abort(); |
118 |
| - }; |
119 | 99 | }
|
120 | 100 |
|
121 | 101 | getAWSCredentials() {
|
122 |
| - const { awsConfig } = this; |
123 |
| - |
124 | 102 | return new Promise((resolve, reject) => {
|
125 |
| - awsConfig.getCredentials((err, creds) => { |
| 103 | + this.awsConfig.getCredentials((err, creds) => { |
126 | 104 | if (err) return reject(err);
|
127 | 105 | return resolve(creds);
|
128 | 106 | });
|
129 | 107 | });
|
130 | 108 | }
|
| 109 | + |
| 110 | + createRequest(params, reqParams) { |
| 111 | + const request = new AWS.HttpRequest(this.endpoint); |
| 112 | + |
| 113 | + // copy across params |
| 114 | + Object.assign(request, reqParams); |
| 115 | + |
| 116 | + request.region = this.awsConfig.region; |
| 117 | + if (params.body) request.body = params.body; |
| 118 | + if (!request.headers) request.headers = {}; |
| 119 | + request.headers['presigned-expires'] = false; |
| 120 | + request.headers['Host'] = this.endpoint.host; |
| 121 | + |
| 122 | + return request; |
| 123 | + } |
131 | 124 | }
|
132 | 125 |
|
133 | 126 | module.exports = HttpAmazonESConnector;
|
0 commit comments