Skip to content

Commit 72ec8c6

Browse files
authored
Add SQS fair queue support (#975)
* Add SQS fair queue support * Replace bitnami/kafka with official apache/kafka image
1 parent b78e529 commit 72ec8c6

File tree

5 files changed

+101
-12
lines changed

5 files changed

+101
-12
lines changed

.changeset/puny-paws-rule.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"steveo": minor
3+
---
4+
5+
Steveo - Add sqs fair queues support

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ jobs:
6363
run: |
6464
pip install localstack awscli-local[ver1]
6565
docker pull localstack/localstack
66-
docker run --hostname 127.0.0.1 --name kafka -d -p 9092:9092 -e KAFKA_ENABLE_KRAFT=yes -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093 -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER bitnami/kafka:3.5
66+
docker run --name kafka --network=host -d -e KAFKA_NODE_ID=1 -e KAFKA_PROCESS_ROLES=broker,controller -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 -e KAFKA_NUM_PARTITIONS=1 apache/kafka:latest
6767
localstack start -d
6868
localstack wait -t 30
6969
echo "Startup complete"

packages/steveo/src/common.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,22 @@ export interface KafkaMessageRoutingOptions {
170170
}
171171

172172
/**
173-
* @description SQS FIFO message routing options
173+
* @description SQS message routing options
174174
*/
175175
export interface SQSMessageRoutingOptions {
176176
/**
177-
* @description Groups messages with the same key.
178-
* See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html
177+
* @description Groups messages with the same key (MessageGroupId).
178+
*
179+
* Behavior depends on queue type:
180+
* - **FIFO queues**: Messages with the same key are processed in strict order, one at a time.
181+
* - **Standard queues**: Enables Fair Queue behavior - messages with the same key can be processed
182+
* in parallel while maintaining fair distribution across different keys/tenants.
183+
*
184+
* See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html
179185
*/
180186
key?: string;
181187
/**
182-
* @description The message deduplication ID.
188+
* @description The message deduplication ID (FIFO queues only).
183189
* SQS FIFO engine uses content-based deduplication by default if no message deduplication ID is provided.
184190
*/
185191
deDuplicationId?: string;

packages/steveo/src/producers/sqs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ class SqsProducer extends BaseProducer implements IProducer {
242242
MessageAttributes: messageAttributes,
243243
MessageBody: JSON.stringify({ ...msg, _meta: messageMetadata }),
244244
QueueUrl: this.sqsUrls[sqsTopic],
245-
MessageGroupId: fifo && options.key ? options.key : undefined,
245+
MessageGroupId: options.key || undefined,
246246
MessageDeduplicationId:
247247
fifo && options.deDuplicationId ? options.deDuplicationId : undefined,
248248
} as SendMessageCommandInput;

packages/steveo/test/producer/sqs_test.ts

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import sinon from 'sinon';
33
import Producer from '../../src/producers/sqs';
44
import Registry from '../../src/runtime/registry';
55
import Task from '../../src/runtime/task';
6-
import {ITask, SQSMessageRoutingOptions} from '../../src/common';
6+
import { ITask, SQSMessageRoutingOptions } from '../../src/common';
77
import { createMessageMetadata } from '../../src/lib/context';
88
import { TaskOptions } from '../../src/types/task-options';
99

@@ -77,12 +77,12 @@ describe('SQS Producer', () => {
7777
const subscribeStub = sandbox.stub().resolves({ some: 'success' });
7878
const anotherRegistry = {
7979
registeredTasks: [],
80-
addNewTask: () => {},
81-
removeTask: () => {},
80+
addNewTask: () => { },
81+
removeTask: () => { },
8282
getTopics: () => [],
8383
getTaskTopics: () => [],
8484
getTask: () => ({
85-
publish: () => {},
85+
publish: () => { },
8686
subscribe: subscribeStub,
8787
options: {
8888
deadLetterQueue: true,
@@ -217,7 +217,7 @@ describe('SQS Producer', () => {
217217

218218
const sentAttributes =
219219
sendMessageStub.getCall(0).args[0].MessageAttributes[
220-
attributes[0].name
220+
attributes[0].name
221221
];
222222
const sentDataType = sentAttributes.DataType;
223223
const sentStringValue = sentAttributes.StringValue;
@@ -260,7 +260,7 @@ describe('SQS Producer', () => {
260260
},
261261
MessageBody: JSON.stringify(expectedMessageBody),
262262
QueueUrl: undefined,
263-
MessageGroupId: undefined,
263+
MessageGroupId: 'context',
264264
MessageDeduplicationId: undefined
265265
};
266266

@@ -408,5 +408,83 @@ describe('SQS Producer', () => {
408408
);
409409
sinon.assert.calledWith(sendMessageStub, expectedPayload);
410410
});
411+
412+
it('should set MessageGroupId on standard queue when key is provided', async () => {
413+
const task: ITask = new Task(
414+
//@ts-expect-error
415+
{ engine: 'sqs' },
416+
registry,
417+
producer,
418+
'test-task-standard-with-key',
419+
'test-topic',
420+
() => undefined,
421+
{} // Standard queue (no fifo flag)
422+
);
423+
registry.addNewTask(task);
424+
425+
const messagePayload: any = { a: 'payload' };
426+
const messageContext: SQSMessageRoutingOptions = { key: 'tenant-123' };
427+
const expectedMessageBody = {
428+
...messagePayload,
429+
_meta: { ...createMessageMetadata(messagePayload), ...messageContext },
430+
};
431+
432+
const expectedPayload = {
433+
MessageAttributes: {
434+
Timestamp: { DataType: 'Number', StringValue: clock.now.toString() },
435+
},
436+
MessageBody: JSON.stringify(expectedMessageBody),
437+
QueueUrl: undefined,
438+
MessageGroupId: messageContext.key, // Should be set on standard queue
439+
MessageDeduplicationId: undefined
440+
};
441+
442+
const sendMessageStub = sandbox.stub(producer.producer, 'sendMessage');
443+
await producer.send(
444+
'test-topic',
445+
messagePayload,
446+
messageContext
447+
);
448+
sinon.assert.calledWith(sendMessageStub, expectedPayload);
449+
});
450+
451+
it('should not set MessageGroupId when key is not provided', async () => {
452+
const task: ITask = new Task(
453+
//@ts-expect-error
454+
{ engine: 'sqs' },
455+
registry,
456+
producer,
457+
'test-task-no-key',
458+
'test-topic',
459+
() => undefined,
460+
{}
461+
);
462+
registry.addNewTask(task);
463+
464+
const messagePayload = { a: 'payload' };
465+
const messageContext: SQSMessageRoutingOptions = {}; // No key
466+
const expectedMessageBody = {
467+
...messagePayload,
468+
_meta: { ...createMessageMetadata(messagePayload), ...messageContext },
469+
};
470+
471+
const expectedPayload = {
472+
MessageAttributes: {
473+
Timestamp: { DataType: 'Number', StringValue: clock.now.toString() },
474+
},
475+
MessageBody: JSON.stringify(expectedMessageBody),
476+
QueueUrl: undefined,
477+
MessageGroupId: undefined, // Should be undefined without key
478+
MessageDeduplicationId: undefined
479+
};
480+
481+
const sendMessageStub = sandbox.stub(producer.producer, 'sendMessage');
482+
await producer.send(
483+
'test-topic',
484+
messagePayload,
485+
messageContext
486+
);
487+
sinon.assert.calledWith(sendMessageStub, expectedPayload);
488+
});
411489
});
412490
});

0 commit comments

Comments
 (0)