This repository has been archived by the owner on Jan 26, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
partition-set.js
118 lines (97 loc) · 3.05 KB
/
partition-set.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
module.exports = function (logger, inherits, EventEmitter, Consumer, Producer) {
// A PartitionSet contains all of the known Partitions (for a Topic)
// It tracks which partitions are 'readable' and 'writable'
function PartitionSet() {
this.partitions = {}
this.consumer = new Consumer()
this.producer = new Producer()
this.onConsumerMessages = consumerMessages.bind(this)
this.onConsumerError = consumerError.bind(this)
this.onReadableChanged = readableChanged.bind(this)
this.onWritableChanged = writableChanged.bind(this)
this.onPartitionReady = partitionReady.bind(this)
this.onPartitionDestroy = partitionDestroy.bind(this)
this.consumer.on('messages', this.onConsumerMessages)
this.consumer.on('error', this.onConsumerError)
EventEmitter.call(this)
}
inherits(PartitionSet, EventEmitter)
PartitionSet.prototype.get = function (name) {
return this.partitions[name]
}
PartitionSet.prototype.add = function (partition) {
if(!this.partitions[partition.name]) {
partition.on('writable', this.onWritableChanged)
partition.on('readable', this.onReadableChanged)
partition.on('ready', this.onPartitionReady)
partition.on('destroy', this.onPartitionDestroy)
this.partitions[partition.name] = partition
logger.info('added partition', partition.name)
}
}
PartitionSet.prototype.remove = function (partition) {
this.consumer.remove(partition)
this.producer.remove(partition)
partition.removeListener('writable', this.onWritableChanged)
partition.removeListener('readable', this.onReadableChanged)
partition.removeListener('ready', this.onPartitionReady)
partition.removeListener('destroy', this.onPartitionDestroy)
delete this.partitions[partition.name]
logger.info('removed partition', partition.name)
}
PartitionSet.prototype.isReady = function () {
return this.producer.isReady()
}
PartitionSet.prototype.pause = function () {
this.consumer.pause()
}
PartitionSet.prototype.resume = function () {
this.consumer.resume()
}
PartitionSet.prototype.stopConsuming = function () {
var names = Object.keys(this.partitions)
for (var i = 0; i < names.length; i++) {
var partition = this.partitions[names[i]]
partition.isReadable(false)
}
}
PartitionSet.prototype.drain = function (cb) {
this.consumer.drain(cb)
}
PartitionSet.prototype.write = function (messages, cb) {
this.producer.write(messages, cb)
}
// Event handlers
function consumerMessages(partition, messages) {
this.emit('messages', partition, messages)
}
function consumerError(err) {
}
function readableChanged(partition) {
if (partition.isReadable()) {
this.consumer.add(partition)
}
else {
this.consumer.remove(partition)
}
}
function writableChanged(partition) {
if (partition.isWritable()) {
this.producer.add(partition)
// TODO: emit less
if (this.isReady()) {
this.emit('ready')
}
}
else {
this.producer.remove(partition)
}
}
function partitionReady(partition) {
this.emit('ready')
}
function partitionDestroy(partition) {
this.remove(partition)
}
return PartitionSet
}