Skip to content

Commit c3cc3b3

Browse files
eladconEyal Keren
andauthored
feat: fifo queue (#31)
* wip * wip * wip * wip * wip * add test * wip * change mts to cjs * fix indentation --------- Co-authored-by: Eyal Keren <[email protected]>
1 parent acc1fca commit c3cc3b3

File tree

13 files changed

+2143
-0
lines changed

13 files changed

+2143
-0
lines changed

.github/workflows/fifoqueue-pull.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
name: fifoqueue-pull
2+
on:
3+
pull_request:
4+
paths:
5+
- fifoqueue/**
6+
jobs:
7+
build:
8+
runs-on: ubuntu-latest
9+
steps:
10+
- name: Checkout
11+
uses: actions/checkout@v3
12+
with:
13+
sparse-checkout: fifoqueue
14+
- name: Setup Node.js
15+
uses: actions/setup-node@v3
16+
with:
17+
node-version: 18.x
18+
registry-url: https://registry.npmjs.org
19+
- name: Install winglang
20+
run: npm i -g winglang
21+
- name: Install dependencies
22+
run: npm install --include=dev
23+
working-directory: fifoqueue
24+
- name: Test
25+
run: wing test
26+
working-directory: fifoqueue
27+
- name: Pack
28+
run: wing pack
29+
working-directory: fifoqueue
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
name: fifoqueue-release
2+
on:
3+
push:
4+
branches:
5+
- main
6+
paths:
7+
- fifoqueue/**
8+
jobs:
9+
build:
10+
runs-on: ubuntu-latest
11+
steps:
12+
- name: Checkout
13+
uses: actions/checkout@v3
14+
with:
15+
sparse-checkout: fifoqueue
16+
- name: Setup Node.js
17+
uses: actions/setup-node@v3
18+
with:
19+
node-version: 18.x
20+
registry-url: https://registry.npmjs.org
21+
- name: Install winglang
22+
run: npm i -g winglang
23+
- name: Install dependencies
24+
run: npm install --include=dev
25+
working-directory: fifoqueue
26+
- name: Test
27+
run: wing test
28+
working-directory: fifoqueue
29+
- name: Pack
30+
run: wing pack
31+
working-directory: fifoqueue
32+
- name: Publish
33+
run: npm publish --access=public --registry https://registry.npmjs.org --tag
34+
latest *.tgz
35+
working-directory: fifoqueue
36+
env:
37+
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}

fifoqueue/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
target/
2+
node_modules/

fifoqueue/LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2023 Wing
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

fifoqueue/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# fifoqueue
2+
3+
A wing library to work with FIFO (first-in first-out) Queues.
4+
5+
To use the queue, set `groupId` to group messages and process them in an ordered fashion.
6+
7+
## Prerequisites
8+
9+
* [winglang](https://winglang.io).
10+
11+
## Installation
12+
13+
`sh
14+
npm i @winglibs/fifoqueue
15+
`
16+
17+
## Usage
18+
19+
`js
20+
bring fifoqueue;
21+
22+
let queue = new fifoqueue.FifoQueue();
23+
24+
queue.setConsumer(inflight (message: str) => {
25+
log("recieved message {message}");
26+
});
27+
28+
test "will push to queue" {
29+
queue.push("a new message", groupId: "myGroup");
30+
}
31+
`
32+
33+
## License
34+
35+
This library is licensed under the [MIT License](./LICENSE).

fifoqueue/api.w

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
bring cloud;
2+
3+
pub struct FifoQueueProps extends cloud.QueueProps{}
4+
5+
pub struct PushOptions {
6+
groupId: str;
7+
}
8+
9+
pub struct SetConsumerOptions extends cloud.QueueSetConsumerOptions {}
10+
11+
pub interface IFifoQueue {
12+
setConsumer(handler: inflight (str): void, options: SetConsumerOptions?);
13+
inflight push(message: str, options: PushOptions);
14+
}

fifoqueue/aws.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
const {
2+
SQSClient,
3+
SendMessageCommand,
4+
} = require("@aws-sdk/client-sqs");
5+
6+
exports._push = async (queueUrl, message, groupId) => {
7+
const client = new SQSClient();
8+
return client.send(new SendMessageCommand({
9+
QueueUrl: queueUrl,
10+
MessageBody: message,
11+
MessageGroupId: groupId,
12+
}))
13+
};

fifoqueue/fifo-queue.aws.w

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
bring "cdktf" as cdktf;
2+
bring "@cdktf/provider-aws" as aws;
3+
bring cloud;
4+
bring aws as awsUtil;
5+
bring "./api.w" as api;
6+
7+
struct Record {
8+
body: str;
9+
}
10+
11+
struct SqsEvent {
12+
Records: Array<Record>;
13+
}
14+
15+
pub class FifoQueue_aws impl api.IFifoQueue {
16+
queue: aws.sqsQueue.SqsQueue;
17+
url: str;
18+
arn: str;
19+
new(props: api.FifoQueueProps?) {
20+
this.queue = new aws.sqsQueue.SqsQueue(
21+
visibilityTimeoutSeconds: props?.timeout?.seconds
22+
?? duration.fromSeconds(120).seconds,
23+
messageRetentionSeconds: props?.retentionPeriod?.seconds
24+
?? duration.fromHours(1).seconds,
25+
fifoQueue: true,
26+
contentBasedDeduplication: true,
27+
);
28+
this.url = this.queue.url;
29+
this.arn = this.queue.arn;
30+
}
31+
32+
pub setConsumer(handler: inflight (str) : void, options: api.SetConsumerOptions?) {
33+
let lambdaFn = new cloud.Function(inflight (event: str): void => {
34+
let json: Json = unsafeCast(event);
35+
let sqsEvent = SqsEvent.fromJson(event);
36+
for message in sqsEvent.Records {
37+
handler(message.body);
38+
}
39+
}, env: options?.env, logRetentionDays: options?.logRetentionDays, memory: options?.memory, timeout: options?.timeout);
40+
41+
let lambda = awsUtil.Function.from(lambdaFn);
42+
lambda?.addPolicyStatements({
43+
actions: [
44+
"sqs:ReceiveMessage",
45+
"sqs:ChangeMessageVisibility",
46+
"sqs:GetQueueUrl",
47+
"sqs:DeleteMessage",
48+
"sqs:GetQueueAttributes",
49+
],
50+
resources: [this.arn],
51+
});
52+
53+
new aws.lambdaEventSourceMapping.LambdaEventSourceMapping(
54+
functionName: "{lambda?.functionName}",
55+
eventSourceArn: this.arn,
56+
batchSize: options?.batchSize ?? 1
57+
);
58+
}
59+
60+
pub onLift(host: std.IInflightHost, ops: Array<str>) {
61+
if let lambda = awsUtil.Function.from(host) {
62+
if ops.contains("push") {
63+
lambda.addPolicyStatements({
64+
actions: ["sqs:SendMessage"],
65+
effect: awsUtil.Effect.ALLOW,
66+
resources: [
67+
this.arn
68+
]
69+
});
70+
}
71+
}
72+
}
73+
74+
pub inflight push(message: str, options: api.PushOptions) {
75+
FifoQueue_aws._push(this.url, message, options.groupId);
76+
}
77+
78+
extern "./aws.js" static inflight _push(queueUrl: str, message: str, groupId: str);
79+
}

fifoqueue/fifo-queue.sim.w

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
bring expect;
2+
bring util;
3+
bring cloud;
4+
bring "./api.w" as api;
5+
6+
struct FifoQueueMessage {
7+
groupId: str;
8+
message: str;
9+
}
10+
11+
pub class FifoQueue_sim impl api.IFifoQueue {
12+
queue: cloud.Queue;
13+
counter: cloud.Counter;
14+
15+
new(){
16+
this.queue = new cloud.Queue();
17+
this.counter = new cloud.Counter();
18+
}
19+
20+
pub setConsumer(handler: inflight (str): void, options: api.SetConsumerOptions?) {
21+
let counter = this.counter;
22+
this.queue.setConsumer(inflight (event: str) => {
23+
let message = FifoQueueMessage.parseJson(event);
24+
util.waitUntil(inflight () => {
25+
let value = counter.peek(message.groupId);
26+
if value == 0 {
27+
let acquired = counter.inc(1, message.groupId);
28+
if acquired == 0 {
29+
return true;
30+
} else {
31+
counter.dec(1, message.groupId);
32+
return false;
33+
}
34+
}
35+
return false;
36+
}, timeout: 30m);
37+
38+
try {
39+
handler(message.message);
40+
} finally {
41+
counter.dec(1, message.groupId);
42+
}
43+
});
44+
45+
}
46+
47+
inflight pub push(message: str, options: api.PushOptions) {
48+
this.queue.push(Json.stringify({ groupId: options.groupId, message: message }));
49+
}
50+
}

fifoqueue/fifo-queue.test.w

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
bring expect;
2+
bring util;
3+
bring cloud;
4+
bring "./fifo-queue.w" as fifo_queue;
5+
6+
let queue = new fifo_queue.FifoQueue();
7+
let counter = new cloud.Counter();
8+
9+
queue.setConsumer(inflight (message: str) => {
10+
log("start {message}");
11+
util.sleep(10s);
12+
counter.inc();
13+
log("end {message}");
14+
});
15+
16+
test "sequentially consume messages with the same group id " {
17+
queue.push("msg1", groupId: "123");
18+
queue.push("msg2", groupId: "123");
19+
util.sleep(15s);
20+
expect.equal(counter.peek(), 1);
21+
util.sleep(15s);
22+
expect.equal(counter.peek(), 2);
23+
}
24+
25+
test "parallelly consume messages with different group ids" {
26+
queue.push("msg1", groupId: "123");
27+
queue.push("msg2", groupId: "456");
28+
util.sleep(15s);
29+
expect.equal(counter.peek(), 2);
30+
}

0 commit comments

Comments
 (0)