-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.js
71 lines (56 loc) · 2.32 KB
/
test.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
const _ = require('lodash');
const async = require('async');
const Client = require('./Client');
const Cluster = require('./Cluster');
const cst = require('./protocol/constants');
const Response = require('./protocol/Response');
const Request = require('./protocol/Request');
const client = new Client('192.168.50.10:9092');
client.getMetadatas(null, (metadatas) => {
const cluster = new Cluster(metadatas);
const topicsPartitions = {};
metadatas.topic_metadata.forEach((topic_meta) => {
topicsPartitions[topic_meta.topic] = _.map(topic_meta.topic_metadata, 'partition');
});
const calls = [
(done) => cluster.getCommittedOffsets(
'live-deliveries',
topicsPartitions,
(data) => done(null, data)),
(done) => cluster.getOffsetsList(
topicsPartitions,
(data) => done(null, data))
];
async.parallel(calls, (err, results) => {
const committedOffsets = {};
const offsetsList = {};
_.each(results[0].responses, (response) => {
committedOffsets[response.topic] = {};
_.each(response.partitions, (partition_data) => {
committedOffsets[response.topic][partition_data.partition] = partition_data;
});
});
_.each(results[1].responses, (response) => {
offsetsList[response.topic] = {};
_.each(response.partitions, (partition_data) => {
offsetsList[response.topic][partition_data.partition] = partition_data;
});
});
const lag = {};
_.each(offsetsList, (partitions, topic) => {
lag[topic] = {};
_.each(partitions, (data) => {
if (committedOffsets[topic][data.partition].error_code !== 0) {
lag[topic][data.partition] = -1;
return
}
let offset = 0;
if (committedOffsets[topic][data.partition].offset > 0) {
offset = committedOffsets[topic][data.partition].offset;
}
lag[topic][data.partition] = data.offset - offset;
});
});
console.log(JSON.stringify(lag, null, 2));
});
});