Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C1-1153] Modified payload parsing of received message from kafka #787

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/shiny-turkeys-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"steveo": minor
---

Re-align kafka consumer callback payload parsing
22 changes: 14 additions & 8 deletions packages/steveo/src/consumers/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Kafka, {
import nullLogger from 'null-logger';
import { Pool } from 'generic-pool';
import BaseRunner from './base';
import { getDuration } from '../lib/context';
import { getContext, getDuration } from '../lib/context';
import { IRunner, Logger, KafkaConfiguration } from '../common';
import { Steveo } from '..';
import { Resource } from '../lib/pool';
Expand Down Expand Up @@ -75,18 +75,26 @@ class KafkaRunner
this.logger.debug(`waiting for pool ${c.topic}`);
resource = await this.pool.acquire();
this.logger.debug(`acquired pool`);
const valueString = c.payload.value?.toString() ?? '';

const { value: payloadValue, ...rest } = c.payload;

const valueString = payloadValue?.toString() ?? '';
let value = valueString;

try {
value = JSON.parse(valueString);
} catch (e) {
throw new JsonParsingError();
}

const runnerContext = getContext(value);
const parsed = {
...message,
value,
key: message.key?.toString(),
metadata: {
...rest,
},
...value,
};

this.registry.emit('runner_receive', c.topic, parsed, {
...message,
start: getDuration(),
Expand All @@ -107,9 +115,7 @@ class KafkaRunner

this.logger.debug('Start subscribe', c.topic, message);

// @ts-ignore
const context = parsed.value.context ?? null;
await task.subscribe(parsed, context);
await task.subscribe(parsed, runnerContext);

if (waitToCommit) {
this.logger.debug('committing message', message);
Expand Down
98 changes: 86 additions & 12 deletions packages/steveo/test/consumer/kafka_test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import sinon from 'sinon';

import { randomUUID } from 'crypto';
import Runner from '../../src/consumers/kafka';
import { build } from '../../src/lib/pool';
import Registry from '../../src/registry';
Expand Down Expand Up @@ -204,8 +204,8 @@ describe('runner/kafka', () => {
// @ts-ignore
pool: build(anotherRegistry),
manager: {
state: 'running'
}
state: 'running',
},
};
// @ts-ignore
const anotherRunner = new Runner(steveo);
Expand All @@ -217,16 +217,90 @@ describe('runner/kafka', () => {
anotherRunner.consumer,
'commitMessage'
);
await anotherRunner.consumeCallback(null, [{
value: Buffer.from(
'\x7B\x20\x22\x61\x22\x3A\x20\x22\x31\x32\x33\x22\x20\x7D'
),
size: 1000,
offset: 0,
topic: 'a-topic',
partition: 1,
}]);
await anotherRunner.consumeCallback(null, [
{
value: Buffer.from(
'\x7B\x20\x22\x61\x22\x3A\x20\x22\x31\x32\x33\x22\x20\x7D'
),
size: 1000,
offset: 0,
topic: 'a-topic',
partition: 1,
},
]);
expect(commitOffsetStub.callCount).to.equal(0);
expect(subscribeStub.callCount).to.equal(0);
});

it('should process a message', async () => {
const subscribeStub = sinon.stub().rejects();
const anotherRegistry = {
getTask: () => ({
publish: () => {},
subscribe: subscribeStub,
}),
emit: sandbox.stub(),
events: {
emit: sandbox.stub(),
},
};

const steveo = {
config: {
bootstrapServers: 'kafka:9200',
engine: 'kafka',
securityProtocol: 'plaintext',
waitToCommit: true,
},
// @ts-ignore
registry: anotherRegistry,
// @ts-ignore
pool: build(anotherRegistry),
manager: {
state: 'running',
},
};
// @ts-ignore
const anotherRunner = new Runner(steveo);

sandbox.stub(anotherRunner.consumer, 'commitMessage');

const jobId = randomUUID();
const payload = {
message: 'test runner',
context: {
jobId,
},
};

await anotherRunner.consumeCallback(null, [
{
value: JSON.stringify(payload),
size: 1000,
offset: 0,
topic: 'a-topic',
partition: 1,
},
]);

expect(subscribeStub.called).to.be.true;
const data = subscribeStub.args[0][0];
const context = subscribeStub.args[0][1];
expect(data, 'expected data').to.deep.equals({
metadata: {
size: 1000,
offset: 0,
topic: 'a-topic',
partition: 1,
},
message: 'test runner',
context: {
jobId,
},
});
expect(context, 'expected context').to.deep.equals({
duration: 0,
jobId,
});
});
});
Loading