Skip to content

Commit 5eef1b8

Browse files
check flaky bb consumer
1 parent 7b9fa2b commit 5eef1b8

File tree

4 files changed

+36
-13
lines changed

4 files changed

+36
-13
lines changed

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
"ft_test": "mocha --recursive $(find tests/functional -name '*.js') --timeout 30000 --exit",
2323
"ft_test:notification": "mocha --recursive $(find tests/functional/notification -name '*.js') --timeout 30000 --exit",
2424
"ft_test:replication": "mocha --recursive $(find tests/functional/replication -name '*.js') --timeout 30000 --exit",
25-
"ft_test:replication2": "mocha tests/functional/replication/streamedCopy.spec.js --timeout 60000 --exit",
2625
"ft_test:lib": "mocha --recursive $(find tests/functional/lib -name '*.js') --timeout 30000 --exit",
2726
"ft_test:lifecycle": "mocha --recursive $(find tests/functional/lifecycle -name '*.js') --timeout 30000 --exit",
2827
"ft_test:ingestion": "mocha --recursive $(find tests/functional/ingestion -name '*.js') --timeout 30000 --exit",

tests/functional/lib/BackbeatConsumer.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,12 @@ describe('BackbeatConsumer main tests', () => {
6262
], done);
6363
});
6464
afterEach(() => {
65+
console.log('[AFTER_EACH] Cleaning up, unsubscribing consumer...');
66+
// Stop the Kafka consumer from fetching messages
67+
consumer._consumer.unsubscribe();
68+
// Clear the consumed messages array for next test
6569
consumedMessages = [];
70+
// Remove event handlers to prevent cross-test pollution
6671
consumer.removeAllListeners('consumed');
6772
});
6873
after(function after(done) {
@@ -79,6 +84,8 @@ describe('BackbeatConsumer main tests', () => {
7984

8085
it('should be able to read messages sent to the topic and publish ' +
8186
'topic metrics', done => {
87+
console.log('[TEST_START] consumedMessages at start:', consumedMessages.length,
88+
'messages:', consumedMessages.map(b => b.toString()));
8289
let consumeCb = null;
8390
let totalConsumed = 0;
8491
let topicOffset;
@@ -90,39 +97,56 @@ describe('BackbeatConsumer main tests', () => {
9097
// reset to 0 before the test
9198
latestConsumedMetric.reset();
9299

100+
console.log('[TEST] Starting test - expecting', messages.length, 'messages');
101+
console.log('[TEST] Expected messages:', messages.map(e => e.message));
102+
93103
function _checkZkMetrics(done) {
104+
console.log('[TEST] Checking ZK metrics...');
94105
async.waterfall([
95106
next => zookeeper.getData(`${zkMetricsPath}/topic`, next),
96107
(topicOffsetData, stat, next) => {
97108
topicOffset = Number.parseInt(topicOffsetData, 10);
109+
console.log('[TEST] Topic offset from ZK:', topicOffset);
98110
zookeeper.getData(`${zkMetricsPath}/consumers/${groupId}`,
99111
next);
100112
},
101113
], (err, consumerOffsetData) => {
102114
assert.ifError(err);
103115
consumerOffset = Number.parseInt(consumerOffsetData, 10);
116+
console.log('[TEST] Consumer offset from ZK:', consumerOffset);
117+
console.log('[TEST] Offsets match:', topicOffset === consumerOffset);
104118
assert.strictEqual(topicOffset, consumerOffset);
105119
done();
106120
});
107121
}
108122
async function _checkPromMetrics() {
109123
const latestConsumedMetricValues =
110124
(await latestConsumedMetric.get()).values;
125+
console.log('[TEST] Prometheus metric values:', latestConsumedMetricValues);
111126
assert.strictEqual(latestConsumedMetricValues.length, 1);
112127
assert(latestConsumedMetricValues[0].value >= beforeConsume / 1000);
113128
}
114129
consumer.subscribe();
130+
console.log('[TEST] Consumer subscribed');
115131
consumer.on('consumed', messagesConsumed => {
116132
totalConsumed += messagesConsumed;
133+
console.log('[TEST] Consumed event fired - messagesConsumed:', messagesConsumed,
134+
'totalConsumed:', totalConsumed, 'expected:', messages.length);
135+
console.log('[TEST] Current consumedMessages array length:', consumedMessages.length);
136+
console.log('[TEST] Current consumedMessages content:',
137+
consumedMessages.map(buffer => buffer.toString()));
117138
assert(totalConsumed <= messages.length);
118139
if (totalConsumed === messages.length) {
140+
console.log('[TEST] Reached expected message count!');
119141
assert.deepStrictEqual(
120142
messages.map(e => e.message),
121143
consumedMessages.map(buffer => buffer.toString()));
122144
// metrics are published every second, so they
123145
// should be there after 5s
146+
console.log('[TEST] Waiting 5s for metrics to be published...');
124147
setTimeout(() => {
125148
_checkZkMetrics(() => {
149+
console.log('[TEST] Test complete, calling done()');
126150
consumeCb();
127151
consumer._consumer.unsubscribe();
128152
});
@@ -136,8 +160,10 @@ describe('BackbeatConsumer main tests', () => {
136160
}
137161
});
138162
consumeCb = done;
163+
console.log('[TEST] Sending', messages.length, 'test messages...');
139164
producer.send(messages, err => {
140165
assert.ifError(err);
166+
console.log('[TEST] Messages sent to Kafka');
141167
});
142168

143169
// Check that rdkafka metrics are indeed exported

tests/functional/replication/queueProcessor.js

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ function escapeForXML(string) {
6666
: string;
6767
}
6868

69-
const mockServerPort = 7777;
7069
const constants = {
7170
source: {
7271
s3: '127.0.0.1',
@@ -75,8 +74,8 @@ const constants = {
7574
'ab30293a044eca5215068c6a06cfdb1b636a16e4'],
7675
},
7776
target: {
78-
hosts: [{ host: '127.0.0.3', port: mockServerPort },
79-
{ host: '127.0.0.4', port: mockServerPort }],
77+
hosts: [{ host: '127.0.0.3', port: 7777 },
78+
{ host: '127.0.0.4', port: 7777 }],
8079
dataPartsKeys: ['7d808697fbaf9f16fb32b94be189b80b3b9b2890',
8180
'e54e2ced6625f67e07f4735fb7b897a7bc81d603'],
8281
},
@@ -805,9 +804,9 @@ function getQueueProcessorSiteConfig(site) {
805804
{ hosts: 'localhost:9092' },
806805
{ auth: { type: 'role',
807806
vault: { host: constants.source.vault,
808-
port: mockServerPort } },
807+
port: 7777 } },
809808
s3: { host: constants.source.s3,
810-
port: mockServerPort },
809+
port: 7777 },
811810
transport: 'http',
812811
},
813812
{
@@ -869,9 +868,9 @@ describe('queue processor functional tests with mocking', () => {
869868
{ hosts: 'localhost:9092' },
870869
{ auth: { type: 'role',
871870
vault: { host: constants.source.vault,
872-
port: mockServerPort } },
871+
port: 7777 } },
873872
s3: { host: constants.source.s3,
874-
port: mockServerPort },
873+
port: 7777 },
875874
transport: 'http',
876875
},
877876
{ replicationStatusTopic:
@@ -917,7 +916,7 @@ describe('queue processor functional tests with mocking', () => {
917916
s3mock = new S3Mock();
918917
httpServer = http.createServer(
919918
(req, res) => s3mock.onRequest(req, res));
920-
httpServer.listen(mockServerPort);
919+
httpServer.listen(7777);
921920
});
922921

923922
after(done => {

tests/functional/replication/streamedCopy.spec.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ const constants = {
2525
// data size should be sufficient to have data held in the socket
2626
// buffers, 10MB seems to work
2727
contents: Buffer.alloc(10000000).fill('Z'),
28-
mockServerPort: 7777,
2928
};
3029

3130
const qpParams = [
@@ -36,12 +35,12 @@ const qpParams = [
3635
{ hosts: 'localhost:9092' },
3736
{ auth: { type: 'account', account: 'bart' },
3837
s3: { host: '127.0.0.1',
39-
port: constants.mockServerPort },
38+
port: 7777 },
4039
transport: 'http',
4140
},
4241
{ auth: { type: 'account', account: 'bart' },
4342
replicationEndpoint: {
44-
site: 'sf', servers: [`127.0.0.2:${constants.mockServerPort}`],
43+
site: 'sf', servers: [`127.0.0.2:${7777}`],
4544
},
4645
transport: 'http' },
4746
{ topic: 'backbeat-func-test-dummy-topic',
@@ -192,7 +191,7 @@ describe('streamed copy functional tests', () => {
192191
s3mock = new S3Mock();
193192
httpServer = http.createServer(
194193
(req, res) => s3mock.onRequest(req, res));
195-
httpServer.listen(constants.mockServerPort, err => {
194+
httpServer.listen(7777, err => {
196195
if (err) {
197196
return done(err);
198197
}

0 commit comments

Comments
 (0)