Skip to content

Commit e5076ca

Browse files
committed
Added Elasticsearch aggregation response parser.
Added tests for each case.
0 parents  commit e5076ca

File tree

4 files changed

+903
-0
lines changed

4 files changed

+903
-0
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#IDE folder
2+
.idea
3+
4+
#Node modules
5+
node_modules

index.js

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
"use strict";
2+
3+
var merge = require('xtend');
4+
5+
var reduceObj = function (predicate, initial, obj, pastAgg) {
6+
return Object.keys(obj).reduce(function (acc, key) {
7+
return predicate(acc, obj[key], key, pastAgg);
8+
}, initial)
9+
};
10+
11+
function createKeyValueObject(key, value) {
12+
var obj = {};
13+
obj[key] = value;
14+
return obj
15+
}
16+
17+
function ensureDimensionIsComplete(agg) {
18+
if (!agg._dimensionName || !agg._dimensionValue && !(agg._dimensionValue === 0)) return agg;
19+
var newAgg = merge(createKeyValueObject(agg._dimensionName, agg._dimensionValue), agg);
20+
delete newAgg._dimensionName;
21+
delete newAgg._dimensionValue;
22+
return newAgg;
23+
}
24+
25+
function Aggregation(state) {
26+
this._state = state || [{}];
27+
}
28+
29+
Aggregation.prototype.map = function (iterator) {
30+
return new Aggregation(this._state.map(iterator));
31+
};
32+
33+
Aggregation.prototype.setDimensionName = function (dimensionName) {
34+
return this.map(function (agg) {
35+
return ensureDimensionIsComplete(merge(createKeyValueObject("_dimensionName", dimensionName), agg));
36+
})
37+
};
38+
39+
Aggregation.prototype.setDimensionValue = function (dimensionValue) {
40+
return this.map(function (agg) {
41+
return ensureDimensionIsComplete(merge(createKeyValueObject("_dimensionValue", dimensionValue), agg));
42+
})
43+
};
44+
45+
Aggregation.prototype.addMetric = function (name, val) {
46+
return this.map(function (agg) {
47+
return merge(agg, createKeyValueObject(name, val));
48+
});
49+
};
50+
51+
Aggregation.prototype.concat = function (agg) {
52+
return new Aggregation(this._state.concat(agg._state));
53+
};
54+
55+
Aggregation.prototype.addSubAggregation = function (agg) {
56+
return new Aggregation(this._state.reduce(function (acc, left) {
57+
return acc.concat((agg._state || []).map(function (right) {
58+
return merge(left, right);
59+
}));
60+
}, []));
61+
};
62+
63+
Aggregation.prototype.get = function () {
64+
return this._state;
65+
};
66+
67+
function EmptyAggregation() {
68+
}
69+
EmptyAggregation.prototype.map = function () {
70+
return this;
71+
};
72+
EmptyAggregation.prototype.setDimensionName = function (dimensionName) {
73+
return new Aggregation().setDimensionName(dimensionName)
74+
};
75+
EmptyAggregation.prototype.setDimensionValue = function (dimensionValue) {
76+
return new Aggregation().setDimensionValue(dimensionValue)
77+
};
78+
EmptyAggregation.prototype.addMetric = function (name, val) {
79+
return new Aggregation().addMetric(name, val)
80+
};
81+
EmptyAggregation.prototype.concat = function (agg) {
82+
return agg
83+
};
84+
EmptyAggregation.prototype.addSubAggregation = function (agg) {
85+
return agg
86+
};
87+
EmptyAggregation.prototype.get = function () {
88+
return []
89+
};
90+
91+
function isBucket(bucket) {
92+
return typeof bucket === 'object' && bucket.hasOwnProperty('key');
93+
}
94+
95+
function isSubAgg(subAgg) {
96+
return typeof subAgg === 'object' && subAgg.hasOwnProperty("buckets");
97+
}
98+
99+
function handleMetrics(next, first, agg, metric, metricName, pastAggregation) {
100+
if (!metric.hasOwnProperty("value")) return next(agg, metric, metricName, pastAggregation);
101+
return agg.addMetric(metricName, metric.value);
102+
}
103+
104+
function handleOneBucket(next, first, agg, bucket, key, pastAggregation) {
105+
if (!isBucket(bucket)) return next(agg, bucket, key, pastAggregation);
106+
return reduceObj(first, agg.setDimensionValue(bucket.key), bucket, agg.setDimensionValue(bucket.key));
107+
}
108+
109+
function handleBuckets(next, first, agg, buckets, key, pastAggregation) {
110+
if (!Array.isArray(buckets) || key != "buckets") return next(agg, buckets, key, pastAggregation);
111+
return buckets.map(/*first.bind(null, agg)*/function (bucket, idx) {
112+
return first(agg, bucket, idx, pastAggregation);
113+
}).reduce(function (acc, _agg) {
114+
return acc.concat(_agg);
115+
}, new EmptyAggregation())
116+
}
117+
118+
function handleSubAggregation(next, first, agg, subAgg, subAggName, pastAggregation) {
119+
if (!isSubAgg(subAgg)) return next(agg, subAgg, subAggName, pastAggregation);
120+
return addSubOrMissingAgg(first, agg, subAgg, subAggName, pastAggregation);
121+
}
122+
123+
function addSubOrMissingAgg(first, agg, subAgg, subAggName, pastAggregation) {
124+
pastAggregation = pastAggregation ? pastAggregation : new EmptyAggregation();
125+
if (agg._state && agg._state.length && Object.keys(agg._state[0]).length > 1) {
126+
return agg.concat(pastAggregation.addSubAggregation(reduceObj(first, new EmptyAggregation().setDimensionName(subAggName), subAgg, agg)))
127+
} else {
128+
return (agg._state && agg._state.length ? agg : pastAggregation).addSubAggregation(reduceObj(first, new EmptyAggregation().setDimensionName(subAggName), subAgg, agg))
129+
}
130+
}
131+
132+
function isNumeric(n) {
133+
return !isNaN(parseFloat(n)) && isFinite(n);
134+
}
135+
136+
function isMissingAggregation(missingAggName, missingAggs) {
137+
return (missingAggs.doc_count && (missingAggName && !isNumeric(missingAggName)) && typeof missingAggs === 'object' && missingAggName.indexOf("missing_") >= 0);
138+
}
139+
140+
function createSubAggWithMissingAgg(missingAggs) {
141+
return createKeyValueObject("buckets", [merge(createKeyValueObject("key", "null"), missingAggs)]);
142+
}
143+
144+
145+
function handleMissingAggregations(next, first, agg, missingAggs, missingAggName, pastAggregation) {
146+
if (!isMissingAggregation(missingAggName, missingAggs)) return next(agg, missingAggs, missingAggName, pastAggregation);
147+
return addSubOrMissingAgg(first, agg, createSubAggWithMissingAgg(missingAggs), missingAggName.slice(8), pastAggregation);
148+
}
149+
150+
var defaultHandlers = [
151+
handleMetrics,
152+
handleOneBucket,
153+
handleBuckets,
154+
handleSubAggregation,
155+
handleMissingAggregations
156+
];
157+
158+
function createCor(handlers, fallBack) {
159+
return handlers.reduce(function (nextHandler, handler) {
160+
return handler.bind(null, nextHandler, function first() {
161+
return createCor(handlers, fallBack).apply(null, arguments);
162+
});
163+
}, fallBack);
164+
}
165+
166+
module.exports.parse = function parseEsResponse(response, options) {
167+
options = options || {};
168+
if (!response.aggregations) return [];
169+
return reduceObj(createCor(options.handlers || defaultHandlers, function (handler) {
170+
return handler;
171+
}), new EmptyAggregation(), response.aggregations).get()
172+
};

package.json

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "es-response-parser",
3+
"version": "1.0.0",
4+
"description": "A module that helps to parse Elasticsearch aggregation response to human readable form",
5+
"main": "index.js",
6+
"scripts": {
7+
"test": "npm test"
8+
},
9+
"repository": {
10+
"type": "git",
11+
"url": "git+ssh://[email protected]/Isabek/es-response-parser.git"
12+
},
13+
"keywords": [
14+
"Elasticsearch aggregation Node.JS",
15+
"response parser aggregation Node.JS",
16+
"human readable response elasticsearch"
17+
],
18+
"author": "[email protected]",
19+
"license": "ISC",
20+
"bugs": {
21+
"url": "https://github.com/Isabek/es-response-parser/issues"
22+
},
23+
"homepage": "https://github.com/Isabek/es-response-parser#readme",
24+
"devDependencies": {
25+
"assert": "^1.3.0"
26+
},
27+
"dependencies": {
28+
"mocha": "^2.3.4",
29+
"xtend": "^4.0.1"
30+
}
31+
}

0 commit comments

Comments
 (0)